┌──────────────┐ LPUSH ┌─────────────────┐ BRPOP ┌──────────────┐
│ Producer │ ──────────────────▶ │ Redis Queue │ ─────────────────▶ │ Consumer │
│ (Job sender)│ │ (List: jobs) │ │ (Job worker) │
└──────────────┘ └─────────────────┘ └──────────────┘
┌──────────────┐ LPUSH ┌─────────────────┐ BRPOP ┌──────────────┐
│ Producer │ ──────────────────▶ │ Redis Queue │ ─────────────────▶ │ Consumer │
│ (Job sender)│ │ (List: jobs) │ │ (Job worker) │
└──────────────┘ └─────────────────┘ └──────────────┘
┌──────────────┐ LPUSH ┌─────────────────┐ BRPOP ┌──────────────┐
│ Producer │ ──────────────────▶ │ Redis Queue │ ─────────────────▶ │ Consumer │
│ (Job sender)│ │ (List: jobs) │ │ (Job worker) │
└──────────────┘ └─────────────────┘ └──────────────┘
mkdir redis-queue && cd redis-queue
cabal init --non-interactive
mkdir redis-queue && cd redis-queue
cabal init --non-interactive
mkdir redis-queue && cd redis-queue
cabal init --non-interactive
build-depends: base >= 4.14, hedis >= 0.15, aeson >= 2.0, text >= 1.2, bytestring >= 0.11, async >= 2.2
build-depends: base >= 4.14, hedis >= 0.15, aeson >= 2.0, text >= 1.2, bytestring >= 0.11, async >= 2.2
build-depends: base >= 4.14, hedis >= 0.15, aeson >= 2.0, text >= 1.2, bytestring >= 0.11, async >= 2.2
cabal build
cabal build
cabal build
-- Open a connection pool
conn <- connect defaultConnectInfo -- Run Redis commands inside runRedis
runRedis conn $ do set "hello" "world" get "hello" -- returns Right (Just "world")
-- Open a connection pool
conn <- connect defaultConnectInfo -- Run Redis commands inside runRedis
runRedis conn $ do set "hello" "world" get "hello" -- returns Right (Just "world")
-- Open a connection pool
conn <- connect defaultConnectInfo -- Run Redis commands inside runRedis
runRedis conn $ do set "hello" "world" get "hello" -- returns Right (Just "world")
{-# LANGUAGE DeriveGeneric #-} module Job where import Data.Aeson (FromJSON, ToJSON, encode, decode)
import Data.Text (Text)
import GHC.Generics (Generic)
import Data.ByteString.Lazy (ByteString) -- Our job payload — swap this for whatever your domain needs
data Job = Job { jobId :: Text , jobType :: Text , payload :: Text } deriving (Show, Eq, Generic) instance ToJSON Job
instance FromJSON Job -- The Redis key we'll use as our queue
queueKey :: ByteString
queueKey = "jobs:queue"
{-# LANGUAGE DeriveGeneric #-} module Job where import Data.Aeson (FromJSON, ToJSON, encode, decode)
import Data.Text (Text)
import GHC.Generics (Generic)
import Data.ByteString.Lazy (ByteString) -- Our job payload — swap this for whatever your domain needs
data Job = Job { jobId :: Text , jobType :: Text , payload :: Text } deriving (Show, Eq, Generic) instance ToJSON Job
instance FromJSON Job -- The Redis key we'll use as our queue
queueKey :: ByteString
queueKey = "jobs:queue"
{-# LANGUAGE DeriveGeneric #-} module Job where import Data.Aeson (FromJSON, ToJSON, encode, decode)
import Data.Text (Text)
import GHC.Generics (Generic)
import Data.ByteString.Lazy (ByteString) -- Our job payload — swap this for whatever your domain needs
data Job = Job { jobId :: Text , jobType :: Text , payload :: Text } deriving (Show, Eq, Generic) instance ToJSON Job
instance FromJSON Job -- The Redis key we'll use as our queue
queueKey :: ByteString
queueKey = "jobs:queue"
module Producer where import Database.Redis
import Data.Aeson (encode)
import Data.ByteString.Lazy (toStrict)
import Control.Monad (forM_)
import Job -- Push a single job onto the left end of the list
enqueue :: Connection -> Job -> IO ()
enqueue conn job = do let encoded = toStrict (encode job) result <- runRedis conn $ lpush queueKey [encoded] case result of Left err -> putStrLn $ "Enqueue error: " ++ show err Right count -> putStrLn $ "Job enqueued. Queue depth: " ++ show count -- Simulate a burst of jobs — e.g. end-of-day refund batch
producerMain :: Connection -> IO ()
producerMain conn = do let jobs = [ Job "txn-001" "refund" "{\"amount\": 500, \"currency\": \"INR\"}" , Job "txn-002" "refund" "{\"amount\": 1200, \"currency\": \"INR\"}" , Job "txn-003" "notify" "{\"email\": \"[email protected]\"}" , Job "txn-004" "refund" "{\"amount\": 300, \"currency\": \"USD\"}" , Job "txn-005" "notify" "{\"email\": \"[email protected]\"}" ] putStrLn "Producer starting — pushing jobs..." forM_ jobs (enqueue conn) putStrLn "Producer done."
module Producer where import Database.Redis
import Data.Aeson (encode)
import Data.ByteString.Lazy (toStrict)
import Control.Monad (forM_)
import Job -- Push a single job onto the left end of the list
enqueue :: Connection -> Job -> IO ()
enqueue conn job = do let encoded = toStrict (encode job) result <- runRedis conn $ lpush queueKey [encoded] case result of Left err -> putStrLn $ "Enqueue error: " ++ show err Right count -> putStrLn $ "Job enqueued. Queue depth: " ++ show count -- Simulate a burst of jobs — e.g. end-of-day refund batch
producerMain :: Connection -> IO ()
producerMain conn = do let jobs = [ Job "txn-001" "refund" "{\"amount\": 500, \"currency\": \"INR\"}" , Job "txn-002" "refund" "{\"amount\": 1200, \"currency\": \"INR\"}" , Job "txn-003" "notify" "{\"email\": \"[email protected]\"}" , Job "txn-004" "refund" "{\"amount\": 300, \"currency\": \"USD\"}" , Job "txn-005" "notify" "{\"email\": \"[email protected]\"}" ] putStrLn "Producer starting — pushing jobs..." forM_ jobs (enqueue conn) putStrLn "Producer done."
module Producer where import Database.Redis
import Data.Aeson (encode)
import Data.ByteString.Lazy (toStrict)
import Control.Monad (forM_)
import Job -- Push a single job onto the left end of the list
enqueue :: Connection -> Job -> IO ()
enqueue conn job = do let encoded = toStrict (encode job) result <- runRedis conn $ lpush queueKey [encoded] case result of Left err -> putStrLn $ "Enqueue error: " ++ show err Right count -> putStrLn $ "Job enqueued. Queue depth: " ++ show count -- Simulate a burst of jobs — e.g. end-of-day refund batch
producerMain :: Connection -> IO ()
producerMain conn = do let jobs = [ Job "txn-001" "refund" "{\"amount\": 500, \"currency\": \"INR\"}" , Job "txn-002" "refund" "{\"amount\": 1200, \"currency\": \"INR\"}" , Job "txn-003" "notify" "{\"email\": \"[email protected]\"}" , Job "txn-004" "refund" "{\"amount\": 300, \"currency\": \"USD\"}" , Job "txn-005" "notify" "{\"email\": \"[email protected]\"}" ] putStrLn "Producer starting — pushing jobs..." forM_ jobs (enqueue conn) putStrLn "Producer done."
module Consumer where import Database.Redis
import Data.Aeson (decode)
import Data.ByteString.Lazy (fromStrict)
import Control.Monad (forever)
import Job -- Process a single job — replace this with your real business logic
processJob :: Job -> IO ()
processJob job = putStrLn $ "[Worker] Processing " ++ show (jobType job) ++ " | ID: " ++ show (jobId job) ++ " | Payload: " ++ show (payload job) -- Block until a job is available, then process it
consumeOne :: Connection -> IO ()
consumeOne conn = do result <- runRedis conn $ brpop [queueKey] 30 -- 30s timeout case result of Left err -> putStrLn $ "Redis error: " ++ show err Right Nothing -> putStrLn "Timeout — no jobs in 30s, polling again..." Right (Just (_, raw)) -> case decode (fromStrict raw) of Nothing -> putStrLn $ "Failed to decode job: " ++ show raw Just job -> processJob job -- Run forever, consuming jobs as they arrive
consumerMain :: Connection -> IO ()
consumerMain conn = do putStrLn "Consumer started — waiting for jobs..." forever (consumeOne conn)
module Consumer where import Database.Redis
import Data.Aeson (decode)
import Data.ByteString.Lazy (fromStrict)
import Control.Monad (forever)
import Job -- Process a single job — replace this with your real business logic
processJob :: Job -> IO ()
processJob job = putStrLn $ "[Worker] Processing " ++ show (jobType job) ++ " | ID: " ++ show (jobId job) ++ " | Payload: " ++ show (payload job) -- Block until a job is available, then process it
consumeOne :: Connection -> IO ()
consumeOne conn = do result <- runRedis conn $ brpop [queueKey] 30 -- 30s timeout case result of Left err -> putStrLn $ "Redis error: " ++ show err Right Nothing -> putStrLn "Timeout — no jobs in 30s, polling again..." Right (Just (_, raw)) -> case decode (fromStrict raw) of Nothing -> putStrLn $ "Failed to decode job: " ++ show raw Just job -> processJob job -- Run forever, consuming jobs as they arrive
consumerMain :: Connection -> IO ()
consumerMain conn = do putStrLn "Consumer started — waiting for jobs..." forever (consumeOne conn)
module Consumer where import Database.Redis
import Data.Aeson (decode)
import Data.ByteString.Lazy (fromStrict)
import Control.Monad (forever)
import Job -- Process a single job — replace this with your real business logic
processJob :: Job -> IO ()
processJob job = putStrLn $ "[Worker] Processing " ++ show (jobType job) ++ " | ID: " ++ show (jobId job) ++ " | Payload: " ++ show (payload job) -- Block until a job is available, then process it
consumeOne :: Connection -> IO ()
consumeOne conn = do result <- runRedis conn $ brpop [queueKey] 30 -- 30s timeout case result of Left err -> putStrLn $ "Redis error: " ++ show err Right Nothing -> putStrLn "Timeout — no jobs in 30s, polling again..." Right (Just (_, raw)) -> case decode (fromStrict raw) of Nothing -> putStrLn $ "Failed to decode job: " ++ show raw Just job -> processJob job -- Run forever, consuming jobs as they arrive
consumerMain :: Connection -> IO ()
consumerMain conn = do putStrLn "Consumer started — waiting for jobs..." forever (consumeOne conn)
module Main where import Database.Redis
import Control.Concurrent.Async (concurrently_)
import Producer
import Consumer main :: IO ()
main = do -- Connect to local Redis; swap defaultConnectInfo for your host/port/auth conn <- connect defaultConnectInfo -- Run producer and consumer concurrently -- In production you'd run these as separate processes/services concurrently_ (producerMain conn) (consumerMain conn)
module Main where import Database.Redis
import Control.Concurrent.Async (concurrently_)
import Producer
import Consumer main :: IO ()
main = do -- Connect to local Redis; swap defaultConnectInfo for your host/port/auth conn <- connect defaultConnectInfo -- Run producer and consumer concurrently -- In production you'd run these as separate processes/services concurrently_ (producerMain conn) (consumerMain conn)
module Main where import Database.Redis
import Control.Concurrent.Async (concurrently_)
import Producer
import Consumer main :: IO ()
main = do -- Connect to local Redis; swap defaultConnectInfo for your host/port/auth conn <- connect defaultConnectInfo -- Run producer and consumer concurrently -- In production you'd run these as separate processes/services concurrently_ (producerMain conn) (consumerMain conn)
# Terminal 1 — start Redis
redis-server # Terminal 2 — run the app
cabal run redis-queue
# Terminal 1 — start Redis
redis-server # Terminal 2 — run the app
cabal run redis-queue
# Terminal 1 — start Redis
redis-server # Terminal 2 — run the app
cabal run redis-queue
Producer starting — pushing jobs...
Job enqueued. Queue depth: 1
Job enqueued. Queue depth: 2
Job enqueued. Queue depth: 3
Job enqueued. Queue depth: 4
Job enqueued. Queue depth: 5
Producer done.
Consumer started — waiting for jobs...
[Worker] Processing "refund" | ID: "txn-001" | Payload: "{"amount": 500, "currency": "INR"}"
[Worker] Processing "refund" | ID: "txn-002" | Payload: "{"amount": 1200, "currency": "INR"}"
[Worker] Processing "notify" | ID: "txn-003" | Payload: "{"email": "[email protected]"}"
[Worker] Processing "refund" | ID: "txn-004" | Payload: "{"amount": 300, "currency": "USD"}"
[Worker] Processing "notify" | ID: "txn-005" | Payload: "{"email": "[email protected]"}"
Producer starting — pushing jobs...
Job enqueued. Queue depth: 1
Job enqueued. Queue depth: 2
Job enqueued. Queue depth: 3
Job enqueued. Queue depth: 4
Job enqueued. Queue depth: 5
Producer done.
Consumer started — waiting for jobs...
[Worker] Processing "refund" | ID: "txn-001" | Payload: "{"amount": 500, "currency": "INR"}"
[Worker] Processing "refund" | ID: "txn-002" | Payload: "{"amount": 1200, "currency": "INR"}"
[Worker] Processing "notify" | ID: "txn-003" | Payload: "{"email": "[email protected]"}"
[Worker] Processing "refund" | ID: "txn-004" | Payload: "{"amount": 300, "currency": "USD"}"
[Worker] Processing "notify" | ID: "txn-005" | Payload: "{"email": "[email protected]"}"
Producer starting — pushing jobs...
Job enqueued. Queue depth: 1
Job enqueued. Queue depth: 2
Job enqueued. Queue depth: 3
Job enqueued. Queue depth: 4
Job enqueued. Queue depth: 5
Producer done.
Consumer started — waiting for jobs...
[Worker] Processing "refund" | ID: "txn-001" | Payload: "{"amount": 500, "currency": "INR"}"
[Worker] Processing "refund" | ID: "txn-002" | Payload: "{"amount": 1200, "currency": "INR"}"
[Worker] Processing "notify" | ID: "txn-003" | Payload: "{"email": "[email protected]"}"
[Worker] Processing "refund" | ID: "txn-004" | Payload: "{"amount": 300, "currency": "USD"}"
[Worker] Processing "notify" | ID: "txn-005" | Payload: "{"email": "[email protected]"}"
import Control.Concurrent.Async (replicateConcurrently_) main :: IO ()
main = do conn <- connect defaultConnectInfo -- Run 4 parallel consumer workers concurrently_ (producerMain conn) (replicateConcurrently_ 4 (consumerMain conn))
import Control.Concurrent.Async (replicateConcurrently_) main :: IO ()
main = do conn <- connect defaultConnectInfo -- Run 4 parallel consumer workers concurrently_ (producerMain conn) (replicateConcurrently_ 4 (consumerMain conn))
import Control.Concurrent.Async (replicateConcurrently_) main :: IO ()
main = do conn <- connect defaultConnectInfo -- Run 4 parallel consumer workers concurrently_ (producerMain conn) (replicateConcurrently_ 4 (consumerMain conn))
# In a Redis CLI while the app runs:
redis-cli LLEN jobs:queue # current queue depth
redis-cli MONITOR # watch every command in real time
# In a Redis CLI while the app runs:
redis-cli LLEN jobs:queue # current queue depth
redis-cli MONITOR # watch every command in real time
# In a Redis CLI while the app runs:
redis-cli LLEN jobs:queue # current queue depth
redis-cli MONITOR # watch every command in real time
deadLetterKey :: ByteString
deadLetterKey = "jobs:dead" -- Consume with failure handling
consumeSafe :: Connection -> IO ()
consumeSafe conn = do result <- runRedis conn $ brpop [queueKey] 30 case result of Right (Just (_, raw)) -> case decode (fromStrict raw) of Nothing -> do -- Malformed payload — send to dead letter queue _ <- runRedis conn $ lpush deadLetterKey [raw] putStrLn "Malformed job moved to dead letter queue" Just job -> -- Wrap in exception handler for business logic failures processJob job `catch` \(e :: SomeException) -> do _ <- runRedis conn $ lpush deadLetterKey [raw] putStrLn $ "Job failed, dead-lettered: " ++ show e _ -> pure ()
deadLetterKey :: ByteString
deadLetterKey = "jobs:dead" -- Consume with failure handling
consumeSafe :: Connection -> IO ()
consumeSafe conn = do result <- runRedis conn $ brpop [queueKey] 30 case result of Right (Just (_, raw)) -> case decode (fromStrict raw) of Nothing -> do -- Malformed payload — send to dead letter queue _ <- runRedis conn $ lpush deadLetterKey [raw] putStrLn "Malformed job moved to dead letter queue" Just job -> -- Wrap in exception handler for business logic failures processJob job `catch` \(e :: SomeException) -> do _ <- runRedis conn $ lpush deadLetterKey [raw] putStrLn $ "Job failed, dead-lettered: " ++ show e _ -> pure ()
deadLetterKey :: ByteString
deadLetterKey = "jobs:dead" -- Consume with failure handling
consumeSafe :: Connection -> IO ()
consumeSafe conn = do result <- runRedis conn $ brpop [queueKey] 30 case result of Right (Just (_, raw)) -> case decode (fromStrict raw) of Nothing -> do -- Malformed payload — send to dead letter queue _ <- runRedis conn $ lpush deadLetterKey [raw] putStrLn "Malformed job moved to dead letter queue" Just job -> -- Wrap in exception handler for business logic failures processJob job `catch` \(e :: SomeException) -> do _ <- runRedis conn $ lpush deadLetterKey [raw] putStrLn $ "Job failed, dead-lettered: " ++ show e _ -> pure ()
import Database.Redis productionConnInfo :: ConnectInfo
productionConnInfo = defaultConnectInfo { connectHost = "your-redis-host.example.com" , connectPort = PortNumber 6379 , connectAuth = Just "your-auth-password" , connectDatabase = 0 , connectMaxConnections = 50 -- connection pool size } main :: IO ()
main = do conn <- connect productionConnInfo ...
import Database.Redis productionConnInfo :: ConnectInfo
productionConnInfo = defaultConnectInfo { connectHost = "your-redis-host.example.com" , connectPort = PortNumber 6379 , connectAuth = Just "your-auth-password" , connectDatabase = 0 , connectMaxConnections = 50 -- connection pool size } main :: IO ()
main = do conn <- connect productionConnInfo ...
import Database.Redis productionConnInfo :: ConnectInfo
productionConnInfo = defaultConnectInfo { connectHost = "your-redis-host.example.com" , connectPort = PortNumber 6379 , connectAuth = Just "your-auth-password" , connectDatabase = 0 , connectMaxConnections = 50 -- connection pool size } main :: IO ()
main = do conn <- connect productionConnInfo ... - Low latency — sub-millisecond enqueue/dequeue
- Simplicity — no broker clusters to manage
- Atomicity — LPUSH/BRPOP are atomic operations, safe under concurrency
- Visibility — you can inspect the queue state instantly with LLEN - Producer pushes JSON-encoded jobs onto a Redis list using LPUSH
- Consumer blocks on BRPOP — waking up the instant a job arrives
- Multiple consumers can run in parallel, each pulling distinct jobs atomically - GHC + Cabal (or Stack) installed
- A running Redis instance (redis-server or Docker: docker run -p 6379:6379 redis)
- Basic familiarity with Haskell (do notation, IO) - LPUSH + BRPOP is Redis's native producer-consumer primitive — atomic, fast, and simple
- Hedis gives you a type-safe, monadic interface to Redis in Haskell with connection pooling built in
- Blocking pop (BRPOP) beats polling — zero CPU overhead while the queue is idle
- Dead-letter queues are non-negotiable in production — never let failed jobs disappear silently
- This pattern scales horizontally: add consumers, add producers, the queue fans out automatically - Priority queues — use multiple Redis lists (jobs:high, jobs:low) and pass both keys to BRPOP; Redis pops from the first non-empty list
- Delayed jobs — use a Redis Sorted Set with the scheduled timestamp as the score; a scheduler process moves ready jobs to the main queue
- Exactly-once delivery — combine BRPOPLPUSH with a processing list and a visibility timeout