Tools: Async without async (2026)

Tools: Async without async (2026)

Background

Why not async?

Objective

Handling IO

Asynchronous IO (or not really)

A look inside

Gotchas with writes

Time driven action

Summary - Good Bad and Ugly

Conclusion To start with a short TLDR: this article is my exploration of

implementing an asynchronous networking application, without usingasync Rust. Over the past months (if not years at this point) I have been playingaround with some sane approaches of implementing consensus algorithms,and perhaps more general distributed systems. As part of this journey I am seeking ways to have more control overthe whole "application framework". This recently led me to ask aquestion: Can I have a performant, IO heavy application without usingasync Rust? In many ways async support in Rust is great. If you are just writing aweb application, the async and await keywords really make it very easyto write the code as you would with sync Rust. However, everythingcomes at a cost - async Rust brings in the complexity of the wholeasync runtime, and hides a lot of what is going on from our sight. One of the reasons for it is that Async Rust and accompanying runtimesare built to be a generalized solution, to support a lot of differentcases and be robust in many different ways. To be able to do this somecomplexity arises naturally, which is then well hidden from us by asyncand friends. Not all this complexity is needed for every use case,and since there is no such thing as a free lunch, there may come thetime to pay for it. Another thing we sacrifice by using an async runtime is control.It is programmed in a specific way, with some knobs that we are ableto tweak and some we are not. Until we understand the code thoroughlyand grasp the possible code paths, there will always be a black boxaspect to it. What I have learned over the years is that sometimes it is better toditch a one-size-fits-all, batteries-included solution, and buildsomething simpler, use-case-specific, sacrificing some time but sparingyourself a lot of complexity, and retaining full control andbetter understanding of the system. Part of this exploration is to answer the question whether it is worthit in this case. Not using async Rust is not a goal in itself, but only a means toan end. The main objective remains to build a proof of concept of asimple system that could be used to implement more sophisticatedsoftware on top of it. The goal is exploration, but there are a few constraints I want tosatisfy. Not just request trigger Since I am ditching async Rust, and IO is still at the core of theapplication, the first step is to figure out how to handle it withoutthe magic of Tokio.Let's take a look at the possibilities. If I were asked to write the simplest echo server to handle just oneconnection I would end up with something like this: And that is the first, likely simplest approach to handling IO -blocking IO. The application will block on the connection.read calluntil there is something to read. Now if I would have to handle multiple connections there are a fewways to extend it. I suppose that the most intuitive one is to just handle each connectionin a separate thread and keep accepting in the main one: Clearly, this approach is more versatile than handling only oneconnection, but it is also clear that this approach violates one of myobjectives -- being single threaded. Note: a variation of this could be process per connection, whichis used by some systems. Still they often use async IO anyway. The other option that we have allows us to keep our single thread, allwe need is making sockets non-blocking and adding a bit more code: However, a careful observer can immediately see that this violatesanother constraint, as the loop will just keep spinning burning allthe CPU cycles. We could avoid busy looping by adding a short sleepbetween iterations, but that is added latency I want to avoid aswell. With all of those out of the way, and not suitable, we need to do afull circle and go back to async. Not necessarily async Rust butasync IO nonetheless. As the name suggests, asynchronous IO is not synchronous. But what it really means is a bit complicated. Async IO can work in different ways, and I am not sure if there isa real, correct definition of what is async and what is not. Ingeneral, when we talk about async, it is understood as something thathappens "in the background" and there is some notification when"things are ready". Different systems work in different ways. With io_uring, IOhappens in kernel space and user space application receivesnotification when the work is completed, while with epoll theapplication still does the dirty work of IO syscalls, and justreceives the notification when there is progress to be made. Deeper tangent:I would say that async is in the eye of the beholder.One can argue that epoll is not "real" async since the applicationonly receives the event and all work still happens synchronously(in a non-blocking way, let's say).However, if you go with this thinking then Rust tokio isn't reallyasync, since it is also the application that does the IO."But io_uring is a real async!" You may object. In its case it is notthe application that does IO, but the kernel itself. It must be atrue async then!However, if you look at it from the perspective of the CPU (or eventhe kernel), it all happens on the same silicon (perhaps on differentcores, but that is not for us to decide), so is it "async" after all? Different operating systems have different APIs for async IO, to namea few: There are other, older mechanisms on Linux as well, such as poll andselect, but these days epoll is likely most prevalent, with io_uringbeing the newest and slowly getting more adoption. Since penguins dominate the server world, I focused on Linuxand took a deeper look into Epoll. I knew about epoll from the first time I asked myself a question"But how does async, actually work?" that led me into the deep rabbithole of various kernel mechanisms, down to the realm of CPU interrupts(if you never went there, I highly recommend that journey). However,not being a C programmer, I never used it "directly". Most of the "async" web dev libraries in all languages rely on it,but hide it carefully under a few layers of abstraction, mainly becausethey are meant to work on all OSes and not just on Linux. But, let'sget to the point... Epoll as a wholeis an API in the Linux kernel that allows a process to registerinterest in IO events for a set of file descriptors. There are 4 syscalls listed under the epoll man page: Names are somewhat self-explanatory, so I will not copy-pastedefinitions from theman page,feel free to check it out on your own. I will not leave you empty-handed, however, and give you a quick intuition of how things work:Epoll is about events. Instead of constantly checking if there isany IO to be done, the user space application receives "notification",when there is "progress" to be made. Since behind this mechanism is thekernel, while waiting for the events the waiting thread can "go to sleep" and get woken up when the IO event arrives, hence not wastingCPU cycles by spinning around checking all connections, and also notadding latency with an actual sleep. One might ask: how do we know when to stop reading or writing then?Well, if you ask the socket politely it will tell you. As long as it isa non-blocking socket, as async IO is usually used in conjunctionwith those. And by socket telling you what I mean is returning EAGAIN orEWOULDBLOCK. For each epoll instance created in userspace there is an eventpollallocated on the kernel side. It contains a red-black tree of epitemskeyed by file pointer and file descriptor. When we registerinterest, the new tree node is inserted, and a callback is addedto file descriptors' wait queue. This callback is where the magichappens, as whenever we call epoll_wait our thread will be parked(if no interests are ready), and it is this callback's job to wakeit up (if the interest mask matches). Additionally when this happensthe reference to epitem from the tree is inserted intoeventpoll's ready list. Now, to the more interesting part: how to actually use it. My goal here is to get a real glimpse of epoll in all its glory,not covered by the compatibility layers and easy to use abstractions. Fine, fine... Using libc is not the lowest one can go, but it is goodenough for today... First things first, I need something that listens on a port andaccepts connections. No epoll magic here, no async IO, just goodold C: This will do the work, as a simple echo server. However, as an exampleof single thread blocking IO (just in a different language), it canonly handle one connection at a time. Since async IO only makes sense with non-blocking sockets, the firststep is to make the listening socket as such: Now, this breaks the echo server, since accept will no longer block,but return the error instead: Which basically means that: "if the socket was blocking, the call wouldblock". Now it is the time we need to create an epoll instance andregister read interest (EPOLLIN) for socket_fd on it: Next, instead of just calling accept and handling the connectiondirectly in the main thread, we call epoll_wait inside the loop. Whenthe socket is able to accept a connection, epoll_wait returns,putting an event into the buffer we pass to it. We then iteratethrough new events, checking if the associated file descriptor isthe listening socket -- in which case we accept all new connectionsand add them to epoll -- or regular connection socket otherwise. Epoll interest can be registered as level-triggered or edge-triggered.Level-triggered is a default option and it will keep notifying whilethe interest "is fulfilled". So if I register a TCP connection socket with read interest epoll_wait will keep waking with anevent until I read all available data from that socket.Edge-triggered (EPOLLET option) on the other hand will notify(at least once) only when the interest "becomes fulfilled", so inthe example above, only when new data arrives to the socket.More details can be found on already known to youman page. Here I add TCP connections to epoll as edge-triggered, however, in thiscase it does not really matter, since I read all available data eachtime and we are not working around any constraints. I also do not careabout writes as they are done in best effort fashion. Write interest is slightly more complicated. If we were to uselevel-triggered epoll, we would get wake up events as long as thesocket is writable, which if we do not have anything to write willbe all the time, hence the application will never "sleep". Oneoption here is to register write interest only when there is data to be written, and then remove it. It is not a problem withedge-triggered epoll, however, here we need to be mindful that weonly get notified when the socket state changes to be writable,therefore if the socket was already writable, and we have new datato send, we will not be notified, so either again, we re-arm theepoll with write interest only when we have data to write, orwhenever we have new data we attempt to write to socket immediately,and stop when the write would block. Handling existing connections will change slightly as well, as now wealso need to handle EAGAIN and EWOULDBLOCK since connectionsockets are now non-blocking as well: The write part is a bit simplified since I do not want to store an extrastate of what has been successfully written and what was not (I couldregister connection fds with write interest as well (EPOLLOUT) and getnotified when there is some progress to be made writing). Now to compile it and run And connect from two separate terminal windows: If I now start writing to the connections, I can see messages being echoedback, and server logs show its hard work. It can handle multiple connections, runs in a single thread, does not addany artificial latency, and is not busy looping. This checks the requirements. So, as all software does eventually, it istime to rewrite it in Rust. This full code is available on Github. Since I am not planning to become C wizard anytime soon, to build thefoundation for something more complex and come back to the idea of"async without async", there actually needs to be some async that Iditch, so Rust it is. Do not expect fireworks, though, just a bit more "flashy" echoserver... To not libc myself into oblivion or unsafe my way to hell, I decide totake an easier path. Path well trodden by others, the secret asyncsource behind Tokio - Mio. Mio not only wraps around Epoll with a nice, easy-to-use API, butalso does so over other OSes' async APIs, making our appcross-platform! We can now forget about epoll's naked glory. However, theoverall approach is the same as in C: And to be fair, there is not much more to it, since Mio is handlingall the dirty work behind the scenes. Poll's API is quite similar to what we saw in C code, but withoutmaking your hands dirty with direct syscall calls.As with C code, the first thing to do is register a listener socket.To use it with Poll, it needs to be wrapped withmio::net::TcpListener,which provides the aforementioned Source trait implementationexpected by the Registry::register(...) method (Registry lives inside Poll): While in C events are associated with a specific file descriptor, to becross platform Mio usesToken, which isa wrapper around usize and allows us to map the event back to theSource, for example a specific TCP connection, or as is the purposeof listener_token, to TCP listener. With Poll initialized, we can wait for events and process them: It is analogous to our previous echo server, differentiatingbetween events to the listening socket and connection sockets, withthe difference that here we compare Tokens instead of filedescriptors. Unlike in the C implementation, however, here we handle writes properlyby registering both READABLE and WRITABLE interests on the TCPstream from an established connection. For that to work, writes areinitially appended to an in-memory buffer and then written to theconnection whenever we can make progress: When reading and writing to the connection, we need to remember tohandle WouldBlock errors as "cannot do more, wait for next epollevent": That gives us all necessary ingredients to be async in non-asyncworld, and completes the echo server. However, if you paid attentionthere is one more objective to be taken care of. A lot of applications have to do more than just handling IO, and dosome work periodically, be it send some heartbeat or request somedata from another system. Let's consider those as time-based work, asopposed to request-based work that is triggered by IO events. It is then time to add a killer feature to the mighty echo server.Every 5 seconds it is going to send its "status" to all connectedclients. Yes, that's it. In async Rust one could simply do some tokio::selectmagic with a interval timeras one of the branches. I still want the application to stay single threaded, so a separatesleeping thread is also not an option. Fortunately, Epoll (or Mio) hasexactly what we need. With epoll_wait we can specify the timeout forhow long we want to wait for events, before the function will returnand presumably come back to the next iteration of the loop. Mio Poll exposes the same functionality in poll method as well. I already pass timeout through wait_for_events function. And thatbrings me to the last piece of this puzzle, which is the aforementionedloop that will send the status periodically, and wait for the IOevents when idle: To set things straight, broadcasting the message is non-blocking aswell. All it does is extend the write buffer for all connectionsand try to progress the write as much as it can until hittingthe WouldBlock error. In case of fatal connection errors, we justdrop them: Just extending the buffer would not be enough, and we would not getany event if the connection would currently be writable, since thestate did not change.This is the thing I mentioned here. There is not a lot more magic than in C implementation, but for thesake of completeness, let's run it. You can find full source code on GitHub. And if we connect from two terminal windows, we can see that the echoserver is cruising. This concludes the echo server, and thus the proof of concept.As intended, I have achieved "async IO" without using async Rust. Well, if we come back to the rant in Asynchronous IO section, maybe itis not so async. Perhaps this article should be named "Non blocking,event driven IO without async", but hey, that is not very catchy.Anyway, I am digressing... It, however, begs the question: was it worth it?If I were about to give you advice on writing echo servers, you areprobably better off just using async, and that is likely truefor most simple applications. With this simple example it is hard to argue the case of async withoutasync, yet I am going to give it a shot. Writing an echo server in async Rust could probably be done in 1/3 of thelines of code. For all IO operations instead of catching would blockerrors, we would simply await. Mio Poll, although still there,would be conveniently hidden inside the belly of the Tokio runtime. Things would get a bit ugly if we were to go deep into some morecomplex network protocol. Let's consider that we need to perform asimple handshake. With Tokio we could simply: What happens under the hood, is that the compiler would convertthis code into a state machine, with each await marking the statetransition, where it can yield. This convenience is gone when we drop async Rust. For it towork with our framework, we would need to build this kind of statemachine ourselves (unless we chose to block the thread), a greatlysimplified example could be: Each "state" logic could be called multiple times in case our "hands"are so big that they do not fit into send queues, or acks did notarrive fully at once, therefore there can be much more logic hiddeninside function calls. Without async, buffers and queues become your friends. Enough with the beating, though. I was supposed to argue for, notagainst, my own creation! One thing we do not see in this code are Mutexes. Since everythingis single threaded and there are noTasks ready to jumparound different threads with the next await, no Send + Sync + 'static, no Pins and Arcs, we do not annoy theborrow checker, nor ourselves. There is no async runtime, so whenthings get hot, it should be easier to troubleshoot and debug. The whole system is much more deterministic, IO can be easier toseparate from the application logic (think some kind of event loop),and perhaps a small thing, but there is no function coloring andasync spreading over the whole codebase. Of course some of those things could be achieved with async as well. To wrap up these lengthy conclusions, I am yet to experiment with thisapproach in more sophisticated systems and see whether it has somejuice in it. How it turns out, maybe you will be able to find out in the nextepisode article. Until then I hope you enjoyed this experiment andperhaps even learned something. If you have any questions, comments, or

suggestions, feel free to reach out. Thanks for reading! Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to ? It will become hidden in your post, but will still be visible via the comment's permalink. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse

Code Block

Copy

let listener = std::net::TcpListener::bind("127.0.0.1:9000")?; let (mut connection, addr) = listener.accept()?; loop { let mut buffer = [0u8; 1024]; let bytes_read = connection.read(&mut buffer)?; if bytes_read == 0 { // Connection closed by client break; } connection.write_all(&buffer[..bytes_read])?; } let listener = std::net::TcpListener::bind("127.0.0.1:9000")?; let (mut connection, addr) = listener.accept()?; loop { let mut buffer = [0u8; 1024]; let bytes_read = connection.read(&mut buffer)?; if bytes_read == 0 { // Connection closed by client break; } connection.write_all(&buffer[..bytes_read])?; } let listener = std::net::TcpListener::bind("127.0.0.1:9000")?; let (mut connection, addr) = listener.accept()?; loop { let mut buffer = [0u8; 1024]; let bytes_read = connection.read(&mut buffer)?; if bytes_read == 0 { // Connection closed by client break; } connection.write_all(&buffer[..bytes_read])?; } connection.read let listener = std::net::TcpListener::bind("127.0.0.1:9000")?; loop { let (mut connection, addr) = listener.accept()?; let _handle = std::thread::spawn(move || -> anyhow::Result<()> { loop { let mut buffer = [0u8; 1024]; let bytes_read = connection.read(&mut buffer)?; if bytes_read == 0 { // Connection closed by client break; } connection.write_all(&buffer[..bytes_read])?; } Ok(()) }); } let listener = std::net::TcpListener::bind("127.0.0.1:9000")?; loop { let (mut connection, addr) = listener.accept()?; let _handle = std::thread::spawn(move || -> anyhow::Result<()> { loop { let mut buffer = [0u8; 1024]; let bytes_read = connection.read(&mut buffer)?; if bytes_read == 0 { // Connection closed by client break; } connection.write_all(&buffer[..bytes_read])?; } Ok(()) }); } let listener = std::net::TcpListener::bind("127.0.0.1:9000")?; loop { let (mut connection, addr) = listener.accept()?; let _handle = std::thread::spawn(move || -> anyhow::Result<()> { loop { let mut buffer = [0u8; 1024]; let bytes_read = connection.read(&mut buffer)?; if bytes_read == 0 { // Connection closed by client break; } connection.write_all(&buffer[..bytes_read])?; } Ok(()) }); } let mut connections = Vec::new(); let listener = std::net::TcpListener::bind("127.0.0.1:9000")?; listener.set_nonblocking(true)?; loop { // accept all pending connections for cnn_res in listener.incoming() { match cnn_res { Ok(connection) => { // set socket non-blocking connection.set_nonblocking(true)?; connections.push(connection); } Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { // No more incoming connections at the moment break; } Err(e) => { return Err(e.into()); } } } // Go through all connections. Read, write, and retain // only not closed connections. let mut new_connections = Vec::new(); for connection in connections.iter_mut() { let mut drop = false; let mut buffer = [0u8; 1024]; loop { match connection.read(&mut buffer) { Ok(bytes_read) => { if bytes_read == 0 { // Connection closed by client drop = true; break; } // If we would like to do it properly, we should // handle would block errors as well connection.write_all(&buffer[..bytes_read])?; } Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { // No data to read at the moment break } Err(e) => { return Err(e.into()); } } } if !drop { new_connections.push(connection.try_clone()?); } } connections = new_connections; } let mut connections = Vec::new(); let listener = std::net::TcpListener::bind("127.0.0.1:9000")?; listener.set_nonblocking(true)?; loop { // accept all pending connections for cnn_res in listener.incoming() { match cnn_res { Ok(connection) => { // set socket non-blocking connection.set_nonblocking(true)?; connections.push(connection); } Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { // No more incoming connections at the moment break; } Err(e) => { return Err(e.into()); } } } // Go through all connections. Read, write, and retain // only not closed connections. let mut new_connections = Vec::new(); for connection in connections.iter_mut() { let mut drop = false; let mut buffer = [0u8; 1024]; loop { match connection.read(&mut buffer) { Ok(bytes_read) => { if bytes_read == 0 { // Connection closed by client drop = true; break; } // If we would like to do it properly, we should // handle would block errors as well connection.write_all(&buffer[..bytes_read])?; } Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { // No data to read at the moment break } Err(e) => { return Err(e.into()); } } } if !drop { new_connections.push(connection.try_clone()?); } } connections = new_connections; } let mut connections = Vec::new(); let listener = std::net::TcpListener::bind("127.0.0.1:9000")?; listener.set_nonblocking(true)?; loop { // accept all pending connections for cnn_res in listener.incoming() { match cnn_res { Ok(connection) => { // set socket non-blocking connection.set_nonblocking(true)?; connections.push(connection); } Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { // No more incoming connections at the moment break; } Err(e) => { return Err(e.into()); } } } // Go through all connections. Read, write, and retain // only not closed connections. let mut new_connections = Vec::new(); for connection in connections.iter_mut() { let mut drop = false; let mut buffer = [0u8; 1024]; loop { match connection.read(&mut buffer) { Ok(bytes_read) => { if bytes_read == 0 { // Connection closed by client drop = true; break; } // If we would like to do it properly, we should // handle would block errors as well connection.write_all(&buffer[..bytes_read])?; } Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { // No data to read at the moment break } Err(e) => { return Err(e.into()); } } } if !drop { new_connections.push(connection.try_clone()?); } } connections = new_connections; } epoll_create epoll_create1 EWOULDBLOCK #define _GNU_SOURCE #include <netinet/in.h> #include <stdio.h> #include <errno.h> #include <arpa/inet.h> #include <sys/socket.h> #include <unistd.h> int main(void) { int socket_fd = socket(AF_INET, SOCK_STREAM, 0); // create TCP socket if (socket_fd < 0) { perror("socket"); return 1; } // check return value int yes = 1; setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); struct sockaddr_in addr = { // bind address .sin_family = AF_INET, // IPv4 .sin_port = htons(9000), // port 9000 // Convert from host byte order to network byte order .sin_addr = { htonl(INADDR_LOOPBACK) } // localhost }; if (bind(socket_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) { perror("bind"); return 1; } if (listen(socket_fd, SOMAXCONN) < 0) { perror("listen"); return 1; } printf("Listening on 127.0.0.1:9000...\n"); struct sockaddr_in clientaddr; socklen_t clientaddr_size = sizeof(clientaddr); int connection_fd = accept(socket_fd, (struct sockaddr*)&clientaddr, &clientaddr_size); if (connection_fd < 0) { perror("accept"); return 1; } printf("accepted connection from %s:%d\n", inet_ntoa(clientaddr.sin_addr), ntohs(clientaddr.sin_port)); char buf[4096]; for (;;) { ssize_t r = read(connection_fd, buf, sizeof buf); // read some if (r == 0) { // peer closed printf("read 0 bytes, closing fd\n"); close(connection_fd); break; } else if (r < 0) { perror("read"); // read error close(connection_fd); break; } else { // got r bytes printf("read %zd bytes\n", r); ssize_t off = 0; while (off < r) { // echo back, in a blocking way ssize_t w = send(connection_fd, buf + off, (size_t)(r - off), MSG_NOSIGNAL); if (w < 0) { perror("send"); // write error close(connection_fd); break; } off += w; // advance } } } return 0; } #define _GNU_SOURCE #include <netinet/in.h> #include <stdio.h> #include <errno.h> #include <arpa/inet.h> #include <sys/socket.h> #include <unistd.h> int main(void) { int socket_fd = socket(AF_INET, SOCK_STREAM, 0); // create TCP socket if (socket_fd < 0) { perror("socket"); return 1; } // check return value int yes = 1; setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); struct sockaddr_in addr = { // bind address .sin_family = AF_INET, // IPv4 .sin_port = htons(9000), // port 9000 // Convert from host byte order to network byte order .sin_addr = { htonl(INADDR_LOOPBACK) } // localhost }; if (bind(socket_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) { perror("bind"); return 1; } if (listen(socket_fd, SOMAXCONN) < 0) { perror("listen"); return 1; } printf("Listening on 127.0.0.1:9000...\n"); struct sockaddr_in clientaddr; socklen_t clientaddr_size = sizeof(clientaddr); int connection_fd = accept(socket_fd, (struct sockaddr*)&clientaddr, &clientaddr_size); if (connection_fd < 0) { perror("accept"); return 1; } printf("accepted connection from %s:%d\n", inet_ntoa(clientaddr.sin_addr), ntohs(clientaddr.sin_port)); char buf[4096]; for (;;) { ssize_t r = read(connection_fd, buf, sizeof buf); // read some if (r == 0) { // peer closed printf("read 0 bytes, closing fd\n"); close(connection_fd); break; } else if (r < 0) { perror("read"); // read error close(connection_fd); break; } else { // got r bytes printf("read %zd bytes\n", r); ssize_t off = 0; while (off < r) { // echo back, in a blocking way ssize_t w = send(connection_fd, buf + off, (size_t)(r - off), MSG_NOSIGNAL); if (w < 0) { perror("send"); // write error close(connection_fd); break; } off += w; // advance } } } return 0; } #define _GNU_SOURCE #include <netinet/in.h> #include <stdio.h> #include <errno.h> #include <arpa/inet.h> #include <sys/socket.h> #include <unistd.h> int main(void) { int socket_fd = socket(AF_INET, SOCK_STREAM, 0); // create TCP socket if (socket_fd < 0) { perror("socket"); return 1; } // check return value int yes = 1; setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); struct sockaddr_in addr = { // bind address .sin_family = AF_INET, // IPv4 .sin_port = htons(9000), // port 9000 // Convert from host byte order to network byte order .sin_addr = { htonl(INADDR_LOOPBACK) } // localhost }; if (bind(socket_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) { perror("bind"); return 1; } if (listen(socket_fd, SOMAXCONN) < 0) { perror("listen"); return 1; } printf("Listening on 127.0.0.1:9000...\n"); struct sockaddr_in clientaddr; socklen_t clientaddr_size = sizeof(clientaddr); int connection_fd = accept(socket_fd, (struct sockaddr*)&clientaddr, &clientaddr_size); if (connection_fd < 0) { perror("accept"); return 1; } printf("accepted connection from %s:%d\n", inet_ntoa(clientaddr.sin_addr), ntohs(clientaddr.sin_port)); char buf[4096]; for (;;) { ssize_t r = read(connection_fd, buf, sizeof buf); // read some if (r == 0) { // peer closed printf("read 0 bytes, closing fd\n"); close(connection_fd); break; } else if (r < 0) { perror("read"); // read error close(connection_fd); break; } else { // got r bytes printf("read %zd bytes\n", r); ssize_t off = 0; while (off < r) { // echo back, in a blocking way ssize_t w = send(connection_fd, buf + off, (size_t)(r - off), MSG_NOSIGNAL); if (w < 0) { perror("send"); // write error close(connection_fd); break; } off += w; // advance } } } return 0; } #include <fcntl.h> static int set_nonblocking(int fd) { // F_GETFL - return (as the function result) the file access mode and // the file status flags; arg is ignored. int fl = fcntl(fd, F_GETFL, 0); if (fl == -1) return -1; // Preserve the current flags and add O_NONBLOCK. return fcntl(fd, F_SETFL, fl | O_NONBLOCK); } ... int yes = 1; setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); // new line - setting socket_fd as non-blocking if (set_nonblocking(socket_fd) < 0) { perror("fcntl"); return 1; } ... #include <fcntl.h> static int set_nonblocking(int fd) { // F_GETFL - return (as the function result) the file access mode and // the file status flags; arg is ignored. int fl = fcntl(fd, F_GETFL, 0); if (fl == -1) return -1; // Preserve the current flags and add O_NONBLOCK. return fcntl(fd, F_SETFL, fl | O_NONBLOCK); } ... int yes = 1; setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); // new line - setting socket_fd as non-blocking if (set_nonblocking(socket_fd) < 0) { perror("fcntl"); return 1; } ... #include <fcntl.h> static int set_nonblocking(int fd) { // F_GETFL - return (as the function result) the file access mode and // the file status flags; arg is ignored. int fl = fcntl(fd, F_GETFL, 0); if (fl == -1) return -1; // Preserve the current flags and add O_NONBLOCK. return fcntl(fd, F_SETFL, fl | O_NONBLOCK); } ... int yes = 1; setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); // new line - setting socket_fd as non-blocking if (set_nonblocking(socket_fd) < 0) { perror("fcntl"); return 1; } ... Listening on 127.0.0.1:9000... accept4: Resource temporarily unavailable Listening on 127.0.0.1:9000... accept4: Resource temporarily unavailable Listening on 127.0.0.1:9000... accept4: Resource temporarily unavailable #include <sys/epoll.h> ... if (listen(socket_fd, SOMAXCONN) < 0) { perror("listen"); return 1; } // Create epoll instance and return its fd int ep = epoll_create1(EPOLL_CLOEXEC); if (ep < 0) { perror("epoll_create1"); return 1; } // watch socket_fd for READ events, and register it with epoll instance struct epoll_event ev = { .events = EPOLLIN, .data.fd = socket_fd }; if (epoll_ctl(ep, EPOLL_CTL_ADD, socket_fd, &ev) < 0) { perror("epoll_ctl ADD lfd"); return 1; } ... #include <sys/epoll.h> ... if (listen(socket_fd, SOMAXCONN) < 0) { perror("listen"); return 1; } // Create epoll instance and return its fd int ep = epoll_create1(EPOLL_CLOEXEC); if (ep < 0) { perror("epoll_create1"); return 1; } // watch socket_fd for READ events, and register it with epoll instance struct epoll_event ev = { .events = EPOLLIN, .data.fd = socket_fd }; if (epoll_ctl(ep, EPOLL_CTL_ADD, socket_fd, &ev) < 0) { perror("epoll_ctl ADD lfd"); return 1; } ... #include <sys/epoll.h> ... if (listen(socket_fd, SOMAXCONN) < 0) { perror("listen"); return 1; } // Create epoll instance and return its fd int ep = epoll_create1(EPOLL_CLOEXEC); if (ep < 0) { perror("epoll_create1"); return 1; } // watch socket_fd for READ events, and register it with epoll instance struct epoll_event ev = { .events = EPOLLIN, .data.fd = socket_fd }; if (epoll_ctl(ep, EPOLL_CTL_ADD, socket_fd, &ev) < 0) { perror("epoll_ctl ADD lfd"); return 1; } ... ... struct epoll_event events[128]; // event buffer char buf[4096]; // I/O buffer printf("Listening on 127.0.0.1:9000...\n"); for (;;) { // epoll_wait is a system call. It will populate the events array and // return the number of events that were triggered. // We also specify max events to return - calculated based on the size of the events array. int n = epoll_wait(ep, events, (int)(sizeof events / sizeof events[0]), -1); if (n < 0) { if (errno == EINTR) continue; // interrupted -> retry perror("epoll_wait"); break; } printf("epoll_wait wake up: %d events\n", n); for (int i = 0; i < n; i++) { // handle ready fds int fd = events[i].data.fd; // fd associated with the event uint32_t e = events[i].events;// event flags - readable, writable etc if (fd == socket_fd) { // socket_fd is readable, that may mean new connections to accept for (;;) { // accept until EAGAIN or EWOULDBLOCK struct sockaddr_in clientaddr; socklen_t clientaddr_size = sizeof(clientaddr); // now, accept with SOCK_NONBLOCK so that the new fd is already // set to non-blocking, so we can avoid fcntl call int connection_fd = accept4(socket_fd, (struct sockaddr*)&clientaddr, &clientaddr_size, SOCK_NONBLOCK); if (connection_fd < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; // no more connections else perror("accept4"); break; } printf("accepted connection from %s:%d\n", inet_ntoa(clientaddr.sin_addr), ntohs(clientaddr.sin_port)); // We have accepted a new connection. Now we add it to the // epoll instance to monitor it for events. struct epoll_event cev = { // Edge-triggered epoll will notify only once when fd is // ready, it will not keep notifying until we read all data. .events = EPOLLIN | EPOLLET | EPOLLHUP | EPOLLERR, // read, edge-triggered, hup/err .data.fd = connection_fd }; // Add new connection to epoll if (epoll_ctl(ep, EPOLL_CTL_ADD, connection_fd, &cev) < 0) { perror("epoll_ctl ADD connection_fd"); close(connection_fd); } } } else { // handle existing connection if (e & (EPOLLHUP | EPOLLERR)) { // hangup/error printf("epollhup | epollerr, closing fd\n"); // fd is removed from the epoll when the last reference // is closed. We never dup it so it is the only one. close(fd); continue; } if (e & EPOLLIN) { // readable if (handle_connection_io(fd, buf, sizeof buf) < 0) { break; } } } } } ... struct epoll_event events[128]; // event buffer char buf[4096]; // I/O buffer printf("Listening on 127.0.0.1:9000...\n"); for (;;) { // epoll_wait is a system call. It will populate the events array and // return the number of events that were triggered. // We also specify max events to return - calculated based on the size of the events array. int n = epoll_wait(ep, events, (int)(sizeof events / sizeof events[0]), -1); if (n < 0) { if (errno == EINTR) continue; // interrupted -> retry perror("epoll_wait"); break; } printf("epoll_wait wake up: %d events\n", n); for (int i = 0; i < n; i++) { // handle ready fds int fd = events[i].data.fd; // fd associated with the event uint32_t e = events[i].events;// event flags - readable, writable etc if (fd == socket_fd) { // socket_fd is readable, that may mean new connections to accept for (;;) { // accept until EAGAIN or EWOULDBLOCK struct sockaddr_in clientaddr; socklen_t clientaddr_size = sizeof(clientaddr); // now, accept with SOCK_NONBLOCK so that the new fd is already // set to non-blocking, so we can avoid fcntl call int connection_fd = accept4(socket_fd, (struct sockaddr*)&clientaddr, &clientaddr_size, SOCK_NONBLOCK); if (connection_fd < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; // no more connections else perror("accept4"); break; } printf("accepted connection from %s:%d\n", inet_ntoa(clientaddr.sin_addr), ntohs(clientaddr.sin_port)); // We have accepted a new connection. Now we add it to the // epoll instance to monitor it for events. struct epoll_event cev = { // Edge-triggered epoll will notify only once when fd is // ready, it will not keep notifying until we read all data. .events = EPOLLIN | EPOLLET | EPOLLHUP | EPOLLERR, // read, edge-triggered, hup/err .data.fd = connection_fd }; // Add new connection to epoll if (epoll_ctl(ep, EPOLL_CTL_ADD, connection_fd, &cev) < 0) { perror("epoll_ctl ADD connection_fd"); close(connection_fd); } } } else { // handle existing connection if (e & (EPOLLHUP | EPOLLERR)) { // hangup/error printf("epollhup | epollerr, closing fd\n"); // fd is removed from the epoll when the last reference // is closed. We never dup it so it is the only one. close(fd); continue; } if (e & EPOLLIN) { // readable if (handle_connection_io(fd, buf, sizeof buf) < 0) { break; } } } } } ... struct epoll_event events[128]; // event buffer char buf[4096]; // I/O buffer printf("Listening on 127.0.0.1:9000...\n"); for (;;) { // epoll_wait is a system call. It will populate the events array and // return the number of events that were triggered. // We also specify max events to return - calculated based on the size of the events array. int n = epoll_wait(ep, events, (int)(sizeof events / sizeof events[0]), -1); if (n < 0) { if (errno == EINTR) continue; // interrupted -> retry perror("epoll_wait"); break; } printf("epoll_wait wake up: %d events\n", n); for (int i = 0; i < n; i++) { // handle ready fds int fd = events[i].data.fd; // fd associated with the event uint32_t e = events[i].events;// event flags - readable, writable etc if (fd == socket_fd) { // socket_fd is readable, that may mean new connections to accept for (;;) { // accept until EAGAIN or EWOULDBLOCK struct sockaddr_in clientaddr; socklen_t clientaddr_size = sizeof(clientaddr); // now, accept with SOCK_NONBLOCK so that the new fd is already // set to non-blocking, so we can avoid fcntl call int connection_fd = accept4(socket_fd, (struct sockaddr*)&clientaddr, &clientaddr_size, SOCK_NONBLOCK); if (connection_fd < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; // no more connections else perror("accept4"); break; } printf("accepted connection from %s:%d\n", inet_ntoa(clientaddr.sin_addr), ntohs(clientaddr.sin_port)); // We have accepted a new connection. Now we add it to the // epoll instance to monitor it for events. struct epoll_event cev = { // Edge-triggered epoll will notify only once when fd is // ready, it will not keep notifying until we read all data. .events = EPOLLIN | EPOLLET | EPOLLHUP | EPOLLERR, // read, edge-triggered, hup/err .data.fd = connection_fd }; // Add new connection to epoll if (epoll_ctl(ep, EPOLL_CTL_ADD, connection_fd, &cev) < 0) { perror("epoll_ctl ADD connection_fd"); close(connection_fd); } } } else { // handle existing connection if (e & (EPOLLHUP | EPOLLERR)) { // hangup/error printf("epollhup | epollerr, closing fd\n"); // fd is removed from the epoll when the last reference // is closed. We never dup it so it is the only one. close(fd); continue; } if (e & EPOLLIN) { // readable if (handle_connection_io(fd, buf, sizeof buf) < 0) { break; } } } } } EWOULDBLOCK int handle_connection_io(int fd, char *buf, size_t buf_size) { for (;;) { // drain the connection until EAGAIN or EWOULDBLOCK ssize_t r = read(fd, buf, buf_size); // read some if (r == 0) { // peer closed printf("read 0 bytes, closing fd\n"); close(fd); return 0; } else if (r < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; // drained perror("read"); close(fd); return 0; } else { // got r bytes printf("read %zd bytes\n", r); ssize_t off = 0; while (off < r) { // echo back ssize_t w = send(fd, buf + off, (size_t)(r - off), MSG_NOSIGNAL); if (w < 0) { // Write what we can in best effort manner. We would need // EPOLLOUT and some kind of buffer per connection to do it // properly in non-blocking way. if (errno == EAGAIN || errno == EWOULDBLOCK) break; perror("send"); close(fd); return -1; // signal to skip to next_event } off += w; } } } return 0; } int handle_connection_io(int fd, char *buf, size_t buf_size) { for (;;) { // drain the connection until EAGAIN or EWOULDBLOCK ssize_t r = read(fd, buf, buf_size); // read some if (r == 0) { // peer closed printf("read 0 bytes, closing fd\n"); close(fd); return 0; } else if (r < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; // drained perror("read"); close(fd); return 0; } else { // got r bytes printf("read %zd bytes\n", r); ssize_t off = 0; while (off < r) { // echo back ssize_t w = send(fd, buf + off, (size_t)(r - off), MSG_NOSIGNAL); if (w < 0) { // Write what we can in best effort manner. We would need // EPOLLOUT and some kind of buffer per connection to do it // properly in non-blocking way. if (errno == EAGAIN || errno == EWOULDBLOCK) break; perror("send"); close(fd); return -1; // signal to skip to next_event } off += w; } } } return 0; } int handle_connection_io(int fd, char *buf, size_t buf_size) { for (;;) { // drain the connection until EAGAIN or EWOULDBLOCK ssize_t r = read(fd, buf, buf_size); // read some if (r == 0) { // peer closed printf("read 0 bytes, closing fd\n"); close(fd); return 0; } else if (r < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; // drained perror("read"); close(fd); return 0; } else { // got r bytes printf("read %zd bytes\n", r); ssize_t off = 0; while (off < r) { // echo back ssize_t w = send(fd, buf + off, (size_t)(r - off), MSG_NOSIGNAL); if (w < 0) { // Write what we can in best effort manner. We would need // EPOLLOUT and some kind of buffer per connection to do it // properly in non-blocking way. if (errno == EAGAIN || errno == EWOULDBLOCK) break; perror("send"); close(fd); return -1; // signal to skip to next_event } off += w; } } } return 0; } clang main.c ./a.out clang main.c ./a.out clang main.c ./a.out nc 127.0.0.1 9000 nc 127.0.0.1 9000 nc 127.0.0.1 9000 Listening on 127.0.0.1:9000... epoll_wait wake up: 1 events accepted connection from 127.0.0.1:50496 epoll_wait wake up: 1 events read 5 bytes epoll_wait wake up: 1 events accepted connection from 127.0.0.1:50870 epoll_wait wake up: 1 events read 6 bytes epoll_wait wake up: 1 events read 6 bytes Listening on 127.0.0.1:9000... epoll_wait wake up: 1 events accepted connection from 127.0.0.1:50496 epoll_wait wake up: 1 events read 5 bytes epoll_wait wake up: 1 events accepted connection from 127.0.0.1:50870 epoll_wait wake up: 1 events read 6 bytes epoll_wait wake up: 1 events read 6 bytes Listening on 127.0.0.1:9000... epoll_wait wake up: 1 events accepted connection from 127.0.0.1:50496 epoll_wait wake up: 1 events read 5 bytes epoll_wait wake up: 1 events accepted connection from 127.0.0.1:50870 epoll_wait wake up: 1 events read 6 bytes epoll_wait wake up: 1 events read 6 bytes mio::net::TcpListener Registry::register(...) ... let listener = std::net::TcpListener::bind(addr) .context(format!("Failed to bind to address {}", addr))?; listener .set_nonblocking(true) .context("Failed to set listener to non-blocking mode")?; let poll = mio::Poll::new().context("Failed to create Poll instance")?; let mut listener = mio::net::TcpListener::from_std(listener); let listener_token = mio::Token(0); poll.registry() .register(&mut listener, listener_token, mio::Interest::READABLE) .context("Failed to register listener with Poll")?; ... ... let listener = std::net::TcpListener::bind(addr) .context(format!("Failed to bind to address {}", addr))?; listener .set_nonblocking(true) .context("Failed to set listener to non-blocking mode")?; let poll = mio::Poll::new().context("Failed to create Poll instance")?; let mut listener = mio::net::TcpListener::from_std(listener); let listener_token = mio::Token(0); poll.registry() .register(&mut listener, listener_token, mio::Interest::READABLE) .context("Failed to register listener with Poll")?; ... ... let listener = std::net::TcpListener::bind(addr) .context(format!("Failed to bind to address {}", addr))?; listener .set_nonblocking(true) .context("Failed to set listener to non-blocking mode")?; let poll = mio::Poll::new().context("Failed to create Poll instance")?; let mut listener = mio::net::TcpListener::from_std(listener); let listener_token = mio::Token(0); poll.registry() .register(&mut listener, listener_token, mio::Interest::READABLE) .context("Failed to register listener with Poll")?; ... listener_token fn wait_for_events(&mut self, duration: std::time::Duration) -> anyhow::Result<()> { let mut events = mio::Events::with_capacity(1024); self.poll .poll(&mut events, Some(duration)) .context("Failed to poll events")?; for event in events.iter() { self.handle_io_for_event(event)?; } Ok(()) } fn wait_for_events(&mut self, duration: std::time::Duration) -> anyhow::Result<()> { let mut events = mio::Events::with_capacity(1024); self.poll .poll(&mut events, Some(duration)) .context("Failed to poll events")?; for event in events.iter() { self.handle_io_for_event(event)?; } Ok(()) } fn wait_for_events(&mut self, duration: std::time::Duration) -> anyhow::Result<()> { let mut events = mio::Events::with_capacity(1024); self.poll .poll(&mut events, Some(duration)) .context("Failed to poll events")?; for event in events.iter() { self.handle_io_for_event(event)?; } Ok(()) } event::Source fn handle_io_for_event(&mut self, event: &mio::event::Event) -> anyhow::Result<()> { let token = event.token(); if token == self.listener_token { if !event.is_readable() { tracing::warn!("Listener event is not readable"); return Ok(()); } loop { if self.free_tokens.is_empty() { tracing::warn!("Maximum connections reached, cannot accept new connection"); break; } // Accept on mio::net::TcpListener already sets socket to // non-blocking mode and returns mio::net::TcpStream. let (mut stream, addr) = match self.listener.accept() { Ok((stream, addr)) => (stream, addr), // EAGAIN and EWOULDBLOCK are both mapped to std::io::ErrorKind::WouldBlock Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { // No more connections to accept at the moment break; } Err(ref e) if is_transient_listener_error(e) => { tracing::warn!(error =? e, "transient error accepting connection, retrying"); continue; } Err(e) => { return Err(anyhow::anyhow!("Failed to accept new connection: {:?}", e)); } }; tracing::info!("Accepted new connection from {}", addr); let token = self .free_tokens .pop_front() .expect("we checked above that free_tokens is not empty"); // On Linux Mio by default uses edge-triggered epoll let result = self.poll.registry().register( &mut stream, token, mio::Interest::READABLE.add(mio::Interest::WRITABLE), ); match result { Ok(_) => {} Err(err) => { self.free_tokens.push_back(token); return Err(anyhow::anyhow!( "Failed to register new connection with Poll: {:?}", err )); } } self.connections_by_token .insert(token, Connection::new(stream)); } } else if let Some(connection) = self.connections_by_token.get_mut(&token) { match connection.process_io_event(event) { Ok(_) => {} Err(err) => { tracing::error!( "Error processing IO event for token {:?}. Dropping connection. Error: {:?}", token, err ); self.connections_by_token.remove(&token); self.free_tokens.push_back(token); } } } else { tracing::warn!("Received event for unknown token: {:?}", token); } Ok(()) } fn handle_io_for_event(&mut self, event: &mio::event::Event) -> anyhow::Result<()> { let token = event.token(); if token == self.listener_token { if !event.is_readable() { tracing::warn!("Listener event is not readable"); return Ok(()); } loop { if self.free_tokens.is_empty() { tracing::warn!("Maximum connections reached, cannot accept new connection"); break; } // Accept on mio::net::TcpListener already sets socket to // non-blocking mode and returns mio::net::TcpStream. let (mut stream, addr) = match self.listener.accept() { Ok((stream, addr)) => (stream, addr), // EAGAIN and EWOULDBLOCK are both mapped to std::io::ErrorKind::WouldBlock Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { // No more connections to accept at the moment break; } Err(ref e) if is_transient_listener_error(e) => { tracing::warn!(error =? e, "transient error accepting connection, retrying"); continue; } Err(e) => { return Err(anyhow::anyhow!("Failed to accept new connection: {:?}", e)); } }; tracing::info!("Accepted new connection from {}", addr); let token = self .free_tokens .pop_front() .expect("we checked above that free_tokens is not empty"); // On Linux Mio by default uses edge-triggered epoll let result = self.poll.registry().register( &mut stream, token, mio::Interest::READABLE.add(mio::Interest::WRITABLE), ); match result { Ok(_) => {} Err(err) => { self.free_tokens.push_back(token); return Err(anyhow::anyhow!( "Failed to register new connection with Poll: {:?}", err )); } } self.connections_by_token .insert(token, Connection::new(stream)); } } else if let Some(connection) = self.connections_by_token.get_mut(&token) { match connection.process_io_event(event) { Ok(_) => {} Err(err) => { tracing::error!( "Error processing IO event for token {:?}. Dropping connection. Error: {:?}", token, err ); self.connections_by_token.remove(&token); self.free_tokens.push_back(token); } } } else { tracing::warn!("Received event for unknown token: {:?}", token); } Ok(()) } fn handle_io_for_event(&mut self, event: &mio::event::Event) -> anyhow::Result<()> { let token = event.token(); if token == self.listener_token { if !event.is_readable() { tracing::warn!("Listener event is not readable"); return Ok(()); } loop { if self.free_tokens.is_empty() { tracing::warn!("Maximum connections reached, cannot accept new connection"); break; } // Accept on mio::net::TcpListener already sets socket to // non-blocking mode and returns mio::net::TcpStream. let (mut stream, addr) = match self.listener.accept() { Ok((stream, addr)) => (stream, addr), // EAGAIN and EWOULDBLOCK are both mapped to std::io::ErrorKind::WouldBlock Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { // No more connections to accept at the moment break; } Err(ref e) if is_transient_listener_error(e) => { tracing::warn!(error =? e, "transient error accepting connection, retrying"); continue; } Err(e) => { return Err(anyhow::anyhow!("Failed to accept new connection: {:?}", e)); } }; tracing::info!("Accepted new connection from {}", addr); let token = self .free_tokens .pop_front() .expect("we checked above that free_tokens is not empty"); // On Linux Mio by default uses edge-triggered epoll let result = self.poll.registry().register( &mut stream, token, mio::Interest::READABLE.add(mio::Interest::WRITABLE), ); match result { Ok(_) => {} Err(err) => { self.free_tokens.push_back(token); return Err(anyhow::anyhow!( "Failed to register new connection with Poll: {:?}", err )); } } self.connections_by_token .insert(token, Connection::new(stream)); } } else if let Some(connection) = self.connections_by_token.get_mut(&token) { match connection.process_io_event(event) { Ok(_) => {} Err(err) => { tracing::error!( "Error processing IO event for token {:?}. Dropping connection. Error: {:?}", token, err ); self.connections_by_token.remove(&token); self.free_tokens.push_back(token); } } } else { tracing::warn!("Received event for unknown token: {:?}", token); } Ok(()) } pub struct Connection { stream: mio::net::TcpStream, write_buffer: Vec<u8>, } impl Connection { fn new(stream: mio::net::TcpStream) -> Self { Connection { stream, write_buffer: Vec::new(), } } fn process_io_event(&mut self, event: &mio::event::Event) -> anyhow::Result<()> { let mut try_write = false; if event.is_readable() { let data = read_from_stream(&mut self.stream) .context("failed to read data from tcp stream")?; // dump to logs just for our example tracing::debug!( "Read from connection: {} bytes: {}", data.len(), String::from_utf8_lossy(&data) ); // Since we want to write data back, if we read something we will // try to write it to connection immediately. self.write_buffer.extend_from_slice(&data); try_write = data.len() > 0; } if event.is_writable() || try_write { let written = self.write_pending()?; tracing::debug!("Wrote {} bytes to connection", written); } Ok(()) } fn write_pending(&mut self) -> anyhow::Result<usize> { let n_written = write_to_stream(&mut self.stream, &self.write_buffer) .context("failed to write data to tcp stream")?; self.write_buffer.drain(0..n_written); Ok(n_written) } } pub struct Connection { stream: mio::net::TcpStream, write_buffer: Vec<u8>, } impl Connection { fn new(stream: mio::net::TcpStream) -> Self { Connection { stream, write_buffer: Vec::new(), } } fn process_io_event(&mut self, event: &mio::event::Event) -> anyhow::Result<()> { let mut try_write = false; if event.is_readable() { let data = read_from_stream(&mut self.stream) .context("failed to read data from tcp stream")?; // dump to logs just for our example tracing::debug!( "Read from connection: {} bytes: {}", data.len(), String::from_utf8_lossy(&data) ); // Since we want to write data back, if we read something we will // try to write it to connection immediately. self.write_buffer.extend_from_slice(&data); try_write = data.len() > 0; } if event.is_writable() || try_write { let written = self.write_pending()?; tracing::debug!("Wrote {} bytes to connection", written); } Ok(()) } fn write_pending(&mut self) -> anyhow::Result<usize> { let n_written = write_to_stream(&mut self.stream, &self.write_buffer) .context("failed to write data to tcp stream")?; self.write_buffer.drain(0..n_written); Ok(n_written) } } pub struct Connection { stream: mio::net::TcpStream, write_buffer: Vec<u8>, } impl Connection { fn new(stream: mio::net::TcpStream) -> Self { Connection { stream, write_buffer: Vec::new(), } } fn process_io_event(&mut self, event: &mio::event::Event) -> anyhow::Result<()> { let mut try_write = false; if event.is_readable() { let data = read_from_stream(&mut self.stream) .context("failed to read data from tcp stream")?; // dump to logs just for our example tracing::debug!( "Read from connection: {} bytes: {}", data.len(), String::from_utf8_lossy(&data) ); // Since we want to write data back, if we read something we will // try to write it to connection immediately. self.write_buffer.extend_from_slice(&data); try_write = data.len() > 0; } if event.is_writable() || try_write { let written = self.write_pending()?; tracing::debug!("Wrote {} bytes to connection", written); } Ok(()) } fn write_pending(&mut self) -> anyhow::Result<usize> { let n_written = write_to_stream(&mut self.stream, &self.write_buffer) .context("failed to write data to tcp stream")?; self.write_buffer.drain(0..n_written); Ok(n_written) } } use std::io::{Read, Write}; pub fn read_from_stream<T: Read>(stream: &mut T) -> anyhow::Result<Vec<u8>> { let mut all_bytes = Vec::new(); let mut read_buffer: [u8; 1024] = [0; 1024]; let mut interrupted = false; loop { match stream.read(&mut read_buffer) { Ok(read) => { if read == 0 { return Err(anyhow::anyhow!("Connection closed by peer")); } all_bytes.extend_from_slice(&read_buffer[..read]); } Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { // No more data to read at the moment break; } // Retry once if we hit interrupted error Err(e) if e.kind() == std::io::ErrorKind::Interrupted && !interrupted => { interrupted = true; continue; } Err(e) => { return Err(anyhow::anyhow!("Failed to read from TCP stream: {}", e)); } } } Ok(all_bytes) } pub fn write_to_stream<T: Write>(stream: &mut T, buffer: &[u8]) -> anyhow::Result<usize> { let mut interrupted = false; let mut written = 0; loop { if buffer.is_empty() || written >= buffer.len() { break; } match stream.write(&buffer[written..]) { Ok(n) => { if n == 0 { return Err(anyhow::anyhow!("Connection closed by peer")); } written += n; } Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { // Cannot write more at the moment break; } // Retry once if we hit interrupted error Err(e) if e.kind() == std::io::ErrorKind::Interrupted && !interrupted => { interrupted = true; continue; } Err(e) => { return Err(anyhow::anyhow!("Failed to write to TCP stream: {}", e)); } } } Ok(written) } use std::io::{Read, Write}; pub fn read_from_stream<T: Read>(stream: &mut T) -> anyhow::Result<Vec<u8>> { let mut all_bytes = Vec::new(); let mut read_buffer: [u8; 1024] = [0; 1024]; let mut interrupted = false; loop { match stream.read(&mut read_buffer) { Ok(read) => { if read == 0 { return Err(anyhow::anyhow!("Connection closed by peer")); } all_bytes.extend_from_slice(&read_buffer[..read]); } Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { // No more data to read at the moment break; } // Retry once if we hit interrupted error Err(e) if e.kind() == std::io::ErrorKind::Interrupted && !interrupted => { interrupted = true; continue; } Err(e) => { return Err(anyhow::anyhow!("Failed to read from TCP stream: {}", e)); } } } Ok(all_bytes) } pub fn write_to_stream<T: Write>(stream: &mut T, buffer: &[u8]) -> anyhow::Result<usize> { let mut interrupted = false; let mut written = 0; loop { if buffer.is_empty() || written >= buffer.len() { break; } match stream.write(&buffer[written..]) { Ok(n) => { if n == 0 { return Err(anyhow::anyhow!("Connection closed by peer")); } written += n; } Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { // Cannot write more at the moment break; } // Retry once if we hit interrupted error Err(e) if e.kind() == std::io::ErrorKind::Interrupted && !interrupted => { interrupted = true; continue; } Err(e) => { return Err(anyhow::anyhow!("Failed to write to TCP stream: {}", e)); } } } Ok(written) } use std::io::{Read, Write}; pub fn read_from_stream<T: Read>(stream: &mut T) -> anyhow::Result<Vec<u8>> { let mut all_bytes = Vec::new(); let mut read_buffer: [u8; 1024] = [0; 1024]; let mut interrupted = false; loop { match stream.read(&mut read_buffer) { Ok(read) => { if read == 0 { return Err(anyhow::anyhow!("Connection closed by peer")); } all_bytes.extend_from_slice(&read_buffer[..read]); } Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { // No more data to read at the moment break; } // Retry once if we hit interrupted error Err(e) if e.kind() == std::io::ErrorKind::Interrupted && !interrupted => { interrupted = true; continue; } Err(e) => { return Err(anyhow::anyhow!("Failed to read from TCP stream: {}", e)); } } } Ok(all_bytes) } pub fn write_to_stream<T: Write>(stream: &mut T, buffer: &[u8]) -> anyhow::Result<usize> { let mut interrupted = false; let mut written = 0; loop { if buffer.is_empty() || written >= buffer.len() { break; } match stream.write(&buffer[written..]) { Ok(n) => { if n == 0 { return Err(anyhow::anyhow!("Connection closed by peer")); } written += n; } Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { // Cannot write more at the moment break; } // Retry once if we hit interrupted error Err(e) if e.kind() == std::io::ErrorKind::Interrupted && !interrupted => { interrupted = true; continue; } Err(e) => { return Err(anyhow::anyhow!("Failed to write to TCP stream: {}", e)); } } } Ok(written) } tokio::select fn poll(&mut self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> fn poll(&mut self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> fn poll(&mut self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> wait_for_events pub fn run(&mut self, exit: Arc<AtomicBool>) -> anyhow::Result<()> { tracing::info!( "Echo server starting up, listening on {:?}", self.listener.local_addr()? ); let mut status_timer = std::time::Instant::now(); loop { tracing::debug!("Echo server main loop iteration starting"); if exit.load(std::sync::atomic::Ordering::Relaxed) { tracing::info!("Echo server exiting as requested"); return Ok(()); } if status_timer.elapsed() >= self.status_interval { let num_connections = self.connections_by_token.len(); tracing::info!( num_connections, "Broadcasting server status to all connections" ); let message = format!("Server status: {} active connections\n", num_connections); self.broadcast_message(message.as_bytes())?; status_timer = std::time::Instant::now(); } let time_remaining = self .status_interval .checked_sub(status_timer.elapsed()) .unwrap_or_default(); self.wait_for_events(time_remaining)?; } } pub fn run(&mut self, exit: Arc<AtomicBool>) -> anyhow::Result<()> { tracing::info!( "Echo server starting up, listening on {:?}", self.listener.local_addr()? ); let mut status_timer = std::time::Instant::now(); loop { tracing::debug!("Echo server main loop iteration starting"); if exit.load(std::sync::atomic::Ordering::Relaxed) { tracing::info!("Echo server exiting as requested"); return Ok(()); } if status_timer.elapsed() >= self.status_interval { let num_connections = self.connections_by_token.len(); tracing::info!( num_connections, "Broadcasting server status to all connections" ); let message = format!("Server status: {} active connections\n", num_connections); self.broadcast_message(message.as_bytes())?; status_timer = std::time::Instant::now(); } let time_remaining = self .status_interval .checked_sub(status_timer.elapsed()) .unwrap_or_default(); self.wait_for_events(time_remaining)?; } } pub fn run(&mut self, exit: Arc<AtomicBool>) -> anyhow::Result<()> { tracing::info!( "Echo server starting up, listening on {:?}", self.listener.local_addr()? ); let mut status_timer = std::time::Instant::now(); loop { tracing::debug!("Echo server main loop iteration starting"); if exit.load(std::sync::atomic::Ordering::Relaxed) { tracing::info!("Echo server exiting as requested"); return Ok(()); } if status_timer.elapsed() >= self.status_interval { let num_connections = self.connections_by_token.len(); tracing::info!( num_connections, "Broadcasting server status to all connections" ); let message = format!("Server status: {} active connections\n", num_connections); self.broadcast_message(message.as_bytes())?; status_timer = std::time::Instant::now(); } let time_remaining = self .status_interval .checked_sub(status_timer.elapsed()) .unwrap_or_default(); self.wait_for_events(time_remaining)?; } } fn broadcast_message(&mut self, message: &[u8]) -> anyhow::Result<()> { let mut connections_to_drop = Vec::new(); for (token, connection) in self.connections_by_token.iter_mut() { connection.write_buffer.extend_from_slice(message); match connection.write_pending() { Ok(_) => {} Err(err) => { tracing::error!( "Error writing broadcast message to connection: {:?}. Dropping connection. Error: {:?}", token, err ); connections_to_drop.push(*token); } } } for token in connections_to_drop { self.connections_by_token.remove(&token); self.free_tokens.push_back(token); } Ok(()) } fn broadcast_message(&mut self, message: &[u8]) -> anyhow::Result<()> { let mut connections_to_drop = Vec::new(); for (token, connection) in self.connections_by_token.iter_mut() { connection.write_buffer.extend_from_slice(message); match connection.write_pending() { Ok(_) => {} Err(err) => { tracing::error!( "Error writing broadcast message to connection: {:?}. Dropping connection. Error: {:?}", token, err ); connections_to_drop.push(*token); } } } for token in connections_to_drop { self.connections_by_token.remove(&token); self.free_tokens.push_back(token); } Ok(()) } fn broadcast_message(&mut self, message: &[u8]) -> anyhow::Result<()> { let mut connections_to_drop = Vec::new(); for (token, connection) in self.connections_by_token.iter_mut() { connection.write_buffer.extend_from_slice(message); match connection.write_pending() { Ok(_) => {} Err(err) => { tracing::error!( "Error writing broadcast message to connection: {:?}. Dropping connection. Error: {:?}", token, err ); connections_to_drop.push(*token); } } } for token in connections_to_drop { self.connections_by_token.remove(&token); self.free_tokens.push_back(token); } Ok(()) } RUST_LOG=echo_server_rust=debug cargo r 2026-03-23T05:55:00.083791Z INFO echo_server_rust::echo_server: Echo server starting up, listening on 127.0.0.1:9000 2026-03-23T05:55:00.083820Z DEBUG echo_server_rust::echo_server: Echo server main loop iteration starting RUST_LOG=echo_server_rust=debug cargo r 2026-03-23T05:55:00.083791Z INFO echo_server_rust::echo_server: Echo server starting up, listening on 127.0.0.1:9000 2026-03-23T05:55:00.083820Z DEBUG echo_server_rust::echo_server: Echo server main loop iteration starting RUST_LOG=echo_server_rust=debug cargo r 2026-03-23T05:55:00.083791Z INFO echo_server_rust::echo_server: Echo server starting up, listening on 127.0.0.1:9000 2026-03-23T05:55:00.083820Z DEBUG echo_server_rust::echo_server: Echo server main loop iteration starting nc 127.0.0.1 9000 test test Server status: 1 active connections Server status: 2 active connections nc 127.0.0.1 9000 test test Server status: 1 active connections Server status: 2 active connections nc 127.0.0.1 9000 test test Server status: 1 active connections Server status: 2 active connections 2026-03-23T05:59:00.484727Z DEBUG echo_server_rust::echo_server: Echo server main loop iteration starting 2026-03-23T05:59:00.484814Z INFO echo_server_rust::echo_server: Broadcasting server status to all connections num_connections=2 2026-03-23T05:59:02.280579Z DEBUG echo_server_rust::echo_server: Read from connection: 5 bytes: test 2026-03-23T05:59:02.280732Z DEBUG echo_server_rust::echo_server: Wrote 5 bytes to connection 2026-03-23T05:59:00.484727Z DEBUG echo_server_rust::echo_server: Echo server main loop iteration starting 2026-03-23T05:59:00.484814Z INFO echo_server_rust::echo_server: Broadcasting server status to all connections num_connections=2 2026-03-23T05:59:02.280579Z DEBUG echo_server_rust::echo_server: Read from connection: 5 bytes: test 2026-03-23T05:59:02.280732Z DEBUG echo_server_rust::echo_server: Wrote 5 bytes to connection 2026-03-23T05:59:00.484727Z DEBUG echo_server_rust::echo_server: Echo server main loop iteration starting 2026-03-23T05:59:00.484814Z INFO echo_server_rust::echo_server: Broadcasting server status to all connections num_connections=2 2026-03-23T05:59:02.280579Z DEBUG echo_server_rust::echo_server: Read from connection: 5 bytes: test 2026-03-23T05:59:02.280732Z DEBUG echo_server_rust::echo_server: Wrote 5 bytes to connection send_handshake(connection).await; let ack = wait_for_ack(connection).await; confirm_connection(connection).await; send_handshake(connection).await; let ack = wait_for_ack(connection).await; confirm_connection(connection).await; send_handshake(connection).await; let ack = wait_for_ack(connection).await; confirm_connection(connection).await; enum OutgoingConnectionState { Initiated, SentHandshake, AwaitingAck, Connected, } fn handle_io_event(...) -> ... { ... match connection.state { Initiated => { try_send_handshake(connection) return SentHandshake }, SentHandshake => { try_progress_write(connection) if !has_pending_write(connection) { return AwaitingAck } else { return SentHandshake } }, AwaitingAck => { if let Some(ack) = try_read_handshake_ack(connection) { return Connected } return AwaitingAck } } ... } enum OutgoingConnectionState { Initiated, SentHandshake, AwaitingAck, Connected, } fn handle_io_event(...) -> ... { ... match connection.state { Initiated => { try_send_handshake(connection) return SentHandshake }, SentHandshake => { try_progress_write(connection) if !has_pending_write(connection) { return AwaitingAck } else { return SentHandshake } }, AwaitingAck => { if let Some(ack) = try_read_handshake_ack(connection) { return Connected } return AwaitingAck } } ... } enum OutgoingConnectionState { Initiated, SentHandshake, AwaitingAck, Connected, } fn handle_io_event(...) -> ... { ... match connection.state { Initiated => { try_send_handshake(connection) return SentHandshake }, SentHandshake => { try_progress_write(connection) if !has_pending_write(connection) { return AwaitingAck } else { return SentHandshake } }, AwaitingAck => { if let Some(ack) = try_read_handshake_ack(connection) { return Connected } return AwaitingAck } } ... } Send + Sync + 'static - Keep it simple The foundation needs to be simple, easy to reason about, troubleshoot and understand. Let the complexity arise from the problems that applications on top of it will be solving, not from its fundamental parts. Given that I want it to be single threaded, at least for as long as it is not a performance limitation. - The foundation needs to be simple, easy to reason about, troubleshoot and understand. Let the complexity arise from the problems that applications on top of it will be solving, not from its fundamental parts. Given that I want it to be single threaded, at least for as long as it is not a performance limitation. - Keep it real This application aims to be proof of concept, of something that could be turned into a functional system. For me this implies: No busy waiting - I do not want to burn the CPU when nothing is going on. No added latency - When IO is ready, it should be processed, not wait until a few milliseconds sleep between loop iterations finishes. - This application aims to be proof of concept, of something that could be turned into a functional system. For me this implies: No busy waiting - I do not want to burn the CPU when nothing is going on. No added latency - When IO is ready, it should be processed, not wait until a few milliseconds sleep between loop iterations finishes. - No busy waiting - I do not want to burn the CPU when nothing is going on. - No added latency - When IO is ready, it should be processed, not wait until a few milliseconds sleep between loop iterations finishes. - Not just request trigger It is not a REST API I want to use it for, therefore applications built on top need to have a way of "triggering" some logic not only when a request arrives. To be more specific here, I am thinking of time based triggers, be it intervals or timeouts, there needs to be a way to run some logic based on those, and not just incoming IO. - It is not a REST API I want to use it for, therefore applications built on top need to have a way of "triggering" some logic not only when a request arrives. To be more specific here, I am thinking of time based triggers, be it intervals or timeouts, there needs to be a way to run some logic based on those, and not just incoming IO. - The foundation needs to be simple, easy to reason about, troubleshoot and understand. Let the complexity arise from the problems that applications on top of it will be solving, not from its fundamental parts. Given that I want it to be single threaded, at least for as long as it is not a performance limitation. - This application aims to be proof of concept, of something that could be turned into a functional system. For me this implies: No busy waiting - I do not want to burn the CPU when nothing is going on. No added latency - When IO is ready, it should be processed, not wait until a few milliseconds sleep between loop iterations finishes. - No busy waiting - I do not want to burn the CPU when nothing is going on. - No added latency - When IO is ready, it should be processed, not wait until a few milliseconds sleep between loop iterations finishes. - No busy waiting - I do not want to burn the CPU when nothing is going on. - No added latency - When IO is ready, it should be processed, not wait until a few milliseconds sleep between loop iterations finishes. - It is not a REST API I want to use it for, therefore applications built on top need to have a way of "triggering" some logic not only when a request arrives. To be more specific here, I am thinking of time based triggers, be it intervals or timeouts, there needs to be a way to run some logic based on those, and not just incoming IO. - kqueue - MacOS, BSD - epoll and io_uring - Linux - IOCP - Windows - epoll_create - epoll_create1 - Create a Poll instance (which on Linux uses Epoll) - Register listener and accepted connections as a Source - Wait for events, and handle IO in non-blocking way