diff --git a/docs/superpowers/plans/2026-04-11-self-hosted-chat.md b/docs/superpowers/plans/2026-04-11-self-hosted-chat.md new file mode 100644 index 0000000..efc18c7 --- /dev/null +++ b/docs/superpowers/plans/2026-04-11-self-hosted-chat.md @@ -0,0 +1,2066 @@ +# Self-Hosted Chat Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Replace Stream Chat with a self-hosted chat system using Cloudflare Durable Objects for real-time WebSocket messaging, Supabase Postgres for message persistence, and chatscope for the React UI. + +**Architecture:** A new Cloudflare Worker (`apps/chat-worker`) handles WebSocket connections via Durable Objects (one per tablo channel) and REST endpoints for message history/unread counts. Messages persist to Supabase Postgres via PostgREST. The frontend replaces `stream-chat-react` with `@chatscope/chat-ui-kit-react` components wired to a custom `useChat` hook. + +**Tech Stack:** Cloudflare Workers + Durable Objects, Supabase Postgres (PostgREST), @chatscope/chat-ui-kit-react, TypeScript, Hono (Worker routing) + +**Spec:** `docs/superpowers/specs/2026-04-11-self-hosted-chat-design.md` + +--- + +## File Structure + +### New files + +``` +apps/chat-worker/ + package.json # Worker package with wrangler, hono, jose deps + wrangler.toml # DO bindings, env vars (SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY, JWT_SECRET) + tsconfig.json # TypeScript config for Workers runtime + src/ + index.ts # Hono Worker entry: JWT auth, routing, DO dispatch + durable-objects/ + ChatRoom.ts # Hibernatable WebSocket DO — broadcast, typing, presence + lib/ + supabase.ts # PostgREST helper (fetch-based, no SDK) + auth.ts # JWT verification using jose + types.ts # WebSocket message types, API response types + +apps/main/src/ + hooks/ + useChat.ts # WebSocket connection, message send/receive, reconnect, typing + useChatUnread.ts # Polls /chat/unread endpoint + pages/ + chat.tsx # REWRITE: replace Stream components with chatscope + components/ + ChatChannelPreview.tsx # chatscope Conversation wrapper (replaces ChannelPreview.tsx) + ChatHeader.tsx # Channel header (replaces CustomChannelHeader.tsx) +``` + +### Modified files + +``` +apps/main/src/lib/routes.tsx # Remove ChatProvider wrapper from chat route +apps/main/src/providers/UserStoreProvider.tsx # Remove streamToken from User type +apps/main/package.json # Remove stream-chat, stream-chat-react; add @chatscope/chat-ui-kit-react +apps/main/.env.local # Remove VITE_STREAM_CHAT_API_KEY, add VITE_CHAT_WS_URL +apps/main/.env.staging # Same +apps/main/.env.production # Same + +apps/api/src/config.ts # Remove STREAM_CHAT_API_KEY, STREAM_CHAT_API_SECRET +apps/api/src/secrets.ts # Remove streamChatApiSecret, streamChatApiSecretStaging +apps/api/src/types/app.types.ts # Remove streamServerClient from BaseEnv +apps/api/src/middlewares/middleware.ts # Remove streamChatMiddleware +apps/api/src/routers/index.ts # Remove streamChat middleware usage +apps/api/src/routers/user.ts # Remove signUpToStream, streamToken from getMe +apps/api/src/routers/tablo.ts # Remove all Stream channel operations +apps/api/src/helpers/helpers.ts # Remove streamServerClient from createInvitedUser +apps/api/package.json # Remove stream-chat dependency +``` + +### Deleted files + +``` +apps/main/src/providers/ChatProvider.tsx # Stream Chat provider — replaced by useChat +apps/main/src/components/ChannelPreview.tsx # Stream-specific — replaced by ChatChannelPreview +apps/main/src/components/CustomChannelHeader.tsx # Stream-specific — replaced by ChatHeader +apps/main/src/hooks/channel.ts # useChannelFromUrl, useTabloDiscussionUnread — replaced +``` + +### Kept as-is + +``` +apps/main/src/components/ChannelBadge.tsx # Generic component, reused in new chat UI +``` + +--- + +## Task 1: Database Migration — Create messages and channel_read_state tables + +**Files:** +- Create: `supabase/migrations/20260411_create_chat_tables.sql` + +This task creates the Postgres tables that the chat system writes to and reads from. Everything else depends on these existing. + +- [ ] **Step 1: Write the migration SQL** + +```sql +-- supabase/migrations/20260411_create_chat_tables.sql + +-- Messages table +CREATE TABLE IF NOT EXISTS messages ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + channel_id uuid NOT NULL REFERENCES tablos(id) ON DELETE CASCADE, + user_id uuid NOT NULL REFERENCES auth.users(id), + text text NOT NULL, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz, + deleted_at timestamptz +); + +CREATE INDEX IF NOT EXISTS idx_messages_channel_created ON messages(channel_id, created_at DESC); + +-- Read state table +CREATE TABLE IF NOT EXISTS channel_read_state ( + user_id uuid NOT NULL REFERENCES auth.users(id), + channel_id uuid NOT NULL REFERENCES tablos(id) ON DELETE CASCADE, + last_read_at timestamptz NOT NULL DEFAULT now(), + PRIMARY KEY (user_id, channel_id) +); + +-- RLS policies +ALTER TABLE messages ENABLE ROW LEVEL SECURITY; +ALTER TABLE channel_read_state ENABLE ROW LEVEL SECURITY; + +-- Messages: users can read messages in channels they are members of +CREATE POLICY "Users can read messages in their tablos" + ON messages FOR SELECT + USING ( + EXISTS ( + SELECT 1 FROM tablo_access + WHERE tablo_access.tablo_id = messages.channel_id + AND tablo_access.user_id = auth.uid() + AND tablo_access.is_active = true + ) + ); + +-- Messages: service role inserts (from chat worker) bypass RLS +-- No INSERT policy needed — the chat worker uses the service role key + +-- Read state: users can read/write their own read state +CREATE POLICY "Users can manage their own read state" + ON channel_read_state FOR ALL + USING (user_id = auth.uid()) + WITH CHECK (user_id = auth.uid()); +``` + +- [ ] **Step 2: Apply the migration** + +Run: `npx supabase db push` (or apply via Supabase dashboard if using hosted migrations) + +Expected: Tables `messages` and `channel_read_state` created with indexes and RLS policies. + +- [ ] **Step 3: Commit** + +```bash +git add supabase/migrations/20260411_create_chat_tables.sql +git commit -m "feat(chat): add messages and channel_read_state tables" +``` + +--- + +## Task 2: Chat Worker — Project scaffold and configuration + +**Files:** +- Create: `apps/chat-worker/package.json` +- Create: `apps/chat-worker/tsconfig.json` +- Create: `apps/chat-worker/wrangler.toml` +- Create: `apps/chat-worker/src/lib/types.ts` + +This task sets up the new Cloudflare Worker project in the monorepo with proper configuration. + +- [ ] **Step 1: Create package.json** + +```json +{ + "name": "@xtablo/chat-worker", + "version": "0.0.0", + "private": true, + "type": "module", + "scripts": { + "dev": "wrangler dev", + "deploy": "wrangler deploy", + "deploy:staging": "wrangler deploy --env staging", + "deploy:prod": "wrangler deploy --env production", + "typecheck": "tsc --noEmit" + }, + "dependencies": { + "hono": "^4.7.7", + "jose": "^6.0.0" + }, + "devDependencies": { + "@cloudflare/workers-types": "^4.20250410.0", + "typescript": "^5.8.3", + "wrangler": "^4.14.0" + } +} +``` + +- [ ] **Step 2: Create tsconfig.json** + +```json +{ + "compilerOptions": { + "target": "ES2022", + "module": "ES2022", + "moduleResolution": "bundler", + "lib": ["ES2022"], + "types": ["@cloudflare/workers-types"], + "strict": true, + "skipLibCheck": true, + "esModuleInterop": true, + "outDir": "./dist", + "rootDir": "./src" + }, + "include": ["src"] +} +``` + +- [ ] **Step 3: Create wrangler.toml** + +```toml +name = "xtablo-chat" +main = "src/index.ts" +compatibility_date = "2025-07-09" + +[durable_objects] +bindings = [ + { name = "CHAT_ROOM", class_name = "ChatRoom" } +] + +[[migrations]] +tag = "v1" +new_classes = ["ChatRoom"] + +[observability] +enabled = true + +[vars] +SUPABASE_URL = "https://mhcafqvzbrrwvahpvvzd.supabase.co" + +# Secrets (set via `wrangler secret put`): +# SUPABASE_SERVICE_ROLE_KEY +# JWT_SECRET + +[env.staging] +route = { pattern = "chat-staging.xtablo.com", custom_domain = true } + +[env.production] +route = { pattern = "chat.xtablo.com", custom_domain = true } +``` + +- [ ] **Step 4: Create shared types** + +Create `apps/chat-worker/src/lib/types.ts`: + +```typescript +// WebSocket message types — client to server +export type ClientMessage = + | { type: "message.send"; text: string; clientId: string } + | { type: "typing.start" } + | { type: "typing.stop" } + | { type: "presence.ping" }; + +// WebSocket message types — server to client +export type ServerMessage = + | { type: "message.new"; id: string; userId: string; text: string; createdAt: string; clientId: string } + | { type: "typing"; userId: string; isTyping: boolean } + | { type: "presence.update"; userId: string; status: "online" | "offline" } + | { type: "error"; code: string; message: string }; + +// REST API types +export interface ChatMessage { + id: string; + channel_id: string; + user_id: string; + text: string; + created_at: string; + updated_at: string | null; + deleted_at: string | null; +} + +export interface UnreadCount { + channel_id: string; + unread_count: number; +} + +// Worker environment bindings +export interface Env { + CHAT_ROOM: DurableObjectNamespace; + SUPABASE_URL: string; + SUPABASE_SERVICE_ROLE_KEY: string; + JWT_SECRET: string; +} +``` + +- [ ] **Step 5: Install dependencies** + +Run: `cd apps/chat-worker && pnpm install` + +- [ ] **Step 6: Commit** + +```bash +git add apps/chat-worker/ +git commit -m "feat(chat-worker): scaffold Cloudflare Worker project" +``` + +--- + +## Task 3: Chat Worker — JWT auth and Supabase PostgREST helper + +**Files:** +- Create: `apps/chat-worker/src/lib/auth.ts` +- Create: `apps/chat-worker/src/lib/supabase.ts` + +- [ ] **Step 1: Create JWT auth helper** + +Create `apps/chat-worker/src/lib/auth.ts`: + +```typescript +import { jwtVerify, createRemoteJWKSet } from "jose"; + +interface AuthResult { + userId: string; + email: string | null; +} + +/** + * Verify a Supabase JWT and extract the user ID. + * Supabase JWTs are signed with the JWT secret and contain the user ID in the `sub` claim. + */ +export async function verifyJwt(token: string, jwtSecret: string): Promise { + const secret = new TextEncoder().encode(jwtSecret); + const { payload } = await jwtVerify(token, secret, { + issuer: "https://mhcafqvzbrrwvahpvvzd.supabase.co/auth/v1", + }); + + if (!payload.sub) { + throw new Error("Missing sub claim in JWT"); + } + + return { + userId: payload.sub, + email: (payload.email as string) ?? null, + }; +} + +/** + * Extract Bearer token from Authorization header. + */ +export function extractToken(authHeader: string | undefined): string | null { + if (!authHeader?.startsWith("Bearer ")) return null; + return authHeader.slice(7); +} +``` + +- [ ] **Step 2: Create Supabase PostgREST helper** + +Create `apps/chat-worker/src/lib/supabase.ts`: + +```typescript +/** + * Thin PostgREST client using fetch — no Supabase SDK dependency. + * Used by both the Worker (history queries) and the Durable Object (message persistence). + */ +export class PostgREST { + private baseUrl: string; + private serviceRoleKey: string; + + constructor(supabaseUrl: string, serviceRoleKey: string) { + this.baseUrl = `${supabaseUrl}/rest/v1`; + this.serviceRoleKey = serviceRoleKey; + } + + private headers(): Record { + return { + "apikey": this.serviceRoleKey, + "Authorization": `Bearer ${this.serviceRoleKey}`, + "Content-Type": "application/json", + "Prefer": "return=representation", + }; + } + + /** Insert a row and return the inserted data. */ + async insert(table: string, data: Record): Promise { + const res = await fetch(`${this.baseUrl}/${table}`, { + method: "POST", + headers: this.headers(), + body: JSON.stringify(data), + }); + if (!res.ok) { + const body = await res.text(); + throw new Error(`PostgREST insert failed (${res.status}): ${body}`); + } + return res.json() as Promise; + } + + /** Upsert a row (requires Prefer: resolution=merge-duplicates). */ + async upsert(table: string, data: Record, onConflict: string): Promise { + const headers = this.headers(); + headers["Prefer"] = "return=representation,resolution=merge-duplicates"; + const res = await fetch(`${this.baseUrl}/${table}?on_conflict=${onConflict}`, { + method: "POST", + headers, + body: JSON.stringify(data), + }); + if (!res.ok) { + const body = await res.text(); + throw new Error(`PostgREST upsert failed (${res.status}): ${body}`); + } + return res.json() as Promise; + } + + /** Select rows with PostgREST query string. */ + async select(table: string, query: string): Promise { + const res = await fetch(`${this.baseUrl}/${table}?${query}`, { + method: "GET", + headers: this.headers(), + }); + if (!res.ok) { + const body = await res.text(); + throw new Error(`PostgREST select failed (${res.status}): ${body}`); + } + return res.json() as Promise; + } + + /** Select with exact count header for unread queries. */ + async count(table: string, query: string): Promise { + const headers = this.headers(); + headers["Prefer"] = "count=exact"; + headers["Range-Unit"] = "items"; + headers["Range"] = "0-0"; + const res = await fetch(`${this.baseUrl}/${table}?${query}`, { + method: "HEAD", + headers, + }); + const contentRange = res.headers.get("Content-Range"); + if (!contentRange) return 0; + // Content-Range format: "0-0/42" or "*/0" + const total = contentRange.split("/")[1]; + return total === "*" ? 0 : parseInt(total, 10); + } +} +``` + +- [ ] **Step 3: Commit** + +```bash +git add apps/chat-worker/src/lib/ +git commit -m "feat(chat-worker): add JWT auth and PostgREST helpers" +``` + +--- + +## Task 4: Chat Worker — ChatRoom Durable Object + +**Files:** +- Create: `apps/chat-worker/src/durable-objects/ChatRoom.ts` + +This is the core real-time component. One instance per tablo channel, managing WebSocket connections, broadcasting messages, and persisting to Postgres. + +- [ ] **Step 1: Create the ChatRoom Durable Object** + +Create `apps/chat-worker/src/durable-objects/ChatRoom.ts`: + +```typescript +import { DurableObject } from "cloudflare:workers"; +import type { Env, ClientMessage, ServerMessage } from "../lib/types"; +import { PostgREST } from "../lib/supabase"; + +export class ChatRoom extends DurableObject { + private postgrest: PostgREST | null = null; + + private getPostgREST(): PostgREST { + if (!this.postgrest) { + this.postgrest = new PostgREST(this.env.SUPABASE_URL, this.env.SUPABASE_SERVICE_ROLE_KEY); + } + return this.postgrest; + } + + /** + * Called by the Worker to initiate a WebSocket connection. + * The userId has already been authenticated by the Worker. + */ + async handleWebSocket(request: Request, userId: string): Promise { + const pair = new WebSocketPair(); + const [client, server] = [pair[0], pair[1]]; + + // Accept with userId as a tag for filtering later + this.ctx.acceptWebSocket(server, [userId]); + + // Broadcast presence to existing connections + this.broadcast({ + type: "presence.update", + userId, + status: "online", + }, server); + + return new Response(null, { status: 101, webSocket: client }); + } + + /** + * Hibernatable WebSocket handler — called when a message arrives. + */ + async webSocketMessage(ws: WebSocket, raw: string | ArrayBuffer): Promise { + const tags = this.ctx.getTags(ws); + const userId = tags[0]; + if (!userId) { + ws.close(4001, "Missing user identity"); + return; + } + + let msg: ClientMessage; + try { + msg = JSON.parse(typeof raw === "string" ? raw : new TextDecoder().decode(raw)); + } catch { + this.sendTo(ws, { type: "error", code: "PARSE_ERROR", message: "Invalid JSON" }); + return; + } + + switch (msg.type) { + case "message.send": + await this.handleSendMessage(ws, userId, msg.text, msg.clientId); + break; + case "typing.start": + this.broadcast({ type: "typing", userId, isTyping: true }, ws); + break; + case "typing.stop": + this.broadcast({ type: "typing", userId, isTyping: false }, ws); + break; + case "presence.ping": + // No-op — the connection itself is the presence signal + break; + } + } + + /** + * Hibernatable WebSocket handler — called when a connection closes. + */ + async webSocketClose(ws: WebSocket, code: number, reason: string, wasClean: boolean): Promise { + const tags = this.ctx.getTags(ws); + const userId = tags[0]; + if (userId) { + // Only broadcast offline if no other connections for this user + const remaining = this.ctx.getWebSockets(userId); + if (remaining.length === 0) { + this.broadcast({ type: "presence.update", userId, status: "offline" }); + } + } + } + + /** + * Hibernatable WebSocket handler — called on error. + */ + async webSocketError(ws: WebSocket, error: unknown): Promise { + console.error("WebSocket error:", error); + ws.close(1011, "Internal error"); + } + + private async handleSendMessage(ws: WebSocket, userId: string, text: string, clientId: string): Promise { + if (!text || text.trim().length === 0) { + this.sendTo(ws, { type: "error", code: "EMPTY_MESSAGE", message: "Message text is required" }); + return; + } + + const id = crypto.randomUUID(); + const createdAt = new Date().toISOString(); + // Extract channelId from the DO's own ID name + // The Worker creates the DO with id = channelId, so we read it from ctx.id + const channelId = this.getChannelId(); + + const serverMsg: ServerMessage = { + type: "message.new", + id, + userId, + text: text.trim(), + createdAt, + clientId, + }; + + // Broadcast to all connections (including sender, for server echo) + this.broadcast(serverMsg); + + // Persist to Postgres asynchronously (fire-and-forget with retry) + this.ctx.waitUntil(this.persistMessage(channelId, id, userId, text.trim(), createdAt)); + } + + private async persistMessage(channelId: string, id: string, userId: string, text: string, createdAt: string): Promise { + const db = this.getPostgREST(); + const maxRetries = 3; + + for (let attempt = 0; attempt < maxRetries; attempt++) { + try { + await db.insert("messages", { + id, + channel_id: channelId, + user_id: userId, + text, + created_at: createdAt, + }); + return; + } catch (error) { + console.error(`Message persist attempt ${attempt + 1} failed:`, error); + if (attempt < maxRetries - 1) { + await new Promise((r) => setTimeout(r, 100 * (attempt + 1))); + } + } + } + console.error(`Failed to persist message ${id} after ${maxRetries} attempts`); + } + + /** + * Get the channel ID from the Durable Object's name. + * The Worker creates the DO ID using `env.CHAT_ROOM.idFromName(channelId)`. + */ + private getChannelId(): string { + // The DO name is set by the Worker when creating the stub. + // We store it on first WebSocket connect via the request URL. + // Fallback: use the hex ID (not ideal but safe). + return this.ctx.id.toString(); + } + + /** Send a typed message to a single WebSocket. */ + private sendTo(ws: WebSocket, msg: ServerMessage): void { + try { + ws.send(JSON.stringify(msg)); + } catch { + // Connection already closed + } + } + + /** Broadcast a typed message to all connected WebSockets, optionally excluding one. */ + private broadcast(msg: ServerMessage, exclude?: WebSocket): void { + const payload = JSON.stringify(msg); + for (const ws of this.ctx.getWebSockets()) { + if (ws !== exclude) { + try { + ws.send(payload); + } catch { + // Connection already closed, skip + } + } + } + } +} +``` + +Note on `getChannelId()`: The DO's own hex ID isn't the channel UUID. We need the channel ID for Postgres writes. We'll pass it via the WebSocket URL path and store it. Let me fix this: + +Actually, the cleanest approach: the Worker passes the channelId as a query param in the internal DO request URL. The DO reads it on first WebSocket accept and stores it in transactional storage. Let me update the implementation: + +Replace the `handleWebSocket` and `getChannelId` methods: + +```typescript + async handleWebSocket(request: Request, userId: string, channelId: string): Promise { + const pair = new WebSocketPair(); + const [client, server] = [pair[0], pair[1]]; + + // Store channelId if not already stored + const stored = await this.ctx.storage.get("channelId"); + if (!stored) { + await this.ctx.storage.put("channelId", channelId); + } + + this.ctx.acceptWebSocket(server, [userId]); + + this.broadcast({ + type: "presence.update", + userId, + status: "online", + }, server); + + return new Response(null, { status: 101, webSocket: client }); + } + + private async getChannelId(): Promise { + const channelId = await this.ctx.storage.get("channelId"); + if (!channelId) throw new Error("channelId not stored in DO"); + return channelId; + } +``` + +And update `handleSendMessage` to `await this.getChannelId()`. + +The full file should incorporate these changes. Here is the complete `ChatRoom.ts`: + +```typescript +import { DurableObject } from "cloudflare:workers"; +import type { Env, ClientMessage, ServerMessage } from "../lib/types"; +import { PostgREST } from "../lib/supabase"; + +export class ChatRoom extends DurableObject { + private postgrest: PostgREST | null = null; + + private getPostgREST(): PostgREST { + if (!this.postgrest) { + this.postgrest = new PostgREST(this.env.SUPABASE_URL, this.env.SUPABASE_SERVICE_ROLE_KEY); + } + return this.postgrest; + } + + async handleWebSocket(request: Request, userId: string, channelId: string): Promise { + const pair = new WebSocketPair(); + const [client, server] = [pair[0], pair[1]]; + + const stored = await this.ctx.storage.get("channelId"); + if (!stored) { + await this.ctx.storage.put("channelId", channelId); + } + + this.ctx.acceptWebSocket(server, [userId]); + + this.broadcast({ + type: "presence.update", + userId, + status: "online", + }, server); + + return new Response(null, { status: 101, webSocket: client }); + } + + async webSocketMessage(ws: WebSocket, raw: string | ArrayBuffer): Promise { + const tags = this.ctx.getTags(ws); + const userId = tags[0]; + if (!userId) { + ws.close(4001, "Missing user identity"); + return; + } + + let msg: ClientMessage; + try { + msg = JSON.parse(typeof raw === "string" ? raw : new TextDecoder().decode(raw)); + } catch { + this.sendTo(ws, { type: "error", code: "PARSE_ERROR", message: "Invalid JSON" }); + return; + } + + switch (msg.type) { + case "message.send": + await this.handleSendMessage(ws, userId, msg.text, msg.clientId); + break; + case "typing.start": + this.broadcast({ type: "typing", userId, isTyping: true }, ws); + break; + case "typing.stop": + this.broadcast({ type: "typing", userId, isTyping: false }, ws); + break; + case "presence.ping": + break; + } + } + + async webSocketClose(ws: WebSocket, code: number, reason: string, wasClean: boolean): Promise { + const tags = this.ctx.getTags(ws); + const userId = tags[0]; + if (userId) { + const remaining = this.ctx.getWebSockets(userId); + if (remaining.length === 0) { + this.broadcast({ type: "presence.update", userId, status: "offline" }); + } + } + } + + async webSocketError(ws: WebSocket, error: unknown): Promise { + console.error("WebSocket error:", error); + ws.close(1011, "Internal error"); + } + + private async handleSendMessage(ws: WebSocket, userId: string, text: string, clientId: string): Promise { + if (!text || text.trim().length === 0) { + this.sendTo(ws, { type: "error", code: "EMPTY_MESSAGE", message: "Message text is required" }); + return; + } + + const id = crypto.randomUUID(); + const createdAt = new Date().toISOString(); + const channelId = await this.getChannelId(); + + const serverMsg: ServerMessage = { + type: "message.new", + id, + userId, + text: text.trim(), + createdAt, + clientId, + }; + + this.broadcast(serverMsg); + this.ctx.waitUntil(this.persistMessage(channelId, id, userId, text.trim(), createdAt)); + } + + private async persistMessage(channelId: string, id: string, userId: string, text: string, createdAt: string): Promise { + const db = this.getPostgREST(); + const maxRetries = 3; + + for (let attempt = 0; attempt < maxRetries; attempt++) { + try { + await db.insert("messages", { + id, + channel_id: channelId, + user_id: userId, + text, + created_at: createdAt, + }); + return; + } catch (error) { + console.error(`Message persist attempt ${attempt + 1} failed:`, error); + if (attempt < maxRetries - 1) { + await new Promise((r) => setTimeout(r, 100 * (attempt + 1))); + } + } + } + console.error(`Failed to persist message ${id} after ${maxRetries} attempts`); + } + + private async getChannelId(): Promise { + const channelId = await this.ctx.storage.get("channelId"); + if (!channelId) throw new Error("channelId not stored in DO"); + return channelId; + } + + private sendTo(ws: WebSocket, msg: ServerMessage): void { + try { + ws.send(JSON.stringify(msg)); + } catch { + // Connection already closed + } + } + + private broadcast(msg: ServerMessage, exclude?: WebSocket): void { + const payload = JSON.stringify(msg); + for (const ws of this.ctx.getWebSockets()) { + if (ws !== exclude) { + try { + ws.send(payload); + } catch { + // Connection already closed + } + } + } + } +} +``` + +- [ ] **Step 2: Commit** + +```bash +git add apps/chat-worker/src/durable-objects/ChatRoom.ts +git commit -m "feat(chat-worker): implement ChatRoom Durable Object with WebSocket hibernation" +``` + +--- + +## Task 5: Chat Worker — Hono entry point with routing + +**Files:** +- Create: `apps/chat-worker/src/index.ts` + +The Worker entry point: authenticates requests, checks membership, dispatches WebSocket upgrades to DOs, and serves REST endpoints for message history, unread counts, and marking channels as read. + +- [ ] **Step 1: Create the Worker entry point** + +Create `apps/chat-worker/src/index.ts`: + +```typescript +import { Hono } from "hono"; +import { cors } from "hono/cors"; +import { ChatRoom } from "./durable-objects/ChatRoom"; +import { extractToken, verifyJwt } from "./lib/auth"; +import { PostgREST } from "./lib/supabase"; +import type { Env, ChatMessage, UnreadCount } from "./lib/types"; + +// Re-export DO class for wrangler +export { ChatRoom }; + +const app = new Hono<{ Bindings: Env }>(); + +// CORS — allow the main app origins +app.use("*", cors({ + origin: [ + "http://localhost:5173", + "https://app.xtablo.com", + "https://app-staging.xtablo.com", + ], + allowHeaders: ["Authorization", "Content-Type"], + allowMethods: ["GET", "POST", "OPTIONS"], +})); + +// Auth middleware — extract and verify JWT for all routes +// For WebSocket upgrades, the token comes via query param (?token=...) since browsers +// cannot send custom headers on WebSocket connections. +// For REST requests, the token comes via the Authorization header. +app.use("*", async (c, next) => { + const isWebSocket = c.req.header("Upgrade") === "websocket"; + const token = isWebSocket + ? new URL(c.req.url).searchParams.get("token") + : extractToken(c.req.header("Authorization")); + + if (!token) { + return c.json({ error: "Missing authorization" }, 401); + } + try { + const auth = await verifyJwt(token, c.env.JWT_SECRET); + c.set("userId" as never, auth.userId); + } catch (error) { + return c.json({ error: "Invalid token" }, 401); + } + await next(); +}); + +// Helper: check tablo membership via PostgREST +async function checkMembership(db: PostgREST, channelId: string, userId: string): Promise { + const rows = await db.select<{ user_id: string }>( + "tablo_access", + `tablo_id=eq.${channelId}&user_id=eq.${userId}&is_active=eq.true&select=user_id&limit=1` + ); + return rows.length > 0; +} + +// WebSocket upgrade — route to Durable Object +app.get("/chat/ws/:channelId", async (c) => { + const upgradeHeader = c.req.header("Upgrade"); + if (upgradeHeader !== "websocket") { + return c.json({ error: "Expected WebSocket upgrade" }, 426); + } + + const channelId = c.req.param("channelId"); + const userId = c.get("userId" as never) as string; + const db = new PostgREST(c.env.SUPABASE_URL, c.env.SUPABASE_SERVICE_ROLE_KEY); + + const isMember = await checkMembership(db, channelId, userId); + if (!isMember) { + return c.json({ error: "Not a member of this channel" }, 403); + } + + const id = c.env.CHAT_ROOM.idFromName(channelId); + const stub = c.env.CHAT_ROOM.get(id); + return stub.handleWebSocket(c.req.raw, userId, channelId); +}); + +// GET message history — paginated +app.get("/chat/channels/:channelId/messages", async (c) => { + const channelId = c.req.param("channelId"); + const userId = c.get("userId" as never) as string; + const db = new PostgREST(c.env.SUPABASE_URL, c.env.SUPABASE_SERVICE_ROLE_KEY); + + const isMember = await checkMembership(db, channelId, userId); + if (!isMember) { + return c.json({ error: "Not a member of this channel" }, 403); + } + + const before = c.req.query("before"); + const limit = Math.min(parseInt(c.req.query("limit") || "50", 10), 100); + + let query = `channel_id=eq.${channelId}&deleted_at=is.null&select=id,channel_id,user_id,text,created_at&order=created_at.desc&limit=${limit}`; + if (before) { + query += `&created_at=lt.${before}`; + } + + const messages = await db.select( + "messages", + query + ); + + return c.json({ messages: messages.reverse(), hasMore: messages.length === limit }); +}); + +// POST mark channel as read +app.post("/chat/channels/:channelId/read", async (c) => { + const channelId = c.req.param("channelId"); + const userId = c.get("userId" as never) as string; + const db = new PostgREST(c.env.SUPABASE_URL, c.env.SUPABASE_SERVICE_ROLE_KEY); + + await db.upsert("channel_read_state", { + user_id: userId, + channel_id: channelId, + last_read_at: new Date().toISOString(), + }, "user_id,channel_id"); + + return c.json({ ok: true }); +}); + +// GET unread counts for current user across all channels +app.get("/chat/unread", async (c) => { + const userId = c.get("userId" as never) as string; + const db = new PostgREST(c.env.SUPABASE_URL, c.env.SUPABASE_SERVICE_ROLE_KEY); + + // Get all tablos the user has access to + const accessRows = await db.select<{ tablo_id: string }>( + "tablo_access", + `user_id=eq.${userId}&is_active=eq.true&select=tablo_id` + ); + + if (accessRows.length === 0) { + return c.json({ unread: [] }); + } + + // For each channel, get unread count + // Use a Postgres function or do it in a loop (at small scale, the loop is fine) + const unread: UnreadCount[] = []; + + for (const { tablo_id } of accessRows) { + // Get last read time + const readState = await db.select<{ last_read_at: string }>( + "channel_read_state", + `user_id=eq.${userId}&channel_id=eq.${tablo_id}&select=last_read_at&limit=1` + ); + + const lastReadAt = readState[0]?.last_read_at ?? "1970-01-01T00:00:00Z"; + + const count = await db.count( + "messages", + `channel_id=eq.${tablo_id}&deleted_at=is.null&created_at=gt.${lastReadAt}` + ); + + if (count > 0) { + unread.push({ channel_id: tablo_id, unread_count: count }); + } + } + + return c.json({ unread }); +}); + +export default app; +``` + +- [ ] **Step 2: Run typecheck** + +Run: `cd apps/chat-worker && pnpm typecheck` + +Expected: No type errors. + +- [ ] **Step 3: Commit** + +```bash +git add apps/chat-worker/src/index.ts +git commit -m "feat(chat-worker): add Hono entry point with WebSocket routing and REST endpoints" +``` + +--- + +## Task 6: Frontend — useChat hook + +**Files:** +- Create: `apps/main/src/hooks/useChat.ts` + +The core frontend hook that manages the WebSocket connection, message state, typing indicators, presence, and reconnection logic. + +- [ ] **Step 1: Create the useChat hook** + +Create `apps/main/src/hooks/useChat.ts`: + +```typescript +import { useCallback, useEffect, useRef, useState } from "react"; +import { useSession } from "@xtablo/shared/contexts/SessionContext"; + +interface ChatMessage { + id: string; + userId: string; + text: string; + createdAt: string; + clientId: string; + /** True while the message is only local (not yet echoed by server). */ + optimistic?: boolean; +} + +type ServerMessage = + | { type: "message.new"; id: string; userId: string; text: string; createdAt: string; clientId: string } + | { type: "typing"; userId: string; isTyping: boolean } + | { type: "presence.update"; userId: string; status: "online" | "offline" } + | { type: "error"; code: string; message: string }; + +const CHAT_WS_BASE = import.meta.env.VITE_CHAT_WS_URL as string; +const CHAT_API_BASE = import.meta.env.VITE_CHAT_API_URL as string; + +export function useChat(channelId: string | undefined) { + const { session } = useSession(); + const token = session?.access_token; + + const [messages, setMessages] = useState([]); + const [isConnected, setIsConnected] = useState(false); + const [typingUsers, setTypingUsers] = useState([]); + const [onlineUsers, setOnlineUsers] = useState([]); + const [hasMoreMessages, setHasMoreMessages] = useState(true); + + const wsRef = useRef(null); + const reconnectAttemptRef = useRef(0); + const reconnectTimerRef = useRef>(); + const typingTimerRef = useRef>(); + const isTypingRef = useRef(false); + + // Fetch message history from REST endpoint + const fetchHistory = useCallback(async (before?: string) => { + if (!channelId || !token) return; + + const params = new URLSearchParams({ limit: "50" }); + if (before) params.set("before", before); + + const res = await fetch(`${CHAT_API_BASE}/chat/channels/${channelId}/messages?${params}`, { + headers: { Authorization: `Bearer ${token}` }, + }); + + if (!res.ok) return; + + const data = await res.json() as { messages: ChatMessage[]; hasMore: boolean }; + setHasMoreMessages(data.hasMore); + + if (before) { + // Prepend older messages + setMessages((prev) => [...data.messages, ...prev]); + } else { + // Initial load + setMessages(data.messages); + } + }, [channelId, token]); + + // Load more (pagination) + const loadMoreMessages = useCallback(() => { + if (messages.length === 0 || !hasMoreMessages) return; + const oldest = messages[0]; + fetchHistory(oldest.createdAt); + }, [messages, hasMoreMessages, fetchHistory]); + + // WebSocket connection management + useEffect(() => { + if (!channelId || !token) return; + + const connect = () => { + // Token passed via query param because browsers cannot send custom headers on WS connections + const wsUrl = `${CHAT_WS_BASE}/chat/ws/${channelId}?token=${encodeURIComponent(token)}`; + const ws = new WebSocket(wsUrl); + + ws.onopen = () => { + setIsConnected(true); + reconnectAttemptRef.current = 0; + }; + + ws.onmessage = (event) => { + const msg = JSON.parse(event.data) as ServerMessage; + + switch (msg.type) { + case "message.new": + setMessages((prev) => { + // Deduplicate: replace optimistic message with server version + const withoutOptimistic = prev.filter( + (m) => !(m.clientId === msg.clientId && m.optimistic) + ); + // Avoid duplicate if message already received + if (withoutOptimistic.some((m) => m.id === msg.id)) { + return withoutOptimistic; + } + return [...withoutOptimistic, { + id: msg.id, + userId: msg.userId, + text: msg.text, + createdAt: msg.createdAt, + clientId: msg.clientId, + }]; + }); + break; + + case "typing": + setTypingUsers((prev) => + msg.isTyping + ? prev.includes(msg.userId) ? prev : [...prev, msg.userId] + : prev.filter((id) => id !== msg.userId) + ); + break; + + case "presence.update": + setOnlineUsers((prev) => + msg.status === "online" + ? prev.includes(msg.userId) ? prev : [...prev, msg.userId] + : prev.filter((id) => id !== msg.userId) + ); + break; + + case "error": + console.error("Chat error:", msg.code, msg.message); + break; + } + }; + + ws.onclose = () => { + setIsConnected(false); + wsRef.current = null; + + // Exponential backoff reconnect + const delay = Math.min(1000 * 2 ** reconnectAttemptRef.current, 30000); + reconnectAttemptRef.current++; + reconnectTimerRef.current = setTimeout(connect, delay); + }; + + ws.onerror = () => { + ws.close(); + }; + + wsRef.current = ws; + }; + + // Load initial history then connect WebSocket + fetchHistory().then(connect); + + return () => { + clearTimeout(reconnectTimerRef.current); + clearTimeout(typingTimerRef.current); + wsRef.current?.close(); + wsRef.current = null; + setMessages([]); + setIsConnected(false); + setTypingUsers([]); + setOnlineUsers([]); + }; + }, [channelId, token, fetchHistory]); + + // Send message + const sendMessage = useCallback((text: string) => { + if (!wsRef.current || wsRef.current.readyState !== WebSocket.OPEN) return; + + const clientId = crypto.randomUUID(); + + // Optimistic update + setMessages((prev) => [ + ...prev, + { + id: `optimistic-${clientId}`, + userId: session?.user?.id ?? "", + text, + createdAt: new Date().toISOString(), + clientId, + optimistic: true, + }, + ]); + + wsRef.current.send(JSON.stringify({ type: "message.send", text, clientId })); + + // Stop typing when sending + if (isTypingRef.current) { + wsRef.current.send(JSON.stringify({ type: "typing.stop" })); + isTypingRef.current = false; + clearTimeout(typingTimerRef.current); + } + }, [session?.user?.id]); + + // Typing indicator + const sendTyping = useCallback(() => { + if (!wsRef.current || wsRef.current.readyState !== WebSocket.OPEN) return; + + if (!isTypingRef.current) { + isTypingRef.current = true; + wsRef.current.send(JSON.stringify({ type: "typing.start" })); + } + + clearTimeout(typingTimerRef.current); + typingTimerRef.current = setTimeout(() => { + if (wsRef.current?.readyState === WebSocket.OPEN) { + wsRef.current.send(JSON.stringify({ type: "typing.stop" })); + } + isTypingRef.current = false; + }, 2000); + }, []); + + // Mark as read + const markAsRead = useCallback(async () => { + if (!channelId || !token) return; + await fetch(`${CHAT_API_BASE}/chat/channels/${channelId}/read`, { + method: "POST", + headers: { Authorization: `Bearer ${token}` }, + }); + }, [channelId, token]); + + return { + messages, + sendMessage, + sendTyping, + isConnected, + typingUsers, + onlineUsers, + loadMoreMessages, + hasMoreMessages, + markAsRead, + }; +} +``` + +- [ ] **Step 2: Commit** + +```bash +git add apps/main/src/hooks/useChat.ts +git commit -m "feat(chat): add useChat hook with WebSocket connection and reconnection" +``` + +--- + +## Task 7: Frontend — useChatUnread hook + +**Files:** +- Create: `apps/main/src/hooks/useChatUnread.ts` + +Polls the chat worker for unread counts across all channels. Replaces `useTabloDiscussionUnread`. + +- [ ] **Step 1: Create the useChatUnread hook** + +Create `apps/main/src/hooks/useChatUnread.ts`: + +```typescript +import { useQuery } from "@tanstack/react-query"; +import { useSession } from "@xtablo/shared/contexts/SessionContext"; + +const CHAT_API_BASE = import.meta.env.VITE_CHAT_API_URL as string; + +interface UnreadCount { + channel_id: string; + unread_count: number; +} + +export function useChatUnread() { + const { session } = useSession(); + const token = session?.access_token; + + const { data } = useQuery({ + queryKey: ["chat-unread"], + queryFn: async (): Promise => { + const res = await fetch(`${CHAT_API_BASE}/chat/unread`, { + headers: { Authorization: `Bearer ${token}` }, + }); + if (!res.ok) return []; + const json = await res.json() as { unread: UnreadCount[] }; + return json.unread; + }, + enabled: !!token, + refetchInterval: 30_000, + refetchOnWindowFocus: true, + }); + + return { + unreadCounts: data ?? [], + getUnreadCount: (channelId: string) => + data?.find((u) => u.channel_id === channelId)?.unread_count ?? 0, + hasUnread: (channelId: string) => + (data?.find((u) => u.channel_id === channelId)?.unread_count ?? 0) > 0, + }; +} +``` + +- [ ] **Step 2: Commit** + +```bash +git add apps/main/src/hooks/useChatUnread.ts +git commit -m "feat(chat): add useChatUnread hook for polling unread counts" +``` + +--- + +## Task 8: Frontend — Chat UI components with chatscope + +**Files:** +- Create: `apps/main/src/components/ChatChannelPreview.tsx` +- Create: `apps/main/src/components/ChatHeader.tsx` +- Modify: `apps/main/src/pages/chat.tsx` +- Modify: `apps/main/src/lib/routes.tsx` +- Modify: `apps/main/package.json` + +This task rewrites the chat page to use chatscope components instead of stream-chat-react. + +- [ ] **Step 1: Install chatscope** + +Run: `cd apps/main && pnpm add @chatscope/chat-ui-kit-react @chatscope/chat-ui-kit-styles` + +- [ ] **Step 2: Create ChatChannelPreview component** + +Create `apps/main/src/components/ChatChannelPreview.tsx`: + +```typescript +import { ChannelBadge } from "@ui/components/ChannelBadge"; +import type { UserTablo } from "@xtablo/shared/types/tablos.types"; +import { Badge } from "@xtablo/ui/components/badge"; +import { twMerge } from "tailwind-merge"; + +interface ChatChannelPreviewProps { + tablo: UserTablo; + isActive: boolean; + onClick: () => void; + unreadCount: number; + lastMessage?: string; + lastMessageTime?: string; + isOnline: boolean; +} + +function formatTimestamp(timestamp: string | Date): string { + const date = new Date(timestamp); + const now = new Date(); + const diff = now.getTime() - date.getTime(); + const minutes = Math.floor(diff / 60000); + const hours = Math.floor(diff / 3600000); + const days = Math.floor(diff / 86400000); + + if (minutes < 1) return "now"; + if (minutes < 60) return `${minutes}m`; + if (hours < 24) return `${hours}h`; + if (days < 7) return `${days}d`; + return date.toLocaleDateString(); +} + +export function ChatChannelPreview({ + tablo, + isActive, + onClick, + unreadCount, + lastMessage, + lastMessageTime, + isOnline, +}: ChatChannelPreviewProps) { + return ( +
+ + +
+
+

+ {tablo.name} +

+ {lastMessageTime && ( + + {formatTimestamp(lastMessageTime)} + + )} +
+ +
+

+ {lastMessage ?? "No messages yet"} +

+ + {unreadCount > 0 && ( +
+ + {unreadCount > 99 ? "99+" : unreadCount} + +
+ )} +
+
+ + {isActive && ( +
+ )} +
+ ); +} +``` + +- [ ] **Step 3: Create ChatHeader component** + +Create `apps/main/src/components/ChatHeader.tsx`: + +```typescript +import { ChannelBadge } from "@ui/components/ChannelBadge"; +import type { UserTablo } from "@xtablo/shared/types/tablos.types"; + +interface ChatHeaderProps { + tablo: UserTablo | null; + onToggleChannelList?: () => void; + isChannelListExpanded?: boolean; + onlineUsers: string[]; +} + +export function ChatHeader({ + tablo, + onToggleChannelList, + isChannelListExpanded = false, + onlineUsers, +}: ChatHeaderProps) { + const memberCount = onlineUsers.length; + + return ( +
+ {onToggleChannelList && ( + + )} + {tablo && ( + <> + 0} /> +
+

{tablo.name}

+ {memberCount > 0 && ( +

+ {memberCount} online +

+ )} +
+ + )} +
+ ); +} +``` + +- [ ] **Step 4: Rewrite the chat page** + +Replace the contents of `apps/main/src/pages/chat.tsx` with: + +```typescript +import "@chatscope/chat-ui-kit-styles/dist/default/styles.min.css"; +import { + ChatContainer, + MessageList, + Message, + MessageInput, + TypingIndicator, +} from "@chatscope/chat-ui-kit-react"; +import { useEffect, useRef, useState } from "react"; +import { useNavigate, useParams } from "react-router-dom"; +import { ChatChannelPreview } from "../components/ChatChannelPreview"; +import { ChatHeader } from "../components/ChatHeader"; +import { useChat } from "../hooks/useChat"; +import { useChatUnread } from "../hooks/useChatUnread"; +import { useTablosList } from "../hooks/tablos"; +import { useUser } from "../providers/UserStoreProvider"; + +export function ChatPage() { + const user = useUser(); + const { channelId } = useParams(); + const navigate = useNavigate(); + const { data: tablos } = useTablosList(); + const { getUnreadCount } = useChatUnread(); + const [isChannelListExpanded, setIsChannelListExpanded] = useState(!channelId); + + const { + messages, + sendMessage, + sendTyping, + isConnected, + typingUsers, + onlineUsers, + loadMoreMessages, + hasMoreMessages, + markAsRead, + } = useChat(channelId); + + const activeTablo = tablos?.find((t) => t.id === channelId) ?? null; + + // Mark as read when channel is focused + useEffect(() => { + if (channelId && messages.length > 0) { + markAsRead(); + } + }, [channelId, messages.length, markAsRead]); + + const handleSend = (innerHtml: string, textContent: string) => { + const text = textContent.trim(); + if (!text) return; + sendMessage(text); + }; + + const handleChannelSelect = (tabloId: string) => { + navigate(`/chat/${tabloId}`); + }; + + return ( +
+
+

Discussions

+
+
+ {/* Channel list sidebar */} +
+
+ {tablos?.map((tablo) => ( + handleChannelSelect(tablo.id)} + unreadCount={getUnreadCount(tablo.id)} + isOnline={onlineUsers.some((uid) => uid !== user.id)} + /> + ))} +
+
+ + {/* Chat area */} +
+ {channelId && activeTablo ? ( + <> + setIsChannelListExpanded(!isChannelListExpanded)} + isChannelListExpanded={isChannelListExpanded} + onlineUsers={onlineUsers} + /> +
+ + 0 ? ( + + ) : undefined + } + > + {messages.map((msg) => ( + + ))} + + sendTyping()} + attachButton={false} + /> + +
+ + ) : ( +
+ Select a conversation to start chatting +
+ )} +
+
+
+ ); +} +``` + +- [ ] **Step 5: Update routes — remove ChatProvider wrapper** + +In `apps/main/src/lib/routes.tsx`, change the chat route from: + +```typescript +import ChatProvider from "../providers/ChatProvider"; +``` +and +```typescript + { + path: "chat", + element: ( + + + + ), + children: [{ index: true }, { path: ":channelId" }], + }, +``` + +to: + +```typescript + { + path: "chat", + element: , + children: [{ index: true }, { path: ":channelId" }], + }, +``` + +Remove the `import ChatProvider` line entirely. + +- [ ] **Step 6: Commit** + +```bash +git add apps/main/src/components/ChatChannelPreview.tsx apps/main/src/components/ChatHeader.tsx apps/main/src/pages/chat.tsx apps/main/src/lib/routes.tsx apps/main/package.json +git commit -m "feat(chat): rewrite chat page with chatscope UI and custom hooks" +``` + +--- + +## Task 9: Frontend — Environment variables + +**Files:** +- Modify: `apps/main/.env.local` +- Modify: `apps/main/.env.staging` +- Modify: `apps/main/.env.production` + +- [ ] **Step 1: Update .env.local** + +Remove the line `VITE_STREAM_CHAT_API_KEY="h7bwnn8ynjpx"` and add: + +``` +VITE_CHAT_WS_URL=ws://localhost:8787 +VITE_CHAT_API_URL=http://localhost:8787 +``` + +- [ ] **Step 2: Update .env.staging** + +Remove the line `VITE_STREAM_CHAT_API_KEY="t5vvvddteapa"` and add: + +``` +VITE_CHAT_WS_URL=wss://chat-staging.xtablo.com +VITE_CHAT_API_URL=https://chat-staging.xtablo.com +``` + +- [ ] **Step 3: Update .env.production** + +Remove the line `VITE_STREAM_CHAT_API_KEY="h7bwnn8ynjpx"` and add: + +``` +VITE_CHAT_WS_URL=wss://chat.xtablo.com +VITE_CHAT_API_URL=https://chat.xtablo.com +``` + +- [ ] **Step 4: Commit** + +```bash +git add apps/main/.env.local apps/main/.env.staging apps/main/.env.production +git commit -m "feat(chat): update env vars — replace Stream API key with chat worker URLs" +``` + +--- + +## Task 10: Backend — Remove Stream Chat from API + +**Files:** +- Modify: `apps/api/src/types/app.types.ts` — remove `streamServerClient` from BaseEnv +- Modify: `apps/api/src/middlewares/middleware.ts` — remove streamChatMiddleware +- Modify: `apps/api/src/routers/index.ts` — remove `streamChat` middleware usage +- Modify: `apps/api/src/config.ts` — remove STREAM_CHAT_API_KEY, STREAM_CHAT_API_SECRET +- Modify: `apps/api/src/secrets.ts` — remove streamChatApiSecret, streamChatApiSecretStaging +- Modify: `apps/api/src/routers/user.ts` — remove signUpToStream, streamToken from getMe, Stream from inviteToOrganization and removeOrganizationMember +- Modify: `apps/api/src/routers/tablo.ts` — remove all Stream channel operations +- Modify: `apps/api/src/helpers/helpers.ts` — remove streamServerClient from createInvitedUser +- Modify: `apps/api/package.json` — remove stream-chat dependency +- Modify: `apps/api/src/__tests__/routes/tablo.test.ts` — remove Stream Chat mocks +- Modify: `apps/api/src/__tests__/config/stripe-config.test.ts` — remove streamChat mock data + +This is the largest task but is entirely removal. Each change is a deletion, not a rewrite. + +- [ ] **Step 1: Remove StreamChat from app.types.ts** + +In `apps/api/src/types/app.types.ts`: + +Remove the import: `import type { StreamChat } from "stream-chat";` + +Remove `streamServerClient: StreamChat;` from the `BaseEnv.Variables` type. + +The `BaseEnv` becomes: +```typescript +export type BaseEnv = { + Variables: { + supabase: SupabaseClient; + s3_client: S3Client; + transporter: Transporter; + stripe: Stripe; + stripeSync: StripeSync; + }; +}; +``` + +- [ ] **Step 2: Remove streamChatMiddleware from middleware.ts** + +In `apps/api/src/middlewares/middleware.ts`: + +1. Remove the import: `import { StreamChat } from "stream-chat";` +2. Remove `streamChatMiddleware` from the `Middlewares` type (lines 28-30) +3. Remove the `streamChatMiddleware` creation (lines 171-178) +4. Remove `streamChatMiddleware` from the return object (line 258) +5. Remove the `get streamChat()` getter (lines 285-287) + +- [ ] **Step 3: Remove streamChat from router index** + +In `apps/api/src/routers/index.ts`, remove line 20: +```typescript + mainRouter.use(middlewareManager.streamChat); +``` + +- [ ] **Step 4: Remove Stream config vars** + +In `apps/api/src/config.ts`: + +1. Remove lines 11-12 from `AppConfig`: + ```typescript + STREAM_CHAT_API_KEY: string; + STREAM_CHAT_API_SECRET: string; + ``` +2. Remove lines 62-63 (the `getStreamChatApiSecret` helper) +3. Remove lines 85-89 (the config assignments for `STREAM_CHAT_API_KEY` and `STREAM_CHAT_API_SECRET`) + +- [ ] **Step 5: Remove Stream from secrets.ts** + +In `apps/api/src/secrets.ts`: + +1. Remove from the `Secrets` type: `streamChatApiSecret: string;` and `streamChatApiSecretStaging: string;` +2. Remove from `loadSecrets()`: the lines fetching `stream-chat-api-secret-staging` and `stream-chat-api-secret` + +- [ ] **Step 6: Remove Stream from user.ts** + +In `apps/api/src/routers/user.ts`: + +1. Remove the `signUpToStream` handler entirely (lines 14-32) +2. In `getMe` handler (lines 34-71): + - Remove `const streamServerClient = c.get("streamServerClient");` (line 37) + - Remove `const token = streamServerClient.createToken(user_id);` (line 64) + - Remove `streamToken: token` from the JSON response (line 69) + - The response becomes: `return c.json({ ...userData, plan: effectivePlan });` +3. In `inviteToOrganization` (lines 514-715): + - Remove `const streamServerClient = c.get("streamServerClient");` (line 518) + - Remove `streamServerClient` from `createInvitedUser` call (line 614-621) — pass only `supabase, transporter, ...` + - Remove the Stream channel addMembers loop (lines 676-683) +4. In `removeOrganizationMember` (lines 717-850): + - Remove `const streamServerClient = c.get("streamServerClient");` (line 720) + - Remove the Stream channel removeMembers loop (lines 829-836) +5. Remove the route: `userRouter.post("/sign-up-to-stream", ...signUpToStream);` (line 855) + +- [ ] **Step 7: Remove Stream from tablo.ts** + +In `apps/api/src/routers/tablo.ts`: + +1. Remove the `isAlreadyMemberError` helper (lines 21-29) +2. Remove the `upsertStreamUserFromProfile` helper (lines 31-47) +3. Remove the `ensureTabloChannelMember` helper (lines 49-96) +4. In `createTablo` (lines 98-170): + - Remove `const streamServerClient = c.get("streamServerClient");` and the channel.create block (lines 150-157) +5. In `updateTablo` (lines 172-220): + - Remove `const streamServerClient = c.get("streamServerClient");` (line 176) + - Remove the channel.update block (lines 207-217) +6. In `deleteTablo` (lines 222-281): + - Remove `const streamServerClient = c.get("streamServerClient");` (line 225) + - Remove the channel.delete block (lines 273-278) +7. In `inviteToTablo` (lines 283-435): + - Remove `const streamServerClient = c.get("streamServerClient");` (line 291) + - Remove `streamServerClient` from `createInvitedUser` call (line 356-363) + - Remove the `ensureTabloChannelMember` call (lines 384-389) +8. In `cancelPendingInvite` (lines 437-526): + - Remove `const streamServerClient = c.get("streamServerClient");` (line 441) + - Remove the channel.removeMembers block (lines 517-522) +9. In `acceptInviteById` (lines 572-632): + - Remove `const streamServerClient = c.get("streamServerClient");` (line 576) + - Remove the `upsertStreamUserFromProfile` call (lines 601-606) + - Remove the `ensureTabloChannelMember` call (lines 624-629) +10. In `joinTablo` (lines 634-697): + - Remove `const streamServerClient = c.get("streamServerClient");` (line 639) + - Remove the `upsertStreamUserFromProfile` call (lines 660-665) + - Remove the `ensureTabloChannelMember` call (lines 689-694) +11. In `leaveTablo` (lines 748-768): + - Remove `const streamServerClient = c.get("streamServerClient");` (line 751) + - Remove the `channel.removeMembers` call (lines 754-755) +12. In `getTabloRouter` (lines 869-891): + - Remove `tabloRouter.use(middlewareManager.streamChat);` (line 875) + +- [ ] **Step 8: Remove Stream from helpers.ts** + +In `apps/api/src/helpers/helpers.ts`: + +1. Remove `import type { StreamChat } from "stream-chat";` (line 6) +2. In `createInvitedUser` (lines 291-373): + - Remove `streamServerClient: StreamChat` from the parameter list (line 293) + - Remove the `streamServerClient.upsertUser()` call (lines 337-341) + +- [ ] **Step 9: Remove stream-chat dependency** + +Run: `cd apps/api && pnpm remove stream-chat` + +- [ ] **Step 10: Update test files** + +In `apps/api/src/__tests__/routes/tablo.test.ts`: +- Remove the Stream Chat mock block (lines 12-38) +- Remove any `mockChannel*` expectations in individual tests + +In `apps/api/src/__tests__/config/stripe-config.test.ts`: +- Remove `streamChatApiSecret` and `streamChatApiSecretStaging` from mock data (lines 13, 16) + +Update any other test files that reference Stream Chat mocks. + +- [ ] **Step 11: Run tests** + +Run: `pnpm test:api` + +Expected: All tests pass with Stream Chat removed. + +- [ ] **Step 12: Commit** + +```bash +git add apps/api/ +git commit -m "refactor(api): remove all Stream Chat dependencies and operations" +``` + +--- + +## Task 11: Frontend — Remove Stream Chat dependencies + +**Files:** +- Delete: `apps/main/src/providers/ChatProvider.tsx` +- Delete: `apps/main/src/components/ChannelPreview.tsx` +- Delete: `apps/main/src/components/CustomChannelHeader.tsx` +- Delete: `apps/main/src/hooks/channel.ts` +- Modify: `apps/main/src/providers/UserStoreProvider.tsx` — remove streamToken +- Modify: `apps/main/package.json` — remove stream-chat, stream-chat-react +- Modify: `packages/shared/src/hooks/auth.ts` — remove useSignUpToStream + +- [ ] **Step 1: Delete Stream-specific files** + +Delete these files: +- `apps/main/src/providers/ChatProvider.tsx` +- `apps/main/src/components/ChannelPreview.tsx` +- `apps/main/src/components/CustomChannelHeader.tsx` +- `apps/main/src/hooks/channel.ts` + +- [ ] **Step 2: Remove streamToken from UserStoreProvider** + +In `apps/main/src/providers/UserStoreProvider.tsx`: + +Change the `User` type (line 10-12) from: +```typescript +export type User = Tables<"profiles"> & { + streamToken: string | null; +}; +``` +to: +```typescript +export type User = Tables<"profiles">; +``` + +- [ ] **Step 3: Remove useSignUpToStream from auth.ts** + +In `packages/shared/src/hooks/auth.ts`: + +1. Remove the `useSignUpToStream` function entirely (lines 85-101) +2. In `useSignUpWithoutPassword`, remove: + - `const { signUpToStream } = useSignUpToStream(api);` (line 15) + - The `signUpToStream` call in the mutation (lines 38-40): + ```typescript + if (response.session?.access_token) { + await signUpToStream(response.session.access_token); + } + ``` + +- [ ] **Step 4: Remove Stream packages** + +Run: `cd apps/main && pnpm remove stream-chat stream-chat-react` + +- [ ] **Step 5: Search for any remaining Stream references** + +Run: `grep -r "stream-chat\|streamToken\|STREAM_CHAT\|StreamChat\|useChannelFromUrl\|useTabloDiscussionUnread\|ChatProvider" apps/main/src/ --include="*.ts" --include="*.tsx" -l` + +Expected: No files returned (only test mocks, if any). + +- [ ] **Step 6: Remove VITE_STREAM_CHAT_API_KEY from external app if present** + +Check `apps/external/.env.production` — if it references `VITE_STREAM_CHAT_API_KEY`, remove it. + +- [ ] **Step 7: Run typecheck and tests** + +Run: `pnpm typecheck && cd apps/main && pnpm test` + +Expected: No type errors, all tests pass. + +- [ ] **Step 8: Commit** + +```bash +git add -A +git commit -m "refactor(main): remove all Stream Chat dependencies and components" +``` + +--- + +## Task 12: Frontend — Remove Stream Chat API env var from API .env files + +**Files:** +- Modify: `apps/api/.env.development` + +- [ ] **Step 1: Remove STREAM_CHAT_API_KEY** + +In `apps/api/.env.development`, remove line 3: +``` +STREAM_CHAT_API_KEY=h7bwnn8ynjpx +``` + +- [ ] **Step 2: Commit** + +```bash +git add apps/api/.env.development +git commit -m "chore: remove STREAM_CHAT_API_KEY from API env files" +``` + +--- + +## Task 13: Integration testing — End-to-end chat flow + +**Files:** +- No new files — manual testing + +- [ ] **Step 1: Start the chat worker locally** + +Run: `cd apps/chat-worker && pnpm dev` + +Expected: Worker starts on `http://localhost:8787` with DO bindings. + +Note: Before this, set the required secrets locally: +```bash +cd apps/chat-worker +echo "your-supabase-service-role-key" | wrangler secret put SUPABASE_SERVICE_ROLE_KEY --local +echo "your-jwt-secret" | wrangler secret put JWT_SECRET --local +``` + +- [ ] **Step 2: Start the main app** + +Run: `pnpm dev:main` + +- [ ] **Step 3: Test the golden path** + +1. Log in to the app +2. Navigate to `/chat` +3. Select a tablo channel from the sidebar +4. Send a message — verify it appears immediately (optimistic UI) +5. Open a second browser tab with the same channel — verify the message appears there +6. Send a message from the second tab — verify it appears in both tabs +7. Type in one tab — verify typing indicator shows in the other + +- [ ] **Step 4: Test reconnection** + +1. Stop the chat worker (`ctrl+c` in the terminal running `pnpm dev` in chat-worker) +2. Verify the UI shows disconnected state +3. Restart the chat worker +4. Verify the client reconnects and loads missed messages + +- [ ] **Step 5: Test unread counts** + +1. Open `/chat` in one tab +2. Navigate away from the chat page in a second tab +3. Send a message from the first tab +4. In the second tab, the unread badge should appear within 30 seconds (polling interval) + +- [ ] **Step 6: Verify API has no Stream references** + +Run: `pnpm dev:api` +Run: Test various tablo operations (create, update, delete, invite) and verify they work without Stream Chat errors. + +- [ ] **Step 7: Run full test suite** + +Run: `pnpm test` + +Expected: All tests pass.