diff --git a/apps/chat-worker/src/durable-objects/ChatRoom.ts b/apps/chat-worker/src/durable-objects/ChatRoom.ts new file mode 100644 index 0000000..8c1982c --- /dev/null +++ b/apps/chat-worker/src/durable-objects/ChatRoom.ts @@ -0,0 +1,155 @@ +import { DurableObject } from "cloudflare:workers"; +import type { Env, ClientMessage, ServerMessage } from "../lib/types"; +import { PostgREST } from "../lib/supabase"; + +export class ChatRoom extends DurableObject { + private postgrest: PostgREST | null = null; + + private getPostgREST(): PostgREST { + if (!this.postgrest) { + this.postgrest = new PostgREST(this.env.SUPABASE_URL, this.env.SUPABASE_SERVICE_ROLE_KEY); + } + return this.postgrest; + } + + async handleWebSocket(request: Request, userId: string, channelId: string): Promise { + const pair = new WebSocketPair(); + const [client, server] = [pair[0], pair[1]]; + + const stored = await this.ctx.storage.get("channelId"); + if (!stored) { + await this.ctx.storage.put("channelId", channelId); + } + + this.ctx.acceptWebSocket(server, [userId]); + + this.broadcast({ + type: "presence.update", + userId, + status: "online", + }, server); + + return new Response(null, { status: 101, webSocket: client }); + } + + async webSocketMessage(ws: WebSocket, raw: string | ArrayBuffer): Promise { + const tags = this.ctx.getTags(ws); + const userId = tags[0]; + if (!userId) { + ws.close(4001, "Missing user identity"); + return; + } + + let msg: ClientMessage; + try { + msg = JSON.parse(typeof raw === "string" ? raw : new TextDecoder().decode(raw)); + } catch { + this.sendTo(ws, { type: "error", code: "PARSE_ERROR", message: "Invalid JSON" }); + return; + } + + switch (msg.type) { + case "message.send": + await this.handleSendMessage(ws, userId, msg.text, msg.clientId); + break; + case "typing.start": + this.broadcast({ type: "typing", userId, isTyping: true }, ws); + break; + case "typing.stop": + this.broadcast({ type: "typing", userId, isTyping: false }, ws); + break; + case "presence.ping": + break; + } + } + + async webSocketClose(ws: WebSocket, code: number, reason: string, wasClean: boolean): Promise { + const tags = this.ctx.getTags(ws); + const userId = tags[0]; + if (userId) { + const remaining = this.ctx.getWebSockets(userId); + if (remaining.length === 0) { + this.broadcast({ type: "presence.update", userId, status: "offline" }); + } + } + } + + async webSocketError(ws: WebSocket, error: unknown): Promise { + console.error("WebSocket error:", error); + ws.close(1011, "Internal error"); + } + + private async handleSendMessage(ws: WebSocket, userId: string, text: string, clientId: string): Promise { + if (!text || text.trim().length === 0) { + this.sendTo(ws, { type: "error", code: "EMPTY_MESSAGE", message: "Message text is required" }); + return; + } + + const id = crypto.randomUUID(); + const createdAt = new Date().toISOString(); + const channelId = await this.getChannelId(); + + const serverMsg: ServerMessage = { + type: "message.new", + id, + userId, + text: text.trim(), + createdAt, + clientId, + }; + + this.broadcast(serverMsg); + this.ctx.waitUntil(this.persistMessage(channelId, id, userId, text.trim(), createdAt)); + } + + private async persistMessage(channelId: string, id: string, userId: string, text: string, createdAt: string): Promise { + const db = this.getPostgREST(); + const maxRetries = 3; + + for (let attempt = 0; attempt < maxRetries; attempt++) { + try { + await db.insert("messages", { + id, + channel_id: channelId, + user_id: userId, + text, + created_at: createdAt, + }); + return; + } catch (error) { + console.error(`Message persist attempt ${attempt + 1} failed:`, error); + if (attempt < maxRetries - 1) { + await new Promise((r) => setTimeout(r, 100 * (attempt + 1))); + } + } + } + console.error(`Failed to persist message ${id} after ${maxRetries} attempts`); + } + + private async getChannelId(): Promise { + const channelId = await this.ctx.storage.get("channelId"); + if (!channelId) throw new Error("channelId not stored in DO"); + return channelId; + } + + private sendTo(ws: WebSocket, msg: ServerMessage): void { + try { + ws.send(JSON.stringify(msg)); + } catch { + // Connection already closed + } + } + + private broadcast(msg: ServerMessage, exclude?: WebSocket): void { + const payload = JSON.stringify(msg); + for (const ws of this.ctx.getWebSockets()) { + if (ws !== exclude) { + try { + ws.send(payload); + } catch { + // Connection already closed + } + } + } + } +}