// main network loop enum { net_read_size = (1024*1024), netloop_epoll_timeout_ms = 5*1000, netloop_ping_check_rate_limit = 1*1000, ws_ping_interval_ms = 45*1000, ws_pong_timeout_ms = 10*1000, }; static int epoll_fd; // Helper: get current time in milliseconds static int64_t current_time_ms(void) { struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); return ts.tv_sec * 1000 + ts.tv_nsec / 1000000; } // Helper: make socket non-blocking static void set_nonblocking(int fd) { int flags = fcntl(fd, F_GETFL, 0); fcntl(fd, F_SETFL, flags | O_NONBLOCK); } // Helper: find a free client slot static Client *find_free_client(void) { for (int i = 0; i < max_clients; i++) { unless(is_active_client(&clients[i])){ clients[i] = (Client){0}; return &clients[i]; } } return NULL; } // Helper: find client by fd static Client *find_client_by_fd(int fd) { for (int i = 0; i < max_clients; i++) { if (clients[i].fd == fd) return &clients[i]; } return NULL; } static void netloop_close_and_del(int fd){ debug("netloop_close_and_del (fd:%d)\n", fd); close(fd); epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL); } static void netloop_shutdown_websocket(Client *cli, uint16_t close_code, str const reason) { debug("netloop_shutdown_websocket(fd:%d) code: %d, reason: %.*s\n", cli->fd, close_code, (int)reason.len, reason.data); assert(cli->is_websocket); // Send proper WebSocket close frame ws_send_close_frame(cli->fd, close_code, reason); // Then close the connection close(cli->fd); ws_on_close(cli); epoll_ctl(epoll_fd, EPOLL_CTL_DEL, cli->fd, NULL); invalidate_client(cli); } static void netloop_shutdown_http(Client *cli) { debug("netloop_shutdown_http (fd:%d)\n", cli->fd); assert(!cli->is_websocket); close(cli->fd); epoll_ctl(epoll_fd, EPOLL_CTL_DEL, cli->fd, NULL); invalidate_client(cli); } static void netloop_shutdown_any(Client *cli) { if(cli->is_websocket){ netloop_shutdown_websocket(cli, ws_close_code_normal, (str){0}); }else{ netloop_shutdown_http(cli); } } enum {ws_custom_close_code_session_expired = 4001 }; // Check all clients for ping/pong timeouts and expiry static void check_ping_timeouts(int64_t now) { static int64_t last_ping_check; int64_t time_since_last_check = (now - last_ping_check); if(time_since_last_check < netloop_ping_check_rate_limit) return; last_ping_check = now; for(int i = 0; i < max_clients; i++){ Client *cli = &clients[i]; unless(is_active_client(cli)) continue; // Check for session expiry if (cli->expiry_monotonic_ms > 0 && now >= cli->expiry_monotonic_ms) { debug("cli(fd:%d) (all times clock monotonic ms) now:%ld >= expiry:%ld (%d)\n", cli->fd, now, cli->expiry_monotonic_ms, (now >= cli->expiry_monotonic_ms)); log("Session expired for fd=%d (expiry: %ld, now: %ld)\n", cli->fd, cli->expiry_monotonic_ms, now); ws_on_error(cli, "Session expired"); netloop_shutdown_websocket(cli, ws_custom_close_code_session_expired, kstr("Session expired")); continue; } ping_state *ps = &cli->ping_state; switch(ps->state){ default: die("bad ping state: %d\n", ps->state); break; case wait_to_ping: if(now >= ps->target_time){ unless(quieter_logs){ debug("\n"); log("send_ping(%d) @%ld\n", cli->fd, now); } str empty = {0}; if(ws_send_frame(cli->fd, ws_opcode_ping, empty)){ ps->state = wait_for_pong; ps->target_time = now + ws_pong_timeout_ms; // unlikely to overflow before heat death of universe } } break; case wait_for_pong: if(now >= ps->target_time){ debug("\n"); log("Pong timeout for fd=%d [ping@%ld now@%ld ws_pong_timeout_ms=%d]\n", cli->fd, (ps->target_time-ws_pong_timeout_ms), now, ws_pong_timeout_ms); ws_on_error(cli, "Pong timeout"); netloop_shutdown_websocket(cli, ws_close_code_going_away, kstr("Pong timeout")); } break; } } } static void init_new_client(Client *cli, int client_fd){ *cli = (Client){.fd=client_fd}; arena_init(&cli->perm, cli->perm_buf, sizeof(cli->perm_buf)); } // Accept a new connection static void accept_new_connection(int listen_fd) { int client_fd = accept(listen_fd, NULL, NULL); if (client_fd < 0) { perror("accept"); return; } debug("accept_new_connection (fd:%d)\n", client_fd); assert(client_fd>0); // we use fd=0 as invalid in table (taken by stdout in most normal cases) set_nonblocking(client_fd); Client *cli = find_free_client(); if (!cli) { log("No slots available, reject connection\n"); str const msg = kstr("HTTP/1.1 503 Service Unavailable" CRLF CRLF ); (void)write_all(client_fd, msg.data, msg.len); close(client_fd); return; } init_new_client(cli, client_fd); // Add to epoll for future events struct epoll_event ev = {0}; ev.events = EPOLLIN | EPOLLRDHUP; // Level-triggered ev.data.fd = client_fd; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &ev) < 0) { perror("epoll_ctl ADD"); close(client_fd); invalidate_client(cli); return; } } static void ws_shutdown(Client *cli){ ws_send_frame(cli->fd, ws_opcode_close, (str){0}); netloop_shutdown_websocket(cli, ws_close_code_normal, (str){0}); } static void handle_http_request(Client *cli, str rqbuf) { http_dispatch(cli, rqbuf); if (cli->is_websocket) { ws_on_open(cli); }else{ debug("HTTP connection (fd:%d) closed after response (normal case)\n", cli->fd); netloop_shutdown_http(cli); return; } } // Handle data from an existing WebSocket client static void handle_websocket_message(Client *cli, str data, int64_t const now) { str remaining = data; while (remaining.len > 0) { ws_frame_t frame; unless(ws_parse_frame(remaining, &frame)) { ws_on_error(cli, "Failed to parse WebSocket frame"); netloop_shutdown_websocket(cli, ws_close_code_protocol_error, kstr("bad frame")); return; } switch (frame.opcode) { case ws_opcode_pong: unless(quieter_logs) debug("received pong (fd:%d)\n", cli->fd); cli->ping_state.state = wait_to_ping; cli->ping_state.target_time = now + ws_ping_interval_ms; break; case ws_opcode_close: // Client sent close frame - acknowledge and close // Extract close code from payload if present uint16_t client_close_code = ws_close_code_no_status_received; if (frame.payload.len >= 2) { client_close_code = (frame.payload.data[0] << 8) | frame.payload.data[1]; } log("Received close frame from fd=%d, code=%d\n", cli->fd, client_close_code); // Send close frame response (echo their code) ws_send_close_frame(cli->fd, client_close_code, (str){0}); netloop_shutdown_websocket(cli, client_close_code, (str){0}); return; case ws_opcode_ping: debug("received ping, reply with pong (fd:%d)\n", cli->fd); unless(ws_send_frame(cli->fd, ws_opcode_pong, frame.payload)){ ws_on_error(cli, "Failed to reply to ping"); netloop_shutdown_websocket(cli, ws_close_code_protocol_error, (str){0}); return; } break; case ws_opcode_text: case ws_opcode_binary: if (!ws_on_message(cli, frame.payload)) ws_shutdown(cli); break; default: debug("websocket dispatch ignored opcode [%d]\n", frame.opcode); break; } remaining = frame.remaining; } } // Handle data from an existing client (called by epoll loop) static void netloop_read_event(Client *cli) { assert(is_active_client(cli)); uint8_t buf[net_read_size]; int64_t const now = current_time_ms(); cli->last_activity_ms = now; ssize_t n = read(cli->fd, buf, sizeof(buf)); if (n < 0) { warnsys("netloop_read_event (fd:%d) error", cli->fd); if (errno == EAGAIN || errno == EWOULDBLOCK) { return; // No more data to read } perror("read"); ws_on_error(cli, "Read error"); netloop_shutdown_any(cli); return; } if (n == 0) { // Client closed connection netloop_shutdown_any(cli); return; } MAKE_STD_ARENA(scratch); cli->A = &scratch; str data = {.data = buf, .len = n}; if(cli->is_websocket){ handle_websocket_message(cli, data, now); }else{ handle_http_request(cli, data); } cli->A = 0; } // Main event loop static void event_loop(int listen_fd) { epoll_fd = epoll_create1(0); if (epoll_fd < 0) { perror("epoll_create1"); return; } // Add listen socket struct epoll_event listen_ev = {0}; listen_ev.events = EPOLLIN; listen_ev.data.fd = listen_fd; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &listen_ev) < 0) { perror("epoll_ctl ADD listen"); close(epoll_fd); return; } struct epoll_event events[max_clients + 1]; for(;;){ int nfds = epoll_wait(epoll_fd, events, max_clients + 1, netloop_epoll_timeout_ms); unless(quieter_logs) debug("%s", (0==nfds)? "." : "\n"); check_ping_timeouts(current_time_ms()); if (nfds < 0) { warnsys("epoll_wait (fd:%d)", epoll_fd); if (errno == EINTR){ continue; } perror("epoll_wait"); break; } for (int i = 0; i < nfds; i++) { int fd = events[i].data.fd; unless(quieter_logs) debug("epoll received event on (fd:%d)\n", fd); if (fd == listen_fd) { accept_new_connection(listen_fd); } else { Client *cli = find_client_by_fd(fd); if(cli){ netloop_read_event(cli); }else{ warn("epoll received event on (fd:%d) an INACTIVE client\n", fd); netloop_close_and_del(fd); } } } } close(epoll_fd); }