feat(chat-worker): implement ChatRoom Durable Object with WebSocket hibernation
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
f6a56fdbdd
commit
986b31eff0
1 changed files with 155 additions and 0 deletions
155
apps/chat-worker/src/durable-objects/ChatRoom.ts
Normal file
155
apps/chat-worker/src/durable-objects/ChatRoom.ts
Normal file
|
|
@ -0,0 +1,155 @@
|
|||
import { DurableObject } from "cloudflare:workers";
|
||||
import type { Env, ClientMessage, ServerMessage } from "../lib/types";
|
||||
import { PostgREST } from "../lib/supabase";
|
||||
|
||||
export class ChatRoom extends DurableObject<Env> {
|
||||
private postgrest: PostgREST | null = null;
|
||||
|
||||
private getPostgREST(): PostgREST {
|
||||
if (!this.postgrest) {
|
||||
this.postgrest = new PostgREST(this.env.SUPABASE_URL, this.env.SUPABASE_SERVICE_ROLE_KEY);
|
||||
}
|
||||
return this.postgrest;
|
||||
}
|
||||
|
||||
async handleWebSocket(request: Request, userId: string, channelId: string): Promise<Response> {
|
||||
const pair = new WebSocketPair();
|
||||
const [client, server] = [pair[0], pair[1]];
|
||||
|
||||
const stored = await this.ctx.storage.get<string>("channelId");
|
||||
if (!stored) {
|
||||
await this.ctx.storage.put("channelId", channelId);
|
||||
}
|
||||
|
||||
this.ctx.acceptWebSocket(server, [userId]);
|
||||
|
||||
this.broadcast({
|
||||
type: "presence.update",
|
||||
userId,
|
||||
status: "online",
|
||||
}, server);
|
||||
|
||||
return new Response(null, { status: 101, webSocket: client });
|
||||
}
|
||||
|
||||
async webSocketMessage(ws: WebSocket, raw: string | ArrayBuffer): Promise<void> {
|
||||
const tags = this.ctx.getTags(ws);
|
||||
const userId = tags[0];
|
||||
if (!userId) {
|
||||
ws.close(4001, "Missing user identity");
|
||||
return;
|
||||
}
|
||||
|
||||
let msg: ClientMessage;
|
||||
try {
|
||||
msg = JSON.parse(typeof raw === "string" ? raw : new TextDecoder().decode(raw));
|
||||
} catch {
|
||||
this.sendTo(ws, { type: "error", code: "PARSE_ERROR", message: "Invalid JSON" });
|
||||
return;
|
||||
}
|
||||
|
||||
switch (msg.type) {
|
||||
case "message.send":
|
||||
await this.handleSendMessage(ws, userId, msg.text, msg.clientId);
|
||||
break;
|
||||
case "typing.start":
|
||||
this.broadcast({ type: "typing", userId, isTyping: true }, ws);
|
||||
break;
|
||||
case "typing.stop":
|
||||
this.broadcast({ type: "typing", userId, isTyping: false }, ws);
|
||||
break;
|
||||
case "presence.ping":
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
async webSocketClose(ws: WebSocket, code: number, reason: string, wasClean: boolean): Promise<void> {
|
||||
const tags = this.ctx.getTags(ws);
|
||||
const userId = tags[0];
|
||||
if (userId) {
|
||||
const remaining = this.ctx.getWebSockets(userId);
|
||||
if (remaining.length === 0) {
|
||||
this.broadcast({ type: "presence.update", userId, status: "offline" });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async webSocketError(ws: WebSocket, error: unknown): Promise<void> {
|
||||
console.error("WebSocket error:", error);
|
||||
ws.close(1011, "Internal error");
|
||||
}
|
||||
|
||||
private async handleSendMessage(ws: WebSocket, userId: string, text: string, clientId: string): Promise<void> {
|
||||
if (!text || text.trim().length === 0) {
|
||||
this.sendTo(ws, { type: "error", code: "EMPTY_MESSAGE", message: "Message text is required" });
|
||||
return;
|
||||
}
|
||||
|
||||
const id = crypto.randomUUID();
|
||||
const createdAt = new Date().toISOString();
|
||||
const channelId = await this.getChannelId();
|
||||
|
||||
const serverMsg: ServerMessage = {
|
||||
type: "message.new",
|
||||
id,
|
||||
userId,
|
||||
text: text.trim(),
|
||||
createdAt,
|
||||
clientId,
|
||||
};
|
||||
|
||||
this.broadcast(serverMsg);
|
||||
this.ctx.waitUntil(this.persistMessage(channelId, id, userId, text.trim(), createdAt));
|
||||
}
|
||||
|
||||
private async persistMessage(channelId: string, id: string, userId: string, text: string, createdAt: string): Promise<void> {
|
||||
const db = this.getPostgREST();
|
||||
const maxRetries = 3;
|
||||
|
||||
for (let attempt = 0; attempt < maxRetries; attempt++) {
|
||||
try {
|
||||
await db.insert("messages", {
|
||||
id,
|
||||
channel_id: channelId,
|
||||
user_id: userId,
|
||||
text,
|
||||
created_at: createdAt,
|
||||
});
|
||||
return;
|
||||
} catch (error) {
|
||||
console.error(`Message persist attempt ${attempt + 1} failed:`, error);
|
||||
if (attempt < maxRetries - 1) {
|
||||
await new Promise((r) => setTimeout(r, 100 * (attempt + 1)));
|
||||
}
|
||||
}
|
||||
}
|
||||
console.error(`Failed to persist message ${id} after ${maxRetries} attempts`);
|
||||
}
|
||||
|
||||
private async getChannelId(): Promise<string> {
|
||||
const channelId = await this.ctx.storage.get<string>("channelId");
|
||||
if (!channelId) throw new Error("channelId not stored in DO");
|
||||
return channelId;
|
||||
}
|
||||
|
||||
private sendTo(ws: WebSocket, msg: ServerMessage): void {
|
||||
try {
|
||||
ws.send(JSON.stringify(msg));
|
||||
} catch {
|
||||
// Connection already closed
|
||||
}
|
||||
}
|
||||
|
||||
private broadcast(msg: ServerMessage, exclude?: WebSocket): void {
|
||||
const payload = JSON.stringify(msg);
|
||||
for (const ws of this.ctx.getWebSockets()) {
|
||||
if (ws !== exclude) {
|
||||
try {
|
||||
ws.send(payload);
|
||||
} catch {
|
||||
// Connection already closed
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue