From 6e704784179e713830277d7c774643c139083d38 Mon Sep 17 00:00:00 2001 From: Arthur Belleville Date: Fri, 15 May 2026 16:37:20 +0200 Subject: [PATCH] feat(06-02): replace cmd/worker skeleton with full river wiring - rivermigrate at startup (idempotent, before client construction) - S3 store init from env vars (S3_ENDPOINT/S3_BUCKET/S3_ACCESS_KEY/S3_SECRET_KEY/S3_REGION/S3_USE_PATH_STYLE) - signal.NotifyContext created AFTER all startup I/O (PATTERNS.md critical ordering) - HeartbeatWorker + OrphanCleanupWorker registered via river.AddWorker - river.Client with slog.Default() Logger, SlogErrorHandler, MaxWorkers:10 - HeartbeatArgs periodic every 1 min (RunOnStart:true), OrphanCleanupArgs every 1 hr - StopAndCancel(10s timeout) on shutdown; pool.Close after StopAndCancel --- backend/cmd/worker/main.go | 113 ++++++++++++++++++++++++++++++++++--- 1 file changed, 105 insertions(+), 8 deletions(-) diff --git a/backend/cmd/worker/main.go b/backend/cmd/worker/main.go index ecaefd5..1538d14 100644 --- a/backend/cmd/worker/main.go +++ b/backend/cmd/worker/main.go @@ -1,7 +1,5 @@ -// Command worker is the Phase 1 worker skeleton (CONTEXT D-03). It boots, -// opens a pgxpool, logs "worker ready", and blocks on SIGINT/SIGTERM until -// shutdown. Phase 6 replaces this file with the real job runtime — keep it -// minimal until then. +// Command worker is the background job processor for Xtablo. It runs river +// periodic jobs against the same Postgres as cmd/web. package main import ( @@ -10,9 +8,16 @@ import ( "os" "os/signal" "syscall" + "time" "backend/internal/db" + "backend/internal/files" + "backend/internal/jobs" "backend/internal/web" + + "github.com/riverqueue/river" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivermigrate" ) func main() { @@ -22,6 +27,7 @@ func main() { } dsn := os.Getenv("DATABASE_URL") + // Logger first so even fatal-on-missing-DSN paths produce structured output. slog.SetDefault(slog.New(web.NewSlogHandler(env, os.Stdout))) if dsn == "" { @@ -29,20 +35,111 @@ func main() { os.Exit(1) } - ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) - defer stop() + // Use a plain background context for all startup I/O (pool connect, + // rivermigrate, S3 init). The signal context is created AFTER startup + // completes — river.Client.Start must receive a live (non-cancelled) context + // (RESEARCH Pitfall 2 + PATTERNS.md critical ordering constraint). + ctx := context.Background() + // Step 1: Connect DB pool. + // Note: pool.Close() is called explicitly at the end of the shutdown + // sequence (after StopAndCancel) — not via defer — to preserve ordering + // per PATTERNS.md pool close ordering. pool, err := db.NewPool(ctx, dsn) if err != nil { slog.Error("db connect failed", "err", err) os.Exit(1) } - // Load-bearing signal per D-03 — verification scripts grep for this. + // Step 2: Run river migrations before constructing the client (D-02). + // rivermigrate is idempotent — already-applied versions are skipped. + migrator, err := rivermigrate.New(riverpgxv5.New(pool), nil) + if err != nil { + slog.Error("rivermigrate init failed", "err", err) + os.Exit(1) + } + res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, nil) + if err != nil { + slog.Error("river migration failed", "err", err) + os.Exit(1) + } + for _, v := range res.Versions { + slog.Info("river migration applied", "version", v.Version) + } + + // Step 3: Init S3 store for the orphan-file cleanup job. + s3Endpoint := os.Getenv("S3_ENDPOINT") + s3Bucket := os.Getenv("S3_BUCKET") + s3AccessKey := os.Getenv("S3_ACCESS_KEY") + s3SecretKey := os.Getenv("S3_SECRET_KEY") + s3Region := os.Getenv("S3_REGION") + if s3Region == "" { + s3Region = "us-east-1" + } + s3UsePathStyle := os.Getenv("S3_USE_PATH_STYLE") == "true" + + fileStore, err := files.NewStore(ctx, s3Endpoint, s3Bucket, s3Region, s3AccessKey, s3SecretKey, s3UsePathStyle) + if err != nil { + slog.Error("file store init failed", "err", err) + os.Exit(1) + } + + // Step 4: Construct signal context AFTER all startup I/O (critical ordering). + sigCtx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + // Step 5: Register workers (D-05). + workers := river.NewWorkers() + river.AddWorker(workers, &jobs.HeartbeatWorker{}) + river.AddWorker(workers, jobs.NewOrphanCleanupWorker(pool, fileStore)) + + // Step 6: Construct river client with periodic jobs (D-03, D-07). + riverClient, err := river.NewClient(riverpgxv5.New(pool), &river.Config{ + Logger: slog.Default(), + Workers: workers, + Queues: map[string]river.QueueConfig{river.QueueDefault: {MaxWorkers: 10}}, + ErrorHandler: &jobs.SlogErrorHandler{}, + PeriodicJobs: []*river.PeriodicJob{ + // Heartbeat: every 1 minute, fires immediately on start (RunOnStart: true). + river.NewPeriodicJob( + river.PeriodicInterval(1*time.Minute), + func() (river.JobArgs, *river.InsertOpts) { return jobs.HeartbeatArgs{}, nil }, + &river.PeriodicJobOpts{RunOnStart: true}, + ), + // Orphan cleanup: every 1 hour, does NOT fire on start. + river.NewPeriodicJob( + river.PeriodicInterval(1*time.Hour), + func() (river.JobArgs, *river.InsertOpts) { return jobs.OrphanCleanupArgs{}, nil }, + nil, + ), + }, + }) + if err != nil { + slog.Error("river client init failed", "err", err) + os.Exit(1) + } + + // Step 7: Start the river client with the signal context. + // Jobs inherit this context; when SIGINT/SIGTERM fires the context is + // cancelled and running jobs are notified to stop cleanly. + if err := riverClient.Start(sigCtx); err != nil { + slog.Error("river client start failed", "err", err) + os.Exit(1) + } + slog.Info("worker ready") - <-ctx.Done() + <-sigCtx.Done() slog.Info("shutting down") + + // Step 8: Graceful shutdown — give in-flight jobs up to 10 seconds to finish. + stopCtx, stopCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer stopCancel() + if err := riverClient.StopAndCancel(stopCtx); err != nil { + slog.Warn("river stop did not finish cleanly", "err", err) + } + + // Close pool AFTER StopAndCancel returns (PATTERNS.md pool close ordering). pool.Close() slog.Info("shutdown complete") }