feat(06-01): implement internal/jobs package with workers and error handler

- HeartbeatArgs + HeartbeatWorker (logs slog.Info on each tick)
- OrphanCleanupArgs + OrphanCleanupWorker (S3 delete then DB delete loop)
- NewOrphanCleanupWorker constructor with pool + FileStorer injection
- SlogErrorHandler implementing river.ErrorHandler (HandleError + HandlePanic)
- fileQuerier interface for test injection without real DB
- Unit tests: 7 tests pass (pure mock-based, no DB required)
- go build ./... exits 0
This commit is contained in:
Arthur Belleville 2026-05-15 16:34:08 +02:00
parent 62e5e3eb60
commit a1c2828dc4
No known key found for this signature in database
6 changed files with 337 additions and 0 deletions

View file

@ -0,0 +1,39 @@
package jobs
import (
"context"
"log/slog"
"github.com/riverqueue/river"
"github.com/riverqueue/river/rivertype"
)
// SlogErrorHandler logs job errors and panics via the structured slog logger.
// Returning nil from HandleError and HandlePanic instructs river to follow its
// default retry schedule.
type SlogErrorHandler struct{}
func (*SlogErrorHandler) HandleError(
ctx context.Context, job *rivertype.JobRow, err error,
) *river.ErrorHandlerResult {
slog.Error("job error",
"job_id", job.ID,
"job_kind", job.Kind,
"attempt", job.Attempt,
"max_attempts", job.MaxAttempts,
"err", err,
)
return nil
}
func (*SlogErrorHandler) HandlePanic(
ctx context.Context, job *rivertype.JobRow, panicVal any, trace string,
) *river.ErrorHandlerResult {
slog.Error("job panic",
"job_id", job.ID,
"job_kind", job.Kind,
"panic", panicVal,
"trace", trace,
)
return nil
}

View file

@ -0,0 +1,35 @@
package jobs
import (
"context"
"errors"
"testing"
"github.com/riverqueue/river/rivertype"
)
func TestSlogErrorHandler_HandleError(t *testing.T) {
h := &SlogErrorHandler{}
job := &rivertype.JobRow{
ID: 1,
Kind: "test_job",
Attempt: 1,
MaxAttempts: 3,
}
result := h.HandleError(context.Background(), job, errors.New("test error"))
if result != nil {
t.Errorf("HandleError returned %v, want nil", result)
}
}
func TestSlogErrorHandler_HandlePanic(t *testing.T) {
h := &SlogErrorHandler{}
job := &rivertype.JobRow{
ID: 2,
Kind: "test_job",
}
result := h.HandlePanic(context.Background(), job, "panic value", "goroutine 1 ...")
if result != nil {
t.Errorf("HandlePanic returned %v, want nil", result)
}
}

View file

@ -0,0 +1,26 @@
package jobs
import (
"context"
"log/slog"
"github.com/riverqueue/river"
)
// HeartbeatArgs carries no data — heartbeat is purely a proof-of-life tick.
type HeartbeatArgs struct{}
func (HeartbeatArgs) Kind() string { return "heartbeat" }
// HeartbeatWorker logs a heartbeat message on each execution.
type HeartbeatWorker struct {
river.WorkerDefaults[HeartbeatArgs]
}
func (w *HeartbeatWorker) Work(ctx context.Context, job *river.Job[HeartbeatArgs]) error {
slog.Info("worker heartbeat",
"job_id", job.ID,
"attempt", job.Attempt,
)
return nil
}

View file

@ -0,0 +1,29 @@
package jobs
import (
"context"
"testing"
"github.com/riverqueue/river"
"github.com/riverqueue/river/rivertype"
)
func TestHeartbeatWorker(t *testing.T) {
w := &HeartbeatWorker{}
job := &river.Job[HeartbeatArgs]{
JobRow: &rivertype.JobRow{
ID: 42,
Attempt: 1,
},
Args: HeartbeatArgs{},
}
if err := w.Work(context.Background(), job); err != nil {
t.Fatalf("HeartbeatWorker.Work returned unexpected error: %v", err)
}
}
func TestHeartbeatArgs_Kind(t *testing.T) {
if got := (HeartbeatArgs{}).Kind(); got != "heartbeat" {
t.Errorf("HeartbeatArgs.Kind() = %q, want %q", got, "heartbeat")
}
}

View file

@ -0,0 +1,93 @@
package jobs
import (
"context"
"fmt"
"log/slog"
"backend/internal/db/sqlc"
"backend/internal/files"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
)
// OrphanCleanupArgs carries no data — the worker queries orphans dynamically.
type OrphanCleanupArgs struct{}
func (OrphanCleanupArgs) Kind() string { return "orphan_file_cleanup" }
// fileQuerier is the minimal DB interface used by OrphanCleanupWorker.
// sqlc.Queries satisfies this interface; tests inject a mock.
type fileQuerier interface {
ListOrphanFiles(ctx context.Context) ([]sqlc.ListOrphanFilesRow, error)
DeleteTabloFile(ctx context.Context, arg sqlc.DeleteTabloFileParams) error
}
// OrphanCleanupWorker deletes S3 objects and their DB rows for tablo_files
// whose owning tablo no longer exists.
type OrphanCleanupWorker struct {
river.WorkerDefaults[OrphanCleanupArgs]
pool *pgxpool.Pool
store files.FileStorer
querier fileQuerier // nil in production; set by NewOrphanCleanupWorker for testability
}
// NewOrphanCleanupWorker constructs an OrphanCleanupWorker with the given pool
// and S3 store. The fileQuerier is wired from the pool automatically.
func NewOrphanCleanupWorker(pool *pgxpool.Pool, store files.FileStorer) *OrphanCleanupWorker {
return &OrphanCleanupWorker{
pool: pool,
store: store,
querier: sqlc.New(pool),
}
}
// Work iterates orphan tablo_files rows, deletes each S3 object first, then
// removes the DB row. Partial errors are logged and counted; the job itself
// always returns nil so river does not retry a partial run.
func (w *OrphanCleanupWorker) Work(ctx context.Context, job *river.Job[OrphanCleanupArgs]) error {
q := w.querier
if q == nil {
q = sqlc.New(w.pool)
}
orphans, err := q.ListOrphanFiles(ctx)
if err != nil {
return fmt.Errorf("list orphan files: %w", err)
}
var deleted, errCount int
for _, f := range orphans {
if err := w.store.Delete(ctx, f.S3Key); err != nil {
slog.Error("orphan cleanup: s3 delete failed",
"s3_key", f.S3Key,
"err", err,
)
errCount++
continue
}
if err := q.DeleteTabloFile(ctx, sqlc.DeleteTabloFileParams{
ID: f.ID,
TabloID: f.TabloID,
}); err != nil {
slog.Error("orphan cleanup: db delete failed",
"file_id", f.ID,
"err", err,
)
errCount++
continue
}
deleted++
}
slog.Info("orphan cleanup complete",
"orphans_found", len(orphans),
"deleted", deleted,
"errors", errCount,
)
return nil
}

View file

@ -0,0 +1,115 @@
package jobs
import (
"context"
"errors"
"io"
"testing"
"backend/internal/db/sqlc"
"github.com/google/uuid"
"github.com/riverqueue/river"
"github.com/riverqueue/river/rivertype"
)
// mockQuerier implements fileQuerier for unit tests.
type mockQuerier struct {
orphans []sqlc.ListOrphanFilesRow
listErr error
deleteFileCalls []sqlc.DeleteTabloFileParams
deleteFileErr error
}
func (m *mockQuerier) ListOrphanFiles(_ context.Context) ([]sqlc.ListOrphanFilesRow, error) {
return m.orphans, m.listErr
}
func (m *mockQuerier) DeleteTabloFile(_ context.Context, arg sqlc.DeleteTabloFileParams) error {
m.deleteFileCalls = append(m.deleteFileCalls, arg)
return m.deleteFileErr
}
// mockFileStorer implements files.FileStorer for unit tests.
type mockFileStorer struct {
deletedKeys []string
deleteErr error
}
func (m *mockFileStorer) Delete(_ context.Context, key string) error {
m.deletedKeys = append(m.deletedKeys, key)
return m.deleteErr
}
func (m *mockFileStorer) Upload(_ context.Context, _ string, _ io.Reader) (string, int64, error) {
return "", 0, errors.New("not implemented")
}
func (m *mockFileStorer) PresignDownload(_ context.Context, _ string) (string, error) {
return "", errors.New("not implemented")
}
func makeWorkerJob() *river.Job[OrphanCleanupArgs] {
return &river.Job[OrphanCleanupArgs]{
JobRow: &rivertype.JobRow{ID: 1, Attempt: 1},
Args: OrphanCleanupArgs{},
}
}
func TestOrphanCleanupWorker_NoOrphans(t *testing.T) {
q := &mockQuerier{}
store := &mockFileStorer{}
w := &OrphanCleanupWorker{
store: store,
querier: q,
}
if err := w.Work(context.Background(), makeWorkerJob()); err != nil {
t.Fatalf("Work returned unexpected error: %v", err)
}
if len(store.deletedKeys) != 0 {
t.Errorf("expected 0 S3 deletes, got %d", len(store.deletedKeys))
}
if len(q.deleteFileCalls) != 0 {
t.Errorf("expected 0 DB deletes, got %d", len(q.deleteFileCalls))
}
}
func TestOrphanCleanupWorker_DeletesOrphan(t *testing.T) {
fileID := uuid.New()
tabloID := uuid.New()
q := &mockQuerier{
orphans: []sqlc.ListOrphanFilesRow{
{ID: fileID, TabloID: tabloID, S3Key: "orphan-key"},
},
}
store := &mockFileStorer{}
w := &OrphanCleanupWorker{
store: store,
querier: q,
}
if err := w.Work(context.Background(), makeWorkerJob()); err != nil {
t.Fatalf("Work returned unexpected error: %v", err)
}
// Assert S3 Delete was called with the correct key.
if len(store.deletedKeys) != 1 || store.deletedKeys[0] != "orphan-key" {
t.Errorf("expected S3 Delete called with %q, got %v", "orphan-key", store.deletedKeys)
}
// Assert DB DeleteTabloFile was called with matching ID and TabloID.
if len(q.deleteFileCalls) != 1 {
t.Fatalf("expected 1 DB delete call, got %d", len(q.deleteFileCalls))
}
got := q.deleteFileCalls[0]
if got.ID != fileID || got.TabloID != tabloID {
t.Errorf("DeleteTabloFile called with {%v, %v}, want {%v, %v}", got.ID, got.TabloID, fileID, tabloID)
}
}
func TestOrphanCleanupArgs_Kind(t *testing.T) {
if got := (OrphanCleanupArgs{}).Kind(); got != "orphan_file_cleanup" {
t.Errorf("OrphanCleanupArgs.Kind() = %q, want %q", got, "orphan_file_cleanup")
}
}