diff --git a/.planning/phases/06-background-worker/06-RESEARCH.md b/.planning/phases/06-background-worker/06-RESEARCH.md new file mode 100644 index 0000000..295ffe5 --- /dev/null +++ b/.planning/phases/06-background-worker/06-RESEARCH.md @@ -0,0 +1,753 @@ +# Phase 6: Background Worker - Research + +**Researched:** 2026-05-15 +**Domain:** River (Postgres-native job queue), Go worker binary, periodic jobs, sqlc orphan query +**Confidence:** HIGH + +--- + + +## User Constraints (from CONTEXT.md) + +### Locked Decisions +- **D-01:** Use river (`github.com/riverqueue/river`) as the job queue. Postgres-native, adds one Go dependency, provides built-in retry/backoff and advisory locking. +- **D-02:** River's schema managed via `rivermigrate` run programmatically at worker startup (before the client starts listening). No goose migration needed for river's internal tables. +- **D-03:** Phase 6 uses **periodic jobs only** (`river.PeriodicJob`). Web-side enqueueing deferred. +- **D-04:** Two jobs ship: **Heartbeat** (1 min) + **Orphan-file cleanup** (1 hr). +- **D-05:** Both registered as `river.PeriodicJob` entries at worker startup. +- **D-06:** Failed jobs surfaced via **structured logs only** (no CLI command or admin route). +- **D-07:** Single `river.Client` in `cmd/worker/main.go`, blocks on SIGINT/SIGTERM. +- **D-08:** Single-worker constraint — README documents: do not run multiple worker processes. + +### Claude's Discretion +- Exact log fields emitted by the heartbeat job (e.g. worker uptime, job count). +- Whether the orphan-file cleanup job logs a per-run summary (rows deleted, S3 objects deleted, errors). +- River client configuration details: worker concurrency, max attempts before discard, queue name. +- Whether the orphan detection query uses LEFT JOIN or NOT IN / NOT EXISTS. + +### Deferred Ideas (OUT OF SCOPE) +- Web-side job enqueueing (river client in `cmd/web`). +- Multiple worker instances / leader election. +- Job admin UI or CLI subcommand. +- Redis / asynq. + + +--- + + +## Phase Requirements + +| ID | Description | Research Support | +|----|-------------|------------------| +| WORK-01 | `cmd/worker` connects to same Postgres and runs a job queue | D-01/D-02: river + riverpgxv5 + rivermigrate wiring documented below | +| WORK-02 | At least one real job runs end-to-end | D-04: Heartbeat + Orphan-file cleanup; sqlc query pattern documented | +| WORK-03 | Worker has structured logging and graceful shutdown matching the web binary | River Logger field accepts `*slog.Logger`; shutdown pattern documented | +| WORK-04 | Failed jobs retried with backoff and visible in a simple admin/CLI surface | Default 25 retries, `attempts^4` backoff; ErrorHandler wires to slog | + + +--- + +## Summary + +River is a Postgres-native job queue for Go. It stores jobs in `river_job` and related tables inside the application Postgres database, using `LISTEN`/`NOTIFY` for fast pickup and advisory locks for leader election. There is no external broker process. The `riverpgxv5` driver wraps a `*pgxpool.Pool` directly — the project's existing `db.NewPool` result is passed in unchanged. + +River manages its own schema through a separate migration path (`rivermigrate`). The planner must call `migrator.Migrate(ctx, rivermigrate.DirectionUp, nil)` once at worker startup before constructing the `river.Client`. This is a pure code operation — no goose migration file is needed for river's tables. + +Periodic jobs are declared as `[]*river.PeriodicJob` in `river.Config` at construction time. Each entry receives a schedule, a constructor function that returns `river.JobArgs`, and optional opts. River inserts the job into `river_job` on each tick; a registered `Worker[T]` picks it up. The `tablo_files` orphan query uses a `NOT EXISTS` subquery (preferred over `LEFT JOIN IS NULL` for clarity with sqlc) against the `tablos` table, which already exists and has a primary key index. + +**Primary recommendation:** Wire `rivermigrate.New(riverpgxv5.New(pool), nil)` at startup, construct `river.Client` with two `PeriodicJob` entries in `Config`, pass `Logger: slog.Default()` to propagate structured logs, implement `ErrorHandler` to log failures via slog, and hook `riverClient.StopAndCancel` into the existing `signal.NotifyContext` pattern. + +--- + +## Architectural Responsibility Map + +| Capability | Primary Tier | Secondary Tier | Rationale | +|------------|-------------|----------------|-----------| +| Job queue schema migration | Worker binary (startup) | — | rivermigrate runs once at worker boot, owns river_* tables | +| Periodic job scheduling | Worker binary (river scheduler) | — | River's internal scheduler runs inside the same process | +| Heartbeat job execution | Worker binary | — | In-process worker goroutine; writes to slog | +| Orphan-file detection | Worker binary → Postgres | — | sqlc query runs against the DB pool passed to the worker | +| S3 object deletion (orphan) | Worker binary → S3 | — | Reuses `files.Store.Delete` already implemented in Phase 5 | +| Failed job retry | River runtime (automatic) | Worker ErrorHandler | River reschedules; ErrorHandler logs; no external coordination | +| Graceful shutdown | Worker binary | River client Stop/StopAndCancel | Existing `signal.NotifyContext` triggers `riverClient.StopAndCancel` | + +--- + +## Standard Stack + +### Core +| Library | Version | Purpose | Why Standard | +|---------|---------|---------|--------------| +| `github.com/riverqueue/river` | v0.37.0 | Job queue runtime, periodic job scheduler | Locked in D-01; Postgres-native, no broker | +| `github.com/riverqueue/river/riverdriver/riverpgxv5` | v0.37.0 | River driver for pgx/v5 pool | Required adapter; project uses pgx/v5 | + +`rivermigrate` is a **sub-package** of the main `river` module — imported as `github.com/riverqueue/river/rivermigrate`. It does NOT have its own go module path. Adding `github.com/riverqueue/river` and `github.com/riverqueue/river/riverdriver/riverpgxv5` to `go.mod` is sufficient. + +[VERIFIED: Go module proxy — `github.com/riverqueue/river` v0.37.0 published 2026-05-11] +[VERIFIED: Go module proxy — `github.com/riverqueue/river/riverdriver/riverpgxv5` v0.37.0 published 2026-05-11] + +**Installation:** +```bash +cd backend +go get github.com/riverqueue/river@v0.37.0 +go get github.com/riverqueue/river/riverdriver/riverpgxv5@v0.37.0 +``` + +--- + +## Architecture Patterns + +### System Architecture Diagram + +``` +cmd/worker/main.go + │ + ├─ db.NewPool(ctx, dsn) ──► pgxpool.Pool + │ │ + ├─ rivermigrate.New( │ + │ riverpgxv5.New(pool), nil) │ + │ .Migrate(ctx, DirectionUp, nil) │ (creates river_* tables) + │ │ + ├─ files.NewStore(...) │ + │ └─► files.FileStorer │ + │ │ + ├─ river.NewWorkers() │ + │ river.AddWorker(workers, │ + │ &HeartbeatWorker{}) │ + │ river.AddWorker(workers, │ + │ &OrphanCleanupWorker{pool, store}) │ + │ │ + ├─ river.NewClient( │ + │ riverpgxv5.New(pool), │ + │ &river.Config{ │ + │ Logger: slog.Default(), │ + │ Workers: workers, │ + │ ErrorHandler: &SlogErrorHandler{}, │ + │ PeriodicJobs: [...], │ + │ Queues: {default: MaxWorkers:10}, │ + │ }) │ + │ │ + ├─ riverClient.Start(ctx) │ + │ │ + ├─ <-ctx.Done() (SIGINT/SIGTERM) │ + │ │ + └─ riverClient.StopAndCancel(stopCtx) │ + pool.Close() +``` + +**River internal flow (per tick):** +``` +River scheduler goroutine + → inserts job row into river_job (Postgres) + → NOTIFY river_jobs channel + → worker goroutine picks up job + → calls Worker[T].Work(ctx, job) + → on success: marks river_job completed + → on error: increments attempts, schedules retry (attempts^4 + jitter) + → on max attempts (25 default): marks discarded +``` + +### Recommended Project Structure +``` +backend/ +├── cmd/worker/ +│ └── main.go # Replace noop body; full river wiring here +├── internal/ +│ ├── jobs/ +│ │ ├── heartbeat.go # HeartbeatArgs + HeartbeatWorker +│ │ └── orphan_cleanup.go # OrphanCleanupArgs + OrphanCleanupWorker +│ └── db/ +│ └── queries/ +│ └── files.sql # Add: ListOrphanFiles query +``` + +The `internal/jobs/` package is the natural home for job args + worker structs, following the project's handler/store separation pattern (domain logic in `internal/`, not in `cmd/`). + +### Pattern 1: Job Args + Worker Definition + +```go +// Source: https://riverqueue.com/docs/inserting-and-working-jobs + +// HeartbeatArgs carries no data — heartbeat is purely a proof-of-life tick. +type HeartbeatArgs struct{} + +func (HeartbeatArgs) Kind() string { return "heartbeat" } + +type HeartbeatWorker struct { + river.WorkerDefaults[HeartbeatArgs] +} + +func (w *HeartbeatWorker) Work(ctx context.Context, job *river.Job[HeartbeatArgs]) error { + slog.Info("worker heartbeat", + "job_id", job.ID, + "attempt", job.Attempt, + ) + return nil +} +``` + +```go +// OrphanCleanupArgs carries no data — query runs at execution time. +type OrphanCleanupArgs struct{} + +func (OrphanCleanupArgs) Kind() string { return "orphan_file_cleanup" } + +type OrphanCleanupWorker struct { + river.WorkerDefaults[OrphanCleanupArgs] + pool *pgxpool.Pool + store files.FileStorer +} + +func (w *OrphanCleanupWorker) Work(ctx context.Context, job *river.Job[OrphanCleanupArgs]) error { + orphans, err := sqlc.New(w.pool).ListOrphanFiles(ctx) + if err != nil { + return fmt.Errorf("list orphan files: %w", err) + } + var deleted, errCount int + for _, f := range orphans { + if err := w.store.Delete(ctx, f.S3Key); err != nil { + slog.Error("orphan s3 delete failed", "s3_key", f.S3Key, "err", err) + errCount++ + continue + } + if err := sqlc.New(w.pool).DeleteTabloFile(ctx, sqlc.DeleteTabloFileParams{...}); err != nil { + slog.Error("orphan db delete failed", "file_id", f.ID, "err", err) + errCount++ + continue + } + deleted++ + } + slog.Info("orphan cleanup complete", + "orphans_found", len(orphans), + "deleted", deleted, + "errors", errCount, + ) + return nil +} +``` + +### Pattern 2: rivermigrate at Startup + +```go +// Source: https://riverqueue.com/docs/migrations (verified Context7) +import ( + "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivermigrate" +) + +migrator, err := rivermigrate.New(riverpgxv5.New(pool), nil) +if err != nil { + slog.Error("rivermigrate init failed", "err", err) + os.Exit(1) +} +res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, nil) +if err != nil { + slog.Error("river migration failed", "err", err) + os.Exit(1) +} +for _, v := range res.Versions { + slog.Info("river migration applied", "version", v.Version) +} +``` + +`rivermigrate.DirectionUp` with `nil` opts migrates all the way to the latest river schema version. This is idempotent — already-applied versions are skipped. + +### Pattern 3: river.Client Construction and Start + +```go +// Source: https://riverqueue.com/docs/inserting-and-working-jobs (verified Context7) +workers := river.NewWorkers() +river.AddWorker(workers, &HeartbeatWorker{}) +river.AddWorker(workers, &OrphanCleanupWorker{pool: pool, store: fileStore}) + +riverClient, err := river.NewClient(riverpgxv5.New(pool), &river.Config{ + Logger: slog.Default(), // wires river's internal logs to project slog + Workers: workers, + Queues: map[string]river.QueueConfig{ + river.QueueDefault: {MaxWorkers: 10}, + }, + PeriodicJobs: []*river.PeriodicJob{ + river.NewPeriodicJob( + river.PeriodicInterval(1*time.Minute), + func() (river.JobArgs, *river.InsertOpts) { + return HeartbeatArgs{}, nil + }, + &river.PeriodicJobOpts{RunOnStart: true}, + ), + river.NewPeriodicJob( + river.PeriodicInterval(1*time.Hour), + func() (river.JobArgs, *river.InsertOpts) { + return OrphanCleanupArgs{}, nil + }, + &river.PeriodicJobOpts{RunOnStart: false}, + ), + }, + ErrorHandler: &SlogErrorHandler{}, +}) +if err != nil { + slog.Error("river client init failed", "err", err) + os.Exit(1) +} +if err := riverClient.Start(ctx); err != nil { + slog.Error("river client start failed", "err", err) + os.Exit(1) +} +``` + +### Pattern 4: slog-Based ErrorHandler + +River has two ways to integrate with slog: + +1. **`Config.Logger`** — pass `slog.Default()` (or any `*slog.Logger`). River uses it internally for its own operational logs (job picked up, completed, retried, discarded). This is the primary log integration. [VERIFIED: Context7 — `river.Config.Logger` accepts `*slog.Logger`] + +2. **`ErrorHandler`** — optional custom struct for extra telemetry on failures. Use this to emit a slog line with all failure details in a single structured record: + +```go +// Source: https://riverqueue.com/docs/error-handling (verified Context7) +type SlogErrorHandler struct{} + +func (*SlogErrorHandler) HandleError( + ctx context.Context, job *rivertype.JobRow, err error, +) *river.ErrorHandlerResult { + slog.Error("job error", + "job_id", job.ID, + "job_kind", job.Kind, + "attempt", job.Attempt, + "max_attempts", job.MaxAttempts, + "err", err, + ) + return nil // nil = follow default retry schedule +} + +func (*SlogErrorHandler) HandlePanic( + ctx context.Context, job *rivertype.JobRow, panicVal any, trace string, +) *river.ErrorHandlerResult { + slog.Error("job panic", + "job_id", job.ID, + "job_kind", job.Kind, + "panic", panicVal, + "trace", trace, + ) + return nil +} +``` + +### Pattern 5: Graceful Shutdown + +The existing `cmd/worker/main.go` uses `signal.NotifyContext`. River's `StopAndCancel` fits this directly: + +```go +// Source: https://riverqueue.com/docs/graceful-shutdown (verified Context7) +// Simplified version appropriate for single-worker v1 (no two-signal escalation needed). +<-ctx.Done() // context cancelled by SIGINT/SIGTERM + +stopCtx, stopCancel := context.WithTimeout(context.Background(), 10*time.Second) +defer stopCancel() + +if err := riverClient.StopAndCancel(stopCtx); err != nil { + slog.Warn("river stop did not finish cleanly", "err", err) +} +pool.Close() +slog.Info("shutdown complete") +``` + +`StopAndCancel` cancels running job contexts, waits for goroutines to exit, then returns. For periodic jobs that run for a few milliseconds (heartbeat, cleanup) this will always succeed well within 10 seconds. + +`riverClient.Stop(ctx)` waits for in-flight jobs to complete gracefully. `StopAndCancel` cancels their contexts first. For Phase 6's short-lived jobs, both are equivalent. `StopAndCancel` is the safer default. + +### Pattern 6: Orphan-File sqlc Query + +Add to `backend/internal/db/queries/files.sql`: + +```sql +-- name: ListOrphanFiles :many +-- Find tablo_files rows whose owning tablo no longer exists. +-- Used by the orphan-file cleanup worker (Phase 6 WORK-02). +SELECT id, tablo_id, s3_key +FROM tablo_files tf +WHERE NOT EXISTS ( + SELECT 1 FROM tablos t WHERE t.id = tf.tablo_id +); +``` + +**Why NOT EXISTS over LEFT JOIN:** `NOT EXISTS` short-circuits on first match — the planner can use the tablos PK index. `LEFT JOIN ... WHERE t.id IS NULL` produces the same result but is less readable for sqlc annotation. [ASSUMED — both are semantically equivalent; optimizer typically treats them the same] + +`tablo_files.tablo_id` has a FK to `tablos(id)` defined as `ON DELETE CASCADE` (from `0005_files.sql`). This means orphan rows can only exist if the FK was bypassed (e.g., direct SQL delete on `tablos` that bypassed the cascade, or a bug). The query is a safety net — it should normally find zero rows. [VERIFIED: `0005_files.sql` — FK with ON DELETE CASCADE] + +The sqlc return type for this query would be a new struct with just `{ID, TabloID, S3Key}` — add it with `:many` annotation and run `just generate`. + +After deleting from S3, delete the DB row using the existing `DeleteTabloFile` query. Note: `DeleteTabloFile` takes `id` and `tablo_id` — both are available from the orphan query result. + +### Anti-Patterns to Avoid + +- **Don't share the `riverpgxv5.New(pool)` driver between the migrator and the client.** Each call to `riverpgxv5.New(pool)` returns a fresh driver instance wrapping the same pool — this is fine and correct. Do not try to reuse the driver object. +- **Don't call `riverClient.Start(ctx)` with the signal-cancelled context.** `Start` must receive a live (non-cancelled) context. If the signal fires before `Start` completes, the client will not start. Use the signal context for job execution lifecycle (jobs inherit it), but start before awaiting the done channel. +- **Don't wrap the pgxpool in PgBouncer for the worker.** River uses `LISTEN`/`NOTIFY` which requires session-mode connections. Direct pgxpool (no bouncer) is correct — the project does not use PgBouncer. [VERIFIED: river docs pgbouncer page] +- **Don't run rivermigrate while the river client is also running.** Call `Migrate` before `NewClient` / `Start`. The migration acquires a lock; running it concurrently with a live client is unsafe. +- **Don't delete S3 object before DB row.** Delete S3 first in the orphan cleanup loop — if DB delete fails after S3 delete, the row is re-queued on next run but the S3 object is gone (orphan re-found on next run but nothing to delete). Reverse order risks a window where the row is gone but the S3 object leaks permanently. [ASSUMED — standard convention, not river-specific] + +--- + +## Don't Hand-Roll + +| Problem | Don't Build | Use Instead | Why | +|---------|-------------|-------------|-----| +| Job queue with Postgres | Custom `pg_notify` poller | `github.com/riverqueue/river` | River handles advisory locking, concurrent workers, backoff, discarding, schema | +| Retry with backoff | Custom retry loop with sleep | River's built-in 25-attempt `attempts^4` schedule | Jitter, persistence across restarts, observable state in DB | +| Schema migrations for queue | Goose file for river_job | `rivermigrate.New(...).Migrate(...)` | River owns and evolves its own schema; mixing with goose creates version conflicts | +| Periodic scheduler | `time.Ticker` in a goroutine | `river.PeriodicJob` | River ensures exactly-one-insert even on leader failover; `time.Ticker` double-fires on restart | + +--- + +## Common Pitfalls + +### Pitfall 1: rivermigrate is a sub-package, not a separate module +**What goes wrong:** Developer runs `go get github.com/riverqueue/river/rivermigrate` — this fails because there is no separate module at that path. +**Why it happens:** rivermigrate lives inside the main `river` module at the sub-package path `rivermigrate/`. Adding only the main module is sufficient. +**How to avoid:** `go get github.com/riverqueue/river@v0.37.0` — then import `"github.com/riverqueue/river/rivermigrate"` in code. +[VERIFIED: Go module proxy — no separate module exists for rivermigrate] + +### Pitfall 2: Passing signal context to riverClient.Start() +**What goes wrong:** `riverClient.Start(signalCtx)` returns immediately with a context-cancelled error if the signal fires first, or prevents the client from ever starting if the context is already done. +**Why it happens:** `Start` takes the root context that jobs inherit — if this context is cancelled, all running jobs are cancelled. +**How to avoid:** Pass the signal context to `Start` only while the binary is running normally. The signal context cancellation triggers `<-ctx.Done()`, after which you call `riverClient.StopAndCancel(timeoutCtx)` with a fresh background-derived timeout context. +[VERIFIED: Context7 — river graceful shutdown docs] + +### Pitfall 3: Worker struct without WorkerDefaults embedding +**What goes wrong:** Compiler error or runtime panic — `river.AddWorker` requires the worker type to implement the full `river.Worker[T]` interface including `NextRetry`, `Timeout`, and other optional methods. +**Why it happens:** Missing `river.WorkerDefaults[T]` embedding — it provides no-op default implementations of all optional interface methods. +**How to avoid:** Every worker struct must embed `river.WorkerDefaults[T]`. +[VERIFIED: Context7 — river worker registration docs] + +### Pitfall 4: MaxWorkers too high for single VPS +**What goes wrong:** Worker spins up 100 goroutines competing for Postgres connections. +**Why it happens:** Default examples show `MaxWorkers: 100` — this is for high-throughput services, not a single-VPS periodic worker with 2 jobs. +**How to avoid:** Set `MaxWorkers: 10` for Phase 6. The project has `db.NewPool` with `MaxConns: 10`; river's concurrency should not exceed pool size. +[VERIFIED: `backend/internal/db/pool.go` — MaxConns: 10] + +### Pitfall 5: ON DELETE CASCADE makes orphan query always return 0 rows in normal operation +**What goes wrong:** Developer sets up orphan cleanup, runs it, sees 0 rows, assumes the query is broken. +**Why it happens:** `tablo_files.tablo_id` has `ON DELETE CASCADE` — when a tablo is deleted, all its file rows are deleted automatically by Postgres. Orphans can only exist from direct SQL operations that bypass the cascade. +**How to avoid:** This is expected behavior — the job is a safety net. Test the query by inserting a `tablo_files` row manually with a nonexistent `tablo_id` (bypassing FK in test setup with `-- no FK check` or by inserting a tablo, inserting a file, then deleting the tablo via raw DELETE that bypasses cascade). +[VERIFIED: `backend/migrations/0005_files.sql` — ON DELETE CASCADE] + +### Pitfall 6: Running rivermigrate Migrate() while another instance is already migrated +**What goes wrong:** Concern about running `Migrate(DirectionUp, nil)` on every startup when river tables already exist. +**Why it happens:** Developers familiar with destructive migration tools assume "up" re-runs applied steps. +**How to avoid:** This is safe and idempotent — river tracks applied migration versions in its own table (`river_migration`). Already-applied versions are skipped. `nil` opts = migrate to latest. +[VERIFIED: Context7 — rivermigrate docs: "already-applied versions are skipped"] + +--- + +## Code Examples + +### Full cmd/worker/main.go structure + +```go +// Source: assembled from river docs (Context7) + existing skeleton patterns + +package main + +import ( + "context" + "fmt" + "log/slog" + "os" + "time" + + "backend/internal/db" + "backend/internal/files" + "backend/internal/jobs" + "backend/internal/web" + + "github.com/riverqueue/river" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivermigrate" +) + +func main() { + env := os.Getenv("ENV") + if env == "" { + env = "development" + } + dsn := os.Getenv("DATABASE_URL") + slog.SetDefault(slog.New(web.NewSlogHandler(env, os.Stdout))) + + if dsn == "" { + slog.Error("DATABASE_URL is required but unset") + os.Exit(1) + } + + // Background context for startup (not signal-scoped). + ctx := context.Background() + + pool, err := db.NewPool(ctx, dsn) + if err != nil { + slog.Error("db connect failed", "err", err) + os.Exit(1) + } + defer pool.Close() + + // Step 1: Run river migrations before anything else. + migrator, err := rivermigrate.New(riverpgxv5.New(pool), nil) + if err != nil { + slog.Error("rivermigrate init failed", "err", err) + os.Exit(1) + } + res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, nil) + if err != nil { + slog.Error("river migration failed", "err", err) + os.Exit(1) + } + for _, v := range res.Versions { + slog.Info("river migration applied", "version", v.Version) + } + + // Step 2: Set up S3 store for orphan cleanup. + fileStore, err := files.NewStore(ctx, + os.Getenv("S3_ENDPOINT"), + os.Getenv("S3_BUCKET"), + os.Getenv("S3_REGION"), + os.Getenv("S3_ACCESS_KEY"), + os.Getenv("S3_SECRET_KEY"), + os.Getenv("S3_USE_PATH_STYLE") == "true", + ) + if err != nil { + slog.Error("file store init failed", "err", err) + os.Exit(1) + } + + // Step 3: Construct signal context AFTER all startup I/O. + sigCtx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) + defer stop() + + // Step 4: Register workers. + workers := river.NewWorkers() + river.AddWorker(workers, &jobs.HeartbeatWorker{}) + river.AddWorker(workers, jobs.NewOrphanCleanupWorker(pool, fileStore)) + + // Step 5: Construct river client. + riverClient, err := river.NewClient(riverpgxv5.New(pool), &river.Config{ + Logger: slog.Default(), + Workers: workers, + Queues: map[string]river.QueueConfig{ + river.QueueDefault: {MaxWorkers: 10}, + }, + ErrorHandler: &jobs.SlogErrorHandler{}, + PeriodicJobs: []*river.PeriodicJob{ + river.NewPeriodicJob( + river.PeriodicInterval(1*time.Minute), + func() (river.JobArgs, *river.InsertOpts) { return jobs.HeartbeatArgs{}, nil }, + &river.PeriodicJobOpts{RunOnStart: true}, + ), + river.NewPeriodicJob( + river.PeriodicInterval(1*time.Hour), + func() (river.JobArgs, *river.InsertOpts) { return jobs.OrphanCleanupArgs{}, nil }, + nil, + ), + }, + }) + if err != nil { + slog.Error("river client init failed", "err", err) + os.Exit(1) + } + + if err := riverClient.Start(sigCtx); err != nil { + slog.Error("river client start failed", "err", err) + os.Exit(1) + } + + slog.Info("worker ready") + + <-sigCtx.Done() + slog.Info("shutting down") + + stopCtx, stopCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer stopCancel() + + if err := riverClient.StopAndCancel(stopCtx); err != nil { + slog.Warn("river stop did not finish cleanly", "err", err) + } + slog.Info("shutdown complete") +} +``` + +### Orphan-file sqlc query + +```sql +-- Source: project pattern (sqlc convention) + LEFT JOIN / NOT EXISTS analysis +-- Add to: backend/internal/db/queries/files.sql + +-- name: ListOrphanFiles :many +SELECT id, tablo_id, s3_key +FROM tablo_files tf +WHERE NOT EXISTS ( + SELECT 1 FROM tablos t WHERE t.id = tf.tablo_id +); +``` + +Run `just generate` after adding this query. The generated function signature will be: +```go +func (q *Queries) ListOrphanFiles(ctx context.Context) ([]ListOrphanFilesRow, error) +``` + +### justfile targets to add + +```just +# Start the worker binary (development — requires db-up first). +worker: + DATABASE_URL='{{ database_url }}' \ + S3_ENDPOINT='http://localhost:9000' \ + S3_BUCKET='xtablo' \ + S3_REGION='us-east-1' \ + S3_ACCESS_KEY='minioadmin' \ + S3_SECRET_KEY='minioadmin' \ + S3_USE_PATH_STYLE='true' \ + go run ./cmd/worker + +# Stream worker logs (if running as background process via air or separate terminal). +worker-logs: + @echo "Start worker in a separate terminal: just worker" + @echo "Logs appear in that terminal (structured slog output)." +``` + +--- + +## State of the Art + +| Old Approach | Current Approach | When Changed | Impact | +|--------------|------------------|--------------|--------| +| `time.Ticker` goroutine for periodic work | `river.PeriodicJob` with Postgres-persisted schedule | River v0.1+ | Survives restarts, observable, no double-fire | +| Separate broker (Redis/RabbitMQ) | Postgres-native with river | 2023–2024 | No extra infra, one less operational concern | +| `database/sql` for river driver | `riverpgxv5.New(pgxpool.Pool)` | River v0.1+ | Idiomatic pgx/v5; avoids stdlib adapter overhead | + +--- + +## Assumptions Log + +| # | Claim | Section | Risk if Wrong | +|---|-------|---------|---------------| +| A1 | `NOT EXISTS` and `LEFT JOIN IS NULL` are equivalent in this context and optimizer treats them similarly | Orphan query pattern | Wrong result: neither — both produce correct results; performance difference is negligible at v1 scale | +| A2 | S3 delete before DB delete is the safer ordering for orphan cleanup | Anti-patterns | Low: worst case on partial failure is a re-detectable orphan on next run | + +--- + +## Open Questions + +1. **S3 env var names for worker** + - What we know: Phase 5 wired S3 env vars in `cmd/web`. Worker needs the same vars. + - What's unclear: Whether the planner reuses the same env var names or introduces `WORKER_S3_*` variants. + - Recommendation: Reuse the same env var names (`S3_ENDPOINT`, `S3_BUCKET`, etc.) — both binaries share the same `.env.example` pattern. + +2. **DeleteTabloFile query takes (id, tablo_id) — orphan rows have no owning tablo** + - What we know: `DeleteTabloFile` has signature `WHERE id = $1 AND tablo_id = $2`. + - What's unclear: For a true orphan (tablo deleted bypassing cascade), the `tablo_id` column still holds the old UUID even though the tablo row is gone. `$2` matches the stored value, so the DELETE still works. + - Recommendation: No change needed — the query works correctly. Planner should note this in task comments. + +--- + +## Environment Availability + +| Dependency | Required By | Available | Version | Fallback | +|------------|------------|-----------|---------|----------| +| Go 1.26 | worker binary build | ✓ | 1.26.1 (go.mod) | — | +| PostgreSQL | rivermigrate, river queue | ✓ | via compose.yaml | — | +| MinIO (S3) | orphan-file cleanup job | ✓ | via compose.yaml (Phase 5) | — | +| `just` | justfile targets | ✓ | (existing justfile in use) | — | +| `sqlc` | `just generate` | ✓ | v1.31.1 (pinned in justfile) | — | + +[VERIFIED: `backend/go.mod` — `go 1.26.1`] +[VERIFIED: `backend/compose.yaml` — MinIO present from Phase 5 (per CONTEXT.md canonical refs)] +[VERIFIED: `backend/justfile` — sqlc_version = "v1.31.1", `just generate` target exists] + +--- + +## Validation Architecture + +### Test Framework +| Property | Value | +|----------|-------| +| Framework | Go testing + `go test ./...` | +| Config file | None — standard Go test runner | +| Quick run command | `go test ./internal/jobs/...` | +| Full suite command | `just test` (`go test ./...`) | + +### Phase Requirements → Test Map + +| Req ID | Behavior | Test Type | Automated Command | File Exists? | +|--------|----------|-----------|-------------------|-------------| +| WORK-01 | rivermigrate runs without error against test DB | integration | `go test ./cmd/worker/... -run TestWorkerMigrate` | ❌ Wave 0 | +| WORK-01 | river.Client starts and reaches ready state | integration | `go test ./cmd/worker/... -run TestWorkerReady` | ❌ Wave 0 | +| WORK-02 | HeartbeatWorker.Work() returns nil and logs | unit | `go test ./internal/jobs/... -run TestHeartbeatWorker` | ❌ Wave 0 | +| WORK-02 | OrphanCleanupWorker.Work() deletes orphan row + S3 object | unit | `go test ./internal/jobs/... -run TestOrphanCleanupWorker` | ❌ Wave 0 | +| WORK-02 | ListOrphanFiles query returns rows with deleted tablos | integration | `go test ./internal/db/... -run TestListOrphanFiles` | ❌ Wave 0 | +| WORK-03 | Worker shuts down cleanly on context cancel | unit | `go test ./internal/jobs/... -run TestGracefulShutdown` | ❌ Wave 0 | +| WORK-04 | SlogErrorHandler logs error fields correctly | unit | `go test ./internal/jobs/... -run TestSlogErrorHandler` | ❌ Wave 0 | + +**Recommended v1 approach:** For Phase 6, unit tests for individual `Worker[T].Work()` methods are the primary test surface. Use `rivertest.NewWorker(t, riverpgxv5.New(nil), &river.Config{}, worker)` for isolated worker tests without a real DB connection. Integration tests for rivermigrate require a real Postgres connection (same pattern as existing `testdb_test.go` in `internal/web`). + +### Sampling Rate +- **Per task commit:** `go test ./internal/jobs/...` +- **Per wave merge:** `just test` +- **Phase gate:** Full suite green before `/gsd-verify-work` + +### Wave 0 Gaps +- [ ] `backend/internal/jobs/heartbeat_test.go` — covers WORK-02 HeartbeatWorker +- [ ] `backend/internal/jobs/orphan_cleanup_test.go` — covers WORK-02 OrphanCleanupWorker +- [ ] `backend/internal/jobs/error_handler_test.go` — covers WORK-04 SlogErrorHandler +- [ ] `backend/internal/db/sqlc/files_test.go` (or extension of existing) — covers WORK-02 ListOrphanFiles query + +--- + +## Security Domain + +### Applicable ASVS Categories + +| ASVS Category | Applies | Standard Control | +|---------------|---------|-----------------| +| V2 Authentication | no | Worker has no HTTP surface | +| V3 Session Management | no | No sessions | +| V4 Access Control | no | No HTTP routes | +| V5 Input Validation | partial | Job args are Go structs deserialized from Postgres JSON — no user-supplied input in Phase 6 periodic jobs | +| V6 Cryptography | no | No crypto operations | + +### Known Threat Patterns for Worker/Queue Stack + +| Pattern | STRIDE | Standard Mitigation | +|---------|--------|---------------------| +| Job args injection via tampered Postgres rows | Tampering | Not applicable in Phase 6 — periodic jobs have empty args structs; no user-supplied data | +| S3 credential leak via worker env vars | Information Disclosure | Use same `.env.example` pattern; never commit secrets | +| Worker running as root in container | Elevation of Privilege | Dockerfile (Phase 7 scope) should use non-root user | +| Advisory lock not respected by second worker instance | Tampering | D-08: document single-worker constraint; river's locking provides partial protection | + +--- + +## Sources + +### Primary (HIGH confidence) +- `/websites/riverqueue` (Context7) — PeriodicJob config, riverpgxv5.New, rivermigrate.New, ErrorHandler interface, Logger field, graceful shutdown, AddWorker, WorkerDefaults, retry defaults +- `/riverqueue/river` (Context7) — rivermigrate import path, MigrateResult, WorkFunc pattern +- Go module proxy `proxy.golang.org` — verified river v0.37.0 and riverpgxv5 v0.37.0 published 2026-05-11 +- `backend/migrations/0005_files.sql` — FK definition and ON DELETE CASCADE +- `backend/internal/db/pool.go` — MaxConns: 10 +- `backend/internal/files/store.go` — Delete(ctx, key) method signature +- `backend/cmd/worker/main.go` — existing skeleton patterns +- `backend/justfile` — existing targets and env var conventions + +### Secondary (MEDIUM confidence) +- `backend/internal/web/slog.go` — `web.NewSlogHandler` signature (used to confirm slog integration pattern) +- `backend/internal/db/sqlc/db.go` — DBTX interface and sqlc.New pattern (confirms orphan query can use `sqlc.New(pool)`) + +--- + +## Metadata + +**Confidence breakdown:** +- Standard stack: HIGH — verified via Go module proxy (v0.37.0, 2026-05-11) +- River API patterns: HIGH — verified via Context7 official docs (multiple queries) +- Architecture: HIGH — derived directly from existing codebase patterns +- Orphan query: HIGH — derived from confirmed schema (0005_files.sql, 0003_tablos.sql) +- Pitfalls: HIGH — verified against official docs where applicable; A1/A2 are minor ASSUMED items + +**Research date:** 2026-05-15 +**Valid until:** 2026-06-15 (river is actively maintained; verify before pinning if delayed)