// websocket dispatch // @date 2026-04-19 11:42:16Z enum{ws_b64_len_of_sha1=28}; static str compute_websocket_accept(arena *A, str const client_key){ // Concatenate with magic GUID char combined[256]; str const magic = kstr("258EAFA5-E914-47DA-95CA-C5AB0DC85B11"); snprintf(combined, sizeof(combined), "%.*s%.*s", pstr(client_key), pstr(magic)); // SHA-1 hash (20 bytes) unsigned char sha[SHA_DIGEST_LENGTH]; SHA1((unsigned char*)combined, strlen(combined), sha); // Base64 encode (28 chars) byte *output = new(A, byte, ws_b64_len_of_sha1); if (!output) return (str){0}; base64_encode_std(output, ws_b64_len_of_sha1, sha, SHA_DIGEST_LENGTH); return str_from_buf(output, ws_b64_len_of_sha1); } static void ws_on_close(Client *cli){ log("ws_on_close called (fd:%d)\n", cli->fd); } static void ws_on_error(Client *cli, const char *error){ log("ws_on_error called (fd:%d). error: %s\n", cli->fd, error); } static void ws_on_open(Client *cli){ log("ws_on_open [%p][%d]\n", cli, cli->fd); str reply = kstr("#(:type 'connected')"); unless(ws_send_frame(cli->fd, ws_opcode_binary, reply)){ warnsys("ws_send_frame len:%ld", reply.len); } debug("ws_on_open exits\n"); } // Broadcast a message to all clients in a room static void ws_broadcast_room(str const room_id, str const message) { debug("broadcast[channel: %.*s]: %.*s\n", pstr(room_id), pstr(message)); for (int i = 0; i < max_clients; i++) { Client *cli = &clients[i]; unless(is_active_client(cli)) continue; if (str_eq(cli->room_id, room_id)) { debug("broadcast message (len:%ld) to [fd:%d]\n", message.len, cli->fd); if(!ws_send_frame(cli->fd, ws_opcode_binary, message)){ log("WARN: ws_broadcast_room [%.*s] failed to send to cli (fd:%d)\n", pstr(room_id), cli->fd); } } } } // handle websocket request type=send_message (user sent a message, store and broadcast it) static void ws_on_send_message(Client *cli, str msg) { bl_result room = bl_find_map_string(msg, kstr("room")); bl_result text = bl_find_map_string(msg, kstr("text")); if (room.ok && text.ok) { room.s = str_trim(room.s); text.s = str_trim(text.s); if (room.s.len == 0 || text.s.len == 0) return; str attachments = bl_find_map_key(msg, kstr("attachments")); if(bl_fail(msg, attachments)) attachments = (str){0}; // Get current timestamp struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); enum{nanoseconds_per_millisecond=1000000}; uint64_t now_ms = (uint64_t)ts.tv_sec * 1000 + ts.tv_nsec / nanoseconds_per_millisecond; str const msg_id = generate_ulid(cli->A); if (0 == msg_id.len){ debug("generate ulid failed message not stored in db\n"); return; } if (!db_is_user_member_of_room(cli->user_id, room.s)) { log("WARN: User not authorized to post in room %.*s\n", pstr(room.s)); return; } // Insert into database if(!db_insert_message(room.s, cli->user_id, text.s, attachments, msg_id, now_ms)){ // Send error back to client? debug("db_insert_message failed [%s]\n", db_errmsg()); return; } // change type of incoming message to new_message, add msg_id, and broadcast str const prefix = kstr("#(:type 'send_message' "); unless(str_starts_with(msg, prefix)){ debug("expected [%.*s] at start of message\n", pstr(prefix)); return; } str const suffix = str_drop(msg, prefix.len); str const broadcast = str_printf(cli->A, "#(:type 'new_message' :id '%.*s' :sender_id '%.*s' :sender_name #%u'%.*s' %.*s\n" , pstr(msg_id), pstr(cli->user_id), cli->username.len, pstr(cli->username), pstr(suffix)); ws_broadcast_room(room.s, broadcast); } } // handle websocket request type=edit_message (user edits their own message) static void ws_on_edit_message(Client *cli, str msg) { bl_result msg_id = bl_find_map_string(msg, kstr("id")); bl_result room_id = bl_find_map_string(msg, kstr("room_id")); bl_result new_text = bl_find_map_string(msg, kstr("text")); if (!msg_id.ok || !room_id.ok || !new_text.ok) { debug("ws_on_edit_message: missing required fields\n"); return; } msg_id.s = str_trim(msg_id.s); room_id.s = str_trim(room_id.s); new_text.s = str_trim(new_text.s); if (msg_id.s.len == 0 || room_id.s.len == 0 || new_text.s.len == 0) { debug("ws_on_edit_message: empty fields after trim\n"); return; } // Check if user is member of the room if (!db_is_user_member_of_room(cli->user_id, room_id.s)) { log("WARN: User(fd:%d) [%.*s] not authorized to edit message in room %.*s\n", cli->fd, pstr(cli->username), pstr(room_id.s)); return; } // Get current message and verify ownership str current_text; str current_sender_id; uint64_t current_created_at; unless(db_get_message_by_id(cli->A, msg_id.s, ¤t_text, ¤t_sender_id, ¤t_created_at)) { debug("ws_on_edit_message: message not found: %.*s\n", pstr(msg_id.s)); return; } // Verify user owns the message if (!str_eq(current_sender_id, cli->user_id)) { log("WARN: User(fd:%d) [%.*s] tried to edit someone else's message %.*s\n", cli->fd, pstr(cli->username), pstr(msg_id.s)); return; } // Don't update if text hasn't changed if (str_eq(current_text, new_text.s)) { debug("ws_on_edit_message: text unchanged, skipping update\n"); return; } // Update message in database uint64_t edited_at_ms = 0; struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); enum{nanoseconds_per_millisecond=1000000}; edited_at_ms = (uint64_t)ts.tv_sec * 1000 + ts.tv_nsec / nanoseconds_per_millisecond; if (!db_update_message_text(msg_id.s, new_text.s, edited_at_ms)) { debug("ws_on_edit_message: db_update failed for message %.*s\n", pstr(msg_id.s)); return; } // Broadcast the update to all clients in the room str const broadcast = str_printf(cli->A, "#(:type 'message.updated' :id '%.*s' :room '%.*s' :text #%d'%.*s' :edited_at #d%lu)\n", pstr(msg_id.s), pstr(room_id.s), (int)new_text.s.len, pstr(new_text.s), edited_at_ms); ws_broadcast_room(room_id.s, broadcast); } // handle websocket request type=delete_message (user deletes their own message) static void ws_on_delete_message(Client *cli, str msg) { bl_result msg_id = bl_find_map_string(msg, kstr("id")); bl_result room_id = bl_find_map_string(msg, kstr("room_id")); if (!msg_id.ok || !room_id.ok) { debug("ws_on_delete_message: missing required fields\n"); return; } msg_id.s = str_trim(msg_id.s); room_id.s = str_trim(room_id.s); if (msg_id.s.len == 0 || room_id.s.len == 0) { debug("ws_on_delete_message: empty fields after trim\n"); return; } // Check if user is member of the room if (!db_is_user_member_of_room(cli->user_id, room_id.s)) { log("WARN: User(fd:%d) [%.*s] not authorized to delete message in room %.*s\n", cli->fd, pstr(cli->username), pstr(room_id.s)); return; } // Get current message and verify ownership str current_text; str current_sender_id; uint64_t current_created_at; unless(db_get_message_by_id(cli->A, msg_id.s, ¤t_text, ¤t_sender_id, ¤t_created_at)) { debug("ws_on_delete_message: message not found: %.*s\n", pstr(msg_id.s)); return; } // Verify user owns the message if (!str_eq(current_sender_id, cli->user_id)) { log("WARN: User(fd:%d) [%.*s] tried to delete someone else's message %.*s\n", cli->fd, pstr(cli->username), pstr(msg_id.s)); return; } // Delete message from database (cascade will handle attachments and deliveries) if (!db_delete_message(msg_id.s)) { debug("ws_on_delete_message: db_delete failed for message %.*s\n", pstr(msg_id.s)); return; } // Broadcast the deletion to all clients in the room str const broadcast = str_printf(cli->A, "#(:type 'message.deleted' :id '%.*s' :room '%.*s')\n", pstr(msg_id.s), pstr(room_id.s)); ws_broadcast_room(room_id.s, broadcast); } static bool update_current_room_id_(Client *cli, str room_id){ debug("update_current_room_id_(fd:%d) [%.*s] => [%.*s]\n", cli->fd, pstr(cli->room_id), pstr(room_id)); if ((size_t)room_id.len > sizeof(cli->room_id_buf)) { warn("room_id too long: %zu bytes\n", room_id.len); return false; } memcpy(cli->room_id_buf, room_id.data, room_id.len); cli->room_id = str_from_buf(cli->room_id_buf, room_id.len); return true; } static void ws_broadcast_event_(Client *cli, str room_id, str const event_type) { str broadcast = str_printf(cli->A, "#(:type '%.*s' :room_id '%.*s' :user_id '%.*s' :username '%.*s')\n", pstr(event_type), pstr(room_id), pstr(cli->user_id), pstr(cli->username)); ws_broadcast_room(room_id, broadcast); } // Handle incoming WebSocket message from client (replace the placeholder) // return false to close the socket static bool ws_on_message(Client *cli, str const msg) { debug("ws_on_message called (fd:%d) msg: [%.*s]\n", cli->fd, pstr(msg)); bl_result type = bl_find_map_string(msg, kstr("type")); unless(type.ok){ warn("ws_on_message(fd:%d): missing :type key\n", cli->fd); return false; } if (str_eq(type.s, kstr("send_message"))) { ws_on_send_message(cli, msg); } else if (str_eq(type.s, kstr("edit_message"))) { ws_on_edit_message(cli, msg); } else if (str_eq(type.s, kstr("delete_message"))) { ws_on_delete_message(cli, msg); } else if (str_eq(type.s, kstr("room.enter"))) { bl_result room_id = bl_find_map_string(msg, kstr("room_id")); room_id.s = str_trim(room_id.s); if (room_id.ok && room_id.s.len) { if (db_is_user_member_of_room(cli->user_id, room_id.s)) { if(cli->room_id.len){ ws_broadcast_event_(cli, cli->room_id, kstr("room.exited")); } if(update_current_room_id_(cli, room_id.s)){ // TODO we have already sent room.exited before check this, the reason is cos this overwrites cli->room_id ws_broadcast_event_(cli, room_id.s, kstr("room.entered")); } }else{ log("WARN: User(fd:%d) [%.*s] not authorized to post in room %.*s\n", cli->fd, pstr(cli->username), pstr(room_id.s)); return false; } } } else if (str_eq(type.s, kstr("typing.start"))) { bl_result room_id = bl_find_map_string(msg, kstr("room_id")); room_id.s = str_trim(room_id.s); if (room_id.ok && room_id.s.len) { ws_broadcast_event_(cli, room_id.s, kstr("typing.start")); } } else if (str_eq(type.s, kstr("typing.stop"))) { bl_result room_id = bl_find_map_string(msg, kstr("room_id")); room_id.s = str_trim(room_id.s); if (room_id.ok && room_id.s.len) { ws_broadcast_event_(cli, room_id.s, kstr("typing.stop")); } }else{ warn("ws_on_message(fd:%d): unknown :type key [%.*s], ignoring.\n", cli->fd, pstr(type.s)); } return true; // keep alive }