- rivermigrate at startup (idempotent, before client construction) - S3 store init from env vars (S3_ENDPOINT/S3_BUCKET/S3_ACCESS_KEY/S3_SECRET_KEY/S3_REGION/S3_USE_PATH_STYLE) - signal.NotifyContext created AFTER all startup I/O (PATTERNS.md critical ordering) - HeartbeatWorker + OrphanCleanupWorker registered via river.AddWorker - river.Client with slog.Default() Logger, SlogErrorHandler, MaxWorkers:10 - HeartbeatArgs periodic every 1 min (RunOnStart:true), OrphanCleanupArgs every 1 hr - StopAndCancel(10s timeout) on shutdown; pool.Close after StopAndCancel
145 lines
4.6 KiB
Go
145 lines
4.6 KiB
Go
// Command worker is the background job processor for Xtablo. It runs river
|
|
// periodic jobs against the same Postgres as cmd/web.
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"time"
|
|
|
|
"backend/internal/db"
|
|
"backend/internal/files"
|
|
"backend/internal/jobs"
|
|
"backend/internal/web"
|
|
|
|
"github.com/riverqueue/river"
|
|
"github.com/riverqueue/river/riverdriver/riverpgxv5"
|
|
"github.com/riverqueue/river/rivermigrate"
|
|
)
|
|
|
|
func main() {
|
|
env := os.Getenv("ENV")
|
|
if env == "" {
|
|
env = "development"
|
|
}
|
|
dsn := os.Getenv("DATABASE_URL")
|
|
|
|
// Logger first so even fatal-on-missing-DSN paths produce structured output.
|
|
slog.SetDefault(slog.New(web.NewSlogHandler(env, os.Stdout)))
|
|
|
|
if dsn == "" {
|
|
slog.Error("DATABASE_URL is required but unset")
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Use a plain background context for all startup I/O (pool connect,
|
|
// rivermigrate, S3 init). The signal context is created AFTER startup
|
|
// completes — river.Client.Start must receive a live (non-cancelled) context
|
|
// (RESEARCH Pitfall 2 + PATTERNS.md critical ordering constraint).
|
|
ctx := context.Background()
|
|
|
|
// Step 1: Connect DB pool.
|
|
// Note: pool.Close() is called explicitly at the end of the shutdown
|
|
// sequence (after StopAndCancel) — not via defer — to preserve ordering
|
|
// per PATTERNS.md pool close ordering.
|
|
pool, err := db.NewPool(ctx, dsn)
|
|
if err != nil {
|
|
slog.Error("db connect failed", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Step 2: Run river migrations before constructing the client (D-02).
|
|
// rivermigrate is idempotent — already-applied versions are skipped.
|
|
migrator, err := rivermigrate.New(riverpgxv5.New(pool), nil)
|
|
if err != nil {
|
|
slog.Error("rivermigrate init failed", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, nil)
|
|
if err != nil {
|
|
slog.Error("river migration failed", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
for _, v := range res.Versions {
|
|
slog.Info("river migration applied", "version", v.Version)
|
|
}
|
|
|
|
// Step 3: Init S3 store for the orphan-file cleanup job.
|
|
s3Endpoint := os.Getenv("S3_ENDPOINT")
|
|
s3Bucket := os.Getenv("S3_BUCKET")
|
|
s3AccessKey := os.Getenv("S3_ACCESS_KEY")
|
|
s3SecretKey := os.Getenv("S3_SECRET_KEY")
|
|
s3Region := os.Getenv("S3_REGION")
|
|
if s3Region == "" {
|
|
s3Region = "us-east-1"
|
|
}
|
|
s3UsePathStyle := os.Getenv("S3_USE_PATH_STYLE") == "true"
|
|
|
|
fileStore, err := files.NewStore(ctx, s3Endpoint, s3Bucket, s3Region, s3AccessKey, s3SecretKey, s3UsePathStyle)
|
|
if err != nil {
|
|
slog.Error("file store init failed", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Step 4: Construct signal context AFTER all startup I/O (critical ordering).
|
|
sigCtx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
|
defer stop()
|
|
|
|
// Step 5: Register workers (D-05).
|
|
workers := river.NewWorkers()
|
|
river.AddWorker(workers, &jobs.HeartbeatWorker{})
|
|
river.AddWorker(workers, jobs.NewOrphanCleanupWorker(pool, fileStore))
|
|
|
|
// Step 6: Construct river client with periodic jobs (D-03, D-07).
|
|
riverClient, err := river.NewClient(riverpgxv5.New(pool), &river.Config{
|
|
Logger: slog.Default(),
|
|
Workers: workers,
|
|
Queues: map[string]river.QueueConfig{river.QueueDefault: {MaxWorkers: 10}},
|
|
ErrorHandler: &jobs.SlogErrorHandler{},
|
|
PeriodicJobs: []*river.PeriodicJob{
|
|
// Heartbeat: every 1 minute, fires immediately on start (RunOnStart: true).
|
|
river.NewPeriodicJob(
|
|
river.PeriodicInterval(1*time.Minute),
|
|
func() (river.JobArgs, *river.InsertOpts) { return jobs.HeartbeatArgs{}, nil },
|
|
&river.PeriodicJobOpts{RunOnStart: true},
|
|
),
|
|
// Orphan cleanup: every 1 hour, does NOT fire on start.
|
|
river.NewPeriodicJob(
|
|
river.PeriodicInterval(1*time.Hour),
|
|
func() (river.JobArgs, *river.InsertOpts) { return jobs.OrphanCleanupArgs{}, nil },
|
|
nil,
|
|
),
|
|
},
|
|
})
|
|
if err != nil {
|
|
slog.Error("river client init failed", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Step 7: Start the river client with the signal context.
|
|
// Jobs inherit this context; when SIGINT/SIGTERM fires the context is
|
|
// cancelled and running jobs are notified to stop cleanly.
|
|
if err := riverClient.Start(sigCtx); err != nil {
|
|
slog.Error("river client start failed", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
slog.Info("worker ready")
|
|
|
|
<-sigCtx.Done()
|
|
slog.Info("shutting down")
|
|
|
|
// Step 8: Graceful shutdown — give in-flight jobs up to 10 seconds to finish.
|
|
stopCtx, stopCancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer stopCancel()
|
|
if err := riverClient.StopAndCancel(stopCtx); err != nil {
|
|
slog.Warn("river stop did not finish cleanly", "err", err)
|
|
}
|
|
|
|
// Close pool AFTER StopAndCancel returns (PATTERNS.md pool close ordering).
|
|
pool.Close()
|
|
slog.Info("shutdown complete")
|
|
}
|