// Command worker is the background job processor for Xtablo. It runs river // periodic jobs against the same Postgres as cmd/web. package main import ( "context" "log/slog" "os" "os/signal" "syscall" "time" "backend/internal/db" "backend/internal/files" "backend/internal/jobs" "backend/internal/web" "github.com/riverqueue/river" "github.com/riverqueue/river/riverdriver/riverpgxv5" "github.com/riverqueue/river/rivermigrate" ) func main() { env := os.Getenv("ENV") if env == "" { env = "development" } dsn := os.Getenv("DATABASE_URL") // Logger first so even fatal-on-missing-DSN paths produce structured output. slog.SetDefault(slog.New(web.NewSlogHandler(env, os.Stdout))) if dsn == "" { slog.Error("DATABASE_URL is required but unset") os.Exit(1) } // Use a plain background context for all startup I/O (pool connect, // rivermigrate, S3 init). The signal context is created AFTER startup // completes — river.Client.Start must receive a live (non-cancelled) context // (RESEARCH Pitfall 2 + PATTERNS.md critical ordering constraint). ctx := context.Background() // Step 1: Connect DB pool. // Note: pool.Close() is called explicitly at the end of the shutdown // sequence (after StopAndCancel) — not via defer — to preserve ordering // per PATTERNS.md pool close ordering. pool, err := db.NewPool(ctx, dsn) if err != nil { slog.Error("db connect failed", "err", err) os.Exit(1) } // Step 2: Run river migrations before constructing the client (D-02). // rivermigrate is idempotent — already-applied versions are skipped. migrator, err := rivermigrate.New(riverpgxv5.New(pool), nil) if err != nil { slog.Error("rivermigrate init failed", "err", err) os.Exit(1) } res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, nil) if err != nil { slog.Error("river migration failed", "err", err) os.Exit(1) } for _, v := range res.Versions { slog.Info("river migration applied", "version", v.Version) } // Step 3: Init S3 store for the orphan-file cleanup job. s3Endpoint := os.Getenv("S3_ENDPOINT") s3Bucket := os.Getenv("S3_BUCKET") s3AccessKey := os.Getenv("S3_ACCESS_KEY") s3SecretKey := os.Getenv("S3_SECRET_KEY") s3Region := os.Getenv("S3_REGION") if s3Region == "" { s3Region = "us-east-1" } s3UsePathStyle := os.Getenv("S3_USE_PATH_STYLE") == "true" fileStore, err := files.NewStore(ctx, s3Endpoint, s3Bucket, s3Region, s3AccessKey, s3SecretKey, s3UsePathStyle) if err != nil { slog.Error("file store init failed", "err", err) os.Exit(1) } // Step 4: Construct signal context AFTER all startup I/O (critical ordering). sigCtx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() // Step 5: Register workers (D-05). workers := river.NewWorkers() river.AddWorker(workers, &jobs.HeartbeatWorker{}) river.AddWorker(workers, jobs.NewOrphanCleanupWorker(pool, fileStore)) // Step 6: Construct river client with periodic jobs (D-03, D-07). riverClient, err := river.NewClient(riverpgxv5.New(pool), &river.Config{ Logger: slog.Default(), Workers: workers, Queues: map[string]river.QueueConfig{river.QueueDefault: {MaxWorkers: 10}}, ErrorHandler: &jobs.SlogErrorHandler{}, PeriodicJobs: []*river.PeriodicJob{ // Heartbeat: every 1 minute, fires immediately on start (RunOnStart: true). river.NewPeriodicJob( river.PeriodicInterval(1*time.Minute), func() (river.JobArgs, *river.InsertOpts) { return jobs.HeartbeatArgs{}, nil }, &river.PeriodicJobOpts{RunOnStart: true}, ), // Orphan cleanup: every 1 hour, does NOT fire on start. river.NewPeriodicJob( river.PeriodicInterval(1*time.Hour), func() (river.JobArgs, *river.InsertOpts) { return jobs.OrphanCleanupArgs{}, nil }, nil, ), }, }) if err != nil { slog.Error("river client init failed", "err", err) os.Exit(1) } // Step 7: Start the river client with the signal context. // Jobs inherit this context; when SIGINT/SIGTERM fires the context is // cancelled and running jobs are notified to stop cleanly. if err := riverClient.Start(sigCtx); err != nil { slog.Error("river client start failed", "err", err) os.Exit(1) } slog.Info("worker ready") <-sigCtx.Done() slog.Info("shutting down") // Step 8: Graceful shutdown — give in-flight jobs up to 10 seconds to finish. stopCtx, stopCancel := context.WithTimeout(context.Background(), 10*time.Second) defer stopCancel() if err := riverClient.StopAndCancel(stopCtx); err != nil { slog.Warn("river stop did not finish cleanly", "err", err) } // Close pool AFTER StopAndCancel returns (PATTERNS.md pool close ordering). pool.Close() slog.Info("shutdown complete") }