docs(06): research phase background-worker

This commit is contained in:
Arthur Belleville 2026-05-15 16:12:11 +02:00
parent d8a130cd01
commit 27cecc6701
No known key found for this signature in database

View file

@ -0,0 +1,753 @@
# Phase 6: Background Worker - Research
**Researched:** 2026-05-15
**Domain:** River (Postgres-native job queue), Go worker binary, periodic jobs, sqlc orphan query
**Confidence:** HIGH
---
<user_constraints>
## User Constraints (from CONTEXT.md)
### Locked Decisions
- **D-01:** Use river (`github.com/riverqueue/river`) as the job queue. Postgres-native, adds one Go dependency, provides built-in retry/backoff and advisory locking.
- **D-02:** River's schema managed via `rivermigrate` run programmatically at worker startup (before the client starts listening). No goose migration needed for river's internal tables.
- **D-03:** Phase 6 uses **periodic jobs only** (`river.PeriodicJob`). Web-side enqueueing deferred.
- **D-04:** Two jobs ship: **Heartbeat** (1 min) + **Orphan-file cleanup** (1 hr).
- **D-05:** Both registered as `river.PeriodicJob` entries at worker startup.
- **D-06:** Failed jobs surfaced via **structured logs only** (no CLI command or admin route).
- **D-07:** Single `river.Client` in `cmd/worker/main.go`, blocks on SIGINT/SIGTERM.
- **D-08:** Single-worker constraint — README documents: do not run multiple worker processes.
### Claude's Discretion
- Exact log fields emitted by the heartbeat job (e.g. worker uptime, job count).
- Whether the orphan-file cleanup job logs a per-run summary (rows deleted, S3 objects deleted, errors).
- River client configuration details: worker concurrency, max attempts before discard, queue name.
- Whether the orphan detection query uses LEFT JOIN or NOT IN / NOT EXISTS.
### Deferred Ideas (OUT OF SCOPE)
- Web-side job enqueueing (river client in `cmd/web`).
- Multiple worker instances / leader election.
- Job admin UI or CLI subcommand.
- Redis / asynq.
</user_constraints>
---
<phase_requirements>
## Phase Requirements
| ID | Description | Research Support |
|----|-------------|------------------|
| WORK-01 | `cmd/worker` connects to same Postgres and runs a job queue | D-01/D-02: river + riverpgxv5 + rivermigrate wiring documented below |
| WORK-02 | At least one real job runs end-to-end | D-04: Heartbeat + Orphan-file cleanup; sqlc query pattern documented |
| WORK-03 | Worker has structured logging and graceful shutdown matching the web binary | River Logger field accepts `*slog.Logger`; shutdown pattern documented |
| WORK-04 | Failed jobs retried with backoff and visible in a simple admin/CLI surface | Default 25 retries, `attempts^4` backoff; ErrorHandler wires to slog |
</phase_requirements>
---
## Summary
River is a Postgres-native job queue for Go. It stores jobs in `river_job` and related tables inside the application Postgres database, using `LISTEN`/`NOTIFY` for fast pickup and advisory locks for leader election. There is no external broker process. The `riverpgxv5` driver wraps a `*pgxpool.Pool` directly — the project's existing `db.NewPool` result is passed in unchanged.
River manages its own schema through a separate migration path (`rivermigrate`). The planner must call `migrator.Migrate(ctx, rivermigrate.DirectionUp, nil)` once at worker startup before constructing the `river.Client`. This is a pure code operation — no goose migration file is needed for river's tables.
Periodic jobs are declared as `[]*river.PeriodicJob` in `river.Config` at construction time. Each entry receives a schedule, a constructor function that returns `river.JobArgs`, and optional opts. River inserts the job into `river_job` on each tick; a registered `Worker[T]` picks it up. The `tablo_files` orphan query uses a `NOT EXISTS` subquery (preferred over `LEFT JOIN IS NULL` for clarity with sqlc) against the `tablos` table, which already exists and has a primary key index.
**Primary recommendation:** Wire `rivermigrate.New(riverpgxv5.New(pool), nil)` at startup, construct `river.Client` with two `PeriodicJob` entries in `Config`, pass `Logger: slog.Default()` to propagate structured logs, implement `ErrorHandler` to log failures via slog, and hook `riverClient.StopAndCancel` into the existing `signal.NotifyContext` pattern.
---
## Architectural Responsibility Map
| Capability | Primary Tier | Secondary Tier | Rationale |
|------------|-------------|----------------|-----------|
| Job queue schema migration | Worker binary (startup) | — | rivermigrate runs once at worker boot, owns river_* tables |
| Periodic job scheduling | Worker binary (river scheduler) | — | River's internal scheduler runs inside the same process |
| Heartbeat job execution | Worker binary | — | In-process worker goroutine; writes to slog |
| Orphan-file detection | Worker binary → Postgres | — | sqlc query runs against the DB pool passed to the worker |
| S3 object deletion (orphan) | Worker binary → S3 | — | Reuses `files.Store.Delete` already implemented in Phase 5 |
| Failed job retry | River runtime (automatic) | Worker ErrorHandler | River reschedules; ErrorHandler logs; no external coordination |
| Graceful shutdown | Worker binary | River client Stop/StopAndCancel | Existing `signal.NotifyContext` triggers `riverClient.StopAndCancel` |
---
## Standard Stack
### Core
| Library | Version | Purpose | Why Standard |
|---------|---------|---------|--------------|
| `github.com/riverqueue/river` | v0.37.0 | Job queue runtime, periodic job scheduler | Locked in D-01; Postgres-native, no broker |
| `github.com/riverqueue/river/riverdriver/riverpgxv5` | v0.37.0 | River driver for pgx/v5 pool | Required adapter; project uses pgx/v5 |
`rivermigrate` is a **sub-package** of the main `river` module — imported as `github.com/riverqueue/river/rivermigrate`. It does NOT have its own go module path. Adding `github.com/riverqueue/river` and `github.com/riverqueue/river/riverdriver/riverpgxv5` to `go.mod` is sufficient.
[VERIFIED: Go module proxy — `github.com/riverqueue/river` v0.37.0 published 2026-05-11]
[VERIFIED: Go module proxy — `github.com/riverqueue/river/riverdriver/riverpgxv5` v0.37.0 published 2026-05-11]
**Installation:**
```bash
cd backend
go get github.com/riverqueue/river@v0.37.0
go get github.com/riverqueue/river/riverdriver/riverpgxv5@v0.37.0
```
---
## Architecture Patterns
### System Architecture Diagram
```
cmd/worker/main.go
├─ db.NewPool(ctx, dsn) ──► pgxpool.Pool
│ │
├─ rivermigrate.New( │
│ riverpgxv5.New(pool), nil) │
│ .Migrate(ctx, DirectionUp, nil) │ (creates river_* tables)
│ │
├─ files.NewStore(...) │
│ └─► files.FileStorer │
│ │
├─ river.NewWorkers() │
│ river.AddWorker(workers, │
&HeartbeatWorker{}) │
│ river.AddWorker(workers, │
&OrphanCleanupWorker{pool, store}) │
│ │
├─ river.NewClient( │
│ riverpgxv5.New(pool), │
&river.Config{ │
│ Logger: slog.Default(), │
│ Workers: workers, │
│ ErrorHandler: &SlogErrorHandler{}, │
│ PeriodicJobs: [...], │
│ Queues: {default: MaxWorkers:10}, │
│ }) │
│ │
├─ riverClient.Start(ctx) │
│ │
├─ <-ctx.Done() (SIGINT/SIGTERM)
│ │
└─ riverClient.StopAndCancel(stopCtx) │
pool.Close()
```
**River internal flow (per tick):**
```
River scheduler goroutine
→ inserts job row into river_job (Postgres)
→ NOTIFY river_jobs channel
→ worker goroutine picks up job
→ calls Worker[T].Work(ctx, job)
→ on success: marks river_job completed
→ on error: increments attempts, schedules retry (attempts^4 + jitter)
→ on max attempts (25 default): marks discarded
```
### Recommended Project Structure
```
backend/
├── cmd/worker/
│ └── main.go # Replace noop body; full river wiring here
├── internal/
│ ├── jobs/
│ │ ├── heartbeat.go # HeartbeatArgs + HeartbeatWorker
│ │ └── orphan_cleanup.go # OrphanCleanupArgs + OrphanCleanupWorker
│ └── db/
│ └── queries/
│ └── files.sql # Add: ListOrphanFiles query
```
The `internal/jobs/` package is the natural home for job args + worker structs, following the project's handler/store separation pattern (domain logic in `internal/<domain>`, not in `cmd/`).
### Pattern 1: Job Args + Worker Definition
```go
// Source: https://riverqueue.com/docs/inserting-and-working-jobs
// HeartbeatArgs carries no data — heartbeat is purely a proof-of-life tick.
type HeartbeatArgs struct{}
func (HeartbeatArgs) Kind() string { return "heartbeat" }
type HeartbeatWorker struct {
river.WorkerDefaults[HeartbeatArgs]
}
func (w *HeartbeatWorker) Work(ctx context.Context, job *river.Job[HeartbeatArgs]) error {
slog.Info("worker heartbeat",
"job_id", job.ID,
"attempt", job.Attempt,
)
return nil
}
```
```go
// OrphanCleanupArgs carries no data — query runs at execution time.
type OrphanCleanupArgs struct{}
func (OrphanCleanupArgs) Kind() string { return "orphan_file_cleanup" }
type OrphanCleanupWorker struct {
river.WorkerDefaults[OrphanCleanupArgs]
pool *pgxpool.Pool
store files.FileStorer
}
func (w *OrphanCleanupWorker) Work(ctx context.Context, job *river.Job[OrphanCleanupArgs]) error {
orphans, err := sqlc.New(w.pool).ListOrphanFiles(ctx)
if err != nil {
return fmt.Errorf("list orphan files: %w", err)
}
var deleted, errCount int
for _, f := range orphans {
if err := w.store.Delete(ctx, f.S3Key); err != nil {
slog.Error("orphan s3 delete failed", "s3_key", f.S3Key, "err", err)
errCount++
continue
}
if err := sqlc.New(w.pool).DeleteTabloFile(ctx, sqlc.DeleteTabloFileParams{...}); err != nil {
slog.Error("orphan db delete failed", "file_id", f.ID, "err", err)
errCount++
continue
}
deleted++
}
slog.Info("orphan cleanup complete",
"orphans_found", len(orphans),
"deleted", deleted,
"errors", errCount,
)
return nil
}
```
### Pattern 2: rivermigrate at Startup
```go
// Source: https://riverqueue.com/docs/migrations (verified Context7)
import (
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivermigrate"
)
migrator, err := rivermigrate.New(riverpgxv5.New(pool), nil)
if err != nil {
slog.Error("rivermigrate init failed", "err", err)
os.Exit(1)
}
res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, nil)
if err != nil {
slog.Error("river migration failed", "err", err)
os.Exit(1)
}
for _, v := range res.Versions {
slog.Info("river migration applied", "version", v.Version)
}
```
`rivermigrate.DirectionUp` with `nil` opts migrates all the way to the latest river schema version. This is idempotent — already-applied versions are skipped.
### Pattern 3: river.Client Construction and Start
```go
// Source: https://riverqueue.com/docs/inserting-and-working-jobs (verified Context7)
workers := river.NewWorkers()
river.AddWorker(workers, &HeartbeatWorker{})
river.AddWorker(workers, &OrphanCleanupWorker{pool: pool, store: fileStore})
riverClient, err := river.NewClient(riverpgxv5.New(pool), &river.Config{
Logger: slog.Default(), // wires river's internal logs to project slog
Workers: workers,
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 10},
},
PeriodicJobs: []*river.PeriodicJob{
river.NewPeriodicJob(
river.PeriodicInterval(1*time.Minute),
func() (river.JobArgs, *river.InsertOpts) {
return HeartbeatArgs{}, nil
},
&river.PeriodicJobOpts{RunOnStart: true},
),
river.NewPeriodicJob(
river.PeriodicInterval(1*time.Hour),
func() (river.JobArgs, *river.InsertOpts) {
return OrphanCleanupArgs{}, nil
},
&river.PeriodicJobOpts{RunOnStart: false},
),
},
ErrorHandler: &SlogErrorHandler{},
})
if err != nil {
slog.Error("river client init failed", "err", err)
os.Exit(1)
}
if err := riverClient.Start(ctx); err != nil {
slog.Error("river client start failed", "err", err)
os.Exit(1)
}
```
### Pattern 4: slog-Based ErrorHandler
River has two ways to integrate with slog:
1. **`Config.Logger`** — pass `slog.Default()` (or any `*slog.Logger`). River uses it internally for its own operational logs (job picked up, completed, retried, discarded). This is the primary log integration. [VERIFIED: Context7 — `river.Config.Logger` accepts `*slog.Logger`]
2. **`ErrorHandler`** — optional custom struct for extra telemetry on failures. Use this to emit a slog line with all failure details in a single structured record:
```go
// Source: https://riverqueue.com/docs/error-handling (verified Context7)
type SlogErrorHandler struct{}
func (*SlogErrorHandler) HandleError(
ctx context.Context, job *rivertype.JobRow, err error,
) *river.ErrorHandlerResult {
slog.Error("job error",
"job_id", job.ID,
"job_kind", job.Kind,
"attempt", job.Attempt,
"max_attempts", job.MaxAttempts,
"err", err,
)
return nil // nil = follow default retry schedule
}
func (*SlogErrorHandler) HandlePanic(
ctx context.Context, job *rivertype.JobRow, panicVal any, trace string,
) *river.ErrorHandlerResult {
slog.Error("job panic",
"job_id", job.ID,
"job_kind", job.Kind,
"panic", panicVal,
"trace", trace,
)
return nil
}
```
### Pattern 5: Graceful Shutdown
The existing `cmd/worker/main.go` uses `signal.NotifyContext`. River's `StopAndCancel` fits this directly:
```go
// Source: https://riverqueue.com/docs/graceful-shutdown (verified Context7)
// Simplified version appropriate for single-worker v1 (no two-signal escalation needed).
<-ctx.Done() // context cancelled by SIGINT/SIGTERM
stopCtx, stopCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer stopCancel()
if err := riverClient.StopAndCancel(stopCtx); err != nil {
slog.Warn("river stop did not finish cleanly", "err", err)
}
pool.Close()
slog.Info("shutdown complete")
```
`StopAndCancel` cancels running job contexts, waits for goroutines to exit, then returns. For periodic jobs that run for a few milliseconds (heartbeat, cleanup) this will always succeed well within 10 seconds.
`riverClient.Stop(ctx)` waits for in-flight jobs to complete gracefully. `StopAndCancel` cancels their contexts first. For Phase 6's short-lived jobs, both are equivalent. `StopAndCancel` is the safer default.
### Pattern 6: Orphan-File sqlc Query
Add to `backend/internal/db/queries/files.sql`:
```sql
-- name: ListOrphanFiles :many
-- Find tablo_files rows whose owning tablo no longer exists.
-- Used by the orphan-file cleanup worker (Phase 6 WORK-02).
SELECT id, tablo_id, s3_key
FROM tablo_files tf
WHERE NOT EXISTS (
SELECT 1 FROM tablos t WHERE t.id = tf.tablo_id
);
```
**Why NOT EXISTS over LEFT JOIN:** `NOT EXISTS` short-circuits on first match — the planner can use the tablos PK index. `LEFT JOIN ... WHERE t.id IS NULL` produces the same result but is less readable for sqlc annotation. [ASSUMED — both are semantically equivalent; optimizer typically treats them the same]
`tablo_files.tablo_id` has a FK to `tablos(id)` defined as `ON DELETE CASCADE` (from `0005_files.sql`). This means orphan rows can only exist if the FK was bypassed (e.g., direct SQL delete on `tablos` that bypassed the cascade, or a bug). The query is a safety net — it should normally find zero rows. [VERIFIED: `0005_files.sql` — FK with ON DELETE CASCADE]
The sqlc return type for this query would be a new struct with just `{ID, TabloID, S3Key}` — add it with `:many` annotation and run `just generate`.
After deleting from S3, delete the DB row using the existing `DeleteTabloFile` query. Note: `DeleteTabloFile` takes `id` and `tablo_id` — both are available from the orphan query result.
### Anti-Patterns to Avoid
- **Don't share the `riverpgxv5.New(pool)` driver between the migrator and the client.** Each call to `riverpgxv5.New(pool)` returns a fresh driver instance wrapping the same pool — this is fine and correct. Do not try to reuse the driver object.
- **Don't call `riverClient.Start(ctx)` with the signal-cancelled context.** `Start` must receive a live (non-cancelled) context. If the signal fires before `Start` completes, the client will not start. Use the signal context for job execution lifecycle (jobs inherit it), but start before awaiting the done channel.
- **Don't wrap the pgxpool in PgBouncer for the worker.** River uses `LISTEN`/`NOTIFY` which requires session-mode connections. Direct pgxpool (no bouncer) is correct — the project does not use PgBouncer. [VERIFIED: river docs pgbouncer page]
- **Don't run rivermigrate while the river client is also running.** Call `Migrate` before `NewClient` / `Start`. The migration acquires a lock; running it concurrently with a live client is unsafe.
- **Don't delete S3 object before DB row.** Delete S3 first in the orphan cleanup loop — if DB delete fails after S3 delete, the row is re-queued on next run but the S3 object is gone (orphan re-found on next run but nothing to delete). Reverse order risks a window where the row is gone but the S3 object leaks permanently. [ASSUMED — standard convention, not river-specific]
---
## Don't Hand-Roll
| Problem | Don't Build | Use Instead | Why |
|---------|-------------|-------------|-----|
| Job queue with Postgres | Custom `pg_notify` poller | `github.com/riverqueue/river` | River handles advisory locking, concurrent workers, backoff, discarding, schema |
| Retry with backoff | Custom retry loop with sleep | River's built-in 25-attempt `attempts^4` schedule | Jitter, persistence across restarts, observable state in DB |
| Schema migrations for queue | Goose file for river_job | `rivermigrate.New(...).Migrate(...)` | River owns and evolves its own schema; mixing with goose creates version conflicts |
| Periodic scheduler | `time.Ticker` in a goroutine | `river.PeriodicJob` | River ensures exactly-one-insert even on leader failover; `time.Ticker` double-fires on restart |
---
## Common Pitfalls
### Pitfall 1: rivermigrate is a sub-package, not a separate module
**What goes wrong:** Developer runs `go get github.com/riverqueue/river/rivermigrate` — this fails because there is no separate module at that path.
**Why it happens:** rivermigrate lives inside the main `river` module at the sub-package path `rivermigrate/`. Adding only the main module is sufficient.
**How to avoid:** `go get github.com/riverqueue/river@v0.37.0` — then import `"github.com/riverqueue/river/rivermigrate"` in code.
[VERIFIED: Go module proxy — no separate module exists for rivermigrate]
### Pitfall 2: Passing signal context to riverClient.Start()
**What goes wrong:** `riverClient.Start(signalCtx)` returns immediately with a context-cancelled error if the signal fires first, or prevents the client from ever starting if the context is already done.
**Why it happens:** `Start` takes the root context that jobs inherit — if this context is cancelled, all running jobs are cancelled.
**How to avoid:** Pass the signal context to `Start` only while the binary is running normally. The signal context cancellation triggers `<-ctx.Done()`, after which you call `riverClient.StopAndCancel(timeoutCtx)` with a fresh background-derived timeout context.
[VERIFIED: Context7 — river graceful shutdown docs]
### Pitfall 3: Worker struct without WorkerDefaults embedding
**What goes wrong:** Compiler error or runtime panic — `river.AddWorker` requires the worker type to implement the full `river.Worker[T]` interface including `NextRetry`, `Timeout`, and other optional methods.
**Why it happens:** Missing `river.WorkerDefaults[T]` embedding — it provides no-op default implementations of all optional interface methods.
**How to avoid:** Every worker struct must embed `river.WorkerDefaults[T]`.
[VERIFIED: Context7 — river worker registration docs]
### Pitfall 4: MaxWorkers too high for single VPS
**What goes wrong:** Worker spins up 100 goroutines competing for Postgres connections.
**Why it happens:** Default examples show `MaxWorkers: 100` — this is for high-throughput services, not a single-VPS periodic worker with 2 jobs.
**How to avoid:** Set `MaxWorkers: 10` for Phase 6. The project has `db.NewPool` with `MaxConns: 10`; river's concurrency should not exceed pool size.
[VERIFIED: `backend/internal/db/pool.go` — MaxConns: 10]
### Pitfall 5: ON DELETE CASCADE makes orphan query always return 0 rows in normal operation
**What goes wrong:** Developer sets up orphan cleanup, runs it, sees 0 rows, assumes the query is broken.
**Why it happens:** `tablo_files.tablo_id` has `ON DELETE CASCADE` — when a tablo is deleted, all its file rows are deleted automatically by Postgres. Orphans can only exist from direct SQL operations that bypass the cascade.
**How to avoid:** This is expected behavior — the job is a safety net. Test the query by inserting a `tablo_files` row manually with a nonexistent `tablo_id` (bypassing FK in test setup with `-- no FK check` or by inserting a tablo, inserting a file, then deleting the tablo via raw DELETE that bypasses cascade).
[VERIFIED: `backend/migrations/0005_files.sql` — ON DELETE CASCADE]
### Pitfall 6: Running rivermigrate Migrate() while another instance is already migrated
**What goes wrong:** Concern about running `Migrate(DirectionUp, nil)` on every startup when river tables already exist.
**Why it happens:** Developers familiar with destructive migration tools assume "up" re-runs applied steps.
**How to avoid:** This is safe and idempotent — river tracks applied migration versions in its own table (`river_migration`). Already-applied versions are skipped. `nil` opts = migrate to latest.
[VERIFIED: Context7 — rivermigrate docs: "already-applied versions are skipped"]
---
## Code Examples
### Full cmd/worker/main.go structure
```go
// Source: assembled from river docs (Context7) + existing skeleton patterns
package main
import (
"context"
"fmt"
"log/slog"
"os"
"time"
"backend/internal/db"
"backend/internal/files"
"backend/internal/jobs"
"backend/internal/web"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivermigrate"
)
func main() {
env := os.Getenv("ENV")
if env == "" {
env = "development"
}
dsn := os.Getenv("DATABASE_URL")
slog.SetDefault(slog.New(web.NewSlogHandler(env, os.Stdout)))
if dsn == "" {
slog.Error("DATABASE_URL is required but unset")
os.Exit(1)
}
// Background context for startup (not signal-scoped).
ctx := context.Background()
pool, err := db.NewPool(ctx, dsn)
if err != nil {
slog.Error("db connect failed", "err", err)
os.Exit(1)
}
defer pool.Close()
// Step 1: Run river migrations before anything else.
migrator, err := rivermigrate.New(riverpgxv5.New(pool), nil)
if err != nil {
slog.Error("rivermigrate init failed", "err", err)
os.Exit(1)
}
res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, nil)
if err != nil {
slog.Error("river migration failed", "err", err)
os.Exit(1)
}
for _, v := range res.Versions {
slog.Info("river migration applied", "version", v.Version)
}
// Step 2: Set up S3 store for orphan cleanup.
fileStore, err := files.NewStore(ctx,
os.Getenv("S3_ENDPOINT"),
os.Getenv("S3_BUCKET"),
os.Getenv("S3_REGION"),
os.Getenv("S3_ACCESS_KEY"),
os.Getenv("S3_SECRET_KEY"),
os.Getenv("S3_USE_PATH_STYLE") == "true",
)
if err != nil {
slog.Error("file store init failed", "err", err)
os.Exit(1)
}
// Step 3: Construct signal context AFTER all startup I/O.
sigCtx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer stop()
// Step 4: Register workers.
workers := river.NewWorkers()
river.AddWorker(workers, &jobs.HeartbeatWorker{})
river.AddWorker(workers, jobs.NewOrphanCleanupWorker(pool, fileStore))
// Step 5: Construct river client.
riverClient, err := river.NewClient(riverpgxv5.New(pool), &river.Config{
Logger: slog.Default(),
Workers: workers,
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 10},
},
ErrorHandler: &jobs.SlogErrorHandler{},
PeriodicJobs: []*river.PeriodicJob{
river.NewPeriodicJob(
river.PeriodicInterval(1*time.Minute),
func() (river.JobArgs, *river.InsertOpts) { return jobs.HeartbeatArgs{}, nil },
&river.PeriodicJobOpts{RunOnStart: true},
),
river.NewPeriodicJob(
river.PeriodicInterval(1*time.Hour),
func() (river.JobArgs, *river.InsertOpts) { return jobs.OrphanCleanupArgs{}, nil },
nil,
),
},
})
if err != nil {
slog.Error("river client init failed", "err", err)
os.Exit(1)
}
if err := riverClient.Start(sigCtx); err != nil {
slog.Error("river client start failed", "err", err)
os.Exit(1)
}
slog.Info("worker ready")
<-sigCtx.Done()
slog.Info("shutting down")
stopCtx, stopCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer stopCancel()
if err := riverClient.StopAndCancel(stopCtx); err != nil {
slog.Warn("river stop did not finish cleanly", "err", err)
}
slog.Info("shutdown complete")
}
```
### Orphan-file sqlc query
```sql
-- Source: project pattern (sqlc convention) + LEFT JOIN / NOT EXISTS analysis
-- Add to: backend/internal/db/queries/files.sql
-- name: ListOrphanFiles :many
SELECT id, tablo_id, s3_key
FROM tablo_files tf
WHERE NOT EXISTS (
SELECT 1 FROM tablos t WHERE t.id = tf.tablo_id
);
```
Run `just generate` after adding this query. The generated function signature will be:
```go
func (q *Queries) ListOrphanFiles(ctx context.Context) ([]ListOrphanFilesRow, error)
```
### justfile targets to add
```just
# Start the worker binary (development — requires db-up first).
worker:
DATABASE_URL='{{ database_url }}' \
S3_ENDPOINT='http://localhost:9000' \
S3_BUCKET='xtablo' \
S3_REGION='us-east-1' \
S3_ACCESS_KEY='minioadmin' \
S3_SECRET_KEY='minioadmin' \
S3_USE_PATH_STYLE='true' \
go run ./cmd/worker
# Stream worker logs (if running as background process via air or separate terminal).
worker-logs:
@echo "Start worker in a separate terminal: just worker"
@echo "Logs appear in that terminal (structured slog output)."
```
---
## State of the Art
| Old Approach | Current Approach | When Changed | Impact |
|--------------|------------------|--------------|--------|
| `time.Ticker` goroutine for periodic work | `river.PeriodicJob` with Postgres-persisted schedule | River v0.1+ | Survives restarts, observable, no double-fire |
| Separate broker (Redis/RabbitMQ) | Postgres-native with river | 20232024 | No extra infra, one less operational concern |
| `database/sql` for river driver | `riverpgxv5.New(pgxpool.Pool)` | River v0.1+ | Idiomatic pgx/v5; avoids stdlib adapter overhead |
---
## Assumptions Log
| # | Claim | Section | Risk if Wrong |
|---|-------|---------|---------------|
| A1 | `NOT EXISTS` and `LEFT JOIN IS NULL` are equivalent in this context and optimizer treats them similarly | Orphan query pattern | Wrong result: neither — both produce correct results; performance difference is negligible at v1 scale |
| A2 | S3 delete before DB delete is the safer ordering for orphan cleanup | Anti-patterns | Low: worst case on partial failure is a re-detectable orphan on next run |
---
## Open Questions
1. **S3 env var names for worker**
- What we know: Phase 5 wired S3 env vars in `cmd/web`. Worker needs the same vars.
- What's unclear: Whether the planner reuses the same env var names or introduces `WORKER_S3_*` variants.
- Recommendation: Reuse the same env var names (`S3_ENDPOINT`, `S3_BUCKET`, etc.) — both binaries share the same `.env.example` pattern.
2. **DeleteTabloFile query takes (id, tablo_id) — orphan rows have no owning tablo**
- What we know: `DeleteTabloFile` has signature `WHERE id = $1 AND tablo_id = $2`.
- What's unclear: For a true orphan (tablo deleted bypassing cascade), the `tablo_id` column still holds the old UUID even though the tablo row is gone. `$2` matches the stored value, so the DELETE still works.
- Recommendation: No change needed — the query works correctly. Planner should note this in task comments.
---
## Environment Availability
| Dependency | Required By | Available | Version | Fallback |
|------------|------------|-----------|---------|----------|
| Go 1.26 | worker binary build | ✓ | 1.26.1 (go.mod) | — |
| PostgreSQL | rivermigrate, river queue | ✓ | via compose.yaml | — |
| MinIO (S3) | orphan-file cleanup job | ✓ | via compose.yaml (Phase 5) | — |
| `just` | justfile targets | ✓ | (existing justfile in use) | — |
| `sqlc` | `just generate` | ✓ | v1.31.1 (pinned in justfile) | — |
[VERIFIED: `backend/go.mod``go 1.26.1`]
[VERIFIED: `backend/compose.yaml` — MinIO present from Phase 5 (per CONTEXT.md canonical refs)]
[VERIFIED: `backend/justfile` — sqlc_version = "v1.31.1", `just generate` target exists]
---
## Validation Architecture
### Test Framework
| Property | Value |
|----------|-------|
| Framework | Go testing + `go test ./...` |
| Config file | None — standard Go test runner |
| Quick run command | `go test ./internal/jobs/...` |
| Full suite command | `just test` (`go test ./...`) |
### Phase Requirements → Test Map
| Req ID | Behavior | Test Type | Automated Command | File Exists? |
|--------|----------|-----------|-------------------|-------------|
| WORK-01 | rivermigrate runs without error against test DB | integration | `go test ./cmd/worker/... -run TestWorkerMigrate` | ❌ Wave 0 |
| WORK-01 | river.Client starts and reaches ready state | integration | `go test ./cmd/worker/... -run TestWorkerReady` | ❌ Wave 0 |
| WORK-02 | HeartbeatWorker.Work() returns nil and logs | unit | `go test ./internal/jobs/... -run TestHeartbeatWorker` | ❌ Wave 0 |
| WORK-02 | OrphanCleanupWorker.Work() deletes orphan row + S3 object | unit | `go test ./internal/jobs/... -run TestOrphanCleanupWorker` | ❌ Wave 0 |
| WORK-02 | ListOrphanFiles query returns rows with deleted tablos | integration | `go test ./internal/db/... -run TestListOrphanFiles` | ❌ Wave 0 |
| WORK-03 | Worker shuts down cleanly on context cancel | unit | `go test ./internal/jobs/... -run TestGracefulShutdown` | ❌ Wave 0 |
| WORK-04 | SlogErrorHandler logs error fields correctly | unit | `go test ./internal/jobs/... -run TestSlogErrorHandler` | ❌ Wave 0 |
**Recommended v1 approach:** For Phase 6, unit tests for individual `Worker[T].Work()` methods are the primary test surface. Use `rivertest.NewWorker(t, riverpgxv5.New(nil), &river.Config{}, worker)` for isolated worker tests without a real DB connection. Integration tests for rivermigrate require a real Postgres connection (same pattern as existing `testdb_test.go` in `internal/web`).
### Sampling Rate
- **Per task commit:** `go test ./internal/jobs/...`
- **Per wave merge:** `just test`
- **Phase gate:** Full suite green before `/gsd-verify-work`
### Wave 0 Gaps
- [ ] `backend/internal/jobs/heartbeat_test.go` — covers WORK-02 HeartbeatWorker
- [ ] `backend/internal/jobs/orphan_cleanup_test.go` — covers WORK-02 OrphanCleanupWorker
- [ ] `backend/internal/jobs/error_handler_test.go` — covers WORK-04 SlogErrorHandler
- [ ] `backend/internal/db/sqlc/files_test.go` (or extension of existing) — covers WORK-02 ListOrphanFiles query
---
## Security Domain
### Applicable ASVS Categories
| ASVS Category | Applies | Standard Control |
|---------------|---------|-----------------|
| V2 Authentication | no | Worker has no HTTP surface |
| V3 Session Management | no | No sessions |
| V4 Access Control | no | No HTTP routes |
| V5 Input Validation | partial | Job args are Go structs deserialized from Postgres JSON — no user-supplied input in Phase 6 periodic jobs |
| V6 Cryptography | no | No crypto operations |
### Known Threat Patterns for Worker/Queue Stack
| Pattern | STRIDE | Standard Mitigation |
|---------|--------|---------------------|
| Job args injection via tampered Postgres rows | Tampering | Not applicable in Phase 6 — periodic jobs have empty args structs; no user-supplied data |
| S3 credential leak via worker env vars | Information Disclosure | Use same `.env.example` pattern; never commit secrets |
| Worker running as root in container | Elevation of Privilege | Dockerfile (Phase 7 scope) should use non-root user |
| Advisory lock not respected by second worker instance | Tampering | D-08: document single-worker constraint; river's locking provides partial protection |
---
## Sources
### Primary (HIGH confidence)
- `/websites/riverqueue` (Context7) — PeriodicJob config, riverpgxv5.New, rivermigrate.New, ErrorHandler interface, Logger field, graceful shutdown, AddWorker, WorkerDefaults, retry defaults
- `/riverqueue/river` (Context7) — rivermigrate import path, MigrateResult, WorkFunc pattern
- Go module proxy `proxy.golang.org` — verified river v0.37.0 and riverpgxv5 v0.37.0 published 2026-05-11
- `backend/migrations/0005_files.sql` — FK definition and ON DELETE CASCADE
- `backend/internal/db/pool.go` — MaxConns: 10
- `backend/internal/files/store.go` — Delete(ctx, key) method signature
- `backend/cmd/worker/main.go` — existing skeleton patterns
- `backend/justfile` — existing targets and env var conventions
### Secondary (MEDIUM confidence)
- `backend/internal/web/slog.go``web.NewSlogHandler` signature (used to confirm slog integration pattern)
- `backend/internal/db/sqlc/db.go` — DBTX interface and sqlc.New pattern (confirms orphan query can use `sqlc.New(pool)`)
---
## Metadata
**Confidence breakdown:**
- Standard stack: HIGH — verified via Go module proxy (v0.37.0, 2026-05-11)
- River API patterns: HIGH — verified via Context7 official docs (multiple queries)
- Architecture: HIGH — derived directly from existing codebase patterns
- Orphan query: HIGH — derived from confirmed schema (0005_files.sql, 0003_tablos.sql)
- Pitfalls: HIGH — verified against official docs where applicable; A1/A2 are minor ASSUMED items
**Research date:** 2026-05-15
**Valid until:** 2026-06-15 (river is actively maintained; verify before pinning if delayed)