let listener = std::net::TcpListener::bind("127.0.0.1:9000")?;
let (mut connection, addr) = listener.accept()?;
loop { let mut buffer = [0u8; 1024]; let bytes_read = connection.read(&mut buffer)?; if bytes_read == 0 { // Connection closed by client break; } connection.write_all(&buffer[..bytes_read])?;
}
let listener = std::net::TcpListener::bind("127.0.0.1:9000")?;
let (mut connection, addr) = listener.accept()?;
loop { let mut buffer = [0u8; 1024]; let bytes_read = connection.read(&mut buffer)?; if bytes_read == 0 { // Connection closed by client break; } connection.write_all(&buffer[..bytes_read])?;
}
let listener = std::net::TcpListener::bind("127.0.0.1:9000")?;
let (mut connection, addr) = listener.accept()?;
loop { let mut buffer = [0u8; 1024]; let bytes_read = connection.read(&mut buffer)?; if bytes_read == 0 { // Connection closed by client break; } connection.write_all(&buffer[..bytes_read])?;
}
connection.read
let listener = std::net::TcpListener::bind("127.0.0.1:9000")?;
loop { let (mut connection, addr) = listener.accept()?; let _handle = std::thread::spawn(move || -> anyhow::Result<()> { loop { let mut buffer = [0u8; 1024]; let bytes_read = connection.read(&mut buffer)?; if bytes_read == 0 { // Connection closed by client break; } connection.write_all(&buffer[..bytes_read])?; } Ok(()) });
}
let listener = std::net::TcpListener::bind("127.0.0.1:9000")?;
loop { let (mut connection, addr) = listener.accept()?; let _handle = std::thread::spawn(move || -> anyhow::Result<()> { loop { let mut buffer = [0u8; 1024]; let bytes_read = connection.read(&mut buffer)?; if bytes_read == 0 { // Connection closed by client break; } connection.write_all(&buffer[..bytes_read])?; } Ok(()) });
}
let listener = std::net::TcpListener::bind("127.0.0.1:9000")?;
loop { let (mut connection, addr) = listener.accept()?; let _handle = std::thread::spawn(move || -> anyhow::Result<()> { loop { let mut buffer = [0u8; 1024]; let bytes_read = connection.read(&mut buffer)?; if bytes_read == 0 { // Connection closed by client break; } connection.write_all(&buffer[..bytes_read])?; } Ok(()) });
}
let mut connections = Vec::new();
let listener = std::net::TcpListener::bind("127.0.0.1:9000")?;
listener.set_nonblocking(true)?;
loop { // accept all pending connections for cnn_res in listener.incoming() { match cnn_res { Ok(connection) => { // set socket non-blocking connection.set_nonblocking(true)?; connections.push(connection); } Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { // No more incoming connections at the moment break; } Err(e) => { return Err(e.into()); } } } // Go through all connections. Read, write, and retain // only not closed connections. let mut new_connections = Vec::new(); for connection in connections.iter_mut() { let mut drop = false; let mut buffer = [0u8; 1024]; loop { match connection.read(&mut buffer) { Ok(bytes_read) => { if bytes_read == 0 { // Connection closed by client drop = true; break; } // If we would like to do it properly, we should // handle would block errors as well connection.write_all(&buffer[..bytes_read])?; } Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { // No data to read at the moment break } Err(e) => { return Err(e.into()); } } } if !drop { new_connections.push(connection.try_clone()?); } } connections = new_connections;
}
let mut connections = Vec::new();
let listener = std::net::TcpListener::bind("127.0.0.1:9000")?;
listener.set_nonblocking(true)?;
loop { // accept all pending connections for cnn_res in listener.incoming() { match cnn_res { Ok(connection) => { // set socket non-blocking connection.set_nonblocking(true)?; connections.push(connection); } Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { // No more incoming connections at the moment break; } Err(e) => { return Err(e.into()); } } } // Go through all connections. Read, write, and retain // only not closed connections. let mut new_connections = Vec::new(); for connection in connections.iter_mut() { let mut drop = false; let mut buffer = [0u8; 1024]; loop { match connection.read(&mut buffer) { Ok(bytes_read) => { if bytes_read == 0 { // Connection closed by client drop = true; break; } // If we would like to do it properly, we should // handle would block errors as well connection.write_all(&buffer[..bytes_read])?; } Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { // No data to read at the moment break } Err(e) => { return Err(e.into()); } } } if !drop { new_connections.push(connection.try_clone()?); } } connections = new_connections;
}
let mut connections = Vec::new();
let listener = std::net::TcpListener::bind("127.0.0.1:9000")?;
listener.set_nonblocking(true)?;
loop { // accept all pending connections for cnn_res in listener.incoming() { match cnn_res { Ok(connection) => { // set socket non-blocking connection.set_nonblocking(true)?; connections.push(connection); } Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { // No more incoming connections at the moment break; } Err(e) => { return Err(e.into()); } } } // Go through all connections. Read, write, and retain // only not closed connections. let mut new_connections = Vec::new(); for connection in connections.iter_mut() { let mut drop = false; let mut buffer = [0u8; 1024]; loop { match connection.read(&mut buffer) { Ok(bytes_read) => { if bytes_read == 0 { // Connection closed by client drop = true; break; } // If we would like to do it properly, we should // handle would block errors as well connection.write_all(&buffer[..bytes_read])?; } Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { // No data to read at the moment break } Err(e) => { return Err(e.into()); } } } if !drop { new_connections.push(connection.try_clone()?); } } connections = new_connections;
}
epoll_create
epoll_create1
EWOULDBLOCK
#define _GNU_SOURCE
#include <netinet/in.h>
#include <stdio.h>
#include <errno.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <unistd.h> int main(void) { int socket_fd = socket(AF_INET, SOCK_STREAM, 0); // create TCP socket if (socket_fd < 0) { perror("socket"); return 1; } // check return value int yes = 1; setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); struct sockaddr_in addr = { // bind address .sin_family = AF_INET, // IPv4 .sin_port = htons(9000), // port 9000 // Convert from host byte order to network byte order .sin_addr = { htonl(INADDR_LOOPBACK) } // localhost }; if (bind(socket_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) { perror("bind"); return 1; } if (listen(socket_fd, SOMAXCONN) < 0) { perror("listen"); return 1; } printf("Listening on 127.0.0.1:9000...\n"); struct sockaddr_in clientaddr; socklen_t clientaddr_size = sizeof(clientaddr); int connection_fd = accept(socket_fd, (struct sockaddr*)&clientaddr, &clientaddr_size); if (connection_fd < 0) { perror("accept"); return 1; } printf("accepted connection from %s:%d\n", inet_ntoa(clientaddr.sin_addr), ntohs(clientaddr.sin_port)); char buf[4096]; for (;;) { ssize_t r = read(connection_fd, buf, sizeof buf); // read some if (r == 0) { // peer closed printf("read 0 bytes, closing fd\n"); close(connection_fd); break; } else if (r < 0) { perror("read"); // read error close(connection_fd); break; } else { // got r bytes printf("read %zd bytes\n", r); ssize_t off = 0; while (off < r) { // echo back, in a blocking way ssize_t w = send(connection_fd, buf + off, (size_t)(r - off), MSG_NOSIGNAL); if (w < 0) { perror("send"); // write error close(connection_fd); break; } off += w; // advance } } } return 0;
}
#define _GNU_SOURCE
#include <netinet/in.h>
#include <stdio.h>
#include <errno.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <unistd.h> int main(void) { int socket_fd = socket(AF_INET, SOCK_STREAM, 0); // create TCP socket if (socket_fd < 0) { perror("socket"); return 1; } // check return value int yes = 1; setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); struct sockaddr_in addr = { // bind address .sin_family = AF_INET, // IPv4 .sin_port = htons(9000), // port 9000 // Convert from host byte order to network byte order .sin_addr = { htonl(INADDR_LOOPBACK) } // localhost }; if (bind(socket_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) { perror("bind"); return 1; } if (listen(socket_fd, SOMAXCONN) < 0) { perror("listen"); return 1; } printf("Listening on 127.0.0.1:9000...\n"); struct sockaddr_in clientaddr; socklen_t clientaddr_size = sizeof(clientaddr); int connection_fd = accept(socket_fd, (struct sockaddr*)&clientaddr, &clientaddr_size); if (connection_fd < 0) { perror("accept"); return 1; } printf("accepted connection from %s:%d\n", inet_ntoa(clientaddr.sin_addr), ntohs(clientaddr.sin_port)); char buf[4096]; for (;;) { ssize_t r = read(connection_fd, buf, sizeof buf); // read some if (r == 0) { // peer closed printf("read 0 bytes, closing fd\n"); close(connection_fd); break; } else if (r < 0) { perror("read"); // read error close(connection_fd); break; } else { // got r bytes printf("read %zd bytes\n", r); ssize_t off = 0; while (off < r) { // echo back, in a blocking way ssize_t w = send(connection_fd, buf + off, (size_t)(r - off), MSG_NOSIGNAL); if (w < 0) { perror("send"); // write error close(connection_fd); break; } off += w; // advance } } } return 0;
}
#define _GNU_SOURCE
#include <netinet/in.h>
#include <stdio.h>
#include <errno.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <unistd.h> int main(void) { int socket_fd = socket(AF_INET, SOCK_STREAM, 0); // create TCP socket if (socket_fd < 0) { perror("socket"); return 1; } // check return value int yes = 1; setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); struct sockaddr_in addr = { // bind address .sin_family = AF_INET, // IPv4 .sin_port = htons(9000), // port 9000 // Convert from host byte order to network byte order .sin_addr = { htonl(INADDR_LOOPBACK) } // localhost }; if (bind(socket_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) { perror("bind"); return 1; } if (listen(socket_fd, SOMAXCONN) < 0) { perror("listen"); return 1; } printf("Listening on 127.0.0.1:9000...\n"); struct sockaddr_in clientaddr; socklen_t clientaddr_size = sizeof(clientaddr); int connection_fd = accept(socket_fd, (struct sockaddr*)&clientaddr, &clientaddr_size); if (connection_fd < 0) { perror("accept"); return 1; } printf("accepted connection from %s:%d\n", inet_ntoa(clientaddr.sin_addr), ntohs(clientaddr.sin_port)); char buf[4096]; for (;;) { ssize_t r = read(connection_fd, buf, sizeof buf); // read some if (r == 0) { // peer closed printf("read 0 bytes, closing fd\n"); close(connection_fd); break; } else if (r < 0) { perror("read"); // read error close(connection_fd); break; } else { // got r bytes printf("read %zd bytes\n", r); ssize_t off = 0; while (off < r) { // echo back, in a blocking way ssize_t w = send(connection_fd, buf + off, (size_t)(r - off), MSG_NOSIGNAL); if (w < 0) { perror("send"); // write error close(connection_fd); break; } off += w; // advance } } } return 0;
}
#include <fcntl.h> static int set_nonblocking(int fd) { // F_GETFL - return (as the function result) the file access mode and // the file status flags; arg is ignored. int fl = fcntl(fd, F_GETFL, 0); if (fl == -1) return -1; // Preserve the current flags and add O_NONBLOCK. return fcntl(fd, F_SETFL, fl | O_NONBLOCK);
} ... int yes = 1; setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); // new line - setting socket_fd as non-blocking if (set_nonblocking(socket_fd) < 0) { perror("fcntl"); return 1; }
...
#include <fcntl.h> static int set_nonblocking(int fd) { // F_GETFL - return (as the function result) the file access mode and // the file status flags; arg is ignored. int fl = fcntl(fd, F_GETFL, 0); if (fl == -1) return -1; // Preserve the current flags and add O_NONBLOCK. return fcntl(fd, F_SETFL, fl | O_NONBLOCK);
} ... int yes = 1; setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); // new line - setting socket_fd as non-blocking if (set_nonblocking(socket_fd) < 0) { perror("fcntl"); return 1; }
...
#include <fcntl.h> static int set_nonblocking(int fd) { // F_GETFL - return (as the function result) the file access mode and // the file status flags; arg is ignored. int fl = fcntl(fd, F_GETFL, 0); if (fl == -1) return -1; // Preserve the current flags and add O_NONBLOCK. return fcntl(fd, F_SETFL, fl | O_NONBLOCK);
} ... int yes = 1; setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); // new line - setting socket_fd as non-blocking if (set_nonblocking(socket_fd) < 0) { perror("fcntl"); return 1; }
...
Listening on 127.0.0.1:9000...
accept4: Resource temporarily unavailable
Listening on 127.0.0.1:9000...
accept4: Resource temporarily unavailable
Listening on 127.0.0.1:9000...
accept4: Resource temporarily unavailable
#include <sys/epoll.h>
...
if (listen(socket_fd, SOMAXCONN) < 0) { perror("listen"); return 1; } // Create epoll instance and return its fd
int ep = epoll_create1(EPOLL_CLOEXEC);
if (ep < 0) { perror("epoll_create1"); return 1; } // watch socket_fd for READ events, and register it with epoll instance
struct epoll_event ev = { .events = EPOLLIN, .data.fd = socket_fd };
if (epoll_ctl(ep, EPOLL_CTL_ADD, socket_fd, &ev) < 0) { perror("epoll_ctl ADD lfd"); return 1; }
...
#include <sys/epoll.h>
...
if (listen(socket_fd, SOMAXCONN) < 0) { perror("listen"); return 1; } // Create epoll instance and return its fd
int ep = epoll_create1(EPOLL_CLOEXEC);
if (ep < 0) { perror("epoll_create1"); return 1; } // watch socket_fd for READ events, and register it with epoll instance
struct epoll_event ev = { .events = EPOLLIN, .data.fd = socket_fd };
if (epoll_ctl(ep, EPOLL_CTL_ADD, socket_fd, &ev) < 0) { perror("epoll_ctl ADD lfd"); return 1; }
...
#include <sys/epoll.h>
...
if (listen(socket_fd, SOMAXCONN) < 0) { perror("listen"); return 1; } // Create epoll instance and return its fd
int ep = epoll_create1(EPOLL_CLOEXEC);
if (ep < 0) { perror("epoll_create1"); return 1; } // watch socket_fd for READ events, and register it with epoll instance
struct epoll_event ev = { .events = EPOLLIN, .data.fd = socket_fd };
if (epoll_ctl(ep, EPOLL_CTL_ADD, socket_fd, &ev) < 0) { perror("epoll_ctl ADD lfd"); return 1; }
...
...
struct epoll_event events[128]; // event buffer
char buf[4096]; // I/O buffer printf("Listening on 127.0.0.1:9000...\n"); for (;;) { // epoll_wait is a system call. It will populate the events array and // return the number of events that were triggered. // We also specify max events to return - calculated based on the size of the events array. int n = epoll_wait(ep, events, (int)(sizeof events / sizeof events[0]), -1); if (n < 0) { if (errno == EINTR) continue; // interrupted -> retry perror("epoll_wait"); break; } printf("epoll_wait wake up: %d events\n", n); for (int i = 0; i < n; i++) { // handle ready fds int fd = events[i].data.fd; // fd associated with the event uint32_t e = events[i].events;// event flags - readable, writable etc if (fd == socket_fd) { // socket_fd is readable, that may mean new connections to accept for (;;) { // accept until EAGAIN or EWOULDBLOCK struct sockaddr_in clientaddr; socklen_t clientaddr_size = sizeof(clientaddr); // now, accept with SOCK_NONBLOCK so that the new fd is already // set to non-blocking, so we can avoid fcntl call int connection_fd = accept4(socket_fd, (struct sockaddr*)&clientaddr, &clientaddr_size, SOCK_NONBLOCK); if (connection_fd < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; // no more connections else perror("accept4"); break; } printf("accepted connection from %s:%d\n", inet_ntoa(clientaddr.sin_addr), ntohs(clientaddr.sin_port)); // We have accepted a new connection. Now we add it to the // epoll instance to monitor it for events. struct epoll_event cev = { // Edge-triggered epoll will notify only once when fd is // ready, it will not keep notifying until we read all data. .events = EPOLLIN | EPOLLET | EPOLLHUP | EPOLLERR, // read, edge-triggered, hup/err .data.fd = connection_fd }; // Add new connection to epoll if (epoll_ctl(ep, EPOLL_CTL_ADD, connection_fd, &cev) < 0) { perror("epoll_ctl ADD connection_fd"); close(connection_fd); } } } else { // handle existing connection if (e & (EPOLLHUP | EPOLLERR)) { // hangup/error printf("epollhup | epollerr, closing fd\n"); // fd is removed from the epoll when the last reference // is closed. We never dup it so it is the only one. close(fd); continue; } if (e & EPOLLIN) { // readable if (handle_connection_io(fd, buf, sizeof buf) < 0) { break; } } } }
}
...
struct epoll_event events[128]; // event buffer
char buf[4096]; // I/O buffer printf("Listening on 127.0.0.1:9000...\n"); for (;;) { // epoll_wait is a system call. It will populate the events array and // return the number of events that were triggered. // We also specify max events to return - calculated based on the size of the events array. int n = epoll_wait(ep, events, (int)(sizeof events / sizeof events[0]), -1); if (n < 0) { if (errno == EINTR) continue; // interrupted -> retry perror("epoll_wait"); break; } printf("epoll_wait wake up: %d events\n", n); for (int i = 0; i < n; i++) { // handle ready fds int fd = events[i].data.fd; // fd associated with the event uint32_t e = events[i].events;// event flags - readable, writable etc if (fd == socket_fd) { // socket_fd is readable, that may mean new connections to accept for (;;) { // accept until EAGAIN or EWOULDBLOCK struct sockaddr_in clientaddr; socklen_t clientaddr_size = sizeof(clientaddr); // now, accept with SOCK_NONBLOCK so that the new fd is already // set to non-blocking, so we can avoid fcntl call int connection_fd = accept4(socket_fd, (struct sockaddr*)&clientaddr, &clientaddr_size, SOCK_NONBLOCK); if (connection_fd < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; // no more connections else perror("accept4"); break; } printf("accepted connection from %s:%d\n", inet_ntoa(clientaddr.sin_addr), ntohs(clientaddr.sin_port)); // We have accepted a new connection. Now we add it to the // epoll instance to monitor it for events. struct epoll_event cev = { // Edge-triggered epoll will notify only once when fd is // ready, it will not keep notifying until we read all data. .events = EPOLLIN | EPOLLET | EPOLLHUP | EPOLLERR, // read, edge-triggered, hup/err .data.fd = connection_fd }; // Add new connection to epoll if (epoll_ctl(ep, EPOLL_CTL_ADD, connection_fd, &cev) < 0) { perror("epoll_ctl ADD connection_fd"); close(connection_fd); } } } else { // handle existing connection if (e & (EPOLLHUP | EPOLLERR)) { // hangup/error printf("epollhup | epollerr, closing fd\n"); // fd is removed from the epoll when the last reference // is closed. We never dup it so it is the only one. close(fd); continue; } if (e & EPOLLIN) { // readable if (handle_connection_io(fd, buf, sizeof buf) < 0) { break; } } } }
}
...
struct epoll_event events[128]; // event buffer
char buf[4096]; // I/O buffer printf("Listening on 127.0.0.1:9000...\n"); for (;;) { // epoll_wait is a system call. It will populate the events array and // return the number of events that were triggered. // We also specify max events to return - calculated based on the size of the events array. int n = epoll_wait(ep, events, (int)(sizeof events / sizeof events[0]), -1); if (n < 0) { if (errno == EINTR) continue; // interrupted -> retry perror("epoll_wait"); break; } printf("epoll_wait wake up: %d events\n", n); for (int i = 0; i < n; i++) { // handle ready fds int fd = events[i].data.fd; // fd associated with the event uint32_t e = events[i].events;// event flags - readable, writable etc if (fd == socket_fd) { // socket_fd is readable, that may mean new connections to accept for (;;) { // accept until EAGAIN or EWOULDBLOCK struct sockaddr_in clientaddr; socklen_t clientaddr_size = sizeof(clientaddr); // now, accept with SOCK_NONBLOCK so that the new fd is already // set to non-blocking, so we can avoid fcntl call int connection_fd = accept4(socket_fd, (struct sockaddr*)&clientaddr, &clientaddr_size, SOCK_NONBLOCK); if (connection_fd < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; // no more connections else perror("accept4"); break; } printf("accepted connection from %s:%d\n", inet_ntoa(clientaddr.sin_addr), ntohs(clientaddr.sin_port)); // We have accepted a new connection. Now we add it to the // epoll instance to monitor it for events. struct epoll_event cev = { // Edge-triggered epoll will notify only once when fd is // ready, it will not keep notifying until we read all data. .events = EPOLLIN | EPOLLET | EPOLLHUP | EPOLLERR, // read, edge-triggered, hup/err .data.fd = connection_fd }; // Add new connection to epoll if (epoll_ctl(ep, EPOLL_CTL_ADD, connection_fd, &cev) < 0) { perror("epoll_ctl ADD connection_fd"); close(connection_fd); } } } else { // handle existing connection if (e & (EPOLLHUP | EPOLLERR)) { // hangup/error printf("epollhup | epollerr, closing fd\n"); // fd is removed from the epoll when the last reference // is closed. We never dup it so it is the only one. close(fd); continue; } if (e & EPOLLIN) { // readable if (handle_connection_io(fd, buf, sizeof buf) < 0) { break; } } } }
}
EWOULDBLOCK
int handle_connection_io(int fd, char *buf, size_t buf_size) { for (;;) { // drain the connection until EAGAIN or EWOULDBLOCK ssize_t r = read(fd, buf, buf_size); // read some if (r == 0) { // peer closed printf("read 0 bytes, closing fd\n"); close(fd); return 0; } else if (r < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; // drained perror("read"); close(fd); return 0; } else { // got r bytes printf("read %zd bytes\n", r); ssize_t off = 0; while (off < r) { // echo back ssize_t w = send(fd, buf + off, (size_t)(r - off), MSG_NOSIGNAL); if (w < 0) { // Write what we can in best effort manner. We would need // EPOLLOUT and some kind of buffer per connection to do it // properly in non-blocking way. if (errno == EAGAIN || errno == EWOULDBLOCK) break; perror("send"); close(fd); return -1; // signal to skip to next_event } off += w; } } } return 0;
}
int handle_connection_io(int fd, char *buf, size_t buf_size) { for (;;) { // drain the connection until EAGAIN or EWOULDBLOCK ssize_t r = read(fd, buf, buf_size); // read some if (r == 0) { // peer closed printf("read 0 bytes, closing fd\n"); close(fd); return 0; } else if (r < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; // drained perror("read"); close(fd); return 0; } else { // got r bytes printf("read %zd bytes\n", r); ssize_t off = 0; while (off < r) { // echo back ssize_t w = send(fd, buf + off, (size_t)(r - off), MSG_NOSIGNAL); if (w < 0) { // Write what we can in best effort manner. We would need // EPOLLOUT and some kind of buffer per connection to do it // properly in non-blocking way. if (errno == EAGAIN || errno == EWOULDBLOCK) break; perror("send"); close(fd); return -1; // signal to skip to next_event } off += w; } } } return 0;
}
int handle_connection_io(int fd, char *buf, size_t buf_size) { for (;;) { // drain the connection until EAGAIN or EWOULDBLOCK ssize_t r = read(fd, buf, buf_size); // read some if (r == 0) { // peer closed printf("read 0 bytes, closing fd\n"); close(fd); return 0; } else if (r < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; // drained perror("read"); close(fd); return 0; } else { // got r bytes printf("read %zd bytes\n", r); ssize_t off = 0; while (off < r) { // echo back ssize_t w = send(fd, buf + off, (size_t)(r - off), MSG_NOSIGNAL); if (w < 0) { // Write what we can in best effort manner. We would need // EPOLLOUT and some kind of buffer per connection to do it // properly in non-blocking way. if (errno == EAGAIN || errno == EWOULDBLOCK) break; perror("send"); close(fd); return -1; // signal to skip to next_event } off += w; } } } return 0;
}
clang main.c
./a.out
clang main.c
./a.out
clang main.c
./a.out
nc 127.0.0.1 9000
nc 127.0.0.1 9000
nc 127.0.0.1 9000
Listening on 127.0.0.1:9000...
epoll_wait wake up: 1 events
accepted connection from 127.0.0.1:50496
epoll_wait wake up: 1 events
read 5 bytes
epoll_wait wake up: 1 events
accepted connection from 127.0.0.1:50870
epoll_wait wake up: 1 events
read 6 bytes
epoll_wait wake up: 1 events
read 6 bytes
Listening on 127.0.0.1:9000...
epoll_wait wake up: 1 events
accepted connection from 127.0.0.1:50496
epoll_wait wake up: 1 events
read 5 bytes
epoll_wait wake up: 1 events
accepted connection from 127.0.0.1:50870
epoll_wait wake up: 1 events
read 6 bytes
epoll_wait wake up: 1 events
read 6 bytes
Listening on 127.0.0.1:9000...
epoll_wait wake up: 1 events
accepted connection from 127.0.0.1:50496
epoll_wait wake up: 1 events
read 5 bytes
epoll_wait wake up: 1 events
accepted connection from 127.0.0.1:50870
epoll_wait wake up: 1 events
read 6 bytes
epoll_wait wake up: 1 events
read 6 bytes
mio::net::TcpListener
Registry::register(...)
...
let listener = std::net::TcpListener::bind(addr) .context(format!("Failed to bind to address {}", addr))?;
listener .set_nonblocking(true) .context("Failed to set listener to non-blocking mode")?; let poll = mio::Poll::new().context("Failed to create Poll instance")?; let mut listener = mio::net::TcpListener::from_std(listener);
let listener_token = mio::Token(0); poll.registry() .register(&mut listener, listener_token, mio::Interest::READABLE) .context("Failed to register listener with Poll")?;
...
...
let listener = std::net::TcpListener::bind(addr) .context(format!("Failed to bind to address {}", addr))?;
listener .set_nonblocking(true) .context("Failed to set listener to non-blocking mode")?; let poll = mio::Poll::new().context("Failed to create Poll instance")?; let mut listener = mio::net::TcpListener::from_std(listener);
let listener_token = mio::Token(0); poll.registry() .register(&mut listener, listener_token, mio::Interest::READABLE) .context("Failed to register listener with Poll")?;
...
...
let listener = std::net::TcpListener::bind(addr) .context(format!("Failed to bind to address {}", addr))?;
listener .set_nonblocking(true) .context("Failed to set listener to non-blocking mode")?; let poll = mio::Poll::new().context("Failed to create Poll instance")?; let mut listener = mio::net::TcpListener::from_std(listener);
let listener_token = mio::Token(0); poll.registry() .register(&mut listener, listener_token, mio::Interest::READABLE) .context("Failed to register listener with Poll")?;
...
listener_token
fn wait_for_events(&mut self, duration: std::time::Duration) -> anyhow::Result<()> { let mut events = mio::Events::with_capacity(1024); self.poll .poll(&mut events, Some(duration)) .context("Failed to poll events")?; for event in events.iter() { self.handle_io_for_event(event)?; } Ok(())
}
fn wait_for_events(&mut self, duration: std::time::Duration) -> anyhow::Result<()> { let mut events = mio::Events::with_capacity(1024); self.poll .poll(&mut events, Some(duration)) .context("Failed to poll events")?; for event in events.iter() { self.handle_io_for_event(event)?; } Ok(())
}
fn wait_for_events(&mut self, duration: std::time::Duration) -> anyhow::Result<()> { let mut events = mio::Events::with_capacity(1024); self.poll .poll(&mut events, Some(duration)) .context("Failed to poll events")?; for event in events.iter() { self.handle_io_for_event(event)?; } Ok(())
}
event::Source
fn handle_io_for_event(&mut self, event: &mio::event::Event) -> anyhow::Result<()> { let token = event.token(); if token == self.listener_token { if !event.is_readable() { tracing::warn!("Listener event is not readable"); return Ok(()); } loop { if self.free_tokens.is_empty() { tracing::warn!("Maximum connections reached, cannot accept new connection"); break; } // Accept on mio::net::TcpListener already sets socket to // non-blocking mode and returns mio::net::TcpStream. let (mut stream, addr) = match self.listener.accept() { Ok((stream, addr)) => (stream, addr), // EAGAIN and EWOULDBLOCK are both mapped to std::io::ErrorKind::WouldBlock Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { // No more connections to accept at the moment break; } Err(ref e) if is_transient_listener_error(e) => { tracing::warn!(error =? e, "transient error accepting connection, retrying"); continue; } Err(e) => { return Err(anyhow::anyhow!("Failed to accept new connection: {:?}", e)); } }; tracing::info!("Accepted new connection from {}", addr); let token = self .free_tokens .pop_front() .expect("we checked above that free_tokens is not empty"); // On Linux Mio by default uses edge-triggered epoll let result = self.poll.registry().register( &mut stream, token, mio::Interest::READABLE.add(mio::Interest::WRITABLE), ); match result { Ok(_) => {} Err(err) => { self.free_tokens.push_back(token); return Err(anyhow::anyhow!( "Failed to register new connection with Poll: {:?}", err )); } } self.connections_by_token .insert(token, Connection::new(stream)); } } else if let Some(connection) = self.connections_by_token.get_mut(&token) { match connection.process_io_event(event) { Ok(_) => {} Err(err) => { tracing::error!( "Error processing IO event for token {:?}. Dropping connection. Error: {:?}", token, err ); self.connections_by_token.remove(&token); self.free_tokens.push_back(token); } } } else { tracing::warn!("Received event for unknown token: {:?}", token); } Ok(())
}
fn handle_io_for_event(&mut self, event: &mio::event::Event) -> anyhow::Result<()> { let token = event.token(); if token == self.listener_token { if !event.is_readable() { tracing::warn!("Listener event is not readable"); return Ok(()); } loop { if self.free_tokens.is_empty() { tracing::warn!("Maximum connections reached, cannot accept new connection"); break; } // Accept on mio::net::TcpListener already sets socket to // non-blocking mode and returns mio::net::TcpStream. let (mut stream, addr) = match self.listener.accept() { Ok((stream, addr)) => (stream, addr), // EAGAIN and EWOULDBLOCK are both mapped to std::io::ErrorKind::WouldBlock Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { // No more connections to accept at the moment break; } Err(ref e) if is_transient_listener_error(e) => { tracing::warn!(error =? e, "transient error accepting connection, retrying"); continue; } Err(e) => { return Err(anyhow::anyhow!("Failed to accept new connection: {:?}", e)); } }; tracing::info!("Accepted new connection from {}", addr); let token = self .free_tokens .pop_front() .expect("we checked above that free_tokens is not empty"); // On Linux Mio by default uses edge-triggered epoll let result = self.poll.registry().register( &mut stream, token, mio::Interest::READABLE.add(mio::Interest::WRITABLE), ); match result { Ok(_) => {} Err(err) => { self.free_tokens.push_back(token); return Err(anyhow::anyhow!( "Failed to register new connection with Poll: {:?}", err )); } } self.connections_by_token .insert(token, Connection::new(stream)); } } else if let Some(connection) = self.connections_by_token.get_mut(&token) { match connection.process_io_event(event) { Ok(_) => {} Err(err) => { tracing::error!( "Error processing IO event for token {:?}. Dropping connection. Error: {:?}", token, err ); self.connections_by_token.remove(&token); self.free_tokens.push_back(token); } } } else { tracing::warn!("Received event for unknown token: {:?}", token); } Ok(())
}
fn handle_io_for_event(&mut self, event: &mio::event::Event) -> anyhow::Result<()> { let token = event.token(); if token == self.listener_token { if !event.is_readable() { tracing::warn!("Listener event is not readable"); return Ok(()); } loop { if self.free_tokens.is_empty() { tracing::warn!("Maximum connections reached, cannot accept new connection"); break; } // Accept on mio::net::TcpListener already sets socket to // non-blocking mode and returns mio::net::TcpStream. let (mut stream, addr) = match self.listener.accept() { Ok((stream, addr)) => (stream, addr), // EAGAIN and EWOULDBLOCK are both mapped to std::io::ErrorKind::WouldBlock Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { // No more connections to accept at the moment break; } Err(ref e) if is_transient_listener_error(e) => { tracing::warn!(error =? e, "transient error accepting connection, retrying"); continue; } Err(e) => { return Err(anyhow::anyhow!("Failed to accept new connection: {:?}", e)); } }; tracing::info!("Accepted new connection from {}", addr); let token = self .free_tokens .pop_front() .expect("we checked above that free_tokens is not empty"); // On Linux Mio by default uses edge-triggered epoll let result = self.poll.registry().register( &mut stream, token, mio::Interest::READABLE.add(mio::Interest::WRITABLE), ); match result { Ok(_) => {} Err(err) => { self.free_tokens.push_back(token); return Err(anyhow::anyhow!( "Failed to register new connection with Poll: {:?}", err )); } } self.connections_by_token .insert(token, Connection::new(stream)); } } else if let Some(connection) = self.connections_by_token.get_mut(&token) { match connection.process_io_event(event) { Ok(_) => {} Err(err) => { tracing::error!( "Error processing IO event for token {:?}. Dropping connection. Error: {:?}", token, err ); self.connections_by_token.remove(&token); self.free_tokens.push_back(token); } } } else { tracing::warn!("Received event for unknown token: {:?}", token); } Ok(())
}
pub struct Connection { stream: mio::net::TcpStream, write_buffer: Vec<u8>,
} impl Connection { fn new(stream: mio::net::TcpStream) -> Self { Connection { stream, write_buffer: Vec::new(), } } fn process_io_event(&mut self, event: &mio::event::Event) -> anyhow::Result<()> { let mut try_write = false; if event.is_readable() { let data = read_from_stream(&mut self.stream) .context("failed to read data from tcp stream")?; // dump to logs just for our example tracing::debug!( "Read from connection: {} bytes: {}", data.len(), String::from_utf8_lossy(&data) ); // Since we want to write data back, if we read something we will // try to write it to connection immediately. self.write_buffer.extend_from_slice(&data); try_write = data.len() > 0; } if event.is_writable() || try_write { let written = self.write_pending()?; tracing::debug!("Wrote {} bytes to connection", written); } Ok(()) } fn write_pending(&mut self) -> anyhow::Result<usize> { let n_written = write_to_stream(&mut self.stream, &self.write_buffer) .context("failed to write data to tcp stream")?; self.write_buffer.drain(0..n_written); Ok(n_written) }
}
pub struct Connection { stream: mio::net::TcpStream, write_buffer: Vec<u8>,
} impl Connection { fn new(stream: mio::net::TcpStream) -> Self { Connection { stream, write_buffer: Vec::new(), } } fn process_io_event(&mut self, event: &mio::event::Event) -> anyhow::Result<()> { let mut try_write = false; if event.is_readable() { let data = read_from_stream(&mut self.stream) .context("failed to read data from tcp stream")?; // dump to logs just for our example tracing::debug!( "Read from connection: {} bytes: {}", data.len(), String::from_utf8_lossy(&data) ); // Since we want to write data back, if we read something we will // try to write it to connection immediately. self.write_buffer.extend_from_slice(&data); try_write = data.len() > 0; } if event.is_writable() || try_write { let written = self.write_pending()?; tracing::debug!("Wrote {} bytes to connection", written); } Ok(()) } fn write_pending(&mut self) -> anyhow::Result<usize> { let n_written = write_to_stream(&mut self.stream, &self.write_buffer) .context("failed to write data to tcp stream")?; self.write_buffer.drain(0..n_written); Ok(n_written) }
}
pub struct Connection { stream: mio::net::TcpStream, write_buffer: Vec<u8>,
} impl Connection { fn new(stream: mio::net::TcpStream) -> Self { Connection { stream, write_buffer: Vec::new(), } } fn process_io_event(&mut self, event: &mio::event::Event) -> anyhow::Result<()> { let mut try_write = false; if event.is_readable() { let data = read_from_stream(&mut self.stream) .context("failed to read data from tcp stream")?; // dump to logs just for our example tracing::debug!( "Read from connection: {} bytes: {}", data.len(), String::from_utf8_lossy(&data) ); // Since we want to write data back, if we read something we will // try to write it to connection immediately. self.write_buffer.extend_from_slice(&data); try_write = data.len() > 0; } if event.is_writable() || try_write { let written = self.write_pending()?; tracing::debug!("Wrote {} bytes to connection", written); } Ok(()) } fn write_pending(&mut self) -> anyhow::Result<usize> { let n_written = write_to_stream(&mut self.stream, &self.write_buffer) .context("failed to write data to tcp stream")?; self.write_buffer.drain(0..n_written); Ok(n_written) }
}
use std::io::{Read, Write}; pub fn read_from_stream<T: Read>(stream: &mut T) -> anyhow::Result<Vec<u8>> { let mut all_bytes = Vec::new(); let mut read_buffer: [u8; 1024] = [0; 1024]; let mut interrupted = false; loop { match stream.read(&mut read_buffer) { Ok(read) => { if read == 0 { return Err(anyhow::anyhow!("Connection closed by peer")); } all_bytes.extend_from_slice(&read_buffer[..read]); } Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { // No more data to read at the moment break; } // Retry once if we hit interrupted error Err(e) if e.kind() == std::io::ErrorKind::Interrupted && !interrupted => { interrupted = true; continue; } Err(e) => { return Err(anyhow::anyhow!("Failed to read from TCP stream: {}", e)); } } } Ok(all_bytes)
} pub fn write_to_stream<T: Write>(stream: &mut T, buffer: &[u8]) -> anyhow::Result<usize> { let mut interrupted = false; let mut written = 0; loop { if buffer.is_empty() || written >= buffer.len() { break; } match stream.write(&buffer[written..]) { Ok(n) => { if n == 0 { return Err(anyhow::anyhow!("Connection closed by peer")); } written += n; } Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { // Cannot write more at the moment break; } // Retry once if we hit interrupted error Err(e) if e.kind() == std::io::ErrorKind::Interrupted && !interrupted => { interrupted = true; continue; } Err(e) => { return Err(anyhow::anyhow!("Failed to write to TCP stream: {}", e)); } } } Ok(written)
}
use std::io::{Read, Write}; pub fn read_from_stream<T: Read>(stream: &mut T) -> anyhow::Result<Vec<u8>> { let mut all_bytes = Vec::new(); let mut read_buffer: [u8; 1024] = [0; 1024]; let mut interrupted = false; loop { match stream.read(&mut read_buffer) { Ok(read) => { if read == 0 { return Err(anyhow::anyhow!("Connection closed by peer")); } all_bytes.extend_from_slice(&read_buffer[..read]); } Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { // No more data to read at the moment break; } // Retry once if we hit interrupted error Err(e) if e.kind() == std::io::ErrorKind::Interrupted && !interrupted => { interrupted = true; continue; } Err(e) => { return Err(anyhow::anyhow!("Failed to read from TCP stream: {}", e)); } } } Ok(all_bytes)
} pub fn write_to_stream<T: Write>(stream: &mut T, buffer: &[u8]) -> anyhow::Result<usize> { let mut interrupted = false; let mut written = 0; loop { if buffer.is_empty() || written >= buffer.len() { break; } match stream.write(&buffer[written..]) { Ok(n) => { if n == 0 { return Err(anyhow::anyhow!("Connection closed by peer")); } written += n; } Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { // Cannot write more at the moment break; } // Retry once if we hit interrupted error Err(e) if e.kind() == std::io::ErrorKind::Interrupted && !interrupted => { interrupted = true; continue; } Err(e) => { return Err(anyhow::anyhow!("Failed to write to TCP stream: {}", e)); } } } Ok(written)
}
use std::io::{Read, Write}; pub fn read_from_stream<T: Read>(stream: &mut T) -> anyhow::Result<Vec<u8>> { let mut all_bytes = Vec::new(); let mut read_buffer: [u8; 1024] = [0; 1024]; let mut interrupted = false; loop { match stream.read(&mut read_buffer) { Ok(read) => { if read == 0 { return Err(anyhow::anyhow!("Connection closed by peer")); } all_bytes.extend_from_slice(&read_buffer[..read]); } Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { // No more data to read at the moment break; } // Retry once if we hit interrupted error Err(e) if e.kind() == std::io::ErrorKind::Interrupted && !interrupted => { interrupted = true; continue; } Err(e) => { return Err(anyhow::anyhow!("Failed to read from TCP stream: {}", e)); } } } Ok(all_bytes)
} pub fn write_to_stream<T: Write>(stream: &mut T, buffer: &[u8]) -> anyhow::Result<usize> { let mut interrupted = false; let mut written = 0; loop { if buffer.is_empty() || written >= buffer.len() { break; } match stream.write(&buffer[written..]) { Ok(n) => { if n == 0 { return Err(anyhow::anyhow!("Connection closed by peer")); } written += n; } Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { // Cannot write more at the moment break; } // Retry once if we hit interrupted error Err(e) if e.kind() == std::io::ErrorKind::Interrupted && !interrupted => { interrupted = true; continue; } Err(e) => { return Err(anyhow::anyhow!("Failed to write to TCP stream: {}", e)); } } } Ok(written)
}
tokio::select
fn poll(&mut self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>
fn poll(&mut self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>
fn poll(&mut self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>
wait_for_events
pub fn run(&mut self, exit: Arc<AtomicBool>) -> anyhow::Result<()> { tracing::info!( "Echo server starting up, listening on {:?}", self.listener.local_addr()? ); let mut status_timer = std::time::Instant::now(); loop { tracing::debug!("Echo server main loop iteration starting"); if exit.load(std::sync::atomic::Ordering::Relaxed) { tracing::info!("Echo server exiting as requested"); return Ok(()); } if status_timer.elapsed() >= self.status_interval { let num_connections = self.connections_by_token.len(); tracing::info!( num_connections, "Broadcasting server status to all connections" ); let message = format!("Server status: {} active connections\n", num_connections); self.broadcast_message(message.as_bytes())?; status_timer = std::time::Instant::now(); } let time_remaining = self .status_interval .checked_sub(status_timer.elapsed()) .unwrap_or_default(); self.wait_for_events(time_remaining)?; }
}
pub fn run(&mut self, exit: Arc<AtomicBool>) -> anyhow::Result<()> { tracing::info!( "Echo server starting up, listening on {:?}", self.listener.local_addr()? ); let mut status_timer = std::time::Instant::now(); loop { tracing::debug!("Echo server main loop iteration starting"); if exit.load(std::sync::atomic::Ordering::Relaxed) { tracing::info!("Echo server exiting as requested"); return Ok(()); } if status_timer.elapsed() >= self.status_interval { let num_connections = self.connections_by_token.len(); tracing::info!( num_connections, "Broadcasting server status to all connections" ); let message = format!("Server status: {} active connections\n", num_connections); self.broadcast_message(message.as_bytes())?; status_timer = std::time::Instant::now(); } let time_remaining = self .status_interval .checked_sub(status_timer.elapsed()) .unwrap_or_default(); self.wait_for_events(time_remaining)?; }
}
pub fn run(&mut self, exit: Arc<AtomicBool>) -> anyhow::Result<()> { tracing::info!( "Echo server starting up, listening on {:?}", self.listener.local_addr()? ); let mut status_timer = std::time::Instant::now(); loop { tracing::debug!("Echo server main loop iteration starting"); if exit.load(std::sync::atomic::Ordering::Relaxed) { tracing::info!("Echo server exiting as requested"); return Ok(()); } if status_timer.elapsed() >= self.status_interval { let num_connections = self.connections_by_token.len(); tracing::info!( num_connections, "Broadcasting server status to all connections" ); let message = format!("Server status: {} active connections\n", num_connections); self.broadcast_message(message.as_bytes())?; status_timer = std::time::Instant::now(); } let time_remaining = self .status_interval .checked_sub(status_timer.elapsed()) .unwrap_or_default(); self.wait_for_events(time_remaining)?; }
}
fn broadcast_message(&mut self, message: &[u8]) -> anyhow::Result<()> { let mut connections_to_drop = Vec::new(); for (token, connection) in self.connections_by_token.iter_mut() { connection.write_buffer.extend_from_slice(message); match connection.write_pending() { Ok(_) => {} Err(err) => { tracing::error!( "Error writing broadcast message to connection: {:?}. Dropping connection. Error: {:?}", token, err ); connections_to_drop.push(*token); } } } for token in connections_to_drop { self.connections_by_token.remove(&token); self.free_tokens.push_back(token); } Ok(())
}
fn broadcast_message(&mut self, message: &[u8]) -> anyhow::Result<()> { let mut connections_to_drop = Vec::new(); for (token, connection) in self.connections_by_token.iter_mut() { connection.write_buffer.extend_from_slice(message); match connection.write_pending() { Ok(_) => {} Err(err) => { tracing::error!( "Error writing broadcast message to connection: {:?}. Dropping connection. Error: {:?}", token, err ); connections_to_drop.push(*token); } } } for token in connections_to_drop { self.connections_by_token.remove(&token); self.free_tokens.push_back(token); } Ok(())
}
fn broadcast_message(&mut self, message: &[u8]) -> anyhow::Result<()> { let mut connections_to_drop = Vec::new(); for (token, connection) in self.connections_by_token.iter_mut() { connection.write_buffer.extend_from_slice(message); match connection.write_pending() { Ok(_) => {} Err(err) => { tracing::error!( "Error writing broadcast message to connection: {:?}. Dropping connection. Error: {:?}", token, err ); connections_to_drop.push(*token); } } } for token in connections_to_drop { self.connections_by_token.remove(&token); self.free_tokens.push_back(token); } Ok(())
}
RUST_LOG=echo_server_rust=debug cargo r
2026-03-23T05:55:00.083791Z INFO echo_server_rust::echo_server: Echo server starting up, listening on 127.0.0.1:9000
2026-03-23T05:55:00.083820Z DEBUG echo_server_rust::echo_server: Echo server main loop iteration starting
RUST_LOG=echo_server_rust=debug cargo r
2026-03-23T05:55:00.083791Z INFO echo_server_rust::echo_server: Echo server starting up, listening on 127.0.0.1:9000
2026-03-23T05:55:00.083820Z DEBUG echo_server_rust::echo_server: Echo server main loop iteration starting
RUST_LOG=echo_server_rust=debug cargo r
2026-03-23T05:55:00.083791Z INFO echo_server_rust::echo_server: Echo server starting up, listening on 127.0.0.1:9000
2026-03-23T05:55:00.083820Z DEBUG echo_server_rust::echo_server: Echo server main loop iteration starting
nc 127.0.0.1 9000
test
test
Server status: 1 active connections
Server status: 2 active connections
nc 127.0.0.1 9000
test
test
Server status: 1 active connections
Server status: 2 active connections
nc 127.0.0.1 9000
test
test
Server status: 1 active connections
Server status: 2 active connections
2026-03-23T05:59:00.484727Z DEBUG echo_server_rust::echo_server: Echo server main loop iteration starting
2026-03-23T05:59:00.484814Z INFO echo_server_rust::echo_server: Broadcasting server status to all connections num_connections=2
2026-03-23T05:59:02.280579Z DEBUG echo_server_rust::echo_server: Read from connection: 5 bytes: test 2026-03-23T05:59:02.280732Z DEBUG echo_server_rust::echo_server: Wrote 5 bytes to connection
2026-03-23T05:59:00.484727Z DEBUG echo_server_rust::echo_server: Echo server main loop iteration starting
2026-03-23T05:59:00.484814Z INFO echo_server_rust::echo_server: Broadcasting server status to all connections num_connections=2
2026-03-23T05:59:02.280579Z DEBUG echo_server_rust::echo_server: Read from connection: 5 bytes: test 2026-03-23T05:59:02.280732Z DEBUG echo_server_rust::echo_server: Wrote 5 bytes to connection
2026-03-23T05:59:00.484727Z DEBUG echo_server_rust::echo_server: Echo server main loop iteration starting
2026-03-23T05:59:00.484814Z INFO echo_server_rust::echo_server: Broadcasting server status to all connections num_connections=2
2026-03-23T05:59:02.280579Z DEBUG echo_server_rust::echo_server: Read from connection: 5 bytes: test 2026-03-23T05:59:02.280732Z DEBUG echo_server_rust::echo_server: Wrote 5 bytes to connection
send_handshake(connection).await;
let ack = wait_for_ack(connection).await;
confirm_connection(connection).await;
send_handshake(connection).await;
let ack = wait_for_ack(connection).await;
confirm_connection(connection).await;
send_handshake(connection).await;
let ack = wait_for_ack(connection).await;
confirm_connection(connection).await;
enum OutgoingConnectionState { Initiated, SentHandshake, AwaitingAck, Connected,
}
fn handle_io_event(...) -> ... {
...
match connection.state { Initiated => { try_send_handshake(connection) return SentHandshake }, SentHandshake => { try_progress_write(connection) if !has_pending_write(connection) { return AwaitingAck } else { return SentHandshake } }, AwaitingAck => { if let Some(ack) = try_read_handshake_ack(connection) { return Connected } return AwaitingAck }
}
...
}
enum OutgoingConnectionState { Initiated, SentHandshake, AwaitingAck, Connected,
}
fn handle_io_event(...) -> ... {
...
match connection.state { Initiated => { try_send_handshake(connection) return SentHandshake }, SentHandshake => { try_progress_write(connection) if !has_pending_write(connection) { return AwaitingAck } else { return SentHandshake } }, AwaitingAck => { if let Some(ack) = try_read_handshake_ack(connection) { return Connected } return AwaitingAck }
}
...
}
enum OutgoingConnectionState { Initiated, SentHandshake, AwaitingAck, Connected,
}
fn handle_io_event(...) -> ... {
...
match connection.state { Initiated => { try_send_handshake(connection) return SentHandshake }, SentHandshake => { try_progress_write(connection) if !has_pending_write(connection) { return AwaitingAck } else { return SentHandshake } }, AwaitingAck => { if let Some(ack) = try_read_handshake_ack(connection) { return Connected } return AwaitingAck }
}
...
}
Send + Sync + 'static - Keep it simple The foundation needs to be simple, easy to reason about,
troubleshoot and understand. Let the complexity arise from the
problems that applications on top of it will be solving, not from
its fundamental parts. Given that I want it to be single threaded,
at least for as long as it is not a performance limitation.
- The foundation needs to be simple, easy to reason about,
troubleshoot and understand. Let the complexity arise from the
problems that applications on top of it will be solving, not from
its fundamental parts. Given that I want it to be single threaded,
at least for as long as it is not a performance limitation.
- Keep it real This application aims to be proof of concept, of something that
could be turned into a functional system. For me this implies: No busy waiting - I do not want to burn the CPU when nothing
is going on.
No added latency - When IO is ready, it should be processed,
not wait until a few milliseconds sleep between loop iterations
finishes.
- This application aims to be proof of concept, of something that
could be turned into a functional system. For me this implies: No busy waiting - I do not want to burn the CPU when nothing
is going on.
No added latency - When IO is ready, it should be processed,
not wait until a few milliseconds sleep between loop iterations
finishes.
- No busy waiting - I do not want to burn the CPU when nothing
is going on.
- No added latency - When IO is ready, it should be processed,
not wait until a few milliseconds sleep between loop iterations
finishes.
- Not just request trigger It is not a REST API I want to use it for, therefore applications
built on top need to have a way of "triggering" some logic not only
when a request arrives. To be more specific here, I am thinking of
time based triggers, be it intervals or timeouts, there needs to be
a way to run some logic based on those, and not just incoming IO.
- It is not a REST API I want to use it for, therefore applications
built on top need to have a way of "triggering" some logic not only
when a request arrives. To be more specific here, I am thinking of
time based triggers, be it intervals or timeouts, there needs to be
a way to run some logic based on those, and not just incoming IO. - The foundation needs to be simple, easy to reason about,
troubleshoot and understand. Let the complexity arise from the
problems that applications on top of it will be solving, not from
its fundamental parts. Given that I want it to be single threaded,
at least for as long as it is not a performance limitation. - This application aims to be proof of concept, of something that
could be turned into a functional system. For me this implies: No busy waiting - I do not want to burn the CPU when nothing
is going on.
No added latency - When IO is ready, it should be processed,
not wait until a few milliseconds sleep between loop iterations
finishes.
- No busy waiting - I do not want to burn the CPU when nothing
is going on.
- No added latency - When IO is ready, it should be processed,
not wait until a few milliseconds sleep between loop iterations
finishes. - No busy waiting - I do not want to burn the CPU when nothing
is going on.
- No added latency - When IO is ready, it should be processed,
not wait until a few milliseconds sleep between loop iterations
finishes. - It is not a REST API I want to use it for, therefore applications
built on top need to have a way of "triggering" some logic not only
when a request arrives. To be more specific here, I am thinking of
time based triggers, be it intervals or timeouts, there needs to be
a way to run some logic based on those, and not just incoming IO. - kqueue - MacOS, BSD
- epoll and io_uring - Linux
- IOCP - Windows - epoll_create
- epoll_create1 - Create a Poll
instance (which on Linux uses Epoll)
- Register listener and accepted connections as a Source
- Wait for events, and handle IO in non-blocking way