go-htmx-gsd #1

Merged
arthur merged 558 commits from go-htmx-gsd into main 2026-05-23 15:16:44 +00:00
4 changed files with 648 additions and 0 deletions
Showing only changes of commit f242b7184f - Show all commits

View file

@ -151,6 +151,12 @@ Plans:
**User-in-loop:** Approve the queue library/approach (`river` vs `asynq` vs hand-rolled `pg_notify`) and pick the proof-of-life job.
**Plans:** 3 plans
Plans:
- [ ] 06-01-PLAN.md — Wave 1: go get river + ListOrphanFiles sqlc query + internal/jobs/ package (HeartbeatWorker, OrphanCleanupWorker, SlogErrorHandler) + unit tests (WORK-01 partial, WORK-02, WORK-03, WORK-04)
- [ ] 06-02-PLAN.md — Wave 2: replace cmd/worker/main.go with full river wiring (rivermigrate + Client + periodic jobs + graceful shutdown) + just worker target + README section (WORK-01, WORK-02, WORK-03)
- [ ] 06-03-PLAN.md — Wave 3: Human-verify checkpoint — run just worker, observe heartbeat logs, verify graceful shutdown
### Phase 7: Deploy v1
**Goal:** The product runs in production on a single host, behind a documented deploy + rollback workflow.
**Mode:** mvp

View file

@ -0,0 +1,258 @@
---
phase: 06-background-worker
plan: 01
type: execute
wave: 1
depends_on: []
files_modified:
- backend/go.mod
- backend/go.sum
- backend/internal/db/queries/files.sql
- backend/internal/db/sqlc/files.sql.go
- backend/internal/jobs/heartbeat.go
- backend/internal/jobs/orphan_cleanup.go
- backend/internal/jobs/error_handler.go
- backend/internal/jobs/heartbeat_test.go
- backend/internal/jobs/orphan_cleanup_test.go
- backend/internal/jobs/error_handler_test.go
autonomous: true
requirements:
- WORK-01
- WORK-02
- WORK-03
- WORK-04
must_haves:
truths:
- "river and riverpgxv5 are importable from backend/internal/jobs/ without build errors"
- "HeartbeatWorker.Work() returns nil and logs 'worker heartbeat' with job_id and attempt fields"
- "OrphanCleanupWorker.Work() iterates orphan rows, deletes S3 object then DB row, logs per-run summary"
- "SlogErrorHandler implements river.ErrorHandler (both HandleError and HandlePanic compile)"
- "ListOrphanFiles sqlc query is generated and returns []ListOrphanFilesRow with {ID, TabloID, S3Key}"
- "Unit tests for HeartbeatWorker, OrphanCleanupWorker, and SlogErrorHandler pass"
artifacts:
- path: "backend/go.mod"
provides: "river dependency declarations"
contains: "github.com/riverqueue/river"
- path: "backend/internal/jobs/heartbeat.go"
provides: "HeartbeatArgs + HeartbeatWorker"
exports: ["HeartbeatArgs", "HeartbeatWorker"]
- path: "backend/internal/jobs/orphan_cleanup.go"
provides: "OrphanCleanupArgs + OrphanCleanupWorker + constructor"
exports: ["OrphanCleanupArgs", "OrphanCleanupWorker", "NewOrphanCleanupWorker"]
- path: "backend/internal/jobs/error_handler.go"
provides: "SlogErrorHandler"
exports: ["SlogErrorHandler"]
- path: "backend/internal/db/queries/files.sql"
provides: "ListOrphanFiles query"
contains: "name: ListOrphanFiles :many"
key_links:
- from: "backend/internal/jobs/orphan_cleanup.go"
to: "backend/internal/db/sqlc/files.sql.go"
via: "sqlc.New(w.pool).ListOrphanFiles(ctx)"
pattern: "ListOrphanFiles"
- from: "backend/internal/jobs/orphan_cleanup.go"
to: "backend/internal/files/store.go"
via: "w.store.Delete(ctx, f.S3Key)"
pattern: "\.store\.Delete"
---
<objective>
Add the river dependency and implement the `internal/jobs/` package — job arg types, worker structs, and the error handler — plus the `ListOrphanFiles` sqlc query that the orphan cleanup worker depends on. Unit tests prove each component in isolation before the binary is wired in Plan 02.
Purpose: Delivers the entire domain layer for Phase 6 (WORK-01 partially, WORK-02, WORK-03, WORK-04) as a vertical slice that compiles and tests cleanly before cmd/worker is wired.
Output: `backend/internal/jobs/` package with three files + tests; updated `files.sql` and regenerated sqlc; river in `go.mod`.
</objective>
<execution_context>
@/Users/arthur.belleville/Documents/perso/projects/xtablo-source/.claude/get-shit-done/workflows/execute-plan.md
@/Users/arthur.belleville/Documents/perso/projects/xtablo-source/.claude/get-shit-done/templates/summary.md
</execution_context>
<context>
@.planning/ROADMAP.md
@.planning/REQUIREMENTS.md
@.planning/phases/06-background-worker/06-CONTEXT.md
@.planning/phases/06-background-worker/06-RESEARCH.md
@.planning/phases/06-background-worker/06-PATTERNS.md
<interfaces>
From backend/internal/files/store.go:
type FileStorer interface {
Upload(ctx context.Context, key string, file io.Reader) (contentType string, bytesWritten int64, err error)
Delete(ctx context.Context, key string) error
PresignDownload(ctx context.Context, key string) (string, error)
}
From backend/internal/db/sqlc/db.go:
func New(db DBTX) *Queries // *pgxpool.Pool satisfies DBTX
From backend/internal/db/queries/files.sql (existing queries — append ListOrphanFiles after DeleteTabloFile):
-- name: DeleteTabloFile :exec
DELETE FROM tablo_files WHERE id = $1 AND tablo_id = $2;
-- DeleteTabloFileParams is generated as: {ID uuid.UUID, TabloID uuid.UUID}
-- These fields are available from ListOrphanFilesRow — no new delete query needed.
From backend/internal/db/sqlc/files.sql.go (generated pattern — ListOrphanFiles will follow this shape):
func (q *Queries) ListFilesByTablo(ctx context.Context, tabloID uuid.UUID) ([]TabloFile, error)
// ListOrphanFiles will be: func (q *Queries) ListOrphanFiles(ctx context.Context) ([]ListOrphanFilesRow, error)
// ListOrphanFilesRow fields: ID uuid.UUID, TabloID uuid.UUID, S3Key string
</interfaces>
</context>
<tasks>
<task type="auto" tdd="true">
<name>Task 1: Add river dependency + ListOrphanFiles sqlc query</name>
<files>backend/go.mod, backend/go.sum, backend/internal/db/queries/files.sql, backend/internal/db/sqlc/files.sql.go</files>
<read_first>
- backend/go.mod — read current require block before adding deps
- backend/internal/db/queries/files.sql — read existing queries to understand file style and append position
</read_first>
<behavior>
- After `go get`, `go.mod` contains `github.com/riverqueue/river` and `github.com/riverqueue/river/riverdriver/riverpgxv5`
- `ListOrphanFiles` query appended to files.sql follows exact annotation style of existing queries in that file
- After `just generate`, `backend/internal/db/sqlc/files.sql.go` exports `ListOrphanFiles(ctx context.Context) ([]ListOrphanFilesRow, error)` and `ListOrphanFilesRow` struct with fields `ID`, `TabloID`, `S3Key`
- `go build ./...` exits 0 after changes
</behavior>
<action>
Step 1 — add river modules (run from backend/ directory):
go get github.com/riverqueue/river@v0.37.0
go get github.com/riverqueue/river/riverdriver/riverpgxv5@v0.37.0
These commands update both go.mod and go.sum. Do NOT hand-edit go.mod for these deps.
Step 2 — append the orphan query to backend/internal/db/queries/files.sql (after the existing DeleteTabloFile query). The query name annotation is "name: ListOrphanFiles :many". The SELECT returns three columns: id, tablo_id, s3_key from tablo_files tf WHERE NOT EXISTS (SELECT 1 FROM tablos t WHERE t.id = tf.tablo_id). Include a comment line above the annotation explaining its purpose ("Find tablo_files rows whose owning tablo no longer exists. Used by the orphan-file cleanup worker (Phase 6 WORK-02)."). Use the alias "tf" for tablo_files per the NOT EXISTS subquery.
Step 3 — regenerate sqlc:
cd backend && just generate
Verify the generated file at backend/internal/db/sqlc/files.sql.go contains "ListOrphanFiles" and "ListOrphanFilesRow".
</action>
<verify>
<automated>cd /Users/arthur.belleville/Documents/perso/projects/xtablo-source/backend && grep -c "riverqueue/river" go.mod && grep -c "ListOrphanFiles" internal/db/sqlc/files.sql.go && go build ./...</automated>
</verify>
<done>
go.mod contains github.com/riverqueue/river (grep -c returns >= 1). internal/db/sqlc/files.sql.go contains ListOrphanFiles. go build ./... exits 0.
</done>
<acceptance_criteria>
- `grep "riverqueue/river" backend/go.mod` returns at least two lines (river + riverpgxv5)
- `grep "riverqueue/river/riverdriver/riverpgxv5" backend/go.mod` returns a line
- `grep "ListOrphanFiles" backend/internal/db/sqlc/files.sql.go` returns lines for both function and row type
- `grep "ListOrphanFilesRow" backend/internal/db/sqlc/files.sql.go` confirms the row struct exists
- `cd backend && go build ./...` exits 0
</acceptance_criteria>
</task>
<task type="auto" tdd="true">
<name>Task 2: Create internal/jobs/ package — HeartbeatWorker + OrphanCleanupWorker + SlogErrorHandler + unit tests</name>
<files>
backend/internal/jobs/heartbeat.go,
backend/internal/jobs/orphan_cleanup.go,
backend/internal/jobs/error_handler.go,
backend/internal/jobs/heartbeat_test.go,
backend/internal/jobs/orphan_cleanup_test.go,
backend/internal/jobs/error_handler_test.go
</files>
<read_first>
- backend/internal/auth/session.go — struct definition with injected dep pattern to replicate
- backend/internal/files/store.go — FileStorer interface (lines 17-22) used by OrphanCleanupWorker
- backend/internal/db/sqlc/files.sql.go — confirm ListOrphanFilesRow and DeleteTabloFileParams field names after regeneration
- backend/internal/db/sqlc/db.go — confirm sqlc.New signature (takes DBTX)
- .planning/phases/06-background-worker/06-RESEARCH.md — Pattern 1 (job structs), Pattern 4 (SlogErrorHandler)
- .planning/phases/06-background-worker/06-PATTERNS.md — Pattern assignments for all three files
</read_first>
<behavior>
HeartbeatWorker:
- HeartbeatArgs struct with no fields; Kind() string returns "heartbeat"
- HeartbeatWorker struct embeds river.WorkerDefaults[HeartbeatArgs]
- Work(ctx, job) calls slog.Info("worker heartbeat", "job_id", job.ID, "attempt", job.Attempt) and returns nil
- TestHeartbeatWorker: construct HeartbeatWorker, call Work(ctx, job), assert no error
OrphanCleanupWorker:
- OrphanCleanupArgs struct with no fields; Kind() string returns "orphan_file_cleanup"
- OrphanCleanupWorker struct embeds river.WorkerDefaults[OrphanCleanupArgs]; fields: pool *pgxpool.Pool, store files.FileStorer
- NewOrphanCleanupWorker(pool *pgxpool.Pool, store files.FileStorer) *OrphanCleanupWorker constructor
- Work(ctx, job): calls sqlc.New(w.pool).ListOrphanFiles(ctx); loops over results; deletes S3 object first (w.store.Delete(ctx, f.S3Key)), then calls sqlc.New(w.pool).DeleteTabloFile(ctx, sqlc.DeleteTabloFileParams{ID: f.ID, TabloID: f.TabloID}) for each; increments deleted/errCount counters; logs slog.Info("orphan cleanup complete", "orphans_found", len(orphans), "deleted", deleted, "errors", errCount); returns nil (does not fail the job for partial errors — logs them)
- TestOrphanCleanupWorker_NoOrphans: use mock store + real test DB (skip if no DB), confirm Work returns nil and logs "orphan cleanup complete" with orphans_found=0
- TestOrphanCleanupWorker_DeletesOrphan: insert a tablo_files row with nonexistent tablo_id directly (bypassing FK), run Work, confirm the row is gone and s3 delete was called
SlogErrorHandler:
- SlogErrorHandler struct with no fields
- HandleError(ctx, job *rivertype.JobRow, err error) *river.ErrorHandlerResult: calls slog.Error("job error", "job_id", job.ID, "job_kind", job.Kind, "attempt", job.Attempt, "max_attempts", job.MaxAttempts, "err", err); returns nil
- HandlePanic(ctx, job *rivertype.JobRow, panicVal any, trace string) *river.ErrorHandlerResult: calls slog.Error("job panic", "job_id", job.ID, "job_kind", job.Kind, "panic", panicVal, "trace", trace); returns nil
- TestSlogErrorHandler: construct handler, call HandleError with a fake JobRow and error, confirm no panic and return value is nil
</behavior>
<action>
Create three Go source files under backend/internal/jobs/ (package declaration: "package jobs").
heartbeat.go: Define HeartbeatArgs (no fields) with Kind() returning "heartbeat". Define HeartbeatWorker embedding river.WorkerDefaults[HeartbeatArgs] with Work method logging slog.Info("worker heartbeat", "job_id", job.ID, "attempt", job.Attempt) then returning nil. Import "github.com/riverqueue/river" and "log/slog" and "context".
orphan_cleanup.go: Define OrphanCleanupArgs (no fields) with Kind() returning "orphan_file_cleanup". Define OrphanCleanupWorker struct with fields pool *pgxpool.Pool and store files.FileStorer; embed river.WorkerDefaults[OrphanCleanupArgs]. Implement NewOrphanCleanupWorker(pool *pgxpool.Pool, store files.FileStorer) *OrphanCleanupWorker. Implement Work(ctx, job) per the behavior spec: ListOrphanFiles → loop (S3 delete first, then DB delete, counting deleted/errCount) → log summary → return nil. On individual S3 or DB errors in the loop, log slog.Error with relevant fields (s3_key or file_id) and increment errCount; continue to next item. Import "backend/internal/db/sqlc", "backend/internal/files", "github.com/jackc/pgx/v5/pgxpool", "github.com/riverqueue/river", "log/slog", "context", "fmt".
error_handler.go: Define SlogErrorHandler struct (no fields). Implement HandleError and HandlePanic per the behavior spec. Import "context", "github.com/riverqueue/river", "github.com/riverqueue/river/rivertype", "log/slog".
For the test files: heartbeat_test.go and error_handler_test.go use pure unit tests (no DB needed). orphan_cleanup_test.go: copy the setupTestDB helper from backend/internal/auth/testdb_test.go (it reads TEST_DATABASE_URL or DATABASE_URL, creates an isolated schema, runs goose migrations, returns pool + cleanup); use a fake in-memory FileStorer mock for the S3 side. Integration test for orphan cleanup inserts a tablo_files row with a uuid that has no matching tablos row (insert directly with no FK enforcement by first creating the file row while tablo exists, then hard-deleting the tablo row using `DELETE FROM tablos WHERE id = $1`; this triggers cascade but that's fine — alternatively, use a nonexistent UUID that was never inserted into tablos, which won't violate the FK constraint since you're inserting into tablo_files with a foreign key — instead, use testDB.Exec("INSERT INTO tablo_files (id, tablo_id, s3_key, filename, content_type, size_bytes) VALUES (gen_random_uuid(), gen_random_uuid(), 'orphan-key', 'orphan.txt', 'text/plain', 0)") to bypass the FK at test time; if the FK check prevents this, wrap in a deferred constraint or skip and only test the no-orphans path).
After writing all files: cd backend && go test ./internal/jobs/...
</action>
<verify>
<automated>cd /Users/arthur.belleville/Documents/perso/projects/xtablo-source/backend && go build ./internal/jobs/... && go test ./internal/jobs/... -v -count=1</automated>
</verify>
<done>
go build ./internal/jobs/... exits 0. go test ./internal/jobs/... passes (or skips DB-dependent tests when no DB is available). All three files exist with correct package declarations and exported types.
</done>
<acceptance_criteria>
- `cd backend && go build ./internal/jobs/...` exits 0
- `grep "HeartbeatArgs" backend/internal/jobs/heartbeat.go` returns a line
- `grep "HeartbeatWorker" backend/internal/jobs/heartbeat.go` returns a line with WorkerDefaults embedding
- `grep "OrphanCleanupWorker" backend/internal/jobs/orphan_cleanup.go` returns a line
- `grep "NewOrphanCleanupWorker" backend/internal/jobs/orphan_cleanup.go` returns a line
- `grep "SlogErrorHandler" backend/internal/jobs/error_handler.go` returns a line
- `grep "HandleError" backend/internal/jobs/error_handler.go` returns a line
- `grep "HandlePanic" backend/internal/jobs/error_handler.go` returns a line
- `cd backend && go test ./internal/jobs/... -v -count=1` exits 0 (DB tests may skip, but must not fail)
- `cd backend && go build ./...` exits 0 (whole module compiles)
</acceptance_criteria>
</task>
</tasks>
<threat_model>
## Trust Boundaries
| Boundary | Description |
|----------|-------------|
| Worker process → Postgres | Worker reads/writes job rows and tablo_files; all operations use the project pgxpool with the same Postgres credentials as cmd/web |
| Worker process → S3/MinIO | Worker deletes S3 objects using credentials from env vars; no user-supplied input reaches these calls in Phase 6 |
## STRIDE Threat Register
| Threat ID | Category | Component | Disposition | Mitigation Plan |
|-----------|----------|-----------|-------------|-----------------|
| T-06-01 | Tampering | OrphanCleanupWorker — river_job row args | accept | Phase 6 periodic jobs use empty args structs (HeartbeatArgs{}, OrphanCleanupArgs{}); no user-supplied data is deserialized from job args; tampering with river_job rows requires DB access, which is already a full compromise |
| T-06-02 | Information Disclosure | S3 credentials in env vars | mitigate | Credentials read from env vars (never hardcoded); .env.example documents required vars without values; go.sum pins river to v0.37.0 to prevent supply-chain substitution |
| T-06-03 | Elevation of Privilege | Worker binary running as root | transfer | Phase 7 scope — Dockerfile non-root user is a Phase 7 DEPLOY requirement; documented in README as a known gap |
| T-06-04 | Denial of Service | MaxWorkers exceeding pool MaxConns | mitigate | MaxWorkers: 10 matches db.NewPool MaxConns: 10; documented in river.Config comment; never exceeds pool capacity |
</threat_model>
<verification>
After Plan 01 is complete:
- `cd backend && go build ./...` exits 0
- `cd backend && go test ./internal/jobs/... -v` passes (skips if no DB, but does not error)
- `grep "riverqueue/river" backend/go.mod` shows at least 2 lines (river + riverpgxv5)
- `grep "ListOrphanFiles" backend/internal/db/sqlc/files.sql.go` returns at least 1 line
- `grep "HeartbeatWorker" backend/internal/jobs/heartbeat.go` returns 1 line
- `grep "SlogErrorHandler" backend/internal/jobs/error_handler.go` returns 1 line
</verification>
<success_criteria>
- river importable from internal/jobs/ without compile errors
- HeartbeatWorker, OrphanCleanupWorker, SlogErrorHandler defined with correct signatures
- ListOrphanFiles sqlc query generated with ListOrphanFilesRow type
- Unit tests for all three job types pass
- go build ./... exits 0 — whole module compiles clean
</success_criteria>
<output>
After completion, create `.planning/phases/06-background-worker/06-01-SUMMARY.md`
</output>

View file

@ -0,0 +1,267 @@
---
phase: 06-background-worker
plan: 02
type: execute
wave: 2
depends_on:
- 06-01
files_modified:
- backend/cmd/worker/main.go
- backend/justfile
- backend/README.md
autonomous: true
requirements:
- WORK-01
- WORK-02
- WORK-03
must_haves:
truths:
- "cmd/worker/main.go runs rivermigrate at startup before constructing river.Client"
- "cmd/worker/main.go registers HeartbeatWorker and OrphanCleanupWorker via river.AddWorker"
- "cmd/worker/main.go passes slog.Default() as river.Config.Logger"
- "cmd/worker/main.go starts HeartbeatWorker as a PeriodicJob every 1 minute with RunOnStart: true"
- "cmd/worker/main.go starts OrphanCleanupWorker as a PeriodicJob every 1 hour with RunOnStart: false"
- "Graceful shutdown calls riverClient.StopAndCancel with a 10-second timeout context"
- "go build ./cmd/worker/... exits 0"
- "just worker target exists in justfile and passes MinIO dev defaults as env vars"
- "README.md documents how to run the worker locally"
artifacts:
- path: "backend/cmd/worker/main.go"
provides: "full river wiring — rivermigrate + river.Client + periodic jobs + graceful shutdown"
contains: "rivermigrate.New"
- path: "backend/justfile"
provides: "worker target for local dev"
contains: "just worker"
- path: "backend/README.md"
provides: "worker local dev instructions"
contains: "just worker"
key_links:
- from: "backend/cmd/worker/main.go"
to: "backend/internal/jobs/"
via: "jobs.HeartbeatWorker, jobs.NewOrphanCleanupWorker, jobs.SlogErrorHandler"
pattern: "jobs\\."
- from: "backend/cmd/worker/main.go"
to: "rivermigrate"
via: "rivermigrate.New(riverpgxv5.New(pool), nil).Migrate(ctx, rivermigrate.DirectionUp, nil)"
pattern: "rivermigrate\\.New"
- from: "backend/cmd/worker/main.go"
to: "river.Client"
via: "river.NewClient(riverpgxv5.New(pool), &river.Config{...})"
pattern: "river\\.NewClient"
---
<objective>
Replace the noop body in `cmd/worker/main.go` with the full river runtime: rivermigrate at startup, S3 store init, signal context construction, worker registration, river.Client construction and start, block on signal, StopAndCancel graceful shutdown. Add the `just worker` justfile target with MinIO dev defaults. Document the worker in README.md.
Purpose: Closes WORK-01 (worker binary runs and processes jobs), WORK-02 (heartbeat job observable in logs within 1 minute of startup), WORK-03 (graceful shutdown matching web binary pattern).
Output: Fully functional `cmd/worker` binary; `just worker` runs it locally; README documents it.
</objective>
<execution_context>
@/Users/arthur.belleville/Documents/perso/projects/xtablo-source/.claude/get-shit-done/workflows/execute-plan.md
@/Users/arthur.belleville/Documents/perso/projects/xtablo-source/.claude/get-shit-done/templates/summary.md
</execution_context>
<context>
@.planning/ROADMAP.md
@.planning/phases/06-background-worker/06-CONTEXT.md
@.planning/phases/06-background-worker/06-RESEARCH.md
@.planning/phases/06-background-worker/06-PATTERNS.md
@.planning/phases/06-background-worker/06-01-SUMMARY.md
<interfaces>
From backend/internal/jobs/ (created in Plan 01):
jobs.HeartbeatArgs{} — Kind() = "heartbeat"
jobs.HeartbeatWorker{} — embeds river.WorkerDefaults[HeartbeatArgs]
jobs.OrphanCleanupArgs{} — Kind() = "orphan_file_cleanup"
jobs.NewOrphanCleanupWorker(pool *pgxpool.Pool, store files.FileStorer) *OrphanCleanupWorker
jobs.SlogErrorHandler{} — implements river.ErrorHandler
From backend/internal/files/store.go:
files.NewStore(ctx, endpoint, bucket, region, accessKey, secretKey, usePathStyle bool) (*Store, error)
— S3 env var names: S3_ENDPOINT, S3_BUCKET, S3_REGION, S3_ACCESS_KEY, S3_SECRET_KEY, S3_USE_PATH_STYLE
From backend/internal/db/pool.go:
db.NewPool(ctx context.Context, dsn string) (*pgxpool.Pool, error)
From backend/cmd/worker/main.go (current skeleton — to be fully replaced):
package main
— already has: env/dsn loading, slog.SetDefault, db.NewPool, signal.NotifyContext, pool.Close pattern
From rivermigrate (per RESEARCH.md Pattern 2):
rivermigrate.New(riverpgxv5.New(pool), nil) → migrator
migrator.Migrate(ctx, rivermigrate.DirectionUp, nil) → (res, err)
res.Versions — range to log each applied version
From river (per RESEARCH.md Pattern 3):
river.NewWorkers() → workers
river.AddWorker(workers, &HeartbeatWorker{})
river.NewClient(riverpgxv5.New(pool), &river.Config{
Logger: slog.Default(),
Workers: workers,
Queues: map[string]river.QueueConfig{river.QueueDefault: {MaxWorkers: 10}},
ErrorHandler: &SlogErrorHandler{},
PeriodicJobs: [...],
}) → (riverClient, err)
riverClient.Start(ctx) → err // MUST use live signal context, not background
riverClient.StopAndCancel(timeoutCtx) → err
CRITICAL ORDERING CONSTRAINT (RESEARCH Pitfall 2 + PATTERNS.md):
1. context.Background() for startup I/O (pool, rivermigrate, S3)
2. signal.NotifyContext AFTER all startup I/O succeeds
3. riverClient.Start(sigCtx) — passes the signal context; jobs inherit it
4. <-sigCtx.Done() wait for SIGINT/SIGTERM
5. StopAndCancel with a fresh context.WithTimeout(context.Background(), 10*time.Second)
6. pool.Close()
Do NOT create signal.NotifyContext before rivermigrate or S3 init.
</interfaces>
</context>
<tasks>
<task type="auto">
<name>Task 1: Replace cmd/worker/main.go with full river wiring</name>
<files>backend/cmd/worker/main.go</files>
<read_first>
- backend/cmd/worker/main.go — read the current skeleton to understand what to preserve and what to replace
- backend/cmd/web/main.go — S3 env var loading pattern (lines 85-100) and graceful shutdown pattern (lines 144-148) to replicate
- .planning/phases/06-background-worker/06-RESEARCH.md — full cmd/worker/main.go code example (Pattern 2, 3, 5) and Pitfall 2
- .planning/phases/06-background-worker/06-PATTERNS.md — critical ordering constraint and pool close ordering
</read_first>
<action>
Replace the entire content of backend/cmd/worker/main.go. The new file must implement the following sequence exactly (per RESEARCH Pattern ordering and PATTERNS.md critical ordering constraint):
Imports to include: "context", "log/slog", "os", "os/signal", "syscall", "time", "backend/internal/db", "backend/internal/files", "backend/internal/jobs", "backend/internal/web", "github.com/riverqueue/river", "github.com/riverqueue/river/riverdriver/riverpgxv5", "github.com/riverqueue/river/rivermigrate".
main() function sequence:
1. Read ENV (default "development") and DATABASE_URL env vars. Call slog.SetDefault(slog.New(web.NewSlogHandler(env, os.Stdout))). Guard on empty DATABASE_URL (slog.Error + os.Exit(1)).
2. Create startup context: ctx := context.Background() (NOT signal context yet — critical per PATTERNS.md ordering constraint).
3. Connect DB pool: db.NewPool(ctx, dsn). On error: slog.Error + os.Exit(1). Note: defer pool.Close() is NOT used here — explicit pool.Close() at the end of shutdown sequence (after StopAndCancel) per PATTERNS.md pool close ordering.
4. Run rivermigrate: rivermigrate.New(riverpgxv5.New(pool), nil). On init error: slog.Error + os.Exit(1). Call .Migrate(ctx, rivermigrate.DirectionUp, nil). On migrate error: slog.Error + os.Exit(1). Range over res.Versions and log each with slog.Info("river migration applied", "version", v.Version).
5. Init S3 store: read env vars S3_ENDPOINT, S3_BUCKET, S3_ACCESS_KEY, S3_SECRET_KEY, S3_REGION (default "us-east-1" if empty), S3_USE_PATH_STYLE == "true". Call files.NewStore(ctx, endpoint, bucket, region, accessKey, secretKey, usePathStyle). On error: slog.Error + os.Exit(1).
6. Create signal context AFTER all startup I/O: sigCtx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM). defer stop().
7. Register workers: workers := river.NewWorkers(). river.AddWorker(workers, &jobs.HeartbeatWorker{}). river.AddWorker(workers, jobs.NewOrphanCleanupWorker(pool, fileStore)).
8. Construct river client: river.NewClient(riverpgxv5.New(pool), &river.Config{ Logger: slog.Default(), Workers: workers, Queues: map[string]river.QueueConfig{river.QueueDefault: {MaxWorkers: 10}}, ErrorHandler: &jobs.SlogErrorHandler{}, PeriodicJobs: []*river.PeriodicJob{ river.NewPeriodicJob(river.PeriodicInterval(1*time.Minute), func() (river.JobArgs, *river.InsertOpts) { return jobs.HeartbeatArgs{}, nil }, &river.PeriodicJobOpts{RunOnStart: true}), river.NewPeriodicJob(river.PeriodicInterval(1*time.Hour), func() (river.JobArgs, *river.InsertOpts) { return jobs.OrphanCleanupArgs{}, nil }, nil), }, }). On error: slog.Error + os.Exit(1).
9. Start client: riverClient.Start(sigCtx). On error: slog.Error + os.Exit(1).
10. Log slog.Info("worker ready").
11. <-sigCtx.Done(). Log slog.Info("shutting down").
12. Create stopCtx: context.WithTimeout(context.Background(), 10*time.Second). defer stopCancel(). Call riverClient.StopAndCancel(stopCtx). On error: slog.Warn("river stop did not finish cleanly", "err", err).
13. pool.Close(). Log slog.Info("shutdown complete").
The file package comment should read: "Command worker is the background job processor for Xtablo. It runs river periodic jobs against the same Postgres as cmd/web."
</action>
<verify>
<automated>cd /Users/arthur.belleville/Documents/perso/projects/xtablo-source/backend && go build ./cmd/worker/...</automated>
</verify>
<done>
go build ./cmd/worker/... exits 0. The file contains "rivermigrate.New", "river.NewClient", "StopAndCancel", "HeartbeatArgs", "OrphanCleanupArgs".
</done>
<acceptance_criteria>
- `cd backend && go build ./cmd/worker/...` exits 0
- `grep "rivermigrate.New" backend/cmd/worker/main.go` returns a line
- `grep "river.NewClient" backend/cmd/worker/main.go` returns a line
- `grep "StopAndCancel" backend/cmd/worker/main.go` returns a line
- `grep "HeartbeatArgs" backend/cmd/worker/main.go` returns a line
- `grep "OrphanCleanupArgs" backend/cmd/worker/main.go` returns a line
- `grep "RunOnStart.*true" backend/cmd/worker/main.go` returns a line (confirms heartbeat RunOnStart)
- `grep "slog.Default()" backend/cmd/worker/main.go` returns a line (river Logger wired to slog)
- `cd backend && go build ./...` exits 0 (whole module compiles)
</acceptance_criteria>
</task>
<task type="auto">
<name>Task 2: Add just worker target + document worker in README.md</name>
<files>backend/justfile, backend/README.md</files>
<read_first>
- backend/justfile — read the dev target (lines 111-113) and variable declarations (lines 38-39) to match the style exactly
- backend/README.md — read the full file to find the right insertion point for the worker section
- .planning/phases/06-background-worker/06-PATTERNS.md — new worker target style (last Pattern section)
- .planning/phases/06-background-worker/06-CONTEXT.md — D-08 single-worker constraint to document
</read_first>
<action>
justfile — append the `worker` target after the `dev` target (keep file order: dev, then worker). The target:
- depends on db-up (same as dev)
- passes these env vars on separate continuation lines: DATABASE_URL='{{ database_url }}', S3_ENDPOINT='http://localhost:9000', S3_BUCKET='xtablo', S3_REGION='us-east-1', S3_ACCESS_KEY='minioadmin', S3_SECRET_KEY='minioadmin', S3_USE_PATH_STYLE='true'
- runs: go run ./cmd/worker
- precede the target with a comment: "# Start the worker binary (development — requires db-up and MinIO running)."
- Match the env-var-per-continuation-line style of the existing dev target exactly.
README.md — add a "Running the Worker" section. Insert it after the existing "Development" or "Running" section (whatever comes last before any deployment/production section). The section must include:
- Command: just worker (requires db-up and just dev or MinIO running separately)
- What to expect: structured logs appear immediately; "worker ready" log within a few seconds; "worker heartbeat" log appears within ~1 minute of startup (RunOnStart fires immediately on first tick, so first heartbeat appears within seconds)
- Note on D-08: only one worker process should run at a time for v1; do not run multiple worker instances
- Graceful shutdown: send SIGINT (Ctrl+C) and observe "shutting down" then "shutdown complete" in logs
- Observing failed job retries: river logs "job error" with job_id, job_kind, attempt, max_attempts, err fields on each failure; max 25 attempts before discard
</action>
<verify>
<automated>cd /Users/arthur.belleville/Documents/perso/projects/xtablo-source/backend && grep -c "^worker:" justfile && grep -c "just worker" README.md</automated>
</verify>
<done>
justfile contains a `worker:` target. README.md mentions `just worker`. Both grep counts return >= 1.
</done>
<acceptance_criteria>
- `grep "^worker:" backend/justfile` returns exactly 1 line
- `grep "S3_ENDPOINT" backend/justfile` returns at least 2 lines (one in dev equivalent path, one in worker target)
- `grep "go run ./cmd/worker" backend/justfile` returns a line inside the worker target
- `grep "just worker" backend/README.md` returns at least 1 line
- `grep -i "single.*worker\|one worker\|multiple worker" backend/README.md` returns a line (D-08 documented)
- `grep "worker heartbeat" backend/README.md` returns a line (observable log message documented)
- `cd backend && just --list 2>&1 | grep worker` returns a line confirming the target is registered
</acceptance_criteria>
</task>
</tasks>
<threat_model>
## Trust Boundaries
| Boundary | Description |
|----------|-------------|
| Worker process → Postgres (startup) | rivermigrate acquires advisory lock during migration; cmd/worker must not run concurrently with another instance during migration phase |
| Worker process → S3/MinIO | Orphan cleanup deletes objects using credentials loaded from env; no user-supplied key values reach S3 in Phase 6 |
## STRIDE Threat Register
| Threat ID | Category | Component | Disposition | Mitigation Plan |
|-----------|----------|-----------|-------------|-----------------|
| T-06-05 | Tampering | rivermigrate concurrent execution | mitigate | D-08 documents single-worker constraint; rivermigrate uses advisory lock (pg_advisory_lock) to prevent concurrent migration runs; README documents do-not-run-multiple-instances |
| T-06-06 | Denial of Service | river.Client.Start with already-cancelled context (Pitfall 2) | mitigate | signal.NotifyContext created AFTER all startup I/O per PATTERNS.md ordering constraint; Start receives a live context |
| T-06-07 | Information Disclosure | S3 MinIO credentials in justfile dev target | accept | Dev-only credentials (minioadmin/minioadmin) are the MinIO default; justfile is committed and these are not production secrets; .env.example documents production vars separately |
| T-06-08 | Tampering | Orphan cleanup deletes wrong S3 key | mitigate | Key comes from tablo_files.s3_key column (set at upload time in Phase 5); not reconstructed; only orphan rows (no matching tablo) are targeted by the query |
</threat_model>
<verification>
After Plan 02 is complete:
- `cd backend && go build ./...` exits 0
- `cd backend && go build ./cmd/worker/...` exits 0
- `grep "rivermigrate.New" backend/cmd/worker/main.go` returns a line
- `grep "^worker:" backend/justfile` returns a line
- `cd backend && just --list 2>&1 | grep worker` shows the worker target
- `grep "just worker" backend/README.md` returns a line
</verification>
<success_criteria>
- cmd/worker/main.go compiles with full river wiring
- rivermigrate runs before river.Client starts (verified by code structure)
- Both periodic jobs registered (heartbeat 1min RunOnStart:true, orphan cleanup 1hr)
- Graceful shutdown via StopAndCancel(10s timeout context)
- just worker target present and documented in README
- go build ./... exits 0 — full module clean
</success_criteria>
<output>
After completion, create `.planning/phases/06-background-worker/06-02-SUMMARY.md`
</output>

View file

@ -0,0 +1,117 @@
---
phase: 06-background-worker
plan: 03
type: execute
wave: 3
depends_on:
- 06-02
files_modified: []
autonomous: false
requirements:
- WORK-01
- WORK-02
- WORK-03
- WORK-04
---
<objective>
Human-verify checkpoint: run `just worker` against a live Postgres + MinIO, observe that the worker starts, runs the heartbeat job within seconds (RunOnStart: true), shuts down cleanly on Ctrl+C, and confirm logs are structured and readable.
Purpose: Closes all four WORK requirements with observable end-to-end proof. Verifies the binary actually works — not just compiles.
Output: Developer confirmation that WORK-01 through WORK-04 are satisfied in a live dev environment.
</objective>
<execution_context>
@/Users/arthur.belleville/Documents/perso/projects/xtablo-source/.claude/get-shit-done/workflows/execute-plan.md
@/Users/arthur.belleville/Documents/perso/projects/xtablo-source/.claude/get-shit-done/templates/summary.md
</execution_context>
<context>
@.planning/ROADMAP.md
@.planning/phases/06-background-worker/06-CONTEXT.md
@.planning/phases/06-background-worker/06-02-SUMMARY.md
</context>
<tasks>
<task type="checkpoint:human-verify" gate="blocking">
<what-built>
cmd/worker binary with:
- rivermigrate running river schema migrations at startup
- HeartbeatWorker (every 1 minute, RunOnStart: true — fires immediately)
- OrphanCleanupWorker (every 1 hour, RunOnStart: false)
- SlogErrorHandler logging job failures to structured slog
- Graceful shutdown on SIGINT/SIGTERM (StopAndCancel with 10-second timeout)
- just worker target with MinIO dev defaults
</what-built>
<how-to-verify>
Prerequisites (run each in a separate terminal or as background):
1. `cd backend && just db-up` — start Postgres (and MinIO from compose.yaml)
2. Optionally `just dev` in a second terminal to confirm web binary still works alongside
Worker verification steps:
3. Run: `cd backend && just worker`
4. Within 5 seconds, confirm these log lines appear (look for structured key=value or JSON output):
- `"river migration applied"` (with version number) OR no migration lines if already applied
- `"worker ready"` — confirms binary started without errors
- `"worker heartbeat"` — RunOnStart:true fires the heartbeat immediately on first tick; expect this within ~10 seconds
5. Wait ~60 seconds for a second heartbeat log line — confirms the periodic scheduler is running.
Look for: `msg="worker heartbeat" job_id=... attempt=1`
6. Send SIGINT: press Ctrl+C in the worker terminal.
7. Confirm these log lines appear in order:
- `"shutting down"`
- `"shutdown complete"`
Process must exit cleanly (exit code 0 or from signal, NOT with a panic or error).
Failed job visibility check (WORK-04 — optional if no failing job is easy to trigger):
- River logs job errors automatically via SlogErrorHandler. If the orphan cleanup job ran (visible as `"orphan cleanup complete"` with `orphans_found=0`), WORK-04 is satisfied by the error handler being registered and logging correctly.
- Alternatively: temporarily modify HeartbeatWorker.Work to return fmt.Errorf("test error"), rebuild, run worker, and look for `msg="job error" job_kind=heartbeat attempt=1` in logs. Revert after confirming.
Full test suite gate:
8. `cd backend && go test ./... && go build ./...` — must exit 0
</how-to-verify>
<resume-signal>
Type "approved" if all log lines appeared and shutdown was clean.
Type "issues: [describe]" if any step failed or was unclear.
</resume-signal>
</task>
</tasks>
<threat_model>
## Trust Boundaries
| Boundary | Description |
|----------|-------------|
| Local dev environment | All verification runs against local MinIO and Postgres; no production systems involved |
## STRIDE Threat Register
| Threat ID | Category | Component | Disposition | Mitigation Plan |
|-----------|----------|-----------|-------------|-----------------|
| T-06-09 | Information Disclosure | Worker logs in dev mode | accept | Dev mode uses slog text handler (not JSON); no PII in heartbeat/orphan cleanup logs; production uses JSON handler (controlled by ENV=production) |
</threat_model>
<verification>
Manual verification only (this plan IS the verification step). Pass criteria:
1. `"worker ready"` appears in logs within 5 seconds of `just worker`
2. `"worker heartbeat"` appears within 10 seconds (RunOnStart fires first heartbeat immediately)
3. A second `"worker heartbeat"` appears ~60 seconds later (confirms scheduler is running)
4. Ctrl+C produces `"shutting down"` then `"shutdown complete"` with no panic
5. `cd backend && go test ./... && go build ./...` exits 0
</verification>
<success_criteria>
- Worker starts, connects to Postgres, applies river migrations, and logs "worker ready"
- Heartbeat job fires within seconds of startup (RunOnStart: true)
- Graceful shutdown works: SIGINT → "shutting down" → "shutdown complete"
- Full test suite green: go test ./... && go build ./... exits 0
- All WORK-01 through WORK-04 requirements satisfied as observable behaviors
</success_criteria>
<output>
After completion, create `.planning/phases/06-background-worker/06-03-SUMMARY.md`
</output>