xtablo-source/apps/chat-worker/src/durable-objects/ChatRoom.ts
Arthur Belleville 513aa0a316 fix: use fetch() instead of RPC for DO WebSocket upgrades
DO RPC doesn't support WebSocket upgrade requests. Forward the request
via stub.fetch() and pass userId/channelId via custom headers.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 16:58:31 +02:00

166 lines
4.9 KiB
TypeScript

import { DurableObject } from "cloudflare:workers";
import type { Env, ClientMessage, ServerMessage } from "../lib/types";
import { PostgREST } from "../lib/supabase";
export class ChatRoom extends DurableObject<Env> {
private postgrest: PostgREST | null = null;
private getPostgREST(): PostgREST {
if (!this.postgrest) {
this.postgrest = new PostgREST(this.env.SUPABASE_URL, this.env.SUPABASE_SERVICE_ROLE_KEY);
}
return this.postgrest;
}
/**
* Handle incoming fetch requests — WebSocket upgrades are forwarded here by the Worker.
* userId and channelId are passed via custom headers set by the Worker.
*/
async fetch(request: Request): Promise<Response> {
const userId = request.headers.get("X-User-Id");
const channelId = request.headers.get("X-Channel-Id");
if (!userId || !channelId) {
return new Response("Missing user or channel identity", { status: 400 });
}
const pair = new WebSocketPair();
const [client, server] = [pair[0], pair[1]];
const stored = await this.ctx.storage.get<string>("channelId");
if (!stored) {
await this.ctx.storage.put("channelId", channelId);
}
this.ctx.acceptWebSocket(server, [userId]);
this.broadcast({
type: "presence.update",
userId,
status: "online",
}, server);
return new Response(null, { status: 101, webSocket: client });
}
async webSocketMessage(ws: WebSocket, raw: string | ArrayBuffer): Promise<void> {
const tags = this.ctx.getTags(ws);
const userId = tags[0];
if (!userId) {
ws.close(4001, "Missing user identity");
return;
}
let msg: ClientMessage;
try {
msg = JSON.parse(typeof raw === "string" ? raw : new TextDecoder().decode(raw));
} catch {
this.sendTo(ws, { type: "error", code: "PARSE_ERROR", message: "Invalid JSON" });
return;
}
switch (msg.type) {
case "message.send":
await this.handleSendMessage(ws, userId, msg.text, msg.clientId);
break;
case "typing.start":
this.broadcast({ type: "typing", userId, isTyping: true }, ws);
break;
case "typing.stop":
this.broadcast({ type: "typing", userId, isTyping: false }, ws);
break;
case "presence.ping":
break;
}
}
async webSocketClose(ws: WebSocket, code: number, reason: string, wasClean: boolean): Promise<void> {
const tags = this.ctx.getTags(ws);
const userId = tags[0];
if (userId) {
const remaining = this.ctx.getWebSockets(userId);
if (remaining.length === 0) {
this.broadcast({ type: "presence.update", userId, status: "offline" });
}
}
}
async webSocketError(ws: WebSocket, error: unknown): Promise<void> {
console.error("WebSocket error:", error);
ws.close(1011, "Internal error");
}
private async handleSendMessage(ws: WebSocket, userId: string, text: string, clientId: string): Promise<void> {
if (!text || text.trim().length === 0) {
this.sendTo(ws, { type: "error", code: "EMPTY_MESSAGE", message: "Message text is required" });
return;
}
const id = crypto.randomUUID();
const createdAt = new Date().toISOString();
const channelId = await this.getChannelId();
const serverMsg: ServerMessage = {
type: "message.new",
id,
userId,
text: text.trim(),
createdAt,
clientId,
};
this.broadcast(serverMsg);
this.ctx.waitUntil(this.persistMessage(channelId, id, userId, text.trim(), createdAt));
}
private async persistMessage(channelId: string, id: string, userId: string, text: string, createdAt: string): Promise<void> {
const db = this.getPostgREST();
const maxRetries = 3;
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
await db.insert("messages", {
id,
channel_id: channelId,
user_id: userId,
text,
created_at: createdAt,
});
return;
} catch (error) {
console.error(`Message persist attempt ${attempt + 1} failed:`, error);
if (attempt < maxRetries - 1) {
await new Promise((r) => setTimeout(r, 100 * (attempt + 1)));
}
}
}
console.error(`Failed to persist message ${id} after ${maxRetries} attempts`);
}
private async getChannelId(): Promise<string> {
const channelId = await this.ctx.storage.get<string>("channelId");
if (!channelId) throw new Error("channelId not stored in DO");
return channelId;
}
private sendTo(ws: WebSocket, msg: ServerMessage): void {
try {
ws.send(JSON.stringify(msg));
} catch {
// Connection already closed
}
}
private broadcast(msg: ServerMessage, exclude?: WebSocket): void {
const payload = JSON.stringify(msg);
for (const ws of this.ctx.getWebSockets()) {
if (ws !== exclude) {
try {
ws.send(payload);
} catch {
// Connection already closed
}
}
}
}
}