feat(06-02): replace cmd/worker skeleton with full river wiring

- rivermigrate at startup (idempotent, before client construction)
- S3 store init from env vars (S3_ENDPOINT/S3_BUCKET/S3_ACCESS_KEY/S3_SECRET_KEY/S3_REGION/S3_USE_PATH_STYLE)
- signal.NotifyContext created AFTER all startup I/O (PATTERNS.md critical ordering)
- HeartbeatWorker + OrphanCleanupWorker registered via river.AddWorker
- river.Client with slog.Default() Logger, SlogErrorHandler, MaxWorkers:10
- HeartbeatArgs periodic every 1 min (RunOnStart:true), OrphanCleanupArgs every 1 hr
- StopAndCancel(10s timeout) on shutdown; pool.Close after StopAndCancel
This commit is contained in:
Arthur Belleville 2026-05-15 16:37:20 +02:00
parent 94ac095dee
commit 6e70478417
No known key found for this signature in database

View file

@ -1,7 +1,5 @@
// Command worker is the Phase 1 worker skeleton (CONTEXT D-03). It boots,
// opens a pgxpool, logs "worker ready", and blocks on SIGINT/SIGTERM until
// shutdown. Phase 6 replaces this file with the real job runtime — keep it
// minimal until then.
// Command worker is the background job processor for Xtablo. It runs river
// periodic jobs against the same Postgres as cmd/web.
package main
import (
@ -10,9 +8,16 @@ import (
"os"
"os/signal"
"syscall"
"time"
"backend/internal/db"
"backend/internal/files"
"backend/internal/jobs"
"backend/internal/web"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivermigrate"
)
func main() {
@ -22,6 +27,7 @@ func main() {
}
dsn := os.Getenv("DATABASE_URL")
// Logger first so even fatal-on-missing-DSN paths produce structured output.
slog.SetDefault(slog.New(web.NewSlogHandler(env, os.Stdout)))
if dsn == "" {
@ -29,20 +35,111 @@ func main() {
os.Exit(1)
}
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
// Use a plain background context for all startup I/O (pool connect,
// rivermigrate, S3 init). The signal context is created AFTER startup
// completes — river.Client.Start must receive a live (non-cancelled) context
// (RESEARCH Pitfall 2 + PATTERNS.md critical ordering constraint).
ctx := context.Background()
// Step 1: Connect DB pool.
// Note: pool.Close() is called explicitly at the end of the shutdown
// sequence (after StopAndCancel) — not via defer — to preserve ordering
// per PATTERNS.md pool close ordering.
pool, err := db.NewPool(ctx, dsn)
if err != nil {
slog.Error("db connect failed", "err", err)
os.Exit(1)
}
// Load-bearing signal per D-03 — verification scripts grep for this.
// Step 2: Run river migrations before constructing the client (D-02).
// rivermigrate is idempotent — already-applied versions are skipped.
migrator, err := rivermigrate.New(riverpgxv5.New(pool), nil)
if err != nil {
slog.Error("rivermigrate init failed", "err", err)
os.Exit(1)
}
res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, nil)
if err != nil {
slog.Error("river migration failed", "err", err)
os.Exit(1)
}
for _, v := range res.Versions {
slog.Info("river migration applied", "version", v.Version)
}
// Step 3: Init S3 store for the orphan-file cleanup job.
s3Endpoint := os.Getenv("S3_ENDPOINT")
s3Bucket := os.Getenv("S3_BUCKET")
s3AccessKey := os.Getenv("S3_ACCESS_KEY")
s3SecretKey := os.Getenv("S3_SECRET_KEY")
s3Region := os.Getenv("S3_REGION")
if s3Region == "" {
s3Region = "us-east-1"
}
s3UsePathStyle := os.Getenv("S3_USE_PATH_STYLE") == "true"
fileStore, err := files.NewStore(ctx, s3Endpoint, s3Bucket, s3Region, s3AccessKey, s3SecretKey, s3UsePathStyle)
if err != nil {
slog.Error("file store init failed", "err", err)
os.Exit(1)
}
// Step 4: Construct signal context AFTER all startup I/O (critical ordering).
sigCtx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
// Step 5: Register workers (D-05).
workers := river.NewWorkers()
river.AddWorker(workers, &jobs.HeartbeatWorker{})
river.AddWorker(workers, jobs.NewOrphanCleanupWorker(pool, fileStore))
// Step 6: Construct river client with periodic jobs (D-03, D-07).
riverClient, err := river.NewClient(riverpgxv5.New(pool), &river.Config{
Logger: slog.Default(),
Workers: workers,
Queues: map[string]river.QueueConfig{river.QueueDefault: {MaxWorkers: 10}},
ErrorHandler: &jobs.SlogErrorHandler{},
PeriodicJobs: []*river.PeriodicJob{
// Heartbeat: every 1 minute, fires immediately on start (RunOnStart: true).
river.NewPeriodicJob(
river.PeriodicInterval(1*time.Minute),
func() (river.JobArgs, *river.InsertOpts) { return jobs.HeartbeatArgs{}, nil },
&river.PeriodicJobOpts{RunOnStart: true},
),
// Orphan cleanup: every 1 hour, does NOT fire on start.
river.NewPeriodicJob(
river.PeriodicInterval(1*time.Hour),
func() (river.JobArgs, *river.InsertOpts) { return jobs.OrphanCleanupArgs{}, nil },
nil,
),
},
})
if err != nil {
slog.Error("river client init failed", "err", err)
os.Exit(1)
}
// Step 7: Start the river client with the signal context.
// Jobs inherit this context; when SIGINT/SIGTERM fires the context is
// cancelled and running jobs are notified to stop cleanly.
if err := riverClient.Start(sigCtx); err != nil {
slog.Error("river client start failed", "err", err)
os.Exit(1)
}
slog.Info("worker ready")
<-ctx.Done()
<-sigCtx.Done()
slog.Info("shutting down")
// Step 8: Graceful shutdown — give in-flight jobs up to 10 seconds to finish.
stopCtx, stopCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer stopCancel()
if err := riverClient.StopAndCancel(stopCtx); err != nil {
slog.Warn("river stop did not finish cleanly", "err", err)
}
// Close pool AFTER StopAndCancel returns (PATTERNS.md pool close ordering).
pool.Close()
slog.Info("shutdown complete")
}