xtablo-source/.planning/phases/06-background-worker/06-PATTERNS.md
2026-05-15 19:58:05 +02:00

16 KiB
Raw Blame History

Phase 6: Background Worker - Pattern Map

Mapped: 2026-05-15 Files analyzed: 6 (new/modified) Analogs found: 6 / 6

File Classification

New/Modified File Role Data Flow Closest Analog Match Quality
backend/cmd/worker/main.go entrypoint event-driven (startup + shutdown) backend/cmd/web/main.go role-match (same pattern, different runtime)
backend/internal/jobs/heartbeat.go worker event-driven (periodic) backend/internal/auth/session.go (struct with injected deps) partial (same struct/interface idiom)
backend/internal/jobs/orphan_cleanup.go worker batch (DB query + S3 delete loop) backend/internal/files/store.go (S3 ops) + backend/internal/auth/session.go partial (combines two analogs)
backend/internal/db/queries/files.sql query CRUD (read orphans) backend/internal/db/queries/files.sql lines 118 (existing queries in same file) exact
backend/go.mod config backend/go.mod (existing file, add deps) exact
backend/justfile config backend/justfile (existing file, add worker target) exact

Pattern Assignments

backend/cmd/worker/main.go (entrypoint, event-driven)

Analog: backend/cmd/worker/main.go (existing skeleton) + backend/cmd/web/main.go

Existing skeleton to replace (backend/cmd/worker/main.go lines 148):

// Command worker is the Phase 1 worker skeleton (CONTEXT D-03). It boots,
// opens a pgxpool, logs "worker ready", and blocks on SIGINT/SIGTERM until
// shutdown. Phase 6 replaces this file with the real job runtime — keep it
// minimal until then.
package main

import (
    "context"
    "log/slog"
    "os"
    "os/signal"
    "syscall"

    "backend/internal/db"
    "backend/internal/web"
)

func main() {
    env := os.Getenv("ENV")
    if env == "" {
        env = "development"
    }
    dsn := os.Getenv("DATABASE_URL")

    slog.SetDefault(slog.New(web.NewSlogHandler(env, os.Stdout)))

    if dsn == "" {
        slog.Error("DATABASE_URL is required but unset")
        os.Exit(1)
    }

    ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer stop()

    pool, err := db.NewPool(ctx, dsn)
    if err != nil {
        slog.Error("db connect failed", "err", err)
        os.Exit(1)
    }

    // Load-bearing signal per D-03 — verification scripts grep for this.
    slog.Info("worker ready")

    <-ctx.Done()
    slog.Info("shutting down")
    pool.Close()
    slog.Info("shutdown complete")
}

Startup env + logging pattern (from backend/cmd/web/main.go lines 2841):

env := os.Getenv("ENV")
if env == "" {
    env = "development"
}
dsn := os.Getenv("DATABASE_URL")
slog.SetDefault(slog.New(web.NewSlogHandler(env, os.Stdout)))
if dsn == "" {
    slog.Error("DATABASE_URL is required but unset")
    os.Exit(1)
}

S3 env var loading pattern (from backend/cmd/web/main.go lines 85100):

s3Endpoint := os.Getenv("S3_ENDPOINT")
s3Bucket := os.Getenv("S3_BUCKET")
s3AccessKey := os.Getenv("S3_ACCESS_KEY")
s3SecretKey := os.Getenv("S3_SECRET_KEY")
s3Region := os.Getenv("S3_REGION")
if s3Region == "" {
    s3Region = "us-east-1"
}
s3UsePathStyle := os.Getenv("S3_USE_PATH_STYLE") == "true"

Graceful shutdown timeout pattern (from backend/cmd/web/main.go lines 144148):

shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := srv.Shutdown(shutdownCtx); err != nil {
    slog.Error("shutdown error", "err", err)
}

Critical ordering constraint: In cmd/web/main.go the signal.NotifyContext is created BEFORE S3 init. For the worker, create a plain context.Background() for startup I/O (pool connect, rivermigrate, S3 init), then create signal.NotifyContext AFTER — because river.Client.Start requires a live (non-cancelled) context and must not receive a context that could already be done. See RESEARCH Pitfall 2.

Pool close ordering (from backend/cmd/web/main.go line 155 — after shutdown, not via defer):

pool.Close()
slog.Info("shutdown complete")

backend/internal/jobs/heartbeat.go (worker, event-driven)

Analog: backend/internal/auth/session.go (struct definition) + RESEARCH Pattern 1

Package declaration pattern (from backend/internal/auth/session.go lines 12):

package auth
// → use: package jobs

Struct with embedded defaults + Work method (from RESEARCH Pattern 1 — no codebase analog exists yet):

// HeartbeatArgs carries no data — heartbeat is purely a proof-of-life tick.
type HeartbeatArgs struct{}

func (HeartbeatArgs) Kind() string { return "heartbeat" }

type HeartbeatWorker struct {
    river.WorkerDefaults[HeartbeatArgs]
}

func (w *HeartbeatWorker) Work(ctx context.Context, job *river.Job[HeartbeatArgs]) error {
    slog.Info("worker heartbeat",
        "job_id",  job.ID,
        "attempt", job.Attempt,
    )
    return nil
}

Pitfall to avoid: Every worker struct MUST embed river.WorkerDefaults[T] — without it, river.AddWorker will fail to compile (no-op method implementations are missing). See RESEARCH Pitfall 3.


backend/internal/jobs/orphan_cleanup.go (worker, batch)

Analog: backend/internal/files/store.go (S3 delete pattern) + backend/internal/auth/session.go (struct with injected dep)

Injected dependency struct pattern (from backend/internal/auth/session.go lines 2629):

type Store struct {
    q   *sqlc.Queries
    now func() time.Time // injectable for testing
}

For OrphanCleanupWorker, inject pool *pgxpool.Pool and store files.FileStorer (use the interface, not the concrete type — enables test injection).

FileStorer interface (from backend/internal/files/store.go lines 1722):

type FileStorer interface {
    Upload(ctx context.Context, key string, file io.Reader) (contentType string, bytesWritten int64, err error)
    Delete(ctx context.Context, key string) error
    PresignDownload(ctx context.Context, key string) (string, error)
}

The OrphanCleanupWorker takes files.FileStorer (not *files.Store) for testability.

S3 delete call (from backend/internal/files/store.go lines 101107):

func (s *Store) Delete(ctx context.Context, key string) error {
    _, err := s.client.DeleteObject(ctx, &s3.DeleteObjectInput{
        Bucket: aws.String(s.bucket),
        Key:    aws.String(key),
    })
    return err
}

In the orphan cleanup worker, call w.store.Delete(ctx, f.S3Key) directly — no reconstruction of the key needed; tablo_files.s3_key stores the full key.

sqlc query call pattern (from backend/internal/db/sqlc/files.sql.go lines 96121 — :many scan loop):

func (q *Queries) ListFilesByTablo(ctx context.Context, tabloID uuid.UUID) ([]TabloFile, error) {
    rows, err := q.db.Query(ctx, listFilesByTablo, tabloID)
    if err != nil {
        return nil, err
    }
    defer rows.Close()
    var items []TabloFile
    for rows.Next() {
        var i TabloFile
        if err := rows.Scan(...); err != nil {
            return nil, err
        }
        items = append(items, i)
    }
    if err := rows.Err(); err != nil {
        return nil, err
    }
    return items, nil
}

ListOrphanFiles will follow the same :many pattern. The generated function takes ctx context.Context only (no params — the query has no parameters).

sqlc.New call pattern (from backend/internal/db/sqlc/db.go lines 2022):

func New(db DBTX) *Queries {
    return &Queries{db: db}
}

DBTX is satisfied by *pgxpool.Pool — call sqlc.New(w.pool) inside Work().

Delete ordering (RESEARCH Anti-Patterns): Delete S3 object FIRST, then DB row. Reverse order risks a permanent S3 leak if DB delete succeeds but S3 delete was never attempted on retry.


backend/internal/jobs/error_handler.go (utility, request-response)

Analog: No direct codebase analog — first ErrorHandler implementation.

SlogErrorHandler struct pattern (from RESEARCH Pattern 4):

type SlogErrorHandler struct{}

func (*SlogErrorHandler) HandleError(
    ctx context.Context, job *rivertype.JobRow, err error,
) *river.ErrorHandlerResult {
    slog.Error("job error",
        "job_id",      job.ID,
        "job_kind",    job.Kind,
        "attempt",     job.Attempt,
        "max_attempts", job.MaxAttempts,
        "err",         err,
    )
    return nil // nil = follow default retry schedule
}

func (*SlogErrorHandler) HandlePanic(
    ctx context.Context, job *rivertype.JobRow, panicVal any, trace string,
) *river.ErrorHandlerResult {
    slog.Error("job panic",
        "job_id",  job.ID,
        "job_kind", job.Kind,
        "panic",   panicVal,
        "trace",   trace,
    )
    return nil
}

Can live in backend/internal/jobs/error_handler.go or be inlined into a jobs.go file — either works. Keeping it in jobs/ keeps all river-related types together.


backend/internal/db/queries/files.sql (query, CRUD)

Analog: backend/internal/db/queries/files.sql (same file — extend it)

Existing query style (from backend/internal/db/queries/files.sql lines 118):

-- name: InsertTabloFile :one
INSERT INTO tablo_files (tablo_id, s3_key, filename, content_type, size_bytes)
VALUES ($1, $2, $3, $4, $5)
RETURNING id, tablo_id, s3_key, filename, content_type, size_bytes, created_at;

-- name: DeleteTabloFile :exec
DELETE FROM tablo_files WHERE id = $1 AND tablo_id = $2;

New query to append — follow exact same comment + annotation style:

-- name: ListOrphanFiles :many
-- Find tablo_files rows whose owning tablo no longer exists.
-- Used by the orphan-file cleanup worker (Phase 6 WORK-02).
SELECT id, tablo_id, s3_key
FROM tablo_files tf
WHERE NOT EXISTS (
    SELECT 1 FROM tablos t WHERE t.id = tf.tablo_id
);

Expected generated signature (after just generate):

func (q *Queries) ListOrphanFiles(ctx context.Context) ([]ListOrphanFilesRow, error)

Where ListOrphanFilesRow will have fields: ID uuid.UUID, TabloID uuid.UUID, S3Key string.

Note on DeleteTabloFile for orphan cleanup: DeleteTabloFile takes DeleteTabloFileParams{ID, TabloID}. For orphan rows, the tablo_id column still holds the original UUID even though the tablo row is gone — the DELETE WHERE clause will still match and succeed. No new delete query needed.


backend/go.mod (config — add river dependencies)

Analog: backend/go.mod (existing file — add to require block)

Current require block pattern (from backend/go.mod lines 514):

require (
    github.com/a-h/templ v0.3.1020
    github.com/go-chi/chi/v5 v5.2.5
    github.com/google/uuid v1.6.0
    github.com/gorilla/csrf v1.7.3
    github.com/jackc/pgx/v5 v5.9.2
    github.com/pressly/goose/v3 v3.27.1
    golang.org/x/crypto v0.51.0
    golang.org/x/time v0.15.0
)

Additions needed (run go get — do NOT hand-edit; go get updates both require and go.sum):

cd backend
go get github.com/riverqueue/river@v0.37.0
go get github.com/riverqueue/river/riverdriver/riverpgxv5@v0.37.0

rivermigrate is a sub-package of github.com/riverqueue/river — no separate go get needed. Import it as "github.com/riverqueue/river/rivermigrate" in code once the module is present.


backend/justfile (config — add worker target)

Analog: backend/justfile (existing file — extend it)

Existing target style (from backend/justfile lines 111113 — dev target):

dev: db-up
    just generate
    DATABASE_URL='{{ database_url }}' SESSION_SECRET=... air -c .air.toml

Existing variable pattern (from backend/justfile lines 3839):

database_url := "postgres://xtablo:xtablo@localhost:5432/xtablo?sslmode=disable"

New worker target to add — follows same env-var-per-line pattern as dev:

# Start the worker binary (development — requires db-up first).
worker: db-up
    DATABASE_URL='{{ database_url }}' \
    S3_ENDPOINT='http://localhost:9000' \
    S3_BUCKET='xtablo' \
    S3_REGION='us-east-1' \
    S3_ACCESS_KEY='minioadmin' \
    S3_SECRET_KEY='minioadmin' \
    S3_USE_PATH_STYLE='true' \
    go run ./cmd/worker

Note on build target (from backend/justfile lines 124126 — already builds worker):

build:
    just generate
    go build -o bin/web ./cmd/web
    go build -o bin/worker ./cmd/worker

The build target already compiles ./cmd/worker — no change needed there.


Shared Patterns

Structured Logging

Source: backend/internal/web/slog.go lines 1723 Apply to: All internal/jobs/*.go files (use slog.Info / slog.Error with key-value pairs)

func NewSlogHandler(env string, w io.Writer) slog.Handler {
    opts := &slog.HandlerOptions{Level: slog.LevelInfo}
    if env == "production" {
        return slog.NewJSONHandler(w, opts)
    }
    return slog.NewTextHandler(w, opts)
}

Worker calls slog.SetDefault(slog.New(web.NewSlogHandler(env, os.Stdout))) — same as cmd/web/main.go line 41. River's Config.Logger: slog.Default() then picks up this handler.

DB Pool Construction

Source: backend/internal/db/pool.go lines 1523 Apply to: cmd/worker/main.go (already used in skeleton)

func NewPool(ctx context.Context, dsn string) (*pgxpool.Pool, error) {
    cfg, err := pgxpool.ParseConfig(dsn)
    if err != nil {
        return nil, err
    }
    cfg.MaxConns = 10
    cfg.MinConns = 1
    return pgxpool.NewWithConfig(ctx, cfg)
}

MaxConns: 10 matches the river config MaxWorkers: 10 — do not exceed pool size with worker concurrency.

sqlc.New(pool) Pattern

Source: backend/internal/db/sqlc/db.go lines 2022 Apply to: internal/jobs/orphan_cleanup.go Work method

func New(db DBTX) *Queries {
    return &Queries{db: db}
}

*pgxpool.Pool satisfies DBTX. Call sqlc.New(w.pool) inside Work(ctx, job) at each invocation — no need to cache the *Queries on the struct.

Test DB Setup (Integration Tests)

Source: backend/internal/auth/testdb_test.go lines 40140 Apply to: Any integration tests in internal/jobs/ that need a real Postgres connection

func setupTestDB(t *testing.T) (*pgxpool.Pool, func()) {
    t.Helper()
    dsn := os.Getenv("TEST_DATABASE_URL")
    if dsn == "" {
        dsn = os.Getenv("DATABASE_URL")
    }
    if dsn == "" {
        t.Skip("TEST_DATABASE_URL (or DATABASE_URL) not set — integration test skipped")
        return nil, nil
    }
    // ... creates isolated schema, runs goose migrations, returns pool + cleanup
}

Copy this helper into backend/internal/jobs/testdb_test.go (or a shared internal/testutil/ if it grows). Integration tests for ListOrphanFiles need the app migrations to be applied first.


No Analog Found

All files have close analogs. The only genuinely new patterns are the river-specific wiring (rivermigrate, river.Client, river.PeriodicJob, river.WorkerDefaults) — those come from RESEARCH.md (verified against official docs), not the codebase.

File Role Data Flow Reason
backend/internal/jobs/ (new package) worker event-driven No background job package exists yet; worker struct pattern borrowed from internal/auth

Key Constraints Extracted from Codebase

  1. db.NewPool MaxConns is 10 (backend/internal/db/pool.go line 20) — river MaxWorkers must not exceed 10.
  2. files.FileStorer interface (backend/internal/files/store.go lines 1722) — OrphanCleanupWorker takes the interface, not *files.Store, for testability.
  3. sqlc.New(pool) takes DBTX*pgxpool.Pool satisfies it; no adapter needed.
  4. signal.NotifyContext is already the shutdown idiom in both cmd/web/main.go and the existing worker skeleton — river's shutdown hooks in with StopAndCancel(timeoutCtx) using a fresh context.Background()-derived timeout context.
  5. just generate must run after files.sql change — sqlc regenerates backend/internal/db/sqlc/files.sql.go to add ListOrphanFiles and ListOrphanFilesRow.
  6. S3 env var names are the same as in cmd/webS3_ENDPOINT, S3_BUCKET, S3_ACCESS_KEY, S3_SECRET_KEY, S3_REGION, S3_USE_PATH_STYLE. The worker's just worker target passes MinIO defaults matching cmd/web's conventions.

Metadata

Analog search scope: backend/cmd/, backend/internal/, backend/justfile, backend/go.mod Files scanned: 12 Pattern extraction date: 2026-05-15