feat(12-03): add discussion SSE stream

This commit is contained in:
Arthur Belleville 2026-05-16 10:18:33 +02:00
parent c6dcb680bd
commit d15c3748e4
No known key found for this signature in database
8 changed files with 237 additions and 4 deletions

View file

@ -149,7 +149,7 @@ func main() {
fileDeps := web.FilesDeps{Queries: q, Files: filesStore, MaxUploadMB: maxUploadMB}
etapeDeps := web.EtapesDeps{Queries: q}
eventDeps := web.EventsDeps{Queries: q}
discussionDeps := web.DiscussionDeps{Queries: q}
discussionDeps := web.DiscussionDeps{Queries: q, Realtime: web.NewDiscussionBroker()}
planningDeps := web.PlanningDeps{Queries: q}
// D-09: pass the embedded static FS — binary has zero runtime file dependencies.

View file

@ -0,0 +1,84 @@
package web
import (
"context"
"sync"
"github.com/google/uuid"
)
type DiscussionEvent struct {
TabloID uuid.UUID `json:"tabloId"`
MessageID uuid.UUID `json:"messageId"`
AuthorUserID uuid.UUID `json:"authorUserId"`
MessageHTML string `json:"messageHtml"`
RefreshUnread bool `json:"refreshUnread"`
}
type DiscussionRealtime interface {
Subscribe(ctx context.Context, tabloID uuid.UUID) (<-chan DiscussionEvent, func())
Publish(event DiscussionEvent)
}
type DiscussionBroker struct {
mu sync.Mutex
nextID int
subscribers map[uuid.UUID]map[int]chan DiscussionEvent
}
func NewDiscussionBroker() *DiscussionBroker {
return &DiscussionBroker{subscribers: make(map[uuid.UUID]map[int]chan DiscussionEvent)}
}
func (b *DiscussionBroker) Subscribe(ctx context.Context, tabloID uuid.UUID) (<-chan DiscussionEvent, func()) {
ch := make(chan DiscussionEvent, 8)
b.mu.Lock()
b.nextID++
id := b.nextID
if b.subscribers[tabloID] == nil {
b.subscribers[tabloID] = make(map[int]chan DiscussionEvent)
}
b.subscribers[tabloID][id] = ch
b.mu.Unlock()
var once sync.Once
unsubscribe := func() {
once.Do(func() {
b.mu.Lock()
if subscribers := b.subscribers[tabloID]; subscribers != nil {
delete(subscribers, id)
if len(subscribers) == 0 {
delete(b.subscribers, tabloID)
}
}
b.mu.Unlock()
close(ch)
})
}
go func() {
<-ctx.Done()
unsubscribe()
}()
return ch, unsubscribe
}
func (b *DiscussionBroker) Publish(event DiscussionEvent) {
b.mu.Lock()
defer b.mu.Unlock()
for _, ch := range b.subscribers[event.TabloID] {
select {
case ch <- event:
default:
}
}
}
func (b *DiscussionBroker) SubscriberCount(tabloID uuid.UUID) int {
b.mu.Lock()
defer b.mu.Unlock()
return len(b.subscribers[tabloID])
}

View file

@ -1,9 +1,13 @@
package web
import (
"bytes"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"strings"
"time"
"backend/internal/db/sqlc"
"backend/templates"
@ -15,6 +19,7 @@ import (
type DiscussionDeps struct {
Queries *sqlc.Queries
Realtime DiscussionRealtime
}
func loadDiscussionTabData(w http.ResponseWriter, r *http.Request, q *sqlc.Queries, tablo sqlc.Tablo) (templates.DiscussionTabData, bool) {
@ -107,12 +112,92 @@ func DiscussionMessageCreateHandler(deps DiscussionDeps) http.HandlerFunc {
}
data := templates.DiscussionTabData{Messages: []templates.DiscussionMessageView{templates.DiscussionMessageFromRow(row)}}
markDiscussionRead(r, deps.Queries, tablo, user.ID, data)
message := templates.DiscussionMessageFromRow(row)
if deps.Realtime != nil {
html, err := renderDiscussionMessageHTML(r, message)
if err != nil {
slog.Default().Warn("discussion create: render realtime message failed", "tablo_id", tablo.ID, "message_id", msg.ID, "err", err)
} else {
deps.Realtime.Publish(DiscussionEvent{
TabloID: tablo.ID,
MessageID: msg.ID,
AuthorUserID: user.ID,
MessageHTML: html,
RefreshUnread: true,
})
}
}
if r.Header.Get("HX-Request") == "true" {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
_ = templates.DiscussionMessageRow(templates.DiscussionMessageFromRow(row)).Render(r.Context(), w)
_ = templates.DiscussionMessageRow(message).Render(r.Context(), w)
return
}
http.Redirect(w, r, templates.DiscussionURL(tablo.ID), http.StatusSeeOther)
}
}
func DiscussionStreamHandler(deps DiscussionDeps) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
tablo, _, ok := loadOwnedTablo(w, r, TablosDeps{Queries: deps.Queries})
if !ok {
return
}
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
return
}
if deps.Realtime == nil {
http.Error(w, "streaming unavailable", http.StatusServiceUnavailable)
return
}
events, unsubscribe := deps.Realtime.Subscribe(r.Context(), tablo.ID)
defer unsubscribe()
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
_, _ = fmt.Fprint(w, ": connected\n\n")
flusher.Flush()
keepalive := time.NewTicker(25 * time.Second)
defer keepalive.Stop()
for {
select {
case <-r.Context().Done():
return
case <-keepalive.C:
_, _ = fmt.Fprint(w, ": keepalive\n\n")
flusher.Flush()
case event, ok := <-events:
if !ok {
return
}
if err := writeDiscussionEvent(w, event); err != nil {
return
}
flusher.Flush()
}
}
}
}
func renderDiscussionMessageHTML(r *http.Request, message templates.DiscussionMessageView) (string, error) {
var buf bytes.Buffer
if err := templates.DiscussionMessageRow(message).Render(r.Context(), &buf); err != nil {
return "", err
}
return buf.String(), nil
}
func writeDiscussionEvent(w http.ResponseWriter, event DiscussionEvent) error {
payload, err := json.Marshal(event)
if err != nil {
return err
}
_, err = fmt.Fprintf(w, "event: discussion-message\ndata: %s\n\n", payload)
return err
}

View file

@ -50,6 +50,9 @@ type Pinger interface {
// a Referer header). In production, pass no extra args — leave empty.
func NewRouter(pinger Pinger, staticFS fs.FS, deps AuthDeps, tabloDeps TablosDeps, taskDeps TasksDeps, etapeDeps EtapesDeps, eventDeps EventsDeps, discussionDeps DiscussionDeps, planningDeps PlanningDeps, fileDeps FilesDeps, csrfKey []byte, env string, trustedOrigins ...string) (http.Handler, error) {
r := chi.NewRouter()
if discussionDeps.Realtime == nil {
discussionDeps.Realtime = NewDiscussionBroker()
}
r.Use(RequestIDMiddleware)
r.Use(chimw.RealIP)
r.Use(SlogLoggerMiddleware(slog.Default()))
@ -126,6 +129,7 @@ func NewRouter(pinger Pinger, staticFS fs.FS, deps AuthDeps, tabloDeps TablosDep
r.Post("/tablos/{id}/events/{event_id}/delete", EventDeleteHandler(eventDeps))
// Discussion tab and message routes — static discussion segment before later parametric child routes.
r.Get("/tablos/{id}/discussion", TabloDiscussionTabHandler(discussionDeps))
r.Get("/tablos/{id}/discussion/stream", DiscussionStreamHandler(discussionDeps))
r.Post("/tablos/{id}/discussion/messages", DiscussionMessageCreateHandler(discussionDeps))
// Parametric task routes — must come after static task segments.
r.Get("/tablos/{id}/tasks/{task_id}/show", TaskShowHandler(taskDeps))

View file

@ -0,0 +1,55 @@
(function () {
function messageExists(messageId) {
return Boolean(document.querySelector('[data-message-id="' + CSS.escape(messageId) + '"]'));
}
function ensureMessageList(container) {
var messages = container.querySelector("#discussion-messages");
if (!messages) return null;
var list = messages.querySelector(".divide-y");
if (list) return list;
messages.innerHTML = "";
list = document.createElement("div");
list.className = "divide-y divide-slate-100";
messages.appendChild(list);
return list;
}
function appendMessage(container, event) {
if (!event || !event.messageId || !event.messageHtml || messageExists(event.messageId)) {
return;
}
var list = ensureMessageList(container);
if (!list) return;
var template = document.createElement("template");
template.innerHTML = event.messageHtml.trim();
list.appendChild(template.content);
}
function connectDiscussion(container) {
if (!container || container.dataset.discussionStreamConnected === "true") return;
var streamURL = container.dataset.discussionStreamUrl;
if (!streamURL || !window.EventSource) return;
container.dataset.discussionStreamConnected = "true";
var source = new EventSource(streamURL);
source.addEventListener("discussion-message", function (message) {
try {
appendMessage(container, JSON.parse(message.data));
} catch (_) {
return;
}
});
}
function connectDiscussionStreams() {
document.querySelectorAll("[data-discussion-stream-url]").forEach(connectDiscussion);
}
document.addEventListener("DOMContentLoaded", connectDiscussionStreams);
document.body.addEventListener("htmx:afterSwap", connectDiscussionStreams);
})();

View file

@ -8,7 +8,7 @@ import (
)
templ DiscussionTabFragment(tablo sqlc.Tablo, data DiscussionTabData, form DiscussionForm, errs DiscussionErrors, csrfToken string) {
<div id="discussion-tab" class="space-y-6">
<div id="discussion-tab" class="space-y-6" data-discussion-stream-url={ DiscussionStreamURL(tablo.ID) }>
<div class="flex flex-wrap items-start justify-between gap-3">
<div>
<h2 class="text-2xl font-semibold leading-tight text-slate-900">Discussion</h2>

View file

@ -48,6 +48,10 @@ func DiscussionURL(tabloID uuid.UUID) string {
return "/tablos/" + tabloID.String() + "/discussion"
}
func DiscussionStreamURL(tabloID uuid.UUID) string {
return "/tablos/" + tabloID.String() + "/discussion/stream"
}
func DiscussionMessagesFromRows(rows []sqlc.ListDiscussionMessagesByTabloRow) []DiscussionMessageView {
messages := make([]DiscussionMessageView, 0, len(rows))
for _, row := range rows {

View file

@ -53,6 +53,7 @@ templ Layout(title string, user *auth.User, csrfToken string) {
</footer>
<script src="/static/htmx.min.js" defer></script>
<script src="/static/sortable.min.js" defer></script>
<script src="/static/discussion-sse.js" defer></script>
</body>
</html>
}