diff --git a/backend/cmd/web/main.go b/backend/cmd/web/main.go index 4192493..ec2ed56 100644 --- a/backend/cmd/web/main.go +++ b/backend/cmd/web/main.go @@ -149,7 +149,7 @@ func main() { fileDeps := web.FilesDeps{Queries: q, Files: filesStore, MaxUploadMB: maxUploadMB} etapeDeps := web.EtapesDeps{Queries: q} eventDeps := web.EventsDeps{Queries: q} - discussionDeps := web.DiscussionDeps{Queries: q} + discussionDeps := web.DiscussionDeps{Queries: q, Realtime: web.NewDiscussionBroker()} planningDeps := web.PlanningDeps{Queries: q} // D-09: pass the embedded static FS — binary has zero runtime file dependencies. diff --git a/backend/internal/web/discussion_broker.go b/backend/internal/web/discussion_broker.go new file mode 100644 index 0000000..f2f2680 --- /dev/null +++ b/backend/internal/web/discussion_broker.go @@ -0,0 +1,84 @@ +package web + +import ( + "context" + "sync" + + "github.com/google/uuid" +) + +type DiscussionEvent struct { + TabloID uuid.UUID `json:"tabloId"` + MessageID uuid.UUID `json:"messageId"` + AuthorUserID uuid.UUID `json:"authorUserId"` + MessageHTML string `json:"messageHtml"` + RefreshUnread bool `json:"refreshUnread"` +} + +type DiscussionRealtime interface { + Subscribe(ctx context.Context, tabloID uuid.UUID) (<-chan DiscussionEvent, func()) + Publish(event DiscussionEvent) +} + +type DiscussionBroker struct { + mu sync.Mutex + nextID int + subscribers map[uuid.UUID]map[int]chan DiscussionEvent +} + +func NewDiscussionBroker() *DiscussionBroker { + return &DiscussionBroker{subscribers: make(map[uuid.UUID]map[int]chan DiscussionEvent)} +} + +func (b *DiscussionBroker) Subscribe(ctx context.Context, tabloID uuid.UUID) (<-chan DiscussionEvent, func()) { + ch := make(chan DiscussionEvent, 8) + + b.mu.Lock() + b.nextID++ + id := b.nextID + if b.subscribers[tabloID] == nil { + b.subscribers[tabloID] = make(map[int]chan DiscussionEvent) + } + b.subscribers[tabloID][id] = ch + b.mu.Unlock() + + var once sync.Once + unsubscribe := func() { + once.Do(func() { + b.mu.Lock() + if subscribers := b.subscribers[tabloID]; subscribers != nil { + delete(subscribers, id) + if len(subscribers) == 0 { + delete(b.subscribers, tabloID) + } + } + b.mu.Unlock() + close(ch) + }) + } + + go func() { + <-ctx.Done() + unsubscribe() + }() + + return ch, unsubscribe +} + +func (b *DiscussionBroker) Publish(event DiscussionEvent) { + b.mu.Lock() + defer b.mu.Unlock() + + for _, ch := range b.subscribers[event.TabloID] { + select { + case ch <- event: + default: + } + } +} + +func (b *DiscussionBroker) SubscriberCount(tabloID uuid.UUID) int { + b.mu.Lock() + defer b.mu.Unlock() + return len(b.subscribers[tabloID]) +} diff --git a/backend/internal/web/handlers_discussion.go b/backend/internal/web/handlers_discussion.go index 65248c8..5dc6aa3 100644 --- a/backend/internal/web/handlers_discussion.go +++ b/backend/internal/web/handlers_discussion.go @@ -1,9 +1,13 @@ package web import ( + "bytes" + "encoding/json" + "fmt" "log/slog" "net/http" "strings" + "time" "backend/internal/db/sqlc" "backend/templates" @@ -14,7 +18,8 @@ import ( ) type DiscussionDeps struct { - Queries *sqlc.Queries + Queries *sqlc.Queries + Realtime DiscussionRealtime } func loadDiscussionTabData(w http.ResponseWriter, r *http.Request, q *sqlc.Queries, tablo sqlc.Tablo) (templates.DiscussionTabData, bool) { @@ -107,12 +112,92 @@ func DiscussionMessageCreateHandler(deps DiscussionDeps) http.HandlerFunc { } data := templates.DiscussionTabData{Messages: []templates.DiscussionMessageView{templates.DiscussionMessageFromRow(row)}} markDiscussionRead(r, deps.Queries, tablo, user.ID, data) + message := templates.DiscussionMessageFromRow(row) + if deps.Realtime != nil { + html, err := renderDiscussionMessageHTML(r, message) + if err != nil { + slog.Default().Warn("discussion create: render realtime message failed", "tablo_id", tablo.ID, "message_id", msg.ID, "err", err) + } else { + deps.Realtime.Publish(DiscussionEvent{ + TabloID: tablo.ID, + MessageID: msg.ID, + AuthorUserID: user.ID, + MessageHTML: html, + RefreshUnread: true, + }) + } + } if r.Header.Get("HX-Request") == "true" { w.Header().Set("Content-Type", "text/html; charset=utf-8") - _ = templates.DiscussionMessageRow(templates.DiscussionMessageFromRow(row)).Render(r.Context(), w) + _ = templates.DiscussionMessageRow(message).Render(r.Context(), w) return } http.Redirect(w, r, templates.DiscussionURL(tablo.ID), http.StatusSeeOther) } } + +func DiscussionStreamHandler(deps DiscussionDeps) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + tablo, _, ok := loadOwnedTablo(w, r, TablosDeps{Queries: deps.Queries}) + if !ok { + return + } + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming unsupported", http.StatusInternalServerError) + return + } + if deps.Realtime == nil { + http.Error(w, "streaming unavailable", http.StatusServiceUnavailable) + return + } + + events, unsubscribe := deps.Realtime.Subscribe(r.Context(), tablo.ID) + defer unsubscribe() + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") + _, _ = fmt.Fprint(w, ": connected\n\n") + flusher.Flush() + + keepalive := time.NewTicker(25 * time.Second) + defer keepalive.Stop() + for { + select { + case <-r.Context().Done(): + return + case <-keepalive.C: + _, _ = fmt.Fprint(w, ": keepalive\n\n") + flusher.Flush() + case event, ok := <-events: + if !ok { + return + } + if err := writeDiscussionEvent(w, event); err != nil { + return + } + flusher.Flush() + } + } + } +} + +func renderDiscussionMessageHTML(r *http.Request, message templates.DiscussionMessageView) (string, error) { + var buf bytes.Buffer + if err := templates.DiscussionMessageRow(message).Render(r.Context(), &buf); err != nil { + return "", err + } + return buf.String(), nil +} + +func writeDiscussionEvent(w http.ResponseWriter, event DiscussionEvent) error { + payload, err := json.Marshal(event) + if err != nil { + return err + } + _, err = fmt.Fprintf(w, "event: discussion-message\ndata: %s\n\n", payload) + return err +} diff --git a/backend/internal/web/router.go b/backend/internal/web/router.go index 3bcf778..53b3cec 100644 --- a/backend/internal/web/router.go +++ b/backend/internal/web/router.go @@ -50,6 +50,9 @@ type Pinger interface { // a Referer header). In production, pass no extra args — leave empty. func NewRouter(pinger Pinger, staticFS fs.FS, deps AuthDeps, tabloDeps TablosDeps, taskDeps TasksDeps, etapeDeps EtapesDeps, eventDeps EventsDeps, discussionDeps DiscussionDeps, planningDeps PlanningDeps, fileDeps FilesDeps, csrfKey []byte, env string, trustedOrigins ...string) (http.Handler, error) { r := chi.NewRouter() + if discussionDeps.Realtime == nil { + discussionDeps.Realtime = NewDiscussionBroker() + } r.Use(RequestIDMiddleware) r.Use(chimw.RealIP) r.Use(SlogLoggerMiddleware(slog.Default())) @@ -126,6 +129,7 @@ func NewRouter(pinger Pinger, staticFS fs.FS, deps AuthDeps, tabloDeps TablosDep r.Post("/tablos/{id}/events/{event_id}/delete", EventDeleteHandler(eventDeps)) // Discussion tab and message routes — static discussion segment before later parametric child routes. r.Get("/tablos/{id}/discussion", TabloDiscussionTabHandler(discussionDeps)) + r.Get("/tablos/{id}/discussion/stream", DiscussionStreamHandler(discussionDeps)) r.Post("/tablos/{id}/discussion/messages", DiscussionMessageCreateHandler(discussionDeps)) // Parametric task routes — must come after static task segments. r.Get("/tablos/{id}/tasks/{task_id}/show", TaskShowHandler(taskDeps)) diff --git a/backend/static/discussion-sse.js b/backend/static/discussion-sse.js new file mode 100644 index 0000000..475f440 --- /dev/null +++ b/backend/static/discussion-sse.js @@ -0,0 +1,55 @@ +(function () { + function messageExists(messageId) { + return Boolean(document.querySelector('[data-message-id="' + CSS.escape(messageId) + '"]')); + } + + function ensureMessageList(container) { + var messages = container.querySelector("#discussion-messages"); + if (!messages) return null; + + var list = messages.querySelector(".divide-y"); + if (list) return list; + + messages.innerHTML = ""; + list = document.createElement("div"); + list.className = "divide-y divide-slate-100"; + messages.appendChild(list); + return list; + } + + function appendMessage(container, event) { + if (!event || !event.messageId || !event.messageHtml || messageExists(event.messageId)) { + return; + } + var list = ensureMessageList(container); + if (!list) return; + + var template = document.createElement("template"); + template.innerHTML = event.messageHtml.trim(); + list.appendChild(template.content); + } + + function connectDiscussion(container) { + if (!container || container.dataset.discussionStreamConnected === "true") return; + + var streamURL = container.dataset.discussionStreamUrl; + if (!streamURL || !window.EventSource) return; + + container.dataset.discussionStreamConnected = "true"; + var source = new EventSource(streamURL); + source.addEventListener("discussion-message", function (message) { + try { + appendMessage(container, JSON.parse(message.data)); + } catch (_) { + return; + } + }); + } + + function connectDiscussionStreams() { + document.querySelectorAll("[data-discussion-stream-url]").forEach(connectDiscussion); + } + + document.addEventListener("DOMContentLoaded", connectDiscussionStreams); + document.body.addEventListener("htmx:afterSwap", connectDiscussionStreams); +})(); diff --git a/backend/templates/discussion.templ b/backend/templates/discussion.templ index c5a1518..e868e37 100644 --- a/backend/templates/discussion.templ +++ b/backend/templates/discussion.templ @@ -8,7 +8,7 @@ import ( ) templ DiscussionTabFragment(tablo sqlc.Tablo, data DiscussionTabData, form DiscussionForm, errs DiscussionErrors, csrfToken string) { -
+

Discussion

diff --git a/backend/templates/discussion_forms.go b/backend/templates/discussion_forms.go index e1c509d..758c405 100644 --- a/backend/templates/discussion_forms.go +++ b/backend/templates/discussion_forms.go @@ -48,6 +48,10 @@ func DiscussionURL(tabloID uuid.UUID) string { return "/tablos/" + tabloID.String() + "/discussion" } +func DiscussionStreamURL(tabloID uuid.UUID) string { + return "/tablos/" + tabloID.String() + "/discussion/stream" +} + func DiscussionMessagesFromRows(rows []sqlc.ListDiscussionMessagesByTabloRow) []DiscussionMessageView { messages := make([]DiscussionMessageView, 0, len(rows)) for _, row := range rows { diff --git a/backend/templates/layout.templ b/backend/templates/layout.templ index 2b5f1c9..fa00081 100644 --- a/backend/templates/layout.templ +++ b/backend/templates/layout.templ @@ -53,6 +53,7 @@ templ Layout(title string, user *auth.User, csrfToken string) { + }