From a1c2828dc4fbfecc7ad5d33104dd5433cb2ca453 Mon Sep 17 00:00:00 2001 From: Arthur Belleville Date: Fri, 15 May 2026 16:34:08 +0200 Subject: [PATCH] feat(06-01): implement internal/jobs package with workers and error handler - HeartbeatArgs + HeartbeatWorker (logs slog.Info on each tick) - OrphanCleanupArgs + OrphanCleanupWorker (S3 delete then DB delete loop) - NewOrphanCleanupWorker constructor with pool + FileStorer injection - SlogErrorHandler implementing river.ErrorHandler (HandleError + HandlePanic) - fileQuerier interface for test injection without real DB - Unit tests: 7 tests pass (pure mock-based, no DB required) - go build ./... exits 0 --- backend/internal/jobs/error_handler.go | 39 +++++++ backend/internal/jobs/error_handler_test.go | 35 ++++++ backend/internal/jobs/heartbeat.go | 26 +++++ backend/internal/jobs/heartbeat_test.go | 29 +++++ backend/internal/jobs/orphan_cleanup.go | 93 +++++++++++++++ backend/internal/jobs/orphan_cleanup_test.go | 115 +++++++++++++++++++ 6 files changed, 337 insertions(+) create mode 100644 backend/internal/jobs/error_handler.go create mode 100644 backend/internal/jobs/error_handler_test.go create mode 100644 backend/internal/jobs/heartbeat.go create mode 100644 backend/internal/jobs/heartbeat_test.go create mode 100644 backend/internal/jobs/orphan_cleanup.go create mode 100644 backend/internal/jobs/orphan_cleanup_test.go diff --git a/backend/internal/jobs/error_handler.go b/backend/internal/jobs/error_handler.go new file mode 100644 index 0000000..c684e57 --- /dev/null +++ b/backend/internal/jobs/error_handler.go @@ -0,0 +1,39 @@ +package jobs + +import ( + "context" + "log/slog" + + "github.com/riverqueue/river" + "github.com/riverqueue/river/rivertype" +) + +// SlogErrorHandler logs job errors and panics via the structured slog logger. +// Returning nil from HandleError and HandlePanic instructs river to follow its +// default retry schedule. +type SlogErrorHandler struct{} + +func (*SlogErrorHandler) HandleError( + ctx context.Context, job *rivertype.JobRow, err error, +) *river.ErrorHandlerResult { + slog.Error("job error", + "job_id", job.ID, + "job_kind", job.Kind, + "attempt", job.Attempt, + "max_attempts", job.MaxAttempts, + "err", err, + ) + return nil +} + +func (*SlogErrorHandler) HandlePanic( + ctx context.Context, job *rivertype.JobRow, panicVal any, trace string, +) *river.ErrorHandlerResult { + slog.Error("job panic", + "job_id", job.ID, + "job_kind", job.Kind, + "panic", panicVal, + "trace", trace, + ) + return nil +} diff --git a/backend/internal/jobs/error_handler_test.go b/backend/internal/jobs/error_handler_test.go new file mode 100644 index 0000000..62fb1e2 --- /dev/null +++ b/backend/internal/jobs/error_handler_test.go @@ -0,0 +1,35 @@ +package jobs + +import ( + "context" + "errors" + "testing" + + "github.com/riverqueue/river/rivertype" +) + +func TestSlogErrorHandler_HandleError(t *testing.T) { + h := &SlogErrorHandler{} + job := &rivertype.JobRow{ + ID: 1, + Kind: "test_job", + Attempt: 1, + MaxAttempts: 3, + } + result := h.HandleError(context.Background(), job, errors.New("test error")) + if result != nil { + t.Errorf("HandleError returned %v, want nil", result) + } +} + +func TestSlogErrorHandler_HandlePanic(t *testing.T) { + h := &SlogErrorHandler{} + job := &rivertype.JobRow{ + ID: 2, + Kind: "test_job", + } + result := h.HandlePanic(context.Background(), job, "panic value", "goroutine 1 ...") + if result != nil { + t.Errorf("HandlePanic returned %v, want nil", result) + } +} diff --git a/backend/internal/jobs/heartbeat.go b/backend/internal/jobs/heartbeat.go new file mode 100644 index 0000000..600a652 --- /dev/null +++ b/backend/internal/jobs/heartbeat.go @@ -0,0 +1,26 @@ +package jobs + +import ( + "context" + "log/slog" + + "github.com/riverqueue/river" +) + +// HeartbeatArgs carries no data — heartbeat is purely a proof-of-life tick. +type HeartbeatArgs struct{} + +func (HeartbeatArgs) Kind() string { return "heartbeat" } + +// HeartbeatWorker logs a heartbeat message on each execution. +type HeartbeatWorker struct { + river.WorkerDefaults[HeartbeatArgs] +} + +func (w *HeartbeatWorker) Work(ctx context.Context, job *river.Job[HeartbeatArgs]) error { + slog.Info("worker heartbeat", + "job_id", job.ID, + "attempt", job.Attempt, + ) + return nil +} diff --git a/backend/internal/jobs/heartbeat_test.go b/backend/internal/jobs/heartbeat_test.go new file mode 100644 index 0000000..2319d3d --- /dev/null +++ b/backend/internal/jobs/heartbeat_test.go @@ -0,0 +1,29 @@ +package jobs + +import ( + "context" + "testing" + + "github.com/riverqueue/river" + "github.com/riverqueue/river/rivertype" +) + +func TestHeartbeatWorker(t *testing.T) { + w := &HeartbeatWorker{} + job := &river.Job[HeartbeatArgs]{ + JobRow: &rivertype.JobRow{ + ID: 42, + Attempt: 1, + }, + Args: HeartbeatArgs{}, + } + if err := w.Work(context.Background(), job); err != nil { + t.Fatalf("HeartbeatWorker.Work returned unexpected error: %v", err) + } +} + +func TestHeartbeatArgs_Kind(t *testing.T) { + if got := (HeartbeatArgs{}).Kind(); got != "heartbeat" { + t.Errorf("HeartbeatArgs.Kind() = %q, want %q", got, "heartbeat") + } +} diff --git a/backend/internal/jobs/orphan_cleanup.go b/backend/internal/jobs/orphan_cleanup.go new file mode 100644 index 0000000..fbf328d --- /dev/null +++ b/backend/internal/jobs/orphan_cleanup.go @@ -0,0 +1,93 @@ +package jobs + +import ( + "context" + "fmt" + "log/slog" + + "backend/internal/db/sqlc" + "backend/internal/files" + + "github.com/jackc/pgx/v5/pgxpool" + "github.com/riverqueue/river" +) + +// OrphanCleanupArgs carries no data — the worker queries orphans dynamically. +type OrphanCleanupArgs struct{} + +func (OrphanCleanupArgs) Kind() string { return "orphan_file_cleanup" } + +// fileQuerier is the minimal DB interface used by OrphanCleanupWorker. +// sqlc.Queries satisfies this interface; tests inject a mock. +type fileQuerier interface { + ListOrphanFiles(ctx context.Context) ([]sqlc.ListOrphanFilesRow, error) + DeleteTabloFile(ctx context.Context, arg sqlc.DeleteTabloFileParams) error +} + +// OrphanCleanupWorker deletes S3 objects and their DB rows for tablo_files +// whose owning tablo no longer exists. +type OrphanCleanupWorker struct { + river.WorkerDefaults[OrphanCleanupArgs] + + pool *pgxpool.Pool + store files.FileStorer + querier fileQuerier // nil in production; set by NewOrphanCleanupWorker for testability +} + +// NewOrphanCleanupWorker constructs an OrphanCleanupWorker with the given pool +// and S3 store. The fileQuerier is wired from the pool automatically. +func NewOrphanCleanupWorker(pool *pgxpool.Pool, store files.FileStorer) *OrphanCleanupWorker { + return &OrphanCleanupWorker{ + pool: pool, + store: store, + querier: sqlc.New(pool), + } +} + +// Work iterates orphan tablo_files rows, deletes each S3 object first, then +// removes the DB row. Partial errors are logged and counted; the job itself +// always returns nil so river does not retry a partial run. +func (w *OrphanCleanupWorker) Work(ctx context.Context, job *river.Job[OrphanCleanupArgs]) error { + q := w.querier + if q == nil { + q = sqlc.New(w.pool) + } + + orphans, err := q.ListOrphanFiles(ctx) + if err != nil { + return fmt.Errorf("list orphan files: %w", err) + } + + var deleted, errCount int + for _, f := range orphans { + if err := w.store.Delete(ctx, f.S3Key); err != nil { + slog.Error("orphan cleanup: s3 delete failed", + "s3_key", f.S3Key, + "err", err, + ) + errCount++ + continue + } + + if err := q.DeleteTabloFile(ctx, sqlc.DeleteTabloFileParams{ + ID: f.ID, + TabloID: f.TabloID, + }); err != nil { + slog.Error("orphan cleanup: db delete failed", + "file_id", f.ID, + "err", err, + ) + errCount++ + continue + } + + deleted++ + } + + slog.Info("orphan cleanup complete", + "orphans_found", len(orphans), + "deleted", deleted, + "errors", errCount, + ) + return nil +} diff --git a/backend/internal/jobs/orphan_cleanup_test.go b/backend/internal/jobs/orphan_cleanup_test.go new file mode 100644 index 0000000..1d3bf6c --- /dev/null +++ b/backend/internal/jobs/orphan_cleanup_test.go @@ -0,0 +1,115 @@ +package jobs + +import ( + "context" + "errors" + "io" + "testing" + + "backend/internal/db/sqlc" + + "github.com/google/uuid" + "github.com/riverqueue/river" + "github.com/riverqueue/river/rivertype" +) + +// mockQuerier implements fileQuerier for unit tests. +type mockQuerier struct { + orphans []sqlc.ListOrphanFilesRow + listErr error + deleteFileCalls []sqlc.DeleteTabloFileParams + deleteFileErr error +} + +func (m *mockQuerier) ListOrphanFiles(_ context.Context) ([]sqlc.ListOrphanFilesRow, error) { + return m.orphans, m.listErr +} + +func (m *mockQuerier) DeleteTabloFile(_ context.Context, arg sqlc.DeleteTabloFileParams) error { + m.deleteFileCalls = append(m.deleteFileCalls, arg) + return m.deleteFileErr +} + +// mockFileStorer implements files.FileStorer for unit tests. +type mockFileStorer struct { + deletedKeys []string + deleteErr error +} + +func (m *mockFileStorer) Delete(_ context.Context, key string) error { + m.deletedKeys = append(m.deletedKeys, key) + return m.deleteErr +} + +func (m *mockFileStorer) Upload(_ context.Context, _ string, _ io.Reader) (string, int64, error) { + return "", 0, errors.New("not implemented") +} + +func (m *mockFileStorer) PresignDownload(_ context.Context, _ string) (string, error) { + return "", errors.New("not implemented") +} + +func makeWorkerJob() *river.Job[OrphanCleanupArgs] { + return &river.Job[OrphanCleanupArgs]{ + JobRow: &rivertype.JobRow{ID: 1, Attempt: 1}, + Args: OrphanCleanupArgs{}, + } +} + +func TestOrphanCleanupWorker_NoOrphans(t *testing.T) { + q := &mockQuerier{} + store := &mockFileStorer{} + w := &OrphanCleanupWorker{ + store: store, + querier: q, + } + + if err := w.Work(context.Background(), makeWorkerJob()); err != nil { + t.Fatalf("Work returned unexpected error: %v", err) + } + if len(store.deletedKeys) != 0 { + t.Errorf("expected 0 S3 deletes, got %d", len(store.deletedKeys)) + } + if len(q.deleteFileCalls) != 0 { + t.Errorf("expected 0 DB deletes, got %d", len(q.deleteFileCalls)) + } +} + +func TestOrphanCleanupWorker_DeletesOrphan(t *testing.T) { + fileID := uuid.New() + tabloID := uuid.New() + q := &mockQuerier{ + orphans: []sqlc.ListOrphanFilesRow{ + {ID: fileID, TabloID: tabloID, S3Key: "orphan-key"}, + }, + } + store := &mockFileStorer{} + w := &OrphanCleanupWorker{ + store: store, + querier: q, + } + + if err := w.Work(context.Background(), makeWorkerJob()); err != nil { + t.Fatalf("Work returned unexpected error: %v", err) + } + + // Assert S3 Delete was called with the correct key. + if len(store.deletedKeys) != 1 || store.deletedKeys[0] != "orphan-key" { + t.Errorf("expected S3 Delete called with %q, got %v", "orphan-key", store.deletedKeys) + } + + // Assert DB DeleteTabloFile was called with matching ID and TabloID. + if len(q.deleteFileCalls) != 1 { + t.Fatalf("expected 1 DB delete call, got %d", len(q.deleteFileCalls)) + } + got := q.deleteFileCalls[0] + if got.ID != fileID || got.TabloID != tabloID { + t.Errorf("DeleteTabloFile called with {%v, %v}, want {%v, %v}", got.ID, got.TabloID, fileID, tabloID) + } +} + +func TestOrphanCleanupArgs_Kind(t *testing.T) { + if got := (OrphanCleanupArgs{}).Kind(); got != "orphan_file_cleanup" { + t.Errorf("OrphanCleanupArgs.Kind() = %q, want %q", got, "orphan_file_cleanup") + } +}