feat(chat): add useChat hook with WebSocket connection and reconnection
This commit is contained in:
parent
2811e51109
commit
2833b4b2c1
1 changed files with 230 additions and 0 deletions
230
apps/main/src/hooks/useChat.ts
Normal file
230
apps/main/src/hooks/useChat.ts
Normal file
|
|
@ -0,0 +1,230 @@
|
|||
import { useCallback, useEffect, useRef, useState } from "react";
|
||||
import { useSession } from "@xtablo/shared/contexts/SessionContext";
|
||||
|
||||
interface ChatMessage {
|
||||
id: string;
|
||||
userId: string;
|
||||
text: string;
|
||||
createdAt: string;
|
||||
clientId: string;
|
||||
/** True while the message is only local (not yet echoed by server). */
|
||||
optimistic?: boolean;
|
||||
}
|
||||
|
||||
type ServerMessage =
|
||||
| { type: "message.new"; id: string; userId: string; text: string; createdAt: string; clientId: string }
|
||||
| { type: "typing"; userId: string; isTyping: boolean }
|
||||
| { type: "presence.update"; userId: string; status: "online" | "offline" }
|
||||
| { type: "error"; code: string; message: string };
|
||||
|
||||
const CHAT_WS_BASE = import.meta.env.VITE_CHAT_WS_URL as string;
|
||||
const CHAT_API_BASE = import.meta.env.VITE_CHAT_API_URL as string;
|
||||
|
||||
export function useChat(channelId: string | undefined) {
|
||||
const { session } = useSession();
|
||||
const token = session?.access_token;
|
||||
|
||||
const [messages, setMessages] = useState<ChatMessage[]>([]);
|
||||
const [isConnected, setIsConnected] = useState(false);
|
||||
const [typingUsers, setTypingUsers] = useState<string[]>([]);
|
||||
const [onlineUsers, setOnlineUsers] = useState<string[]>([]);
|
||||
const [hasMoreMessages, setHasMoreMessages] = useState(true);
|
||||
|
||||
const wsRef = useRef<WebSocket | null>(null);
|
||||
const reconnectAttemptRef = useRef(0);
|
||||
const reconnectTimerRef = useRef<ReturnType<typeof setTimeout>>();
|
||||
const typingTimerRef = useRef<ReturnType<typeof setTimeout>>();
|
||||
const isTypingRef = useRef(false);
|
||||
|
||||
// Fetch message history from REST endpoint
|
||||
const fetchHistory = useCallback(async (before?: string) => {
|
||||
if (!channelId || !token) return;
|
||||
|
||||
const params = new URLSearchParams({ limit: "50" });
|
||||
if (before) params.set("before", before);
|
||||
|
||||
const res = await fetch(`${CHAT_API_BASE}/chat/channels/${channelId}/messages?${params}`, {
|
||||
headers: { Authorization: `Bearer ${token}` },
|
||||
});
|
||||
|
||||
if (!res.ok) return;
|
||||
|
||||
const data = await res.json() as { messages: ChatMessage[]; hasMore: boolean };
|
||||
setHasMoreMessages(data.hasMore);
|
||||
|
||||
if (before) {
|
||||
// Prepend older messages
|
||||
setMessages((prev) => [...data.messages, ...prev]);
|
||||
} else {
|
||||
// Initial load
|
||||
setMessages(data.messages);
|
||||
}
|
||||
}, [channelId, token]);
|
||||
|
||||
// Load more (pagination)
|
||||
const loadMoreMessages = useCallback(() => {
|
||||
if (messages.length === 0 || !hasMoreMessages) return;
|
||||
const oldest = messages[0];
|
||||
fetchHistory(oldest.createdAt);
|
||||
}, [messages, hasMoreMessages, fetchHistory]);
|
||||
|
||||
// WebSocket connection management
|
||||
useEffect(() => {
|
||||
if (!channelId || !token) return;
|
||||
|
||||
const connect = () => {
|
||||
// Token passed via query param because browsers cannot send custom headers on WS connections
|
||||
const wsUrl = `${CHAT_WS_BASE}/chat/ws/${channelId}?token=${encodeURIComponent(token)}`;
|
||||
const ws = new WebSocket(wsUrl);
|
||||
|
||||
ws.onopen = () => {
|
||||
setIsConnected(true);
|
||||
reconnectAttemptRef.current = 0;
|
||||
};
|
||||
|
||||
ws.onmessage = (event) => {
|
||||
const msg = JSON.parse(event.data) as ServerMessage;
|
||||
|
||||
switch (msg.type) {
|
||||
case "message.new":
|
||||
setMessages((prev) => {
|
||||
// Deduplicate: replace optimistic message with server version
|
||||
const withoutOptimistic = prev.filter(
|
||||
(m) => !(m.clientId === msg.clientId && m.optimistic)
|
||||
);
|
||||
// Avoid duplicate if message already received
|
||||
if (withoutOptimistic.some((m) => m.id === msg.id)) {
|
||||
return withoutOptimistic;
|
||||
}
|
||||
return [...withoutOptimistic, {
|
||||
id: msg.id,
|
||||
userId: msg.userId,
|
||||
text: msg.text,
|
||||
createdAt: msg.createdAt,
|
||||
clientId: msg.clientId,
|
||||
}];
|
||||
});
|
||||
break;
|
||||
|
||||
case "typing":
|
||||
setTypingUsers((prev) =>
|
||||
msg.isTyping
|
||||
? prev.includes(msg.userId) ? prev : [...prev, msg.userId]
|
||||
: prev.filter((id) => id !== msg.userId)
|
||||
);
|
||||
break;
|
||||
|
||||
case "presence.update":
|
||||
setOnlineUsers((prev) =>
|
||||
msg.status === "online"
|
||||
? prev.includes(msg.userId) ? prev : [...prev, msg.userId]
|
||||
: prev.filter((id) => id !== msg.userId)
|
||||
);
|
||||
break;
|
||||
|
||||
case "error":
|
||||
console.error("Chat error:", msg.code, msg.message);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
ws.onclose = () => {
|
||||
setIsConnected(false);
|
||||
wsRef.current = null;
|
||||
|
||||
// Exponential backoff reconnect
|
||||
const delay = Math.min(1000 * 2 ** reconnectAttemptRef.current, 30000);
|
||||
reconnectAttemptRef.current++;
|
||||
reconnectTimerRef.current = setTimeout(connect, delay);
|
||||
};
|
||||
|
||||
ws.onerror = () => {
|
||||
ws.close();
|
||||
};
|
||||
|
||||
wsRef.current = ws;
|
||||
};
|
||||
|
||||
// Load initial history then connect WebSocket
|
||||
fetchHistory().then(connect);
|
||||
|
||||
return () => {
|
||||
clearTimeout(reconnectTimerRef.current);
|
||||
clearTimeout(typingTimerRef.current);
|
||||
wsRef.current?.close();
|
||||
wsRef.current = null;
|
||||
setMessages([]);
|
||||
setIsConnected(false);
|
||||
setTypingUsers([]);
|
||||
setOnlineUsers([]);
|
||||
};
|
||||
}, [channelId, token, fetchHistory]);
|
||||
|
||||
// Send message
|
||||
const sendMessage = useCallback((text: string) => {
|
||||
if (!wsRef.current || wsRef.current.readyState !== WebSocket.OPEN) return;
|
||||
|
||||
const clientId = crypto.randomUUID();
|
||||
|
||||
// Optimistic update
|
||||
setMessages((prev) => [
|
||||
...prev,
|
||||
{
|
||||
id: `optimistic-${clientId}`,
|
||||
userId: session?.user?.id ?? "",
|
||||
text,
|
||||
createdAt: new Date().toISOString(),
|
||||
clientId,
|
||||
optimistic: true,
|
||||
},
|
||||
]);
|
||||
|
||||
wsRef.current.send(JSON.stringify({ type: "message.send", text, clientId }));
|
||||
|
||||
// Stop typing when sending
|
||||
if (isTypingRef.current) {
|
||||
wsRef.current.send(JSON.stringify({ type: "typing.stop" }));
|
||||
isTypingRef.current = false;
|
||||
clearTimeout(typingTimerRef.current);
|
||||
}
|
||||
}, [session?.user?.id]);
|
||||
|
||||
// Typing indicator
|
||||
const sendTyping = useCallback(() => {
|
||||
if (!wsRef.current || wsRef.current.readyState !== WebSocket.OPEN) return;
|
||||
|
||||
if (!isTypingRef.current) {
|
||||
isTypingRef.current = true;
|
||||
wsRef.current.send(JSON.stringify({ type: "typing.start" }));
|
||||
}
|
||||
|
||||
clearTimeout(typingTimerRef.current);
|
||||
typingTimerRef.current = setTimeout(() => {
|
||||
if (wsRef.current?.readyState === WebSocket.OPEN) {
|
||||
wsRef.current.send(JSON.stringify({ type: "typing.stop" }));
|
||||
}
|
||||
isTypingRef.current = false;
|
||||
}, 2000);
|
||||
}, []);
|
||||
|
||||
// Mark as read
|
||||
const markAsRead = useCallback(async () => {
|
||||
if (!channelId || !token) return;
|
||||
await fetch(`${CHAT_API_BASE}/chat/channels/${channelId}/read`, {
|
||||
method: "POST",
|
||||
headers: { Authorization: `Bearer ${token}` },
|
||||
});
|
||||
}, [channelId, token]);
|
||||
|
||||
return {
|
||||
messages,
|
||||
sendMessage,
|
||||
sendTyping,
|
||||
isConnected,
|
||||
typingUsers,
|
||||
onlineUsers,
|
||||
loadMoreMessages,
|
||||
hasMoreMessages,
|
||||
markAsRead,
|
||||
};
|
||||
}
|
||||
Loading…
Reference in a new issue