diff --git a/apps/chat-worker/src/index.ts b/apps/chat-worker/src/index.ts new file mode 100644 index 0000000..ec03c56 --- /dev/null +++ b/apps/chat-worker/src/index.ts @@ -0,0 +1,158 @@ +import { Hono } from "hono"; +import { cors } from "hono/cors"; +import { ChatRoom } from "./durable-objects/ChatRoom"; +import { extractToken, verifyJwt } from "./lib/auth"; +import { PostgREST } from "./lib/supabase"; +import type { Env, ChatMessage, UnreadCount } from "./lib/types"; + +// Re-export DO class for wrangler +export { ChatRoom }; + +const app = new Hono<{ Bindings: Env }>(); + +// CORS — allow the main app origins +app.use("*", cors({ + origin: [ + "http://localhost:5173", + "https://app.xtablo.com", + "https://app-staging.xtablo.com", + ], + allowHeaders: ["Authorization", "Content-Type"], + allowMethods: ["GET", "POST", "OPTIONS"], +})); + +// Auth middleware — extract and verify JWT for all routes +// For WebSocket upgrades, the token comes via query param (?token=...) since browsers +// cannot send custom headers on WebSocket connections. +// For REST requests, the token comes via the Authorization header. +app.use("*", async (c, next) => { + const isWebSocket = c.req.header("Upgrade") === "websocket"; + const token = isWebSocket + ? new URL(c.req.url).searchParams.get("token") + : extractToken(c.req.header("Authorization")); + + if (!token) { + return c.json({ error: "Missing authorization" }, 401); + } + try { + const auth = await verifyJwt(token, c.env.JWT_SECRET); + c.set("userId" as never, auth.userId); + } catch (error) { + return c.json({ error: "Invalid token" }, 401); + } + await next(); +}); + +// Helper: check tablo membership via PostgREST +async function checkMembership(db: PostgREST, channelId: string, userId: string): Promise { + const rows = await db.select<{ user_id: string }>( + "tablo_access", + `tablo_id=eq.${channelId}&user_id=eq.${userId}&is_active=eq.true&select=user_id&limit=1` + ); + return rows.length > 0; +} + +// WebSocket upgrade — route to Durable Object +app.get("/chat/ws/:channelId", async (c) => { + const upgradeHeader = c.req.header("Upgrade"); + if (upgradeHeader !== "websocket") { + return c.json({ error: "Expected WebSocket upgrade" }, 426); + } + + const channelId = c.req.param("channelId"); + const userId = c.get("userId" as never) as string; + const db = new PostgREST(c.env.SUPABASE_URL, c.env.SUPABASE_SERVICE_ROLE_KEY); + + const isMember = await checkMembership(db, channelId, userId); + if (!isMember) { + return c.json({ error: "Not a member of this channel" }, 403); + } + + const id = c.env.CHAT_ROOM.idFromName(channelId); + const stub = c.env.CHAT_ROOM.get(id); + return (stub as any).handleWebSocket(c.req.raw, userId, channelId); +}); + +// GET message history — paginated +app.get("/chat/channels/:channelId/messages", async (c) => { + const channelId = c.req.param("channelId"); + const userId = c.get("userId" as never) as string; + const db = new PostgREST(c.env.SUPABASE_URL, c.env.SUPABASE_SERVICE_ROLE_KEY); + + const isMember = await checkMembership(db, channelId, userId); + if (!isMember) { + return c.json({ error: "Not a member of this channel" }, 403); + } + + const before = c.req.query("before"); + const limit = Math.min(parseInt(c.req.query("limit") || "50", 10), 100); + + let query = `channel_id=eq.${channelId}&deleted_at=is.null&select=id,channel_id,user_id,text,created_at&order=created_at.desc&limit=${limit}`; + if (before) { + query += `&created_at=lt.${before}`; + } + + const messages = await db.select( + "messages", + query + ); + + return c.json({ messages: messages.reverse(), hasMore: messages.length === limit }); +}); + +// POST mark channel as read +app.post("/chat/channels/:channelId/read", async (c) => { + const channelId = c.req.param("channelId"); + const userId = c.get("userId" as never) as string; + const db = new PostgREST(c.env.SUPABASE_URL, c.env.SUPABASE_SERVICE_ROLE_KEY); + + await db.upsert("channel_read_state", { + user_id: userId, + channel_id: channelId, + last_read_at: new Date().toISOString(), + }, "user_id,channel_id"); + + return c.json({ ok: true }); +}); + +// GET unread counts for current user across all channels +app.get("/chat/unread", async (c) => { + const userId = c.get("userId" as never) as string; + const db = new PostgREST(c.env.SUPABASE_URL, c.env.SUPABASE_SERVICE_ROLE_KEY); + + // Get all tablos the user has access to + const accessRows = await db.select<{ tablo_id: string }>( + "tablo_access", + `user_id=eq.${userId}&is_active=eq.true&select=tablo_id` + ); + + if (accessRows.length === 0) { + return c.json({ unread: [] }); + } + + // For each channel, get unread count + const unread: UnreadCount[] = []; + + for (const { tablo_id } of accessRows) { + // Get last read time + const readState = await db.select<{ last_read_at: string }>( + "channel_read_state", + `user_id=eq.${userId}&channel_id=eq.${tablo_id}&select=last_read_at&limit=1` + ); + + const lastReadAt = readState[0]?.last_read_at ?? "1970-01-01T00:00:00Z"; + + const count = await db.count( + "messages", + `channel_id=eq.${tablo_id}&deleted_at=is.null&created_at=gt.${lastReadAt}` + ); + + if (count > 0) { + unread.push({ channel_id: tablo_id, unread_count: count }); + } + } + + return c.json({ unread }); +}); + +export default app;