Tools
Distributed task queue system
2025-12-14
0 views
admin
Building a Fault-Tolerant Distributed Task Queue from Scratch ## The Problem ## Architecture ## Task Structure ## Raft Consensus ## Leader Election ## Log Replication ## A Note on Persistence ## Failure Scenarios ## Scenario 1: Leader Broker Dies ## Scenario 2: Follower Broker Dies ## Scenario 3: Worker Dies Mid-Task ## Scenario 4: Network Partition ## The Duplicate Assignment Problem ## Sandboxed Execution ## Why Three Brokers? ## The CAP Theorem in Practice ## Threading Model ## Lessons Learned ## Limitations ## Conclusion I implemented Raft consensus so my task queue could survive server crashes. Here's what I learned. You're running a data pipeline that processes thousands of images. Workers are humming along, everything's fine. Then your coordinator server crashes. Tasks vanish. Two workers start processing the same image. Your pipeline grinds to a halt. This is the reality of distributed systems. Machines fail. Networks partition. If you haven't designed for failure, failure cascades into disaster. I wanted to understand how production systems handle this, so I built a distributed task queue from scratch. Not using Redis, but implementing the coordination layer myself using the Raft consensus algorithm. The result: a system that executes Python code across multiple workers, and when a broker crashes, the others continue seamlessly with no data loss. The system has three components: The broker cluster runs the Raft consensus protocol across three nodes. They elect a leader, and all task state is replicated to every node. If one broker crashes, the remaining two continue operating. Workers are stateless. They poll the leader for tasks, execute code in sandboxed subprocesses, and report results. I can scale them horizontally as needed. Clients submit tasks and poll for results. Both workers and clients automatically discover the current leader. An important distinction: the Raft cluster guarantees broker failures don't lose data. Worker failures are detected but handled differently, which I'll cover later. A task packages Python code for remote execution: The lifecycle is simple: pending → processing → completed. The broker tracks each transition, and Raft ensures all brokers agree on the current state. The core problem: three brokers need to maintain identical task state. If I just broadcast updates to all of them, messages arrive at different times, packets get dropped, and each broker ends up with a different view of reality. Raft solves this through leader election and log replication. Every node starts as a follower, waiting to hear from a leader. If a follower doesn't receive a heartbeat within a randomized timeout (1.5 to 3 seconds), it becomes a candidate and requests votes from its peers. The randomized timeout prevents deadlock. If all nodes timed out simultaneously, they'd all vote for themselves and no one would win. Randomization ensures one node usually starts its campaign first. The leader appends commands to its log and sends them to followers. The critical rule: an entry is only "committed" once a majority of nodes have it. This majority requirement is what makes Raft fault-tolerant. If the leader dies after replicating to only one follower, that entry isn't committed yet. The surviving nodes elect a new leader and that uncommitted entry gets rolled back. Consistency is preserved. My implementation keeps everything in memory. Production Raft requires writing currentTerm, votedFor, and the log to disk before acknowledging messages. Without persistence, a crashed node could restart with blank state, vote again in the same term, and violate Raft's safety guarantees. This is acceptable for learning, but not for production. The real test of a distributed system is how it behaves when things go wrong. Let me walk through the key failure scenarios. This is the scenario Raft is designed to handle. Initial state: Broker 1 is the leader. Brokers 2 and 3 are followers. Two workers are connected, processing tasks. The failure: Broker 1 crashes (or gets network partitioned). Worker behavior: Workers' next request to Broker 1 fails with a connection error. They iterate through known broker addresses, query /status, find that Broker 2 is now leader, and reconnect. This takes 1-2 seconds. Data safety: Any task that was committed before the crash is safe. It exists on a majority of nodes, and the new leader has it. Uncommitted entries (acknowledged by fewer than 2 nodes) are lost, but these were never confirmed to clients anyway. This is simpler than leader failure. Initial state: Broker 1 is leader. Broker 3 crashes. What happens: Almost nothing visible. The leader notices Broker 3 isn't responding to AppendEntries, but it can still reach Broker 2. With 2 of 3 nodes alive, the cluster has quorum and continues normally. Writes: Still succeed. The leader replicates to Broker 2, gets acknowledgment, commits. If Broker 3 comes back: It rejoins as a follower, receives missing log entries from the leader, and catches up. The broker detects dead workers and reassigns their tasks automatically.
Initial state: Worker 1 is processing Task A. Worker 2 is idle. The failure: Worker 1 crashes while executing Task A.
What happens: Worker 1 stops sending heartbeats (normally sent every 3 seconds)
The broker's health check thread runs every 5 seconds
After 30 seconds without a heartbeat, the broker marks Worker 1 as dead
The broker finds all tasks assigned to Worker 1 that are still "processing"
For each stuck task, the broker appends a reassign_task command to the Raft log
Once committed, the task status returns to "pending" and worker_id is cleared
Worker 2 polls for work and picks up Task A Why Raft matters here: The reassignment goes through Raft, not just local state. If the leader crashes right after detecting the dead worker, the new leader will have the reassignment in its log. The task won't get stuck.
The trade-off: There's a 30-second window where the task is stuck. This is intentional. Too aggressive a timeout causes false positives (a slow task looks like a dead worker). Production systems tune this based on expected task duration and can implement task-level timeouts separately from worker health checks. A network partition splits the cluster into groups that can't communicate. Initial state: Broker 1 (leader) gets partitioned from Brokers 2 and 3. The minority side (Broker 1): The majority side (Brokers 2 and 3): When the partition heals: This is why the system is CP (Consistent and Partition-tolerant) rather than AP. During the partition, the minority side becomes unavailable rather than serving potentially inconsistent data. There's a subtle bug that's easy to miss when building a task queue. A worker asks for a task. The leader finds a pending task, marks it as "processing," and returns it to the worker. Straightforward enough. But what if the leader crashes immediately after sending the response, before replicating the "processing" status to followers? The new leader doesn't know the task was assigned. Another worker requests a task and receives the same one. Two workers are now running identical jobs. The fix is to wait for Raft to commit before responding: If a worker receives a task, that assignment is guaranteed to exist on a majority of brokers. Even if the leader dies immediately, the new leader knows about it. An important caveat: This prevents duplicate assignment, not duplicate execution. Consider this scenario: The credit card gets charged twice. Solving this requires idempotent task design, like using unique transaction IDs that the payment processor can deduplicate. The queue can guarantee safe assignment, but end-to-end exactly-once execution requires cooperation from the task implementation. Workers execute arbitrary Python code. Running exec() directly in the worker process would be dangerous: malicious code could access credentials, memory leaks would accumulate, and crashes would bring down the entire worker. Each task runs in its own subprocess: This provides memory isolation, timeout enforcement, and crash containment. A proper production sandbox would use containers with CPU and memory limits, but subprocess isolation handles the common cases. Three is the minimum for meaningful fault tolerance with Raft. Two brokers can't tolerate any failures because you need both for quorum. Three brokers tolerate one failure. Five brokers tolerate two failures but add latency since you wait for more acknowledgments. A subtle point: four brokers isn't better than three. You still need three for quorum, so you can still only lose one. Same fault tolerance, more overhead. Odd numbers are preferable. CAP states you can guarantee at most two of: Consistency, Availability, and Partition Tolerance. This system chooses CP. During a network partition, the minority side becomes unavailable (can't reach quorum) rather than serving stale or inconsistent data. For a task queue, this is the right trade-off. Brief unavailability during leader election is acceptable. Losing tasks or running them twice is not. The system uses threads extensively: The system is I/O-bound (waiting on network and subprocesses), so threads work well. Shared state is protected with locks: Consensus is subtle. The Raft paper appears straightforward, but edge cases are everywhere. Getting term numbers right during elections took several iterations. Figure 8 in the paper describes a scenario where committing old-term entries causes inconsistency. It's easy to miss and painful to debug. Design for failure first. Every network call can fail. Every node can crash. The question isn't "what if something fails?" but "what happens when it fails?" Testing distributed systems is hard. I tested by manually killing processes and observing recovery. That works for demos but misses subtle race conditions. Production systems use deterministic simulation to explore failure scenarios exhaustively. No persistence. Everything is in memory. A full cluster restart loses all data. This also violates Raft's safety assumptions, since a restarted node could vote twice in the same term. Stuck tasks from dead workers. If a worker dies mid-task, that task remains in "processing" status indefinitely. The broker detects worker death but doesn't reassign work. Single leader bottleneck. All writes go through one node, limiting write throughput. Basic sandboxing. Subprocess isolation doesn't provide resource limits or network isolation. Building this system taught me more about distributed systems than any textbook. Raft's elegance, CAP's trade-offs, the subtlety of exactly-once semantics: these become concrete when you're debugging why your leader election won't converge or why tasks are getting assigned twice. The core Raft implementation is about 400 lines of Python. If you're interested in distributed systems, I'd recommend building something similar. Start with the Raft paper, implement leader election, add log replication, then build an application on top. The bugs you encounter will be your best teachers. Source code available on GitHub:https://github.com/haechan01/distributed-task-queue
Watch demo video: https://www.loom.com/share/9c9d5846450b443c9e86d2fb5a64a9d7 Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment's permalink. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse CODE_BLOCK:
{ "task_id": "uuid-1234", "task_type": "python_exec", "payload": { "code": "def count_words(text): return len(text.split())", "function": "count_words", "args": ["hello world"] }, "status": "pending"
} Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
{ "task_id": "uuid-1234", "task_type": "python_exec", "payload": { "code": "def count_words(text): return len(text.split())", "function": "count_words", "args": ["hello world"] }, "status": "pending"
} CODE_BLOCK:
{ "task_id": "uuid-1234", "task_type": "python_exec", "payload": { "code": "def count_words(text): return len(text.split())", "function": "count_words", "args": ["hello world"] }, "status": "pending"
} COMMAND_BLOCK:
def _on_election_timeout(self): self.state = RaftState.CANDIDATE self.current_term += 1 self.voted_for = self.node_id # Request votes from all peers... Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
def _on_election_timeout(self): self.state = RaftState.CANDIDATE self.current_term += 1 self.voted_for = self.node_id # Request votes from all peers... COMMAND_BLOCK:
def _on_election_timeout(self): self.state = RaftState.CANDIDATE self.current_term += 1 self.voted_for = self.node_id # Request votes from all peers... COMMAND_BLOCK:
def _try_commit(self): for n in range(len(self.log) - 1, self.commit_index, -1): # Count how many nodes have this entry count = 1 # Leader has it for peer in self.peers: if self.match_index.get(peer, -1) >= n: count += 1 # Commit requires majority (2 of 3 nodes) if count >= majority: self.commit_index = n self._apply_committed() break Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
def _try_commit(self): for n in range(len(self.log) - 1, self.commit_index, -1): # Count how many nodes have this entry count = 1 # Leader has it for peer in self.peers: if self.match_index.get(peer, -1) >= n: count += 1 # Commit requires majority (2 of 3 nodes) if count >= majority: self.commit_index = n self._apply_committed() break COMMAND_BLOCK:
def _try_commit(self): for n in range(len(self.log) - 1, self.commit_index, -1): # Count how many nodes have this entry count = 1 # Leader has it for peer in self.peers: if self.match_index.get(peer, -1) >= n: count += 1 # Commit requires majority (2 of 3 nodes) if count >= majority: self.commit_index = n self._apply_committed() break COMMAND_BLOCK:
target_index = self._append_and_get_index(command)
if not self._wait_for_commit(target_index, timeout=1.0): return jsonify({"error": "Commit timeout"}), 503
return jsonify(task), 200 # Only return after majority has it Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
target_index = self._append_and_get_index(command)
if not self._wait_for_commit(target_index, timeout=1.0): return jsonify({"error": "Commit timeout"}), 503
return jsonify(task), 200 # Only return after majority has it COMMAND_BLOCK:
target_index = self._append_and_get_index(command)
if not self._wait_for_commit(target_index, timeout=1.0): return jsonify({"error": "Commit timeout"}), 503
return jsonify(task), 200 # Only return after majority has it CODE_BLOCK:
result = subprocess.run( ['python3', script_path, json.dumps(args)], capture_output=True, timeout=30
) Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
result = subprocess.run( ['python3', script_path, json.dumps(args)], capture_output=True, timeout=30
) CODE_BLOCK:
result = subprocess.run( ['python3', script_path, json.dumps(args)], capture_output=True, timeout=30
) CODE_BLOCK:
self.commit_lock = threading.Lock()
self.commit_condition = threading.Condition(self.commit_lock) Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
self.commit_lock = threading.Lock()
self.commit_condition = threading.Condition(self.commit_lock) CODE_BLOCK:
self.commit_lock = threading.Lock()
self.commit_condition = threading.Condition(self.commit_lock) - Brokers 2 and 3 stop receiving heartbeats from Broker 1
- Their election timers expire (within 1.5-3 seconds)
- One of them (say Broker 2) times out first, becomes a candidate, and requests votes
- Broker 3 grants its vote (Broker 1 is unreachable)
- Broker 2 becomes the new leader and starts sending heartbeats - Continues thinking it's leader
- Cannot commit any new entries (can't reach quorum)
- Workers connected to it get timeout errors - Election timeout triggers
- One becomes the new leader
- Continues serving requests normally - Broker 1 sees a higher term number from the new leader
- Broker 1 steps down to follower
- Any uncommitted entries on Broker 1 are discarded
- Broker 1 syncs its log with the new leader - Worker receives task
- Worker executes it (charges a credit card)
- Worker crashes before reporting completion
- Task times out and gets reassigned
- Another worker executes it again - Flask runs with threaded=True, handling concurrent HTTP requests
- Raft messages are sent in background threads to avoid blocking
- Election and heartbeat timers run in separate threads
- Workers send heartbeats in a background thread while processing tasks
how-totutorialguidedev.toaimlservernetworknodepythonsslgitgithub