xtablo-source/backend/internal/web/handlers_discussion.go
Arthur Belleville 96a58ef0ea
fix(17): flush HTTP response before SSE publish to fix own-message alignment race
The server was publishing to the SSE broker before writing the HTMX response,
causing a race: if the SSE event (IsOwn=false, left-aligned) arrived at the
browser before HTMX appended the response (IsOwn=true, right-aligned), the
SSE path won and messageExists() then blocked the correct HTMX append.

Fix: write and flush the HTMX response first, then publish to SSE. This ensures
the sender's own message lands in the DOM right-aligned before the SSE event fires.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-05-17 12:26:13 +02:00

227 lines
7.7 KiB
Go

package web
import (
"bytes"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"strings"
"time"
"backend/internal/db/sqlc"
"backend/templates"
"github.com/google/uuid"
"github.com/gorilla/csrf"
"github.com/jackc/pgx/v5/pgtype"
)
type DiscussionDeps struct {
Queries *sqlc.Queries
Realtime DiscussionRealtime
}
func loadDiscussionTabData(w http.ResponseWriter, r *http.Request, q *sqlc.Queries, tablo sqlc.Tablo, currentUserID uuid.UUID) (templates.DiscussionTabData, bool) {
rows, err := q.ListDiscussionMessagesByTablo(r.Context(), tablo.ID)
if err != nil {
slog.Default().Error("discussion: ListDiscussionMessagesByTablo failed", "tablo_id", tablo.ID, "err", err)
http.Error(w, "internal server error", http.StatusInternalServerError)
return templates.DiscussionTabData{}, false
}
data := templates.DiscussionTabData{Messages: templates.DiscussionMessagesFromRows(rows, currentUserID)}
return data, true
}
func markDiscussionRead(r *http.Request, q *sqlc.Queries, tablo sqlc.Tablo, userID uuid.UUID, data templates.DiscussionTabData) {
if len(data.Messages) == 0 {
return
}
last := data.Messages[len(data.Messages)-1]
if _, err := q.UpsertDiscussionReadState(r.Context(), sqlc.UpsertDiscussionReadStateParams{
TabloID: tablo.ID,
UserID: userID,
LastReadMessageID: pgtype.UUID{Bytes: last.ID, Valid: true},
}); err != nil {
slog.Default().Warn("discussion: UpsertDiscussionReadState failed", "tablo_id", tablo.ID, "err", err)
}
}
func TabloDiscussionTabHandler(deps DiscussionDeps) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
tablo, user, ok := loadOwnedTablo(w, r, TablosDeps{Queries: deps.Queries})
if !ok {
return
}
data, ok := loadDiscussionTabData(w, r, deps.Queries, tablo, user.ID)
if !ok {
return
}
markDiscussionRead(r, deps.Queries, tablo, user.ID, data)
w.Header().Set("Content-Type", "text/html; charset=utf-8")
if r.Header.Get("HX-Request") == "true" {
_ = templates.DiscussionTabFragment(tablo, data, templates.DiscussionForm{}, templates.DiscussionErrors{}, csrf.Token(r)).Render(r.Context(), w)
return
}
discussionSidebarTablos, sidebarErr := deps.Queries.ListTablosByUser(r.Context(), user.ID)
if sidebarErr != nil {
slog.Default().Error("discussion: ListTablosByUser failed", "user_id", user.ID, "err", sidebarErr)
discussionSidebarTablos = []sqlc.Tablo{}
}
if discussionSidebarTablos == nil {
discussionSidebarTablos = []sqlc.Tablo{}
}
_ = templates.TabloDetailPage(user, csrf.Token(r), "", discussionSidebarTablos, tablo, nil, nil, templates.EtapeTaskCounts{}, templates.EtapeFilter{}, nil, templates.EventsCalendar{}, data, "discussion").Render(r.Context(), w)
}
}
func DiscussionMessageCreateHandler(deps DiscussionDeps) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
tablo, user, ok := loadOwnedTablo(w, r, TablosDeps{Queries: deps.Queries})
if !ok {
return
}
body := strings.TrimSpace(r.PostFormValue("body"))
form := templates.DiscussionForm{Body: r.PostFormValue("body")}
var errs templates.DiscussionErrors
if body == "" {
errs.Body = "Message is required."
} else if len([]rune(body)) > templates.DiscussionMaxBodyLength {
errs.Body = "Message is too long."
}
if errs.Body != "" {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Header().Set("HX-Retarget", "form[action$='/discussion/messages']")
w.Header().Set("HX-Reswap", "outerHTML")
w.WriteHeader(http.StatusUnprocessableEntity)
_ = templates.DiscussionComposer(tablo, form, errs, csrf.Token(r)).Render(r.Context(), w)
return
}
msg, err := deps.Queries.CreateDiscussionMessage(r.Context(), sqlc.CreateDiscussionMessageParams{
TabloID: tablo.ID,
AuthorUserID: user.ID,
Body: body,
})
if err != nil {
slog.Default().Error("discussion create: CreateDiscussionMessage failed", "tablo_id", tablo.ID, "err", err)
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Header().Set("HX-Retarget", "form[action$='/discussion/messages']")
w.Header().Set("HX-Reswap", "outerHTML")
w.WriteHeader(http.StatusInternalServerError)
errs.General = "Message could not be sent. Please try again."
_ = templates.DiscussionComposer(tablo, form, errs, csrf.Token(r)).Render(r.Context(), w)
return
}
row, err := deps.Queries.GetDiscussionMessageWithAuthor(r.Context(), sqlc.GetDiscussionMessageWithAuthorParams{
ID: msg.ID,
TabloID: tablo.ID,
})
if err != nil {
slog.Default().Error("discussion create: GetDiscussionMessageWithAuthor failed", "tablo_id", tablo.ID, "message_id", msg.ID, "err", err)
http.Error(w, "internal server error", http.StatusInternalServerError)
return
}
message := templates.DiscussionMessageFromRow(row, user.ID)
data := templates.DiscussionTabData{Messages: []templates.DiscussionMessageView{message}}
markDiscussionRead(r, deps.Queries, tablo, user.ID, data)
// Send HTTP response before publishing to SSE so the HTMX append (IsOwn=true)
// lands in the DOM before the SSE event fires. This prevents the race where SSE
// wins, appends IsOwn=false, and messageExists() then blocks the HTMX append.
if r.Header.Get("HX-Request") == "true" {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
_ = templates.DiscussionMessageRow(message).Render(r.Context(), w)
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
}
if deps.Realtime != nil {
// SSE recipients are never the author — always render as IsOwn: false.
sseMessage := message
sseMessage.IsOwn = false
html, err := renderDiscussionMessageHTML(r, sseMessage)
if err != nil {
slog.Default().Warn("discussion create: render realtime message failed", "tablo_id", tablo.ID, "message_id", msg.ID, "err", err)
} else {
deps.Realtime.Publish(DiscussionEvent{
TabloID: tablo.ID,
MessageID: msg.ID,
AuthorUserID: user.ID,
MessageHTML: html,
RefreshUnread: true,
})
}
}
if r.Header.Get("HX-Request") != "true" {
http.Redirect(w, r, templates.DiscussionURL(tablo.ID), http.StatusSeeOther)
}
}
}
func DiscussionStreamHandler(deps DiscussionDeps) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
tablo, _, ok := loadOwnedTablo(w, r, TablosDeps{Queries: deps.Queries})
if !ok {
return
}
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
return
}
if deps.Realtime == nil {
http.Error(w, "streaming unavailable", http.StatusServiceUnavailable)
return
}
events, unsubscribe := deps.Realtime.Subscribe(r.Context(), tablo.ID)
defer unsubscribe()
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
_, _ = fmt.Fprint(w, ": connected\n\n")
flusher.Flush()
keepalive := time.NewTicker(25 * time.Second)
defer keepalive.Stop()
for {
select {
case <-r.Context().Done():
return
case <-keepalive.C:
_, _ = fmt.Fprint(w, ": keepalive\n\n")
flusher.Flush()
case event, ok := <-events:
if !ok {
return
}
if err := writeDiscussionEvent(w, event); err != nil {
return
}
flusher.Flush()
}
}
}
}
func renderDiscussionMessageHTML(r *http.Request, message templates.DiscussionMessageView) (string, error) {
var buf bytes.Buffer
if err := templates.DiscussionMessageRow(message).Render(r.Context(), &buf); err != nil {
return "", err
}
return buf.String(), nil
}
func writeDiscussionEvent(w http.ResponseWriter, event DiscussionEvent) error {
payload, err := json.Marshal(event)
if err != nil {
return err
}
_, err = fmt.Fprintf(w, "event: discussion-message\ndata: %s\n\n", payload)
return err
}