feat(06-02): replace cmd/worker skeleton with full river wiring
- rivermigrate at startup (idempotent, before client construction) - S3 store init from env vars (S3_ENDPOINT/S3_BUCKET/S3_ACCESS_KEY/S3_SECRET_KEY/S3_REGION/S3_USE_PATH_STYLE) - signal.NotifyContext created AFTER all startup I/O (PATTERNS.md critical ordering) - HeartbeatWorker + OrphanCleanupWorker registered via river.AddWorker - river.Client with slog.Default() Logger, SlogErrorHandler, MaxWorkers:10 - HeartbeatArgs periodic every 1 min (RunOnStart:true), OrphanCleanupArgs every 1 hr - StopAndCancel(10s timeout) on shutdown; pool.Close after StopAndCancel
This commit is contained in:
parent
94ac095dee
commit
6e70478417
1 changed files with 105 additions and 8 deletions
|
|
@ -1,7 +1,5 @@
|
||||||
// Command worker is the Phase 1 worker skeleton (CONTEXT D-03). It boots,
|
// Command worker is the background job processor for Xtablo. It runs river
|
||||||
// opens a pgxpool, logs "worker ready", and blocks on SIGINT/SIGTERM until
|
// periodic jobs against the same Postgres as cmd/web.
|
||||||
// shutdown. Phase 6 replaces this file with the real job runtime — keep it
|
|
||||||
// minimal until then.
|
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
|
@ -10,9 +8,16 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
"backend/internal/db"
|
"backend/internal/db"
|
||||||
|
"backend/internal/files"
|
||||||
|
"backend/internal/jobs"
|
||||||
"backend/internal/web"
|
"backend/internal/web"
|
||||||
|
|
||||||
|
"github.com/riverqueue/river"
|
||||||
|
"github.com/riverqueue/river/riverdriver/riverpgxv5"
|
||||||
|
"github.com/riverqueue/river/rivermigrate"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
|
@ -22,6 +27,7 @@ func main() {
|
||||||
}
|
}
|
||||||
dsn := os.Getenv("DATABASE_URL")
|
dsn := os.Getenv("DATABASE_URL")
|
||||||
|
|
||||||
|
// Logger first so even fatal-on-missing-DSN paths produce structured output.
|
||||||
slog.SetDefault(slog.New(web.NewSlogHandler(env, os.Stdout)))
|
slog.SetDefault(slog.New(web.NewSlogHandler(env, os.Stdout)))
|
||||||
|
|
||||||
if dsn == "" {
|
if dsn == "" {
|
||||||
|
|
@ -29,20 +35,111 @@ func main() {
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
// Use a plain background context for all startup I/O (pool connect,
|
||||||
defer stop()
|
// rivermigrate, S3 init). The signal context is created AFTER startup
|
||||||
|
// completes — river.Client.Start must receive a live (non-cancelled) context
|
||||||
|
// (RESEARCH Pitfall 2 + PATTERNS.md critical ordering constraint).
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// Step 1: Connect DB pool.
|
||||||
|
// Note: pool.Close() is called explicitly at the end of the shutdown
|
||||||
|
// sequence (after StopAndCancel) — not via defer — to preserve ordering
|
||||||
|
// per PATTERNS.md pool close ordering.
|
||||||
pool, err := db.NewPool(ctx, dsn)
|
pool, err := db.NewPool(ctx, dsn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("db connect failed", "err", err)
|
slog.Error("db connect failed", "err", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load-bearing signal per D-03 — verification scripts grep for this.
|
// Step 2: Run river migrations before constructing the client (D-02).
|
||||||
|
// rivermigrate is idempotent — already-applied versions are skipped.
|
||||||
|
migrator, err := rivermigrate.New(riverpgxv5.New(pool), nil)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("rivermigrate init failed", "err", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, nil)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("river migration failed", "err", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
for _, v := range res.Versions {
|
||||||
|
slog.Info("river migration applied", "version", v.Version)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 3: Init S3 store for the orphan-file cleanup job.
|
||||||
|
s3Endpoint := os.Getenv("S3_ENDPOINT")
|
||||||
|
s3Bucket := os.Getenv("S3_BUCKET")
|
||||||
|
s3AccessKey := os.Getenv("S3_ACCESS_KEY")
|
||||||
|
s3SecretKey := os.Getenv("S3_SECRET_KEY")
|
||||||
|
s3Region := os.Getenv("S3_REGION")
|
||||||
|
if s3Region == "" {
|
||||||
|
s3Region = "us-east-1"
|
||||||
|
}
|
||||||
|
s3UsePathStyle := os.Getenv("S3_USE_PATH_STYLE") == "true"
|
||||||
|
|
||||||
|
fileStore, err := files.NewStore(ctx, s3Endpoint, s3Bucket, s3Region, s3AccessKey, s3SecretKey, s3UsePathStyle)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("file store init failed", "err", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 4: Construct signal context AFTER all startup I/O (critical ordering).
|
||||||
|
sigCtx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
defer stop()
|
||||||
|
|
||||||
|
// Step 5: Register workers (D-05).
|
||||||
|
workers := river.NewWorkers()
|
||||||
|
river.AddWorker(workers, &jobs.HeartbeatWorker{})
|
||||||
|
river.AddWorker(workers, jobs.NewOrphanCleanupWorker(pool, fileStore))
|
||||||
|
|
||||||
|
// Step 6: Construct river client with periodic jobs (D-03, D-07).
|
||||||
|
riverClient, err := river.NewClient(riverpgxv5.New(pool), &river.Config{
|
||||||
|
Logger: slog.Default(),
|
||||||
|
Workers: workers,
|
||||||
|
Queues: map[string]river.QueueConfig{river.QueueDefault: {MaxWorkers: 10}},
|
||||||
|
ErrorHandler: &jobs.SlogErrorHandler{},
|
||||||
|
PeriodicJobs: []*river.PeriodicJob{
|
||||||
|
// Heartbeat: every 1 minute, fires immediately on start (RunOnStart: true).
|
||||||
|
river.NewPeriodicJob(
|
||||||
|
river.PeriodicInterval(1*time.Minute),
|
||||||
|
func() (river.JobArgs, *river.InsertOpts) { return jobs.HeartbeatArgs{}, nil },
|
||||||
|
&river.PeriodicJobOpts{RunOnStart: true},
|
||||||
|
),
|
||||||
|
// Orphan cleanup: every 1 hour, does NOT fire on start.
|
||||||
|
river.NewPeriodicJob(
|
||||||
|
river.PeriodicInterval(1*time.Hour),
|
||||||
|
func() (river.JobArgs, *river.InsertOpts) { return jobs.OrphanCleanupArgs{}, nil },
|
||||||
|
nil,
|
||||||
|
),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("river client init failed", "err", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 7: Start the river client with the signal context.
|
||||||
|
// Jobs inherit this context; when SIGINT/SIGTERM fires the context is
|
||||||
|
// cancelled and running jobs are notified to stop cleanly.
|
||||||
|
if err := riverClient.Start(sigCtx); err != nil {
|
||||||
|
slog.Error("river client start failed", "err", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
slog.Info("worker ready")
|
slog.Info("worker ready")
|
||||||
|
|
||||||
<-ctx.Done()
|
<-sigCtx.Done()
|
||||||
slog.Info("shutting down")
|
slog.Info("shutting down")
|
||||||
|
|
||||||
|
// Step 8: Graceful shutdown — give in-flight jobs up to 10 seconds to finish.
|
||||||
|
stopCtx, stopCancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
defer stopCancel()
|
||||||
|
if err := riverClient.StopAndCancel(stopCtx); err != nil {
|
||||||
|
slog.Warn("river stop did not finish cleanly", "err", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close pool AFTER StopAndCancel returns (PATTERNS.md pool close ordering).
|
||||||
pool.Close()
|
pool.Close()
|
||||||
slog.Info("shutdown complete")
|
slog.Info("shutdown complete")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue