Tools: Networking Deep Dive With io_uring part 2 - Bridge the Async Model C#
In part 1 we built the minimal io_uring loop: setup, mmaps, SQE/CQE draining, accept/recv/send via opcode dispatch.This was the first step to understanding how the kernel interface works but the dispatch logic was basically hand coded, a state machine that would not scale.This second part is all about introducing the asynchronous model to await data pushed by the kernel. For simplicity, in this article scope we will simply return the number of bytes received, in later parts it will be described how to adapt this to return the actual request data and parse it. As usual, the full source code can be found at .... Why not just use Task? The direct option would be to just return a Task, completed by the dispatcher when a CQE arrives, to avoid the allocations this would cause for every asynchronous read we are going with the zero allocation option. ValueTask over a reusable source, the high performance path, the source is a single object that lives as long as the TCP connection. These three methods are all the runtime needs to drive the await "machinery". The token is just a safety net, every Reset() bumps an internal version, like a generation counter so that a stale awaiter passing an old token gets caught. Typically we don't need to implement these three methods from scratch, the BCL provides ManualResetValueTaskSourceCore, a struct that holds/owns the value, version, captured continuation and the scheduling context. We can delegate the interface methods to it The generic T is RecvSnapshot, a small struct that captures what is available to read, the handler awaits this snapshot and drains the items it covers. This will be covered further on, for now just think of it as a snapshot that points to a circular buffer tail, the reason we need this and can't just drain the entire buffer is because we can receive data as we process this, the tail can change anytime and that would potentially cause a desync when extracting data. _readSignal exposes Version, Reset(), SetResult(value) and SetException(ex), that is the whole API surface we will use. The "Manual" in ManualResetValueTaskSourceCore is the contract, Reset() must be called between completions, without it the next SetResult() would throw, this is intentional design, reuses without explicit resets could mask state management bugs. Each Connection has two sides: Most of the time these two are synchronized in the simplest case: the consumer parks waiting for data, the producer wakes it. There are however gaps that need to be addressed. Both cases boil down to the same issue/need, a buffer between the producer and consumer so that data is preserved no matter what runs first. A solution is a bounded single producer single consumer ring, producer enqueues from the dispatcher and the consumer dequeues from ReadAsync, this structure implementation can be found at SpscRecvRing.cs. One important characteristic is that this is a ring or circular buffer, which size must be a power of 2, this removes the need to ever need to clear or reset the buffer.In this part 2 scope we only care about Len, the number of bytes received, Bid(Buffer Id) and HasBuffer are for returning the buffer to the kernel ring and will be properly covered in part 3. The two interesting methods are SnapshotTail and TryDequeueUntil, together they let the consumer take a "picture" of what was available at that moment and drain everything up to that point in a single batch, without chasing a moving tail, as was explained above. Before moving on I'd like to leave a "small" paragraph here, I've been confronted many times with "but why do you say run synchronously? Don't we want everything asynchronously to be more efficient?", so.. typically when we hear about asynchronous execution we think about parallelism, thread pool and multi threading, this is generally not wrong but not precise either. The typical async workflow is to tell the CPU to execute a piece of code/logic in the "future", a callback, promise or however we many name it, in C# this callback will execute in a thread from the thread pool (as typically .ConfigureAwait is set with false) and you could say that is one of the core mechanisms of c# await/async model which makes it so good. But in the end of the day if we can avoid this future callback and immediately execute the logic which can happen if the CQE was received before we call ReadAsync, this would be the most efficient scenario as we have if there is data already available when we call ReadAsync, short circuiting the whole callback and immediately execute it. Delegating the Task execution to the thread pool sounds great but it comes with a price which becomes noticeable when we want to deal with millions of requests per second and more important, more threads does not always mean faster. Software runs on a CPU which has a limited number of CPU threads typically the same number of physical cores or 2x that value. While we can create thousands or tens of thousands of software threads in C#, those will run in a limited number of CPU threads. Here is where IValueTaskSource can shine, by default it is set to run the OnCompleted callback synchronously but do not get confused here, this synchronously means the callback will run on the same thread of the caller, not that will block the await. Before moving into how IValueTaskSource is bridged between the handler and the reactor loop let's first understand the four flags/states owned by the Connection class. _armed means the consumer is parked. After enqueueing the data into the ring, the producer checks _armed, if set, it fires SetResult with a fresh snapshot to wake the consumer. If not armed, the data just sits in the ring waiting for the next ReadAsync to pick it up via the synchronous fast path. There is no pending data mechanism, the SPSC ring is the bridge from edge triggered wakes (one CQE at a time) to level-triggered consumption (the handler reads when it can)._closed is sticky, once set all future ReadAsync will return immediately. On failure path (res<=0 or can't enqueue the Item) we hand the buffer back to the kernel ring so that it doesn't slowly run out of recv buffers. If the consumer is already parked, wake it with SetResult and pass a snapshot of the ring's tail at this moment. The snapshot is what makes this meaningful, it basically tells the handler "everything from your current head up to this tail can be drained in s single batch", if three CQEs arrived back to back while the handler was busy, all three are visible in the next snapshot. If the ring has data, take a fresh snapshot and return it as a complete ValueTask, no state machine yeild, allocation or wake required, the synchronous fast path.If the ring is empty and the connection is closed, return a closed snapshot.The check on _armed==1 is a guardrail, two simultaneous ReadAsync calls on the same connection would "clobber" each other's continuation.Then the typical case, set _armed and handle the awaiter a ValueTask(this, _readSignal.Version), the handler suspends, the runtime stores the continuation on _readSignal and the thread is freed. Draining the snapshot So, once the handler receives a snapshot it pulls items out one by one with TryGetItem. TryDequeueUntil advances the consumer's head as far as snapshot's tail even if the producer has since moved ahead. The handler always sees a stable batch. IValueTaskSource "magic" In simple terms, the "trick" here is that the continuation callback will always run on the same thread, avoiding the thread pool cost. The reactor/dispatcher loop always runs on the same software AND CPU thread even though it is asynchronous and await boundaries are hit. When a recv CQE arrives, the reactor calls Complete which enqueues into the ring and calls SetResult with a fresh snapshot. With RunContinuationsAsynchronously left at its default false and no captured SynchronizationContext, the stored continuation is invoked inline, on the reactor thread: Reactor.Run--Dispatch(recv_cqe)----Connection.Complete(res, bid, hasBuffer)------_recv.TryEnqueue(...)------_readSignal.SetResult(new RecvSnapshot(tail, isClosed))--------continuation(state) ← function pointer call----------HandleAsync.MoveNext ← state machine resumes after the await------------RecvSnapshot snap = result; ← GetResult returns the snapshot------------while (TryGetItem(...)) { ... process, send response, return buffer ... }------------ResetRead, await ReadAsync
------------// ...the loop starts over, still on the reactor thread The handler runs on top of the reactor's call stack. No thread-pool hop, no scheduler. SetResult is, mechanically, just a function call into the handler's continuation that happens to carry a snapshot as its payload. When the handler finishes draining, hits the next await and parks, the stack unwinds back to Dispatch which moves on to the next CQE. The reactor/dispatcher loop always runs on the same software AND CPU thread, and so does the handler — even though the code is fully asynchronous and await boundaries are hit on every iteration. Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to ? It will become hidden in your post, but will still be visible via the comment's permalink. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse