From f242b7184f962b0acabcc864179ca798dcf0a885 Mon Sep 17 00:00:00 2001 From: Arthur Belleville Date: Fri, 15 May 2026 16:20:11 +0200 Subject: [PATCH] docs(06): create phase 6 background worker plans Three plans: Wave 1 adds river dependency + internal/jobs package with unit tests; Wave 2 wires cmd/worker/main.go with full river runtime + justfile target + README; Wave 3 is human-verify checkpoint. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- .planning/ROADMAP.md | 6 + .../phases/06-background-worker/06-01-PLAN.md | 258 +++++++++++++++++ .../phases/06-background-worker/06-02-PLAN.md | 267 ++++++++++++++++++ .../phases/06-background-worker/06-03-PLAN.md | 117 ++++++++ 4 files changed, 648 insertions(+) create mode 100644 .planning/phases/06-background-worker/06-01-PLAN.md create mode 100644 .planning/phases/06-background-worker/06-02-PLAN.md create mode 100644 .planning/phases/06-background-worker/06-03-PLAN.md diff --git a/.planning/ROADMAP.md b/.planning/ROADMAP.md index dac1ef8..6529eca 100644 --- a/.planning/ROADMAP.md +++ b/.planning/ROADMAP.md @@ -151,6 +151,12 @@ Plans: **User-in-loop:** Approve the queue library/approach (`river` vs `asynq` vs hand-rolled `pg_notify`) and pick the proof-of-life job. +**Plans:** 3 plans +Plans: +- [ ] 06-01-PLAN.md — Wave 1: go get river + ListOrphanFiles sqlc query + internal/jobs/ package (HeartbeatWorker, OrphanCleanupWorker, SlogErrorHandler) + unit tests (WORK-01 partial, WORK-02, WORK-03, WORK-04) +- [ ] 06-02-PLAN.md — Wave 2: replace cmd/worker/main.go with full river wiring (rivermigrate + Client + periodic jobs + graceful shutdown) + just worker target + README section (WORK-01, WORK-02, WORK-03) +- [ ] 06-03-PLAN.md — Wave 3: Human-verify checkpoint — run just worker, observe heartbeat logs, verify graceful shutdown + ### Phase 7: Deploy v1 **Goal:** The product runs in production on a single host, behind a documented deploy + rollback workflow. **Mode:** mvp diff --git a/.planning/phases/06-background-worker/06-01-PLAN.md b/.planning/phases/06-background-worker/06-01-PLAN.md new file mode 100644 index 0000000..9e8e984 --- /dev/null +++ b/.planning/phases/06-background-worker/06-01-PLAN.md @@ -0,0 +1,258 @@ +--- +phase: 06-background-worker +plan: 01 +type: execute +wave: 1 +depends_on: [] +files_modified: + - backend/go.mod + - backend/go.sum + - backend/internal/db/queries/files.sql + - backend/internal/db/sqlc/files.sql.go + - backend/internal/jobs/heartbeat.go + - backend/internal/jobs/orphan_cleanup.go + - backend/internal/jobs/error_handler.go + - backend/internal/jobs/heartbeat_test.go + - backend/internal/jobs/orphan_cleanup_test.go + - backend/internal/jobs/error_handler_test.go +autonomous: true +requirements: + - WORK-01 + - WORK-02 + - WORK-03 + - WORK-04 + +must_haves: + truths: + - "river and riverpgxv5 are importable from backend/internal/jobs/ without build errors" + - "HeartbeatWorker.Work() returns nil and logs 'worker heartbeat' with job_id and attempt fields" + - "OrphanCleanupWorker.Work() iterates orphan rows, deletes S3 object then DB row, logs per-run summary" + - "SlogErrorHandler implements river.ErrorHandler (both HandleError and HandlePanic compile)" + - "ListOrphanFiles sqlc query is generated and returns []ListOrphanFilesRow with {ID, TabloID, S3Key}" + - "Unit tests for HeartbeatWorker, OrphanCleanupWorker, and SlogErrorHandler pass" + artifacts: + - path: "backend/go.mod" + provides: "river dependency declarations" + contains: "github.com/riverqueue/river" + - path: "backend/internal/jobs/heartbeat.go" + provides: "HeartbeatArgs + HeartbeatWorker" + exports: ["HeartbeatArgs", "HeartbeatWorker"] + - path: "backend/internal/jobs/orphan_cleanup.go" + provides: "OrphanCleanupArgs + OrphanCleanupWorker + constructor" + exports: ["OrphanCleanupArgs", "OrphanCleanupWorker", "NewOrphanCleanupWorker"] + - path: "backend/internal/jobs/error_handler.go" + provides: "SlogErrorHandler" + exports: ["SlogErrorHandler"] + - path: "backend/internal/db/queries/files.sql" + provides: "ListOrphanFiles query" + contains: "name: ListOrphanFiles :many" + key_links: + - from: "backend/internal/jobs/orphan_cleanup.go" + to: "backend/internal/db/sqlc/files.sql.go" + via: "sqlc.New(w.pool).ListOrphanFiles(ctx)" + pattern: "ListOrphanFiles" + - from: "backend/internal/jobs/orphan_cleanup.go" + to: "backend/internal/files/store.go" + via: "w.store.Delete(ctx, f.S3Key)" + pattern: "\.store\.Delete" +--- + + +Add the river dependency and implement the `internal/jobs/` package — job arg types, worker structs, and the error handler — plus the `ListOrphanFiles` sqlc query that the orphan cleanup worker depends on. Unit tests prove each component in isolation before the binary is wired in Plan 02. + +Purpose: Delivers the entire domain layer for Phase 6 (WORK-01 partially, WORK-02, WORK-03, WORK-04) as a vertical slice that compiles and tests cleanly before cmd/worker is wired. +Output: `backend/internal/jobs/` package with three files + tests; updated `files.sql` and regenerated sqlc; river in `go.mod`. + + + +@/Users/arthur.belleville/Documents/perso/projects/xtablo-source/.claude/get-shit-done/workflows/execute-plan.md +@/Users/arthur.belleville/Documents/perso/projects/xtablo-source/.claude/get-shit-done/templates/summary.md + + + +@.planning/ROADMAP.md +@.planning/REQUIREMENTS.md +@.planning/phases/06-background-worker/06-CONTEXT.md +@.planning/phases/06-background-worker/06-RESEARCH.md +@.planning/phases/06-background-worker/06-PATTERNS.md + + +From backend/internal/files/store.go: + type FileStorer interface { + Upload(ctx context.Context, key string, file io.Reader) (contentType string, bytesWritten int64, err error) + Delete(ctx context.Context, key string) error + PresignDownload(ctx context.Context, key string) (string, error) + } + +From backend/internal/db/sqlc/db.go: + func New(db DBTX) *Queries // *pgxpool.Pool satisfies DBTX + +From backend/internal/db/queries/files.sql (existing queries — append ListOrphanFiles after DeleteTabloFile): + -- name: DeleteTabloFile :exec + DELETE FROM tablo_files WHERE id = $1 AND tablo_id = $2; + + -- DeleteTabloFileParams is generated as: {ID uuid.UUID, TabloID uuid.UUID} + -- These fields are available from ListOrphanFilesRow — no new delete query needed. + +From backend/internal/db/sqlc/files.sql.go (generated pattern — ListOrphanFiles will follow this shape): + func (q *Queries) ListFilesByTablo(ctx context.Context, tabloID uuid.UUID) ([]TabloFile, error) + // ListOrphanFiles will be: func (q *Queries) ListOrphanFiles(ctx context.Context) ([]ListOrphanFilesRow, error) + // ListOrphanFilesRow fields: ID uuid.UUID, TabloID uuid.UUID, S3Key string + + + + + + + Task 1: Add river dependency + ListOrphanFiles sqlc query + backend/go.mod, backend/go.sum, backend/internal/db/queries/files.sql, backend/internal/db/sqlc/files.sql.go + + - backend/go.mod — read current require block before adding deps + - backend/internal/db/queries/files.sql — read existing queries to understand file style and append position + + + - After `go get`, `go.mod` contains `github.com/riverqueue/river` and `github.com/riverqueue/river/riverdriver/riverpgxv5` + - `ListOrphanFiles` query appended to files.sql follows exact annotation style of existing queries in that file + - After `just generate`, `backend/internal/db/sqlc/files.sql.go` exports `ListOrphanFiles(ctx context.Context) ([]ListOrphanFilesRow, error)` and `ListOrphanFilesRow` struct with fields `ID`, `TabloID`, `S3Key` + - `go build ./...` exits 0 after changes + + + Step 1 — add river modules (run from backend/ directory): + go get github.com/riverqueue/river@v0.37.0 + go get github.com/riverqueue/river/riverdriver/riverpgxv5@v0.37.0 + These commands update both go.mod and go.sum. Do NOT hand-edit go.mod for these deps. + + Step 2 — append the orphan query to backend/internal/db/queries/files.sql (after the existing DeleteTabloFile query). The query name annotation is "name: ListOrphanFiles :many". The SELECT returns three columns: id, tablo_id, s3_key from tablo_files tf WHERE NOT EXISTS (SELECT 1 FROM tablos t WHERE t.id = tf.tablo_id). Include a comment line above the annotation explaining its purpose ("Find tablo_files rows whose owning tablo no longer exists. Used by the orphan-file cleanup worker (Phase 6 WORK-02)."). Use the alias "tf" for tablo_files per the NOT EXISTS subquery. + + Step 3 — regenerate sqlc: + cd backend && just generate + Verify the generated file at backend/internal/db/sqlc/files.sql.go contains "ListOrphanFiles" and "ListOrphanFilesRow". + + + cd /Users/arthur.belleville/Documents/perso/projects/xtablo-source/backend && grep -c "riverqueue/river" go.mod && grep -c "ListOrphanFiles" internal/db/sqlc/files.sql.go && go build ./... + + + go.mod contains github.com/riverqueue/river (grep -c returns >= 1). internal/db/sqlc/files.sql.go contains ListOrphanFiles. go build ./... exits 0. + + + - `grep "riverqueue/river" backend/go.mod` returns at least two lines (river + riverpgxv5) + - `grep "riverqueue/river/riverdriver/riverpgxv5" backend/go.mod` returns a line + - `grep "ListOrphanFiles" backend/internal/db/sqlc/files.sql.go` returns lines for both function and row type + - `grep "ListOrphanFilesRow" backend/internal/db/sqlc/files.sql.go` confirms the row struct exists + - `cd backend && go build ./...` exits 0 + + + + + Task 2: Create internal/jobs/ package — HeartbeatWorker + OrphanCleanupWorker + SlogErrorHandler + unit tests + + backend/internal/jobs/heartbeat.go, + backend/internal/jobs/orphan_cleanup.go, + backend/internal/jobs/error_handler.go, + backend/internal/jobs/heartbeat_test.go, + backend/internal/jobs/orphan_cleanup_test.go, + backend/internal/jobs/error_handler_test.go + + + - backend/internal/auth/session.go — struct definition with injected dep pattern to replicate + - backend/internal/files/store.go — FileStorer interface (lines 17-22) used by OrphanCleanupWorker + - backend/internal/db/sqlc/files.sql.go — confirm ListOrphanFilesRow and DeleteTabloFileParams field names after regeneration + - backend/internal/db/sqlc/db.go — confirm sqlc.New signature (takes DBTX) + - .planning/phases/06-background-worker/06-RESEARCH.md — Pattern 1 (job structs), Pattern 4 (SlogErrorHandler) + - .planning/phases/06-background-worker/06-PATTERNS.md — Pattern assignments for all three files + + + HeartbeatWorker: + - HeartbeatArgs struct with no fields; Kind() string returns "heartbeat" + - HeartbeatWorker struct embeds river.WorkerDefaults[HeartbeatArgs] + - Work(ctx, job) calls slog.Info("worker heartbeat", "job_id", job.ID, "attempt", job.Attempt) and returns nil + - TestHeartbeatWorker: construct HeartbeatWorker, call Work(ctx, job), assert no error + + OrphanCleanupWorker: + - OrphanCleanupArgs struct with no fields; Kind() string returns "orphan_file_cleanup" + - OrphanCleanupWorker struct embeds river.WorkerDefaults[OrphanCleanupArgs]; fields: pool *pgxpool.Pool, store files.FileStorer + - NewOrphanCleanupWorker(pool *pgxpool.Pool, store files.FileStorer) *OrphanCleanupWorker constructor + - Work(ctx, job): calls sqlc.New(w.pool).ListOrphanFiles(ctx); loops over results; deletes S3 object first (w.store.Delete(ctx, f.S3Key)), then calls sqlc.New(w.pool).DeleteTabloFile(ctx, sqlc.DeleteTabloFileParams{ID: f.ID, TabloID: f.TabloID}) for each; increments deleted/errCount counters; logs slog.Info("orphan cleanup complete", "orphans_found", len(orphans), "deleted", deleted, "errors", errCount); returns nil (does not fail the job for partial errors — logs them) + - TestOrphanCleanupWorker_NoOrphans: use mock store + real test DB (skip if no DB), confirm Work returns nil and logs "orphan cleanup complete" with orphans_found=0 + - TestOrphanCleanupWorker_DeletesOrphan: insert a tablo_files row with nonexistent tablo_id directly (bypassing FK), run Work, confirm the row is gone and s3 delete was called + + SlogErrorHandler: + - SlogErrorHandler struct with no fields + - HandleError(ctx, job *rivertype.JobRow, err error) *river.ErrorHandlerResult: calls slog.Error("job error", "job_id", job.ID, "job_kind", job.Kind, "attempt", job.Attempt, "max_attempts", job.MaxAttempts, "err", err); returns nil + - HandlePanic(ctx, job *rivertype.JobRow, panicVal any, trace string) *river.ErrorHandlerResult: calls slog.Error("job panic", "job_id", job.ID, "job_kind", job.Kind, "panic", panicVal, "trace", trace); returns nil + - TestSlogErrorHandler: construct handler, call HandleError with a fake JobRow and error, confirm no panic and return value is nil + + + Create three Go source files under backend/internal/jobs/ (package declaration: "package jobs"). + + heartbeat.go: Define HeartbeatArgs (no fields) with Kind() returning "heartbeat". Define HeartbeatWorker embedding river.WorkerDefaults[HeartbeatArgs] with Work method logging slog.Info("worker heartbeat", "job_id", job.ID, "attempt", job.Attempt) then returning nil. Import "github.com/riverqueue/river" and "log/slog" and "context". + + orphan_cleanup.go: Define OrphanCleanupArgs (no fields) with Kind() returning "orphan_file_cleanup". Define OrphanCleanupWorker struct with fields pool *pgxpool.Pool and store files.FileStorer; embed river.WorkerDefaults[OrphanCleanupArgs]. Implement NewOrphanCleanupWorker(pool *pgxpool.Pool, store files.FileStorer) *OrphanCleanupWorker. Implement Work(ctx, job) per the behavior spec: ListOrphanFiles → loop (S3 delete first, then DB delete, counting deleted/errCount) → log summary → return nil. On individual S3 or DB errors in the loop, log slog.Error with relevant fields (s3_key or file_id) and increment errCount; continue to next item. Import "backend/internal/db/sqlc", "backend/internal/files", "github.com/jackc/pgx/v5/pgxpool", "github.com/riverqueue/river", "log/slog", "context", "fmt". + + error_handler.go: Define SlogErrorHandler struct (no fields). Implement HandleError and HandlePanic per the behavior spec. Import "context", "github.com/riverqueue/river", "github.com/riverqueue/river/rivertype", "log/slog". + + For the test files: heartbeat_test.go and error_handler_test.go use pure unit tests (no DB needed). orphan_cleanup_test.go: copy the setupTestDB helper from backend/internal/auth/testdb_test.go (it reads TEST_DATABASE_URL or DATABASE_URL, creates an isolated schema, runs goose migrations, returns pool + cleanup); use a fake in-memory FileStorer mock for the S3 side. Integration test for orphan cleanup inserts a tablo_files row with a uuid that has no matching tablos row (insert directly with no FK enforcement by first creating the file row while tablo exists, then hard-deleting the tablo row using `DELETE FROM tablos WHERE id = $1`; this triggers cascade but that's fine — alternatively, use a nonexistent UUID that was never inserted into tablos, which won't violate the FK constraint since you're inserting into tablo_files with a foreign key — instead, use testDB.Exec("INSERT INTO tablo_files (id, tablo_id, s3_key, filename, content_type, size_bytes) VALUES (gen_random_uuid(), gen_random_uuid(), 'orphan-key', 'orphan.txt', 'text/plain', 0)") to bypass the FK at test time; if the FK check prevents this, wrap in a deferred constraint or skip and only test the no-orphans path). + + After writing all files: cd backend && go test ./internal/jobs/... + + + cd /Users/arthur.belleville/Documents/perso/projects/xtablo-source/backend && go build ./internal/jobs/... && go test ./internal/jobs/... -v -count=1 + + + go build ./internal/jobs/... exits 0. go test ./internal/jobs/... passes (or skips DB-dependent tests when no DB is available). All three files exist with correct package declarations and exported types. + + + - `cd backend && go build ./internal/jobs/...` exits 0 + - `grep "HeartbeatArgs" backend/internal/jobs/heartbeat.go` returns a line + - `grep "HeartbeatWorker" backend/internal/jobs/heartbeat.go` returns a line with WorkerDefaults embedding + - `grep "OrphanCleanupWorker" backend/internal/jobs/orphan_cleanup.go` returns a line + - `grep "NewOrphanCleanupWorker" backend/internal/jobs/orphan_cleanup.go` returns a line + - `grep "SlogErrorHandler" backend/internal/jobs/error_handler.go` returns a line + - `grep "HandleError" backend/internal/jobs/error_handler.go` returns a line + - `grep "HandlePanic" backend/internal/jobs/error_handler.go` returns a line + - `cd backend && go test ./internal/jobs/... -v -count=1` exits 0 (DB tests may skip, but must not fail) + - `cd backend && go build ./...` exits 0 (whole module compiles) + + + + + + +## Trust Boundaries + +| Boundary | Description | +|----------|-------------| +| Worker process → Postgres | Worker reads/writes job rows and tablo_files; all operations use the project pgxpool with the same Postgres credentials as cmd/web | +| Worker process → S3/MinIO | Worker deletes S3 objects using credentials from env vars; no user-supplied input reaches these calls in Phase 6 | + +## STRIDE Threat Register + +| Threat ID | Category | Component | Disposition | Mitigation Plan | +|-----------|----------|-----------|-------------|-----------------| +| T-06-01 | Tampering | OrphanCleanupWorker — river_job row args | accept | Phase 6 periodic jobs use empty args structs (HeartbeatArgs{}, OrphanCleanupArgs{}); no user-supplied data is deserialized from job args; tampering with river_job rows requires DB access, which is already a full compromise | +| T-06-02 | Information Disclosure | S3 credentials in env vars | mitigate | Credentials read from env vars (never hardcoded); .env.example documents required vars without values; go.sum pins river to v0.37.0 to prevent supply-chain substitution | +| T-06-03 | Elevation of Privilege | Worker binary running as root | transfer | Phase 7 scope — Dockerfile non-root user is a Phase 7 DEPLOY requirement; documented in README as a known gap | +| T-06-04 | Denial of Service | MaxWorkers exceeding pool MaxConns | mitigate | MaxWorkers: 10 matches db.NewPool MaxConns: 10; documented in river.Config comment; never exceeds pool capacity | + + + +After Plan 01 is complete: +- `cd backend && go build ./...` exits 0 +- `cd backend && go test ./internal/jobs/... -v` passes (skips if no DB, but does not error) +- `grep "riverqueue/river" backend/go.mod` shows at least 2 lines (river + riverpgxv5) +- `grep "ListOrphanFiles" backend/internal/db/sqlc/files.sql.go` returns at least 1 line +- `grep "HeartbeatWorker" backend/internal/jobs/heartbeat.go` returns 1 line +- `grep "SlogErrorHandler" backend/internal/jobs/error_handler.go` returns 1 line + + + +- river importable from internal/jobs/ without compile errors +- HeartbeatWorker, OrphanCleanupWorker, SlogErrorHandler defined with correct signatures +- ListOrphanFiles sqlc query generated with ListOrphanFilesRow type +- Unit tests for all three job types pass +- go build ./... exits 0 — whole module compiles clean + + + +After completion, create `.planning/phases/06-background-worker/06-01-SUMMARY.md` + diff --git a/.planning/phases/06-background-worker/06-02-PLAN.md b/.planning/phases/06-background-worker/06-02-PLAN.md new file mode 100644 index 0000000..4ec472a --- /dev/null +++ b/.planning/phases/06-background-worker/06-02-PLAN.md @@ -0,0 +1,267 @@ +--- +phase: 06-background-worker +plan: 02 +type: execute +wave: 2 +depends_on: + - 06-01 +files_modified: + - backend/cmd/worker/main.go + - backend/justfile + - backend/README.md +autonomous: true +requirements: + - WORK-01 + - WORK-02 + - WORK-03 + +must_haves: + truths: + - "cmd/worker/main.go runs rivermigrate at startup before constructing river.Client" + - "cmd/worker/main.go registers HeartbeatWorker and OrphanCleanupWorker via river.AddWorker" + - "cmd/worker/main.go passes slog.Default() as river.Config.Logger" + - "cmd/worker/main.go starts HeartbeatWorker as a PeriodicJob every 1 minute with RunOnStart: true" + - "cmd/worker/main.go starts OrphanCleanupWorker as a PeriodicJob every 1 hour with RunOnStart: false" + - "Graceful shutdown calls riverClient.StopAndCancel with a 10-second timeout context" + - "go build ./cmd/worker/... exits 0" + - "just worker target exists in justfile and passes MinIO dev defaults as env vars" + - "README.md documents how to run the worker locally" + artifacts: + - path: "backend/cmd/worker/main.go" + provides: "full river wiring — rivermigrate + river.Client + periodic jobs + graceful shutdown" + contains: "rivermigrate.New" + - path: "backend/justfile" + provides: "worker target for local dev" + contains: "just worker" + - path: "backend/README.md" + provides: "worker local dev instructions" + contains: "just worker" + key_links: + - from: "backend/cmd/worker/main.go" + to: "backend/internal/jobs/" + via: "jobs.HeartbeatWorker, jobs.NewOrphanCleanupWorker, jobs.SlogErrorHandler" + pattern: "jobs\\." + - from: "backend/cmd/worker/main.go" + to: "rivermigrate" + via: "rivermigrate.New(riverpgxv5.New(pool), nil).Migrate(ctx, rivermigrate.DirectionUp, nil)" + pattern: "rivermigrate\\.New" + - from: "backend/cmd/worker/main.go" + to: "river.Client" + via: "river.NewClient(riverpgxv5.New(pool), &river.Config{...})" + pattern: "river\\.NewClient" +--- + + +Replace the noop body in `cmd/worker/main.go` with the full river runtime: rivermigrate at startup, S3 store init, signal context construction, worker registration, river.Client construction and start, block on signal, StopAndCancel graceful shutdown. Add the `just worker` justfile target with MinIO dev defaults. Document the worker in README.md. + +Purpose: Closes WORK-01 (worker binary runs and processes jobs), WORK-02 (heartbeat job observable in logs within 1 minute of startup), WORK-03 (graceful shutdown matching web binary pattern). +Output: Fully functional `cmd/worker` binary; `just worker` runs it locally; README documents it. + + + +@/Users/arthur.belleville/Documents/perso/projects/xtablo-source/.claude/get-shit-done/workflows/execute-plan.md +@/Users/arthur.belleville/Documents/perso/projects/xtablo-source/.claude/get-shit-done/templates/summary.md + + + +@.planning/ROADMAP.md +@.planning/phases/06-background-worker/06-CONTEXT.md +@.planning/phases/06-background-worker/06-RESEARCH.md +@.planning/phases/06-background-worker/06-PATTERNS.md +@.planning/phases/06-background-worker/06-01-SUMMARY.md + + +From backend/internal/jobs/ (created in Plan 01): + jobs.HeartbeatArgs{} — Kind() = "heartbeat" + jobs.HeartbeatWorker{} — embeds river.WorkerDefaults[HeartbeatArgs] + jobs.OrphanCleanupArgs{} — Kind() = "orphan_file_cleanup" + jobs.NewOrphanCleanupWorker(pool *pgxpool.Pool, store files.FileStorer) *OrphanCleanupWorker + jobs.SlogErrorHandler{} — implements river.ErrorHandler + +From backend/internal/files/store.go: + files.NewStore(ctx, endpoint, bucket, region, accessKey, secretKey, usePathStyle bool) (*Store, error) + — S3 env var names: S3_ENDPOINT, S3_BUCKET, S3_REGION, S3_ACCESS_KEY, S3_SECRET_KEY, S3_USE_PATH_STYLE + +From backend/internal/db/pool.go: + db.NewPool(ctx context.Context, dsn string) (*pgxpool.Pool, error) + +From backend/cmd/worker/main.go (current skeleton — to be fully replaced): + package main + — already has: env/dsn loading, slog.SetDefault, db.NewPool, signal.NotifyContext, pool.Close pattern + +From rivermigrate (per RESEARCH.md Pattern 2): + rivermigrate.New(riverpgxv5.New(pool), nil) → migrator + migrator.Migrate(ctx, rivermigrate.DirectionUp, nil) → (res, err) + res.Versions — range to log each applied version + +From river (per RESEARCH.md Pattern 3): + river.NewWorkers() → workers + river.AddWorker(workers, &HeartbeatWorker{}) + river.NewClient(riverpgxv5.New(pool), &river.Config{ + Logger: slog.Default(), + Workers: workers, + Queues: map[string]river.QueueConfig{river.QueueDefault: {MaxWorkers: 10}}, + ErrorHandler: &SlogErrorHandler{}, + PeriodicJobs: [...], + }) → (riverClient, err) + riverClient.Start(ctx) → err // MUST use live signal context, not background + riverClient.StopAndCancel(timeoutCtx) → err + +CRITICAL ORDERING CONSTRAINT (RESEARCH Pitfall 2 + PATTERNS.md): + 1. context.Background() for startup I/O (pool, rivermigrate, S3) + 2. signal.NotifyContext AFTER all startup I/O succeeds + 3. riverClient.Start(sigCtx) — passes the signal context; jobs inherit it + 4. <-sigCtx.Done() — wait for SIGINT/SIGTERM + 5. StopAndCancel with a fresh context.WithTimeout(context.Background(), 10*time.Second) + 6. pool.Close() + Do NOT create signal.NotifyContext before rivermigrate or S3 init. + + + + + + + Task 1: Replace cmd/worker/main.go with full river wiring + backend/cmd/worker/main.go + + - backend/cmd/worker/main.go — read the current skeleton to understand what to preserve and what to replace + - backend/cmd/web/main.go — S3 env var loading pattern (lines 85-100) and graceful shutdown pattern (lines 144-148) to replicate + - .planning/phases/06-background-worker/06-RESEARCH.md — full cmd/worker/main.go code example (Pattern 2, 3, 5) and Pitfall 2 + - .planning/phases/06-background-worker/06-PATTERNS.md — critical ordering constraint and pool close ordering + + + Replace the entire content of backend/cmd/worker/main.go. The new file must implement the following sequence exactly (per RESEARCH Pattern ordering and PATTERNS.md critical ordering constraint): + + Imports to include: "context", "log/slog", "os", "os/signal", "syscall", "time", "backend/internal/db", "backend/internal/files", "backend/internal/jobs", "backend/internal/web", "github.com/riverqueue/river", "github.com/riverqueue/river/riverdriver/riverpgxv5", "github.com/riverqueue/river/rivermigrate". + + main() function sequence: + 1. Read ENV (default "development") and DATABASE_URL env vars. Call slog.SetDefault(slog.New(web.NewSlogHandler(env, os.Stdout))). Guard on empty DATABASE_URL (slog.Error + os.Exit(1)). + + 2. Create startup context: ctx := context.Background() (NOT signal context yet — critical per PATTERNS.md ordering constraint). + + 3. Connect DB pool: db.NewPool(ctx, dsn). On error: slog.Error + os.Exit(1). Note: defer pool.Close() is NOT used here — explicit pool.Close() at the end of shutdown sequence (after StopAndCancel) per PATTERNS.md pool close ordering. + + 4. Run rivermigrate: rivermigrate.New(riverpgxv5.New(pool), nil). On init error: slog.Error + os.Exit(1). Call .Migrate(ctx, rivermigrate.DirectionUp, nil). On migrate error: slog.Error + os.Exit(1). Range over res.Versions and log each with slog.Info("river migration applied", "version", v.Version). + + 5. Init S3 store: read env vars S3_ENDPOINT, S3_BUCKET, S3_ACCESS_KEY, S3_SECRET_KEY, S3_REGION (default "us-east-1" if empty), S3_USE_PATH_STYLE == "true". Call files.NewStore(ctx, endpoint, bucket, region, accessKey, secretKey, usePathStyle). On error: slog.Error + os.Exit(1). + + 6. Create signal context AFTER all startup I/O: sigCtx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM). defer stop(). + + 7. Register workers: workers := river.NewWorkers(). river.AddWorker(workers, &jobs.HeartbeatWorker{}). river.AddWorker(workers, jobs.NewOrphanCleanupWorker(pool, fileStore)). + + 8. Construct river client: river.NewClient(riverpgxv5.New(pool), &river.Config{ Logger: slog.Default(), Workers: workers, Queues: map[string]river.QueueConfig{river.QueueDefault: {MaxWorkers: 10}}, ErrorHandler: &jobs.SlogErrorHandler{}, PeriodicJobs: []*river.PeriodicJob{ river.NewPeriodicJob(river.PeriodicInterval(1*time.Minute), func() (river.JobArgs, *river.InsertOpts) { return jobs.HeartbeatArgs{}, nil }, &river.PeriodicJobOpts{RunOnStart: true}), river.NewPeriodicJob(river.PeriodicInterval(1*time.Hour), func() (river.JobArgs, *river.InsertOpts) { return jobs.OrphanCleanupArgs{}, nil }, nil), }, }). On error: slog.Error + os.Exit(1). + + 9. Start client: riverClient.Start(sigCtx). On error: slog.Error + os.Exit(1). + + 10. Log slog.Info("worker ready"). + + 11. <-sigCtx.Done(). Log slog.Info("shutting down"). + + 12. Create stopCtx: context.WithTimeout(context.Background(), 10*time.Second). defer stopCancel(). Call riverClient.StopAndCancel(stopCtx). On error: slog.Warn("river stop did not finish cleanly", "err", err). + + 13. pool.Close(). Log slog.Info("shutdown complete"). + + The file package comment should read: "Command worker is the background job processor for Xtablo. It runs river periodic jobs against the same Postgres as cmd/web." + + + cd /Users/arthur.belleville/Documents/perso/projects/xtablo-source/backend && go build ./cmd/worker/... + + + go build ./cmd/worker/... exits 0. The file contains "rivermigrate.New", "river.NewClient", "StopAndCancel", "HeartbeatArgs", "OrphanCleanupArgs". + + + - `cd backend && go build ./cmd/worker/...` exits 0 + - `grep "rivermigrate.New" backend/cmd/worker/main.go` returns a line + - `grep "river.NewClient" backend/cmd/worker/main.go` returns a line + - `grep "StopAndCancel" backend/cmd/worker/main.go` returns a line + - `grep "HeartbeatArgs" backend/cmd/worker/main.go` returns a line + - `grep "OrphanCleanupArgs" backend/cmd/worker/main.go` returns a line + - `grep "RunOnStart.*true" backend/cmd/worker/main.go` returns a line (confirms heartbeat RunOnStart) + - `grep "slog.Default()" backend/cmd/worker/main.go` returns a line (river Logger wired to slog) + - `cd backend && go build ./...` exits 0 (whole module compiles) + + + + + Task 2: Add just worker target + document worker in README.md + backend/justfile, backend/README.md + + - backend/justfile — read the dev target (lines 111-113) and variable declarations (lines 38-39) to match the style exactly + - backend/README.md — read the full file to find the right insertion point for the worker section + - .planning/phases/06-background-worker/06-PATTERNS.md — new worker target style (last Pattern section) + - .planning/phases/06-background-worker/06-CONTEXT.md — D-08 single-worker constraint to document + + + justfile — append the `worker` target after the `dev` target (keep file order: dev, then worker). The target: + - depends on db-up (same as dev) + - passes these env vars on separate continuation lines: DATABASE_URL='{{ database_url }}', S3_ENDPOINT='http://localhost:9000', S3_BUCKET='xtablo', S3_REGION='us-east-1', S3_ACCESS_KEY='minioadmin', S3_SECRET_KEY='minioadmin', S3_USE_PATH_STYLE='true' + - runs: go run ./cmd/worker + - precede the target with a comment: "# Start the worker binary (development — requires db-up and MinIO running)." + - Match the env-var-per-continuation-line style of the existing dev target exactly. + + README.md — add a "Running the Worker" section. Insert it after the existing "Development" or "Running" section (whatever comes last before any deployment/production section). The section must include: + - Command: just worker (requires db-up and just dev or MinIO running separately) + - What to expect: structured logs appear immediately; "worker ready" log within a few seconds; "worker heartbeat" log appears within ~1 minute of startup (RunOnStart fires immediately on first tick, so first heartbeat appears within seconds) + - Note on D-08: only one worker process should run at a time for v1; do not run multiple worker instances + - Graceful shutdown: send SIGINT (Ctrl+C) and observe "shutting down" then "shutdown complete" in logs + - Observing failed job retries: river logs "job error" with job_id, job_kind, attempt, max_attempts, err fields on each failure; max 25 attempts before discard + + + cd /Users/arthur.belleville/Documents/perso/projects/xtablo-source/backend && grep -c "^worker:" justfile && grep -c "just worker" README.md + + + justfile contains a `worker:` target. README.md mentions `just worker`. Both grep counts return >= 1. + + + - `grep "^worker:" backend/justfile` returns exactly 1 line + - `grep "S3_ENDPOINT" backend/justfile` returns at least 2 lines (one in dev equivalent path, one in worker target) + - `grep "go run ./cmd/worker" backend/justfile` returns a line inside the worker target + - `grep "just worker" backend/README.md` returns at least 1 line + - `grep -i "single.*worker\|one worker\|multiple worker" backend/README.md` returns a line (D-08 documented) + - `grep "worker heartbeat" backend/README.md` returns a line (observable log message documented) + - `cd backend && just --list 2>&1 | grep worker` returns a line confirming the target is registered + + + + + + +## Trust Boundaries + +| Boundary | Description | +|----------|-------------| +| Worker process → Postgres (startup) | rivermigrate acquires advisory lock during migration; cmd/worker must not run concurrently with another instance during migration phase | +| Worker process → S3/MinIO | Orphan cleanup deletes objects using credentials loaded from env; no user-supplied key values reach S3 in Phase 6 | + +## STRIDE Threat Register + +| Threat ID | Category | Component | Disposition | Mitigation Plan | +|-----------|----------|-----------|-------------|-----------------| +| T-06-05 | Tampering | rivermigrate concurrent execution | mitigate | D-08 documents single-worker constraint; rivermigrate uses advisory lock (pg_advisory_lock) to prevent concurrent migration runs; README documents do-not-run-multiple-instances | +| T-06-06 | Denial of Service | river.Client.Start with already-cancelled context (Pitfall 2) | mitigate | signal.NotifyContext created AFTER all startup I/O per PATTERNS.md ordering constraint; Start receives a live context | +| T-06-07 | Information Disclosure | S3 MinIO credentials in justfile dev target | accept | Dev-only credentials (minioadmin/minioadmin) are the MinIO default; justfile is committed and these are not production secrets; .env.example documents production vars separately | +| T-06-08 | Tampering | Orphan cleanup deletes wrong S3 key | mitigate | Key comes from tablo_files.s3_key column (set at upload time in Phase 5); not reconstructed; only orphan rows (no matching tablo) are targeted by the query | + + + +After Plan 02 is complete: +- `cd backend && go build ./...` exits 0 +- `cd backend && go build ./cmd/worker/...` exits 0 +- `grep "rivermigrate.New" backend/cmd/worker/main.go` returns a line +- `grep "^worker:" backend/justfile` returns a line +- `cd backend && just --list 2>&1 | grep worker` shows the worker target +- `grep "just worker" backend/README.md` returns a line + + + +- cmd/worker/main.go compiles with full river wiring +- rivermigrate runs before river.Client starts (verified by code structure) +- Both periodic jobs registered (heartbeat 1min RunOnStart:true, orphan cleanup 1hr) +- Graceful shutdown via StopAndCancel(10s timeout context) +- just worker target present and documented in README +- go build ./... exits 0 — full module clean + + + +After completion, create `.planning/phases/06-background-worker/06-02-SUMMARY.md` + diff --git a/.planning/phases/06-background-worker/06-03-PLAN.md b/.planning/phases/06-background-worker/06-03-PLAN.md new file mode 100644 index 0000000..94c96d9 --- /dev/null +++ b/.planning/phases/06-background-worker/06-03-PLAN.md @@ -0,0 +1,117 @@ +--- +phase: 06-background-worker +plan: 03 +type: execute +wave: 3 +depends_on: + - 06-02 +files_modified: [] +autonomous: false +requirements: + - WORK-01 + - WORK-02 + - WORK-03 + - WORK-04 +--- + + +Human-verify checkpoint: run `just worker` against a live Postgres + MinIO, observe that the worker starts, runs the heartbeat job within seconds (RunOnStart: true), shuts down cleanly on Ctrl+C, and confirm logs are structured and readable. + +Purpose: Closes all four WORK requirements with observable end-to-end proof. Verifies the binary actually works — not just compiles. +Output: Developer confirmation that WORK-01 through WORK-04 are satisfied in a live dev environment. + + + +@/Users/arthur.belleville/Documents/perso/projects/xtablo-source/.claude/get-shit-done/workflows/execute-plan.md +@/Users/arthur.belleville/Documents/perso/projects/xtablo-source/.claude/get-shit-done/templates/summary.md + + + +@.planning/ROADMAP.md +@.planning/phases/06-background-worker/06-CONTEXT.md +@.planning/phases/06-background-worker/06-02-SUMMARY.md + + + + + + + cmd/worker binary with: + - rivermigrate running river schema migrations at startup + - HeartbeatWorker (every 1 minute, RunOnStart: true — fires immediately) + - OrphanCleanupWorker (every 1 hour, RunOnStart: false) + - SlogErrorHandler logging job failures to structured slog + - Graceful shutdown on SIGINT/SIGTERM (StopAndCancel with 10-second timeout) + - just worker target with MinIO dev defaults + + + Prerequisites (run each in a separate terminal or as background): + 1. `cd backend && just db-up` — start Postgres (and MinIO from compose.yaml) + 2. Optionally `just dev` in a second terminal to confirm web binary still works alongside + + Worker verification steps: + 3. Run: `cd backend && just worker` + 4. Within 5 seconds, confirm these log lines appear (look for structured key=value or JSON output): + - `"river migration applied"` (with version number) OR no migration lines if already applied + - `"worker ready"` — confirms binary started without errors + - `"worker heartbeat"` — RunOnStart:true fires the heartbeat immediately on first tick; expect this within ~10 seconds + + 5. Wait ~60 seconds for a second heartbeat log line — confirms the periodic scheduler is running. + Look for: `msg="worker heartbeat" job_id=... attempt=1` + + 6. Send SIGINT: press Ctrl+C in the worker terminal. + 7. Confirm these log lines appear in order: + - `"shutting down"` + - `"shutdown complete"` + Process must exit cleanly (exit code 0 or from signal, NOT with a panic or error). + + Failed job visibility check (WORK-04 — optional if no failing job is easy to trigger): + - River logs job errors automatically via SlogErrorHandler. If the orphan cleanup job ran (visible as `"orphan cleanup complete"` with `orphans_found=0`), WORK-04 is satisfied by the error handler being registered and logging correctly. + - Alternatively: temporarily modify HeartbeatWorker.Work to return fmt.Errorf("test error"), rebuild, run worker, and look for `msg="job error" job_kind=heartbeat attempt=1` in logs. Revert after confirming. + + Full test suite gate: + 8. `cd backend && go test ./... && go build ./...` — must exit 0 + + + Type "approved" if all log lines appeared and shutdown was clean. + Type "issues: [describe]" if any step failed or was unclear. + + + + + + +## Trust Boundaries + +| Boundary | Description | +|----------|-------------| +| Local dev environment | All verification runs against local MinIO and Postgres; no production systems involved | + +## STRIDE Threat Register + +| Threat ID | Category | Component | Disposition | Mitigation Plan | +|-----------|----------|-----------|-------------|-----------------| +| T-06-09 | Information Disclosure | Worker logs in dev mode | accept | Dev mode uses slog text handler (not JSON); no PII in heartbeat/orphan cleanup logs; production uses JSON handler (controlled by ENV=production) | + + + +Manual verification only (this plan IS the verification step). Pass criteria: + +1. `"worker ready"` appears in logs within 5 seconds of `just worker` +2. `"worker heartbeat"` appears within 10 seconds (RunOnStart fires first heartbeat immediately) +3. A second `"worker heartbeat"` appears ~60 seconds later (confirms scheduler is running) +4. Ctrl+C produces `"shutting down"` then `"shutdown complete"` with no panic +5. `cd backend && go test ./... && go build ./...` exits 0 + + + +- Worker starts, connects to Postgres, applies river migrations, and logs "worker ready" +- Heartbeat job fires within seconds of startup (RunOnStart: true) +- Graceful shutdown works: SIGINT → "shutting down" → "shutdown complete" +- Full test suite green: go test ./... && go build ./... exits 0 +- All WORK-01 through WORK-04 requirements satisfied as observable behaviors + + + +After completion, create `.planning/phases/06-background-worker/06-03-SUMMARY.md` +