diff --git a/.planning/phases/06-background-worker/06-PATTERNS.md b/.planning/phases/06-background-worker/06-PATTERNS.md new file mode 100644 index 0000000..0f7b6db --- /dev/null +++ b/.planning/phases/06-background-worker/06-PATTERNS.md @@ -0,0 +1,458 @@ +# Phase 6: Background Worker - Pattern Map + +**Mapped:** 2026-05-15 +**Files analyzed:** 6 (new/modified) +**Analogs found:** 6 / 6 + +## File Classification + +| New/Modified File | Role | Data Flow | Closest Analog | Match Quality | +|-------------------|------|-----------|----------------|---------------| +| `backend/cmd/worker/main.go` | entrypoint | event-driven (startup + shutdown) | `backend/cmd/web/main.go` | role-match (same pattern, different runtime) | +| `backend/internal/jobs/heartbeat.go` | worker | event-driven (periodic) | `backend/internal/auth/session.go` (struct with injected deps) | partial (same struct/interface idiom) | +| `backend/internal/jobs/orphan_cleanup.go` | worker | batch (DB query + S3 delete loop) | `backend/internal/files/store.go` (S3 ops) + `backend/internal/auth/session.go` | partial (combines two analogs) | +| `backend/internal/db/queries/files.sql` | query | CRUD (read orphans) | `backend/internal/db/queries/files.sql` lines 1–18 (existing queries in same file) | exact | +| `backend/go.mod` | config | — | `backend/go.mod` (existing file, add deps) | exact | +| `backend/justfile` | config | — | `backend/justfile` (existing file, add `worker` target) | exact | + +--- + +## Pattern Assignments + +### `backend/cmd/worker/main.go` (entrypoint, event-driven) + +**Analog:** `backend/cmd/worker/main.go` (existing skeleton) + `backend/cmd/web/main.go` + +**Existing skeleton to replace** (`backend/cmd/worker/main.go` lines 1–48): +```go +// Command worker is the Phase 1 worker skeleton (CONTEXT D-03). It boots, +// opens a pgxpool, logs "worker ready", and blocks on SIGINT/SIGTERM until +// shutdown. Phase 6 replaces this file with the real job runtime — keep it +// minimal until then. +package main + +import ( + "context" + "log/slog" + "os" + "os/signal" + "syscall" + + "backend/internal/db" + "backend/internal/web" +) + +func main() { + env := os.Getenv("ENV") + if env == "" { + env = "development" + } + dsn := os.Getenv("DATABASE_URL") + + slog.SetDefault(slog.New(web.NewSlogHandler(env, os.Stdout))) + + if dsn == "" { + slog.Error("DATABASE_URL is required but unset") + os.Exit(1) + } + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + pool, err := db.NewPool(ctx, dsn) + if err != nil { + slog.Error("db connect failed", "err", err) + os.Exit(1) + } + + // Load-bearing signal per D-03 — verification scripts grep for this. + slog.Info("worker ready") + + <-ctx.Done() + slog.Info("shutting down") + pool.Close() + slog.Info("shutdown complete") +} +``` + +**Startup env + logging pattern** (from `backend/cmd/web/main.go` lines 28–41): +```go +env := os.Getenv("ENV") +if env == "" { + env = "development" +} +dsn := os.Getenv("DATABASE_URL") +slog.SetDefault(slog.New(web.NewSlogHandler(env, os.Stdout))) +if dsn == "" { + slog.Error("DATABASE_URL is required but unset") + os.Exit(1) +} +``` + +**S3 env var loading pattern** (from `backend/cmd/web/main.go` lines 85–100): +```go +s3Endpoint := os.Getenv("S3_ENDPOINT") +s3Bucket := os.Getenv("S3_BUCKET") +s3AccessKey := os.Getenv("S3_ACCESS_KEY") +s3SecretKey := os.Getenv("S3_SECRET_KEY") +s3Region := os.Getenv("S3_REGION") +if s3Region == "" { + s3Region = "us-east-1" +} +s3UsePathStyle := os.Getenv("S3_USE_PATH_STYLE") == "true" +``` + +**Graceful shutdown timeout pattern** (from `backend/cmd/web/main.go` lines 144–148): +```go +shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) +defer cancel() +if err := srv.Shutdown(shutdownCtx); err != nil { + slog.Error("shutdown error", "err", err) +} +``` + +**Critical ordering constraint:** In `cmd/web/main.go` the `signal.NotifyContext` is created BEFORE S3 init. For the worker, create a plain `context.Background()` for startup I/O (pool connect, rivermigrate, S3 init), then create `signal.NotifyContext` AFTER — because `river.Client.Start` requires a live (non-cancelled) context and must not receive a context that could already be done. See RESEARCH Pitfall 2. + +**Pool close ordering** (from `backend/cmd/web/main.go` line 155 — after shutdown, not via defer): +```go +pool.Close() +slog.Info("shutdown complete") +``` + +--- + +### `backend/internal/jobs/heartbeat.go` (worker, event-driven) + +**Analog:** `backend/internal/auth/session.go` (struct definition) + RESEARCH Pattern 1 + +**Package declaration pattern** (from `backend/internal/auth/session.go` lines 1–2): +```go +package auth +// → use: package jobs +``` + +**Struct with embedded defaults + Work method** (from RESEARCH Pattern 1 — no codebase analog exists yet): +```go +// HeartbeatArgs carries no data — heartbeat is purely a proof-of-life tick. +type HeartbeatArgs struct{} + +func (HeartbeatArgs) Kind() string { return "heartbeat" } + +type HeartbeatWorker struct { + river.WorkerDefaults[HeartbeatArgs] +} + +func (w *HeartbeatWorker) Work(ctx context.Context, job *river.Job[HeartbeatArgs]) error { + slog.Info("worker heartbeat", + "job_id", job.ID, + "attempt", job.Attempt, + ) + return nil +} +``` + +**Pitfall to avoid:** Every worker struct MUST embed `river.WorkerDefaults[T]` — without it, `river.AddWorker` will fail to compile (no-op method implementations are missing). See RESEARCH Pitfall 3. + +--- + +### `backend/internal/jobs/orphan_cleanup.go` (worker, batch) + +**Analog:** `backend/internal/files/store.go` (S3 delete pattern) + `backend/internal/auth/session.go` (struct with injected dep) + +**Injected dependency struct pattern** (from `backend/internal/auth/session.go` lines 26–29): +```go +type Store struct { + q *sqlc.Queries + now func() time.Time // injectable for testing +} +``` +For `OrphanCleanupWorker`, inject `pool *pgxpool.Pool` and `store files.FileStorer` (use the interface, not the concrete type — enables test injection). + +**FileStorer interface** (from `backend/internal/files/store.go` lines 17–22): +```go +type FileStorer interface { + Upload(ctx context.Context, key string, file io.Reader) (contentType string, bytesWritten int64, err error) + Delete(ctx context.Context, key string) error + PresignDownload(ctx context.Context, key string) (string, error) +} +``` +The `OrphanCleanupWorker` takes `files.FileStorer` (not `*files.Store`) for testability. + +**S3 delete call** (from `backend/internal/files/store.go` lines 101–107): +```go +func (s *Store) Delete(ctx context.Context, key string) error { + _, err := s.client.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(key), + }) + return err +} +``` +In the orphan cleanup worker, call `w.store.Delete(ctx, f.S3Key)` directly — no reconstruction of the key needed; `tablo_files.s3_key` stores the full key. + +**sqlc query call pattern** (from `backend/internal/db/sqlc/files.sql.go` lines 96–121 — `:many` scan loop): +```go +func (q *Queries) ListFilesByTablo(ctx context.Context, tabloID uuid.UUID) ([]TabloFile, error) { + rows, err := q.db.Query(ctx, listFilesByTablo, tabloID) + if err != nil { + return nil, err + } + defer rows.Close() + var items []TabloFile + for rows.Next() { + var i TabloFile + if err := rows.Scan(...); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} +``` +`ListOrphanFiles` will follow the same `:many` pattern. The generated function takes `ctx context.Context` only (no params — the query has no parameters). + +**sqlc.New call pattern** (from `backend/internal/db/sqlc/db.go` lines 20–22): +```go +func New(db DBTX) *Queries { + return &Queries{db: db} +} +``` +`DBTX` is satisfied by `*pgxpool.Pool` — call `sqlc.New(w.pool)` inside `Work()`. + +**Delete ordering** (RESEARCH Anti-Patterns): Delete S3 object FIRST, then DB row. Reverse order risks a permanent S3 leak if DB delete succeeds but S3 delete was never attempted on retry. + +--- + +### `backend/internal/jobs/error_handler.go` (utility, request-response) + +**Analog:** No direct codebase analog — first ErrorHandler implementation. + +**SlogErrorHandler struct pattern** (from RESEARCH Pattern 4): +```go +type SlogErrorHandler struct{} + +func (*SlogErrorHandler) HandleError( + ctx context.Context, job *rivertype.JobRow, err error, +) *river.ErrorHandlerResult { + slog.Error("job error", + "job_id", job.ID, + "job_kind", job.Kind, + "attempt", job.Attempt, + "max_attempts", job.MaxAttempts, + "err", err, + ) + return nil // nil = follow default retry schedule +} + +func (*SlogErrorHandler) HandlePanic( + ctx context.Context, job *rivertype.JobRow, panicVal any, trace string, +) *river.ErrorHandlerResult { + slog.Error("job panic", + "job_id", job.ID, + "job_kind", job.Kind, + "panic", panicVal, + "trace", trace, + ) + return nil +} +``` +Can live in `backend/internal/jobs/error_handler.go` or be inlined into a `jobs.go` file — either works. Keeping it in `jobs/` keeps all river-related types together. + +--- + +### `backend/internal/db/queries/files.sql` (query, CRUD) + +**Analog:** `backend/internal/db/queries/files.sql` (same file — extend it) + +**Existing query style** (from `backend/internal/db/queries/files.sql` lines 1–18): +```sql +-- name: InsertTabloFile :one +INSERT INTO tablo_files (tablo_id, s3_key, filename, content_type, size_bytes) +VALUES ($1, $2, $3, $4, $5) +RETURNING id, tablo_id, s3_key, filename, content_type, size_bytes, created_at; + +-- name: DeleteTabloFile :exec +DELETE FROM tablo_files WHERE id = $1 AND tablo_id = $2; +``` + +**New query to append** — follow exact same comment + annotation style: +```sql +-- name: ListOrphanFiles :many +-- Find tablo_files rows whose owning tablo no longer exists. +-- Used by the orphan-file cleanup worker (Phase 6 WORK-02). +SELECT id, tablo_id, s3_key +FROM tablo_files tf +WHERE NOT EXISTS ( + SELECT 1 FROM tablos t WHERE t.id = tf.tablo_id +); +``` + +**Expected generated signature** (after `just generate`): +```go +func (q *Queries) ListOrphanFiles(ctx context.Context) ([]ListOrphanFilesRow, error) +``` +Where `ListOrphanFilesRow` will have fields: `ID uuid.UUID`, `TabloID uuid.UUID`, `S3Key string`. + +**Note on DeleteTabloFile for orphan cleanup:** `DeleteTabloFile` takes `DeleteTabloFileParams{ID, TabloID}`. For orphan rows, the `tablo_id` column still holds the original UUID even though the tablo row is gone — the DELETE WHERE clause will still match and succeed. No new delete query needed. + +--- + +### `backend/go.mod` (config — add river dependencies) + +**Analog:** `backend/go.mod` (existing file — add to `require` block) + +**Current require block pattern** (from `backend/go.mod` lines 5–14): +``` +require ( + github.com/a-h/templ v0.3.1020 + github.com/go-chi/chi/v5 v5.2.5 + github.com/google/uuid v1.6.0 + github.com/gorilla/csrf v1.7.3 + github.com/jackc/pgx/v5 v5.9.2 + github.com/pressly/goose/v3 v3.27.1 + golang.org/x/crypto v0.51.0 + golang.org/x/time v0.15.0 +) +``` + +**Additions needed** (run `go get` — do NOT hand-edit; `go get` updates both `require` and `go.sum`): +```bash +cd backend +go get github.com/riverqueue/river@v0.37.0 +go get github.com/riverqueue/river/riverdriver/riverpgxv5@v0.37.0 +``` +`rivermigrate` is a sub-package of `github.com/riverqueue/river` — no separate `go get` needed. Import it as `"github.com/riverqueue/river/rivermigrate"` in code once the module is present. + +--- + +### `backend/justfile` (config — add `worker` target) + +**Analog:** `backend/justfile` (existing file — extend it) + +**Existing target style** (from `backend/justfile` lines 111–113 — `dev` target): +```just +dev: db-up + just generate + DATABASE_URL='{{ database_url }}' SESSION_SECRET=... air -c .air.toml +``` + +**Existing variable pattern** (from `backend/justfile` lines 38–39): +```just +database_url := "postgres://xtablo:xtablo@localhost:5432/xtablo?sslmode=disable" +``` + +**New `worker` target to add** — follows same env-var-per-line pattern as `dev`: +```just +# Start the worker binary (development — requires db-up first). +worker: db-up + DATABASE_URL='{{ database_url }}' \ + S3_ENDPOINT='http://localhost:9000' \ + S3_BUCKET='xtablo' \ + S3_REGION='us-east-1' \ + S3_ACCESS_KEY='minioadmin' \ + S3_SECRET_KEY='minioadmin' \ + S3_USE_PATH_STYLE='true' \ + go run ./cmd/worker +``` + +**Note on `build` target** (from `backend/justfile` lines 124–126 — already builds worker): +```just +build: + just generate + go build -o bin/web ./cmd/web + go build -o bin/worker ./cmd/worker +``` +The `build` target already compiles `./cmd/worker` — no change needed there. + +--- + +## Shared Patterns + +### Structured Logging +**Source:** `backend/internal/web/slog.go` lines 17–23 +**Apply to:** All `internal/jobs/*.go` files (use `slog.Info` / `slog.Error` with key-value pairs) +```go +func NewSlogHandler(env string, w io.Writer) slog.Handler { + opts := &slog.HandlerOptions{Level: slog.LevelInfo} + if env == "production" { + return slog.NewJSONHandler(w, opts) + } + return slog.NewTextHandler(w, opts) +} +``` +Worker calls `slog.SetDefault(slog.New(web.NewSlogHandler(env, os.Stdout)))` — same as `cmd/web/main.go` line 41. River's `Config.Logger: slog.Default()` then picks up this handler. + +### DB Pool Construction +**Source:** `backend/internal/db/pool.go` lines 15–23 +**Apply to:** `cmd/worker/main.go` (already used in skeleton) +```go +func NewPool(ctx context.Context, dsn string) (*pgxpool.Pool, error) { + cfg, err := pgxpool.ParseConfig(dsn) + if err != nil { + return nil, err + } + cfg.MaxConns = 10 + cfg.MinConns = 1 + return pgxpool.NewWithConfig(ctx, cfg) +} +``` +`MaxConns: 10` matches the river config `MaxWorkers: 10` — do not exceed pool size with worker concurrency. + +### sqlc.New(pool) Pattern +**Source:** `backend/internal/db/sqlc/db.go` lines 20–22 +**Apply to:** `internal/jobs/orphan_cleanup.go` Work method +```go +func New(db DBTX) *Queries { + return &Queries{db: db} +} +``` +`*pgxpool.Pool` satisfies `DBTX`. Call `sqlc.New(w.pool)` inside `Work(ctx, job)` at each invocation — no need to cache the `*Queries` on the struct. + +### Test DB Setup (Integration Tests) +**Source:** `backend/internal/auth/testdb_test.go` lines 40–140 +**Apply to:** Any integration tests in `internal/jobs/` that need a real Postgres connection +```go +func setupTestDB(t *testing.T) (*pgxpool.Pool, func()) { + t.Helper() + dsn := os.Getenv("TEST_DATABASE_URL") + if dsn == "" { + dsn = os.Getenv("DATABASE_URL") + } + if dsn == "" { + t.Skip("TEST_DATABASE_URL (or DATABASE_URL) not set — integration test skipped") + return nil, nil + } + // ... creates isolated schema, runs goose migrations, returns pool + cleanup +} +``` +Copy this helper into `backend/internal/jobs/testdb_test.go` (or a shared `internal/testutil/` if it grows). Integration tests for `ListOrphanFiles` need the app migrations to be applied first. + +--- + +## No Analog Found + +All files have close analogs. The only genuinely new patterns are the river-specific wiring (`rivermigrate`, `river.Client`, `river.PeriodicJob`, `river.WorkerDefaults`) — those come from RESEARCH.md (verified against official docs), not the codebase. + +| File | Role | Data Flow | Reason | +|------|------|-----------|--------| +| `backend/internal/jobs/` (new package) | worker | event-driven | No background job package exists yet; worker struct pattern borrowed from `internal/auth` | + +--- + +## Key Constraints Extracted from Codebase + +1. **`db.NewPool` MaxConns is 10** (`backend/internal/db/pool.go` line 20) — river `MaxWorkers` must not exceed 10. +2. **`files.FileStorer` interface** (`backend/internal/files/store.go` lines 17–22) — `OrphanCleanupWorker` takes the interface, not `*files.Store`, for testability. +3. **`sqlc.New(pool)` takes `DBTX`** — `*pgxpool.Pool` satisfies it; no adapter needed. +4. **`signal.NotifyContext` is already the shutdown idiom** in both `cmd/web/main.go` and the existing worker skeleton — river's shutdown hooks in with `StopAndCancel(timeoutCtx)` using a fresh `context.Background()`-derived timeout context. +5. **`just generate` must run after `files.sql` change** — sqlc regenerates `backend/internal/db/sqlc/files.sql.go` to add `ListOrphanFiles` and `ListOrphanFilesRow`. +6. **S3 env var names are the same as in `cmd/web`** — `S3_ENDPOINT`, `S3_BUCKET`, `S3_ACCESS_KEY`, `S3_SECRET_KEY`, `S3_REGION`, `S3_USE_PATH_STYLE`. The worker's `just worker` target passes MinIO defaults matching `cmd/web`'s conventions. + +## Metadata + +**Analog search scope:** `backend/cmd/`, `backend/internal/`, `backend/justfile`, `backend/go.mod` +**Files scanned:** 12 +**Pattern extraction date:** 2026-05-15