Tools
Using LMAX Disruptor to build a high-performance in-memory event broker in Java.
2025-12-29
0 views
admin
Why Java when Python and Go exist? ## Typical event bus based on locks ## The ring buffer ## Dispatch strategy ## Choosing a WaitStrategy ## Additional features of this experimental event bus ## Scope-based security ## Copy-on-Write subscribers ## Benchmark results ## What this experimental even bus does not do ## To conclude I spend most of my time writing Python code for data analysis and engineering, filesystem operations, and web projects, with some Go thrown in sometimes because concurrency seems simpler or Rust for novelty. But I started programming with C and learned OOP through Java back in the day. After seven years away from the JVM, I got curious about what's changed and decided to dive back in by building something performance-critical. The result is an event router that processes 7+ million dispatches per second on a laptop by eliminating the two main bottlenecks: locks and memory allocation. As a Python developer, I'd typically reach for libraries like PyPubSub or Blinker for event handling, which work well for I/O-bound applications but struggle with CPU-intensive event processing due to the GIL for Python versions before 3.14. Go's channel-based concurrency model handles events elegantly with goroutines, and libraries like EventBus provide pub-sub patterns that feel natural in Go's ecosystem. However, neither ecosystem has a direct equivalent to Disruptor's mechanical sympathy approach. Python's interpreter overhead and Go's garbage collector (though better than Python's) both introduce latency that becomes visible at millions of events per second. If you're building a system where a few microseconds per event multiplied by millions of events actually matters (financial systems, real-time analytics, game servers), Java's mature JIT compilation, fine-tuned GC options, and libraries like Disruptor that exploit CPU cache behavior offer performance that's hard to match. A typical event bus uses queues wrapped in synchronized blocks. When thread A wants to publish an event, it locks the queue, adds the event, and unlocks. Thread B does the same. At high throughput, threads spend more time waiting for locks than doing actual work. Java's ConcurrentLinkedQueue helps but still uses compare-and-swap operations that create memory barriers.
Another issue is object allocation. Creating a new Event object for each message generates garbage. With millions of events per second, the garbage collector runs constantly and pauses the application. Disruptor is basically a very good application of the circular buffer, a fundamental data structure where a fixed-size array wraps around to reuse slots. Anyone who's studied data structures and algorithms has likely encountered this pattern, but Disruptor optimizes it specifically for multi-threaded event processing with lock-free mechanics. The GIF from https://upload.wikimedia.org/wikipedia/commons/f/fd/Circular_Buffer_Animation.gif. Picture a circular array of 16,384 pre-allocated Event slots. Publishers claim the next available slot, write their data directly into it, and mark it ready. Consumers read from their current position and advance forward. No locks. No allocations. Just atomic sequence numbers that coordinate who writes where. The ring wraps around, reusing the same Event objects forever. When a producer publishes, it calls ringBuffer.next() to claim a sequence number, copies data into ringBuffer.get(sequence), and calls ringBuffer.publish(sequence). By copying, I mean setting the valus of the attributes of current Event instance so there is no need to create a new instance. Here is a simple example: The consumer sees published sequences and processes them in order. Once an event is in the ring buffer, the router dispatches it to subscribers. Each event type has a subscriber list to identify all its subscribers quickly. When an event arrives, the router looks up its type and submits a task to a thread pool for each subscriber. The thread pool size matches CPU cores. This parallelizes subscriber callbacks across multiple threads while keeping the main ring buffer processing single-threaded and fast.
Each subscriber has its own single-threaded executor onEvent that queues incoming events. This preserves ordering per subscriber (event A always processes before event B if A was published first) while preventing one slow subscriber from blocking others. Disruptor offers several wait strategies that control how consumers behave when no new events are available. Among others, there are: I chose YieldingWaitStrategy because it sits in the middle ground. When no events are ready, it calls Thread.yield() to hint to the scheduler that other threads can run, then immediately checks again. This keeps latency low without pinning the CPU at 100% like busy spinning would. For a system that processes bursts of events with occasional quiet periods, yielding provides good throughput without wasteful spinning during idle times. Events have scope levels: Components can only interact with events at or below their scope level.
When registering an event type, its scope must be specified. When subscribing to an event type, the router checks that the specified scope is broad enough to encompass the event type's scope. When publishing an event, the router verifies that the event type exists. This prevents untrusted code from accessing sensitive event streams.
The EventRegistry stores registered events and their type as a ConcurrentHashMap<String, Scope>. Looking up an event's scope is a single hash table read, adding microseconds of overhead. Each event type maps to a SubscriberList containing an immutable List<Subscriber>. When someone subscribes, the router creates a new list with the additional subscriber and swaps it in atomically. Readers see a consistent snapshot without synchronization. Writers pay the cost of copying the list, but subscriptions are rare compared to event dispatches. This optimizes the hot path (dispatching) at the expense of the cold path (subscribing). The benchmark spawns 4 producer threads that each publish 250,000 events across 5 event types. Each type has 4 subscribers, creating 4M total dispatch operations (1M events × 4 subscribers each). On an Intel Core i7-13600H with 16GB RAM (and other programs running), average time over 50 runs was 922ms, giving 4.34M dispatches per second. On an Intel Core Ultra 7-155H with 32GB RAM, it dropped to 528ms, giving 7.58Mdispatches per second. These numbers were obtained for light string payloads and subscribers that just increment a counter. Real-world throughput depends on what subscribers actually do with the events, but the router itself adds minimal latency. Disruptor is advertised to perform way better. Disruptor was developed by LMAX, a trading platform that aims to be the "fastest trading platform in the world". It was very fun to understand and use this library while diving back in the Java ecosystem after seven years away. The code is available on GitHub. Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment's permalink. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse COMMAND_BLOCK:
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType; /** * A router is an event bus for subscribers to attach to and receive relevant events. */
public class EventRouter { private final @NotNull Disruptor<@NotNull Event> disruptor; private final @NotNull RingBuffer<@NotNull Event> ringBuffer; /** * Publish an event in the event bus. */ public void publish(@NotNull Event e) { String type = e.getType(); long sequence = this.ringBuffer.next(); try { // type, from, payload and timestamp are custom attributes for the custom Event class. Event bufferedEvent = ringBuffer.get(sequence); # The following are arbitrary attributes. bufferedEvent.setType(e.getType()); bufferedEvent.setFrom(e.getFrom()); bufferedEvent.setPayload(e.getPayload()); bufferedEvent.setTimestamp(e.getTimestamp()); } finally { this.ringBuffer.publish(sequence); } }
} Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType; /** * A router is an event bus for subscribers to attach to and receive relevant events. */
public class EventRouter { private final @NotNull Disruptor<@NotNull Event> disruptor; private final @NotNull RingBuffer<@NotNull Event> ringBuffer; /** * Publish an event in the event bus. */ public void publish(@NotNull Event e) { String type = e.getType(); long sequence = this.ringBuffer.next(); try { // type, from, payload and timestamp are custom attributes for the custom Event class. Event bufferedEvent = ringBuffer.get(sequence); # The following are arbitrary attributes. bufferedEvent.setType(e.getType()); bufferedEvent.setFrom(e.getFrom()); bufferedEvent.setPayload(e.getPayload()); bufferedEvent.setTimestamp(e.getTimestamp()); } finally { this.ringBuffer.publish(sequence); } }
} COMMAND_BLOCK:
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType; /** * A router is an event bus for subscribers to attach to and receive relevant events. */
public class EventRouter { private final @NotNull Disruptor<@NotNull Event> disruptor; private final @NotNull RingBuffer<@NotNull Event> ringBuffer; /** * Publish an event in the event bus. */ public void publish(@NotNull Event e) { String type = e.getType(); long sequence = this.ringBuffer.next(); try { // type, from, payload and timestamp are custom attributes for the custom Event class. Event bufferedEvent = ringBuffer.get(sequence); # The following are arbitrary attributes. bufferedEvent.setType(e.getType()); bufferedEvent.setFrom(e.getFrom()); bufferedEvent.setPayload(e.getPayload()); bufferedEvent.setTimestamp(e.getTimestamp()); } finally { this.ringBuffer.publish(sequence); } }
} COMMAND_BLOCK:
/** * Thread pool for dispatching events to subscribers. */
private final ExecutorService DISPATCH_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); ... /** * Dispatch an event to all its subscribers. */
private void dispatch(@NotNull Event e, long sequence, boolean endOfBatch) { // See section 'Copy-on-Write subscribers' SubscriberList holder = this.subscribers.get(e.getType()); if (holder == null) return; var subs = holder.list; if (subs.isEmpty()) return; if (subs.size() == 1) subs.getFirst().onEvent(e); else { for (Subscriber sub : subs) DISPATCH_POOL.submit(() -> sub.onEvent(e)); }
} Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
/** * Thread pool for dispatching events to subscribers. */
private final ExecutorService DISPATCH_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); ... /** * Dispatch an event to all its subscribers. */
private void dispatch(@NotNull Event e, long sequence, boolean endOfBatch) { // See section 'Copy-on-Write subscribers' SubscriberList holder = this.subscribers.get(e.getType()); if (holder == null) return; var subs = holder.list; if (subs.isEmpty()) return; if (subs.size() == 1) subs.getFirst().onEvent(e); else { for (Subscriber sub : subs) DISPATCH_POOL.submit(() -> sub.onEvent(e)); }
} COMMAND_BLOCK:
/** * Thread pool for dispatching events to subscribers. */
private final ExecutorService DISPATCH_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); ... /** * Dispatch an event to all its subscribers. */
private void dispatch(@NotNull Event e, long sequence, boolean endOfBatch) { // See section 'Copy-on-Write subscribers' SubscriberList holder = this.subscribers.get(e.getType()); if (holder == null) return; var subs = holder.list; if (subs.isEmpty()) return; if (subs.size() == 1) subs.getFirst().onEvent(e); else { for (Subscriber sub : subs) DISPATCH_POOL.submit(() -> sub.onEvent(e)); }
} COMMAND_BLOCK:
/** * A subscriber listens to a given number of event types in his scope's range. */
public abstract class Subscriber implements AutoCloseable { /*** * This executor ensures events are processed one at a time, in the order they are received, * without blocking the event router. */ @NotNull private final ExecutorService exec = Executors.newSingleThreadExecutor(); /** * Return the scope of this subscriber. */ @NotNull public abstract Scope scope(); /** * Process an event received from a router. */ protected abstract void processEvent(@NotNull Event e); /** * Send data to this subscriber. This is fast and does not block the caller. */ public final void onEvent(@NotNull Event e) { this.exec.submit(() -> this.processEvent(e)); } @Override public void close() throws TimeoutException, InterruptedException { this.exec.shutdown(); if (!this.exec.awaitTermination(1, TimeUnit.MINUTES)) throw new TimeoutException("subscriber's executor thread termination timed out"); }
} Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
/** * A subscriber listens to a given number of event types in his scope's range. */
public abstract class Subscriber implements AutoCloseable { /*** * This executor ensures events are processed one at a time, in the order they are received, * without blocking the event router. */ @NotNull private final ExecutorService exec = Executors.newSingleThreadExecutor(); /** * Return the scope of this subscriber. */ @NotNull public abstract Scope scope(); /** * Process an event received from a router. */ protected abstract void processEvent(@NotNull Event e); /** * Send data to this subscriber. This is fast and does not block the caller. */ public final void onEvent(@NotNull Event e) { this.exec.submit(() -> this.processEvent(e)); } @Override public void close() throws TimeoutException, InterruptedException { this.exec.shutdown(); if (!this.exec.awaitTermination(1, TimeUnit.MINUTES)) throw new TimeoutException("subscriber's executor thread termination timed out"); }
} COMMAND_BLOCK:
/** * A subscriber listens to a given number of event types in his scope's range. */
public abstract class Subscriber implements AutoCloseable { /*** * This executor ensures events are processed one at a time, in the order they are received, * without blocking the event router. */ @NotNull private final ExecutorService exec = Executors.newSingleThreadExecutor(); /** * Return the scope of this subscriber. */ @NotNull public abstract Scope scope(); /** * Process an event received from a router. */ protected abstract void processEvent(@NotNull Event e); /** * Send data to this subscriber. This is fast and does not block the caller. */ public final void onEvent(@NotNull Event e) { this.exec.submit(() -> this.processEvent(e)); } @Override public void close() throws TimeoutException, InterruptedException { this.exec.shutdown(); if (!this.exec.awaitTermination(1, TimeUnit.MINUTES)) throw new TimeoutException("subscriber's executor thread termination timed out"); }
} - BlockingWaitStrategy uses locks and is CPU-friendly but adds latency.
- BusySpinWaitStrategy burns CPU cycles in a tight loop for the lowest possible latency.
- SleepingWaitStrategy backs off progressively, balancing CPU usage with reasonable latency. - PUBLIC (visible to remote actors)
- FEDERATED (visible to trusted servers)
- PRIVATE (local trusted actors)
- ROOT (full access). - There's no filtering beyond event type.
- There's no wildcards, no content-based routing, no priority queues.
- There's no guaranteed delivery. If a subscriber falls too far behind and the ring buffer wraps around, old unprocessed events get overwritten. The ring buffer has 16K slots per the default configuration, so this requires being 16K events behind, but it can happen.
- There's no persistence. Events exist only in memory. If the process crashes, in-flight events are lost. This is an in-process event bus, not a message queue like Kafka.
- The single-threaded executor per subscriber means slow subscribers build up queues in their executor. There's no back-pressure mechanism to slow down publishers when consumers can't keep up.
how-totutorialguidedev.toaiserverroutingrouterpythongitgithub