Tools: Building a Producer-Consumer Queue with Redis and Haskell Using Hedis

Tools: Building a Producer-Consumer Queue with Redis and Haskell Using Hedis

Why Redis for a Queue?

What We're Building

Prerequisites

Project Setup

Understanding Hedis Basics

Step 1 — Define the Job Type

Step 2 — The Producer

Step 3 — The Consumer

Step 4 — Wire It Together

Running It

Step 5 — Scaling to Multiple Consumers

Step 6 — Dead Letter Handling (Production Hardening)

Connecting to a Real Redis Host

What We Built vs. What Juspay Ran

Key Takeaways

What's Next? TL;DR: We'll build a production-grade producer-consumer queue in Haskell using Redis as the message broker via the Hedis client library. By the end, you'll have a working system that can handle high-throughput job dispatch and consumption — the same pattern I used to process 1M+ payment refunds at Juspay. When people think "message queue," they reach for Kafka or RabbitMQ. But Redis is often the right call when you need: At Juspay, we routed payment refunds through a Redis-backed producer-consumer system. The queue absorbed burst traffic from merchant-triggered refund events and fed a pool of consumers that processed each refund, updated sub-statuses, and called downstream banking APIs — all without a single dropped message. Create a new Cabal project: Add dependencies to your redis-queue.cabal file: Install and confirm Hedis is available: Hedis wraps all Redis commands in the Redis monad, which you run against a Connection. Here's the mental model: Every command returns Either Reply a — the Left branch is a Redis protocol error, Right is success. In practice you'll pattern-match or use either to handle errors. Keeping the job type generic means you can serialise anything that has a ToJSON instance — refund requests, email notifications, image processing tasks, whatever fits your system. Create src/Producer.hs: Key point: lpush is atomic. Even if 100 producers call it simultaneously, each job lands on the queue exactly once. Redis serialises concurrent writes internally — no locks needed on your side. Create src/Consumer.hs: brpop is the magic here. It blocks the connection until an item is available on any of the listed keys, then atomically pops and returns it. The 30 is a timeout in seconds — after which it returns Right Nothing so you can loop cleanly rather than hanging forever. This is fundamentally different from polling (RPOP in a loop with threadDelay) — blocking means zero CPU burn while the queue is empty. concurrently_ from the async package runs both actions in parallel on separate OS threads, waiting for both to finish. In a real deployment you'd run the producer and consumer as separate services — this just wires them together for a clean demo. Want parallel workers? Spawn multiple consumers against the same queue: Because BRPOP is atomic, each job is delivered to exactly one consumer — no double-processing. Redis handles the fan-out natively. You can verify this live: In production, jobs can fail. You don't want failed jobs silently disappearing. Add a dead-letter queue: Now failed jobs accumulate in jobs:dead where you can inspect, replay, or alert on them — no silent data loss. For production (Redis Cloud, AWS ElastiCache, etc.): For TLS (Redis Cloud, Upstash, etc.), use checkedConnect with connectTLSParams set. The pattern here is the same core design behind Juspay's refund processing pipeline — with a few additions at scale: The Redis primitives (LPUSH, BRPOP, atomic pops) are identical. Scaling up is mostly operational — more consumer replicas, queue-per-priority, monitoring via LLEN metrics fed into dashboards. The complete working project is on GitHub: https://github.com/arnabdas1999/redis-hedis-queue Drop questions in the comments — happy to dig into any of these. Arnab Das is an MS student at NYU Tandon and a software engineer who worked on payment infrastructure at Juspay, processing 200M+ daily transactions. Find him on LinkedIn and GitHub. Templates let you quickly answer FAQs or store snippets for re-use. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse

Code Block

Copy

┌──────────────┐ LPUSH ┌─────────────────┐ BRPOP ┌──────────────┐ │ Producer │ ──────────────────▶ │ Redis Queue │ ─────────────────▶ │ Consumer │ │ (Job sender)│ │ (List: jobs) │ │ (Job worker) │ └──────────────┘ └─────────────────┘ └──────────────┘ ┌──────────────┐ LPUSH ┌─────────────────┐ BRPOP ┌──────────────┐ │ Producer │ ──────────────────▶ │ Redis Queue │ ─────────────────▶ │ Consumer │ │ (Job sender)│ │ (List: jobs) │ │ (Job worker) │ └──────────────┘ └─────────────────┘ └──────────────┘ ┌──────────────┐ LPUSH ┌─────────────────┐ BRPOP ┌──────────────┐ │ Producer │ ──────────────────▶ │ Redis Queue │ ─────────────────▶ │ Consumer │ │ (Job sender)│ │ (List: jobs) │ │ (Job worker) │ └──────────────┘ └─────────────────┘ └──────────────┘ mkdir redis-queue && cd redis-queue cabal init --non-interactive mkdir redis-queue && cd redis-queue cabal init --non-interactive mkdir redis-queue && cd redis-queue cabal init --non-interactive build-depends: base >= 4.14, hedis >= 0.15, aeson >= 2.0, text >= 1.2, bytestring >= 0.11, async >= 2.2 build-depends: base >= 4.14, hedis >= 0.15, aeson >= 2.0, text >= 1.2, bytestring >= 0.11, async >= 2.2 build-depends: base >= 4.14, hedis >= 0.15, aeson >= 2.0, text >= 1.2, bytestring >= 0.11, async >= 2.2 cabal build cabal build cabal build -- Open a connection pool conn <- connect defaultConnectInfo -- Run Redis commands inside runRedis runRedis conn $ do set "hello" "world" get "hello" -- returns Right (Just "world") -- Open a connection pool conn <- connect defaultConnectInfo -- Run Redis commands inside runRedis runRedis conn $ do set "hello" "world" get "hello" -- returns Right (Just "world") -- Open a connection pool conn <- connect defaultConnectInfo -- Run Redis commands inside runRedis runRedis conn $ do set "hello" "world" get "hello" -- returns Right (Just "world") {-# LANGUAGE DeriveGeneric #-} module Job where import Data.Aeson (FromJSON, ToJSON, encode, decode) import Data.Text (Text) import GHC.Generics (Generic) import Data.ByteString.Lazy (ByteString) -- Our job payload — swap this for whatever your domain needs data Job = Job { jobId :: Text , jobType :: Text , payload :: Text } deriving (Show, Eq, Generic) instance ToJSON Job instance FromJSON Job -- The Redis key we'll use as our queue queueKey :: ByteString queueKey = "jobs:queue" {-# LANGUAGE DeriveGeneric #-} module Job where import Data.Aeson (FromJSON, ToJSON, encode, decode) import Data.Text (Text) import GHC.Generics (Generic) import Data.ByteString.Lazy (ByteString) -- Our job payload — swap this for whatever your domain needs data Job = Job { jobId :: Text , jobType :: Text , payload :: Text } deriving (Show, Eq, Generic) instance ToJSON Job instance FromJSON Job -- The Redis key we'll use as our queue queueKey :: ByteString queueKey = "jobs:queue" {-# LANGUAGE DeriveGeneric #-} module Job where import Data.Aeson (FromJSON, ToJSON, encode, decode) import Data.Text (Text) import GHC.Generics (Generic) import Data.ByteString.Lazy (ByteString) -- Our job payload — swap this for whatever your domain needs data Job = Job { jobId :: Text , jobType :: Text , payload :: Text } deriving (Show, Eq, Generic) instance ToJSON Job instance FromJSON Job -- The Redis key we'll use as our queue queueKey :: ByteString queueKey = "jobs:queue" module Producer where import Database.Redis import Data.Aeson (encode) import Data.ByteString.Lazy (toStrict) import Control.Monad (forM_) import Job -- Push a single job onto the left end of the list enqueue :: Connection -> Job -> IO () enqueue conn job = do let encoded = toStrict (encode job) result <- runRedis conn $ lpush queueKey [encoded] case result of Left err -> putStrLn $ "Enqueue error: " ++ show err Right count -> putStrLn $ "Job enqueued. Queue depth: " ++ show count -- Simulate a burst of jobs — e.g. end-of-day refund batch producerMain :: Connection -> IO () producerMain conn = do let jobs = [ Job "txn-001" "refund" "{\"amount\": 500, \"currency\": \"INR\"}" , Job "txn-002" "refund" "{\"amount\": 1200, \"currency\": \"INR\"}" , Job "txn-003" "notify" "{\"email\": \"[email protected]\"}" , Job "txn-004" "refund" "{\"amount\": 300, \"currency\": \"USD\"}" , Job "txn-005" "notify" "{\"email\": \"[email protected]\"}" ] putStrLn "Producer starting — pushing jobs..." forM_ jobs (enqueue conn) putStrLn "Producer done." module Producer where import Database.Redis import Data.Aeson (encode) import Data.ByteString.Lazy (toStrict) import Control.Monad (forM_) import Job -- Push a single job onto the left end of the list enqueue :: Connection -> Job -> IO () enqueue conn job = do let encoded = toStrict (encode job) result <- runRedis conn $ lpush queueKey [encoded] case result of Left err -> putStrLn $ "Enqueue error: " ++ show err Right count -> putStrLn $ "Job enqueued. Queue depth: " ++ show count -- Simulate a burst of jobs — e.g. end-of-day refund batch producerMain :: Connection -> IO () producerMain conn = do let jobs = [ Job "txn-001" "refund" "{\"amount\": 500, \"currency\": \"INR\"}" , Job "txn-002" "refund" "{\"amount\": 1200, \"currency\": \"INR\"}" , Job "txn-003" "notify" "{\"email\": \"[email protected]\"}" , Job "txn-004" "refund" "{\"amount\": 300, \"currency\": \"USD\"}" , Job "txn-005" "notify" "{\"email\": \"[email protected]\"}" ] putStrLn "Producer starting — pushing jobs..." forM_ jobs (enqueue conn) putStrLn "Producer done." module Producer where import Database.Redis import Data.Aeson (encode) import Data.ByteString.Lazy (toStrict) import Control.Monad (forM_) import Job -- Push a single job onto the left end of the list enqueue :: Connection -> Job -> IO () enqueue conn job = do let encoded = toStrict (encode job) result <- runRedis conn $ lpush queueKey [encoded] case result of Left err -> putStrLn $ "Enqueue error: " ++ show err Right count -> putStrLn $ "Job enqueued. Queue depth: " ++ show count -- Simulate a burst of jobs — e.g. end-of-day refund batch producerMain :: Connection -> IO () producerMain conn = do let jobs = [ Job "txn-001" "refund" "{\"amount\": 500, \"currency\": \"INR\"}" , Job "txn-002" "refund" "{\"amount\": 1200, \"currency\": \"INR\"}" , Job "txn-003" "notify" "{\"email\": \"[email protected]\"}" , Job "txn-004" "refund" "{\"amount\": 300, \"currency\": \"USD\"}" , Job "txn-005" "notify" "{\"email\": \"[email protected]\"}" ] putStrLn "Producer starting — pushing jobs..." forM_ jobs (enqueue conn) putStrLn "Producer done." module Consumer where import Database.Redis import Data.Aeson (decode) import Data.ByteString.Lazy (fromStrict) import Control.Monad (forever) import Job -- Process a single job — replace this with your real business logic processJob :: Job -> IO () processJob job = putStrLn $ "[Worker] Processing " ++ show (jobType job) ++ " | ID: " ++ show (jobId job) ++ " | Payload: " ++ show (payload job) -- Block until a job is available, then process it consumeOne :: Connection -> IO () consumeOne conn = do result <- runRedis conn $ brpop [queueKey] 30 -- 30s timeout case result of Left err -> putStrLn $ "Redis error: " ++ show err Right Nothing -> putStrLn "Timeout — no jobs in 30s, polling again..." Right (Just (_, raw)) -> case decode (fromStrict raw) of Nothing -> putStrLn $ "Failed to decode job: " ++ show raw Just job -> processJob job -- Run forever, consuming jobs as they arrive consumerMain :: Connection -> IO () consumerMain conn = do putStrLn "Consumer started — waiting for jobs..." forever (consumeOne conn) module Consumer where import Database.Redis import Data.Aeson (decode) import Data.ByteString.Lazy (fromStrict) import Control.Monad (forever) import Job -- Process a single job — replace this with your real business logic processJob :: Job -> IO () processJob job = putStrLn $ "[Worker] Processing " ++ show (jobType job) ++ " | ID: " ++ show (jobId job) ++ " | Payload: " ++ show (payload job) -- Block until a job is available, then process it consumeOne :: Connection -> IO () consumeOne conn = do result <- runRedis conn $ brpop [queueKey] 30 -- 30s timeout case result of Left err -> putStrLn $ "Redis error: " ++ show err Right Nothing -> putStrLn "Timeout — no jobs in 30s, polling again..." Right (Just (_, raw)) -> case decode (fromStrict raw) of Nothing -> putStrLn $ "Failed to decode job: " ++ show raw Just job -> processJob job -- Run forever, consuming jobs as they arrive consumerMain :: Connection -> IO () consumerMain conn = do putStrLn "Consumer started — waiting for jobs..." forever (consumeOne conn) module Consumer where import Database.Redis import Data.Aeson (decode) import Data.ByteString.Lazy (fromStrict) import Control.Monad (forever) import Job -- Process a single job — replace this with your real business logic processJob :: Job -> IO () processJob job = putStrLn $ "[Worker] Processing " ++ show (jobType job) ++ " | ID: " ++ show (jobId job) ++ " | Payload: " ++ show (payload job) -- Block until a job is available, then process it consumeOne :: Connection -> IO () consumeOne conn = do result <- runRedis conn $ brpop [queueKey] 30 -- 30s timeout case result of Left err -> putStrLn $ "Redis error: " ++ show err Right Nothing -> putStrLn "Timeout — no jobs in 30s, polling again..." Right (Just (_, raw)) -> case decode (fromStrict raw) of Nothing -> putStrLn $ "Failed to decode job: " ++ show raw Just job -> processJob job -- Run forever, consuming jobs as they arrive consumerMain :: Connection -> IO () consumerMain conn = do putStrLn "Consumer started — waiting for jobs..." forever (consumeOne conn) module Main where import Database.Redis import Control.Concurrent.Async (concurrently_) import Producer import Consumer main :: IO () main = do -- Connect to local Redis; swap defaultConnectInfo for your host/port/auth conn <- connect defaultConnectInfo -- Run producer and consumer concurrently -- In production you'd run these as separate processes/services concurrently_ (producerMain conn) (consumerMain conn) module Main where import Database.Redis import Control.Concurrent.Async (concurrently_) import Producer import Consumer main :: IO () main = do -- Connect to local Redis; swap defaultConnectInfo for your host/port/auth conn <- connect defaultConnectInfo -- Run producer and consumer concurrently -- In production you'd run these as separate processes/services concurrently_ (producerMain conn) (consumerMain conn) module Main where import Database.Redis import Control.Concurrent.Async (concurrently_) import Producer import Consumer main :: IO () main = do -- Connect to local Redis; swap defaultConnectInfo for your host/port/auth conn <- connect defaultConnectInfo -- Run producer and consumer concurrently -- In production you'd run these as separate processes/services concurrently_ (producerMain conn) (consumerMain conn) # Terminal 1 — start Redis redis-server # Terminal 2 — run the app cabal run redis-queue # Terminal 1 — start Redis redis-server # Terminal 2 — run the app cabal run redis-queue # Terminal 1 — start Redis redis-server # Terminal 2 — run the app cabal run redis-queue Producer starting — pushing jobs... Job enqueued. Queue depth: 1 Job enqueued. Queue depth: 2 Job enqueued. Queue depth: 3 Job enqueued. Queue depth: 4 Job enqueued. Queue depth: 5 Producer done. Consumer started — waiting for jobs... [Worker] Processing "refund" | ID: "txn-001" | Payload: "{"amount": 500, "currency": "INR"}" [Worker] Processing "refund" | ID: "txn-002" | Payload: "{"amount": 1200, "currency": "INR"}" [Worker] Processing "notify" | ID: "txn-003" | Payload: "{"email": "[email protected]"}" [Worker] Processing "refund" | ID: "txn-004" | Payload: "{"amount": 300, "currency": "USD"}" [Worker] Processing "notify" | ID: "txn-005" | Payload: "{"email": "[email protected]"}" Producer starting — pushing jobs... Job enqueued. Queue depth: 1 Job enqueued. Queue depth: 2 Job enqueued. Queue depth: 3 Job enqueued. Queue depth: 4 Job enqueued. Queue depth: 5 Producer done. Consumer started — waiting for jobs... [Worker] Processing "refund" | ID: "txn-001" | Payload: "{"amount": 500, "currency": "INR"}" [Worker] Processing "refund" | ID: "txn-002" | Payload: "{"amount": 1200, "currency": "INR"}" [Worker] Processing "notify" | ID: "txn-003" | Payload: "{"email": "[email protected]"}" [Worker] Processing "refund" | ID: "txn-004" | Payload: "{"amount": 300, "currency": "USD"}" [Worker] Processing "notify" | ID: "txn-005" | Payload: "{"email": "[email protected]"}" Producer starting — pushing jobs... Job enqueued. Queue depth: 1 Job enqueued. Queue depth: 2 Job enqueued. Queue depth: 3 Job enqueued. Queue depth: 4 Job enqueued. Queue depth: 5 Producer done. Consumer started — waiting for jobs... [Worker] Processing "refund" | ID: "txn-001" | Payload: "{"amount": 500, "currency": "INR"}" [Worker] Processing "refund" | ID: "txn-002" | Payload: "{"amount": 1200, "currency": "INR"}" [Worker] Processing "notify" | ID: "txn-003" | Payload: "{"email": "[email protected]"}" [Worker] Processing "refund" | ID: "txn-004" | Payload: "{"amount": 300, "currency": "USD"}" [Worker] Processing "notify" | ID: "txn-005" | Payload: "{"email": "[email protected]"}" import Control.Concurrent.Async (replicateConcurrently_) main :: IO () main = do conn <- connect defaultConnectInfo -- Run 4 parallel consumer workers concurrently_ (producerMain conn) (replicateConcurrently_ 4 (consumerMain conn)) import Control.Concurrent.Async (replicateConcurrently_) main :: IO () main = do conn <- connect defaultConnectInfo -- Run 4 parallel consumer workers concurrently_ (producerMain conn) (replicateConcurrently_ 4 (consumerMain conn)) import Control.Concurrent.Async (replicateConcurrently_) main :: IO () main = do conn <- connect defaultConnectInfo -- Run 4 parallel consumer workers concurrently_ (producerMain conn) (replicateConcurrently_ 4 (consumerMain conn)) # In a Redis CLI while the app runs: redis-cli LLEN jobs:queue # current queue depth redis-cli MONITOR # watch every command in real time # In a Redis CLI while the app runs: redis-cli LLEN jobs:queue # current queue depth redis-cli MONITOR # watch every command in real time # In a Redis CLI while the app runs: redis-cli LLEN jobs:queue # current queue depth redis-cli MONITOR # watch every command in real time deadLetterKey :: ByteString deadLetterKey = "jobs:dead" -- Consume with failure handling consumeSafe :: Connection -> IO () consumeSafe conn = do result <- runRedis conn $ brpop [queueKey] 30 case result of Right (Just (_, raw)) -> case decode (fromStrict raw) of Nothing -> do -- Malformed payload — send to dead letter queue _ <- runRedis conn $ lpush deadLetterKey [raw] putStrLn "Malformed job moved to dead letter queue" Just job -> -- Wrap in exception handler for business logic failures processJob job `catch` \(e :: SomeException) -> do _ <- runRedis conn $ lpush deadLetterKey [raw] putStrLn $ "Job failed, dead-lettered: " ++ show e _ -> pure () deadLetterKey :: ByteString deadLetterKey = "jobs:dead" -- Consume with failure handling consumeSafe :: Connection -> IO () consumeSafe conn = do result <- runRedis conn $ brpop [queueKey] 30 case result of Right (Just (_, raw)) -> case decode (fromStrict raw) of Nothing -> do -- Malformed payload — send to dead letter queue _ <- runRedis conn $ lpush deadLetterKey [raw] putStrLn "Malformed job moved to dead letter queue" Just job -> -- Wrap in exception handler for business logic failures processJob job `catch` \(e :: SomeException) -> do _ <- runRedis conn $ lpush deadLetterKey [raw] putStrLn $ "Job failed, dead-lettered: " ++ show e _ -> pure () deadLetterKey :: ByteString deadLetterKey = "jobs:dead" -- Consume with failure handling consumeSafe :: Connection -> IO () consumeSafe conn = do result <- runRedis conn $ brpop [queueKey] 30 case result of Right (Just (_, raw)) -> case decode (fromStrict raw) of Nothing -> do -- Malformed payload — send to dead letter queue _ <- runRedis conn $ lpush deadLetterKey [raw] putStrLn "Malformed job moved to dead letter queue" Just job -> -- Wrap in exception handler for business logic failures processJob job `catch` \(e :: SomeException) -> do _ <- runRedis conn $ lpush deadLetterKey [raw] putStrLn $ "Job failed, dead-lettered: " ++ show e _ -> pure () import Database.Redis productionConnInfo :: ConnectInfo productionConnInfo = defaultConnectInfo { connectHost = "your-redis-host.example.com" , connectPort = PortNumber 6379 , connectAuth = Just "your-auth-password" , connectDatabase = 0 , connectMaxConnections = 50 -- connection pool size } main :: IO () main = do conn <- connect productionConnInfo ... import Database.Redis productionConnInfo :: ConnectInfo productionConnInfo = defaultConnectInfo { connectHost = "your-redis-host.example.com" , connectPort = PortNumber 6379 , connectAuth = Just "your-auth-password" , connectDatabase = 0 , connectMaxConnections = 50 -- connection pool size } main :: IO () main = do conn <- connect productionConnInfo ... import Database.Redis productionConnInfo :: ConnectInfo productionConnInfo = defaultConnectInfo { connectHost = "your-redis-host.example.com" , connectPort = PortNumber 6379 , connectAuth = Just "your-auth-password" , connectDatabase = 0 , connectMaxConnections = 50 -- connection pool size } main :: IO () main = do conn <- connect productionConnInfo ... - Low latency — sub-millisecond enqueue/dequeue - Simplicity — no broker clusters to manage - Atomicity — LPUSH/BRPOP are atomic operations, safe under concurrency - Visibility — you can inspect the queue state instantly with LLEN - Producer pushes JSON-encoded jobs onto a Redis list using LPUSH - Consumer blocks on BRPOP — waking up the instant a job arrives - Multiple consumers can run in parallel, each pulling distinct jobs atomically - GHC + Cabal (or Stack) installed - A running Redis instance (redis-server or Docker: docker run -p 6379:6379 redis) - Basic familiarity with Haskell (do notation, IO) - LPUSH + BRPOP is Redis's native producer-consumer primitive — atomic, fast, and simple - Hedis gives you a type-safe, monadic interface to Redis in Haskell with connection pooling built in - Blocking pop (BRPOP) beats polling — zero CPU overhead while the queue is idle - Dead-letter queues are non-negotiable in production — never let failed jobs disappear silently - This pattern scales horizontally: add consumers, add producers, the queue fans out automatically - Priority queues — use multiple Redis lists (jobs:high, jobs:low) and pass both keys to BRPOP; Redis pops from the first non-empty list - Delayed jobs — use a Redis Sorted Set with the scheduled timestamp as the score; a scheduler process moves ready jobs to the main queue - Exactly-once delivery — combine BRPOPLPUSH with a processing list and a visibility timeout