feat(06-01): implement internal/jobs package with workers and error handler
- HeartbeatArgs + HeartbeatWorker (logs slog.Info on each tick) - OrphanCleanupArgs + OrphanCleanupWorker (S3 delete then DB delete loop) - NewOrphanCleanupWorker constructor with pool + FileStorer injection - SlogErrorHandler implementing river.ErrorHandler (HandleError + HandlePanic) - fileQuerier interface for test injection without real DB - Unit tests: 7 tests pass (pure mock-based, no DB required) - go build ./... exits 0
This commit is contained in:
parent
62e5e3eb60
commit
a1c2828dc4
6 changed files with 337 additions and 0 deletions
39
backend/internal/jobs/error_handler.go
Normal file
39
backend/internal/jobs/error_handler.go
Normal file
|
|
@ -0,0 +1,39 @@
|
||||||
|
package jobs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log/slog"
|
||||||
|
|
||||||
|
"github.com/riverqueue/river"
|
||||||
|
"github.com/riverqueue/river/rivertype"
|
||||||
|
)
|
||||||
|
|
||||||
|
// SlogErrorHandler logs job errors and panics via the structured slog logger.
|
||||||
|
// Returning nil from HandleError and HandlePanic instructs river to follow its
|
||||||
|
// default retry schedule.
|
||||||
|
type SlogErrorHandler struct{}
|
||||||
|
|
||||||
|
func (*SlogErrorHandler) HandleError(
|
||||||
|
ctx context.Context, job *rivertype.JobRow, err error,
|
||||||
|
) *river.ErrorHandlerResult {
|
||||||
|
slog.Error("job error",
|
||||||
|
"job_id", job.ID,
|
||||||
|
"job_kind", job.Kind,
|
||||||
|
"attempt", job.Attempt,
|
||||||
|
"max_attempts", job.MaxAttempts,
|
||||||
|
"err", err,
|
||||||
|
)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*SlogErrorHandler) HandlePanic(
|
||||||
|
ctx context.Context, job *rivertype.JobRow, panicVal any, trace string,
|
||||||
|
) *river.ErrorHandlerResult {
|
||||||
|
slog.Error("job panic",
|
||||||
|
"job_id", job.ID,
|
||||||
|
"job_kind", job.Kind,
|
||||||
|
"panic", panicVal,
|
||||||
|
"trace", trace,
|
||||||
|
)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
35
backend/internal/jobs/error_handler_test.go
Normal file
35
backend/internal/jobs/error_handler_test.go
Normal file
|
|
@ -0,0 +1,35 @@
|
||||||
|
package jobs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/riverqueue/river/rivertype"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSlogErrorHandler_HandleError(t *testing.T) {
|
||||||
|
h := &SlogErrorHandler{}
|
||||||
|
job := &rivertype.JobRow{
|
||||||
|
ID: 1,
|
||||||
|
Kind: "test_job",
|
||||||
|
Attempt: 1,
|
||||||
|
MaxAttempts: 3,
|
||||||
|
}
|
||||||
|
result := h.HandleError(context.Background(), job, errors.New("test error"))
|
||||||
|
if result != nil {
|
||||||
|
t.Errorf("HandleError returned %v, want nil", result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSlogErrorHandler_HandlePanic(t *testing.T) {
|
||||||
|
h := &SlogErrorHandler{}
|
||||||
|
job := &rivertype.JobRow{
|
||||||
|
ID: 2,
|
||||||
|
Kind: "test_job",
|
||||||
|
}
|
||||||
|
result := h.HandlePanic(context.Background(), job, "panic value", "goroutine 1 ...")
|
||||||
|
if result != nil {
|
||||||
|
t.Errorf("HandlePanic returned %v, want nil", result)
|
||||||
|
}
|
||||||
|
}
|
||||||
26
backend/internal/jobs/heartbeat.go
Normal file
26
backend/internal/jobs/heartbeat.go
Normal file
|
|
@ -0,0 +1,26 @@
|
||||||
|
package jobs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log/slog"
|
||||||
|
|
||||||
|
"github.com/riverqueue/river"
|
||||||
|
)
|
||||||
|
|
||||||
|
// HeartbeatArgs carries no data — heartbeat is purely a proof-of-life tick.
|
||||||
|
type HeartbeatArgs struct{}
|
||||||
|
|
||||||
|
func (HeartbeatArgs) Kind() string { return "heartbeat" }
|
||||||
|
|
||||||
|
// HeartbeatWorker logs a heartbeat message on each execution.
|
||||||
|
type HeartbeatWorker struct {
|
||||||
|
river.WorkerDefaults[HeartbeatArgs]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *HeartbeatWorker) Work(ctx context.Context, job *river.Job[HeartbeatArgs]) error {
|
||||||
|
slog.Info("worker heartbeat",
|
||||||
|
"job_id", job.ID,
|
||||||
|
"attempt", job.Attempt,
|
||||||
|
)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
29
backend/internal/jobs/heartbeat_test.go
Normal file
29
backend/internal/jobs/heartbeat_test.go
Normal file
|
|
@ -0,0 +1,29 @@
|
||||||
|
package jobs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/riverqueue/river"
|
||||||
|
"github.com/riverqueue/river/rivertype"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestHeartbeatWorker(t *testing.T) {
|
||||||
|
w := &HeartbeatWorker{}
|
||||||
|
job := &river.Job[HeartbeatArgs]{
|
||||||
|
JobRow: &rivertype.JobRow{
|
||||||
|
ID: 42,
|
||||||
|
Attempt: 1,
|
||||||
|
},
|
||||||
|
Args: HeartbeatArgs{},
|
||||||
|
}
|
||||||
|
if err := w.Work(context.Background(), job); err != nil {
|
||||||
|
t.Fatalf("HeartbeatWorker.Work returned unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHeartbeatArgs_Kind(t *testing.T) {
|
||||||
|
if got := (HeartbeatArgs{}).Kind(); got != "heartbeat" {
|
||||||
|
t.Errorf("HeartbeatArgs.Kind() = %q, want %q", got, "heartbeat")
|
||||||
|
}
|
||||||
|
}
|
||||||
93
backend/internal/jobs/orphan_cleanup.go
Normal file
93
backend/internal/jobs/orphan_cleanup.go
Normal file
|
|
@ -0,0 +1,93 @@
|
||||||
|
package jobs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
|
||||||
|
"backend/internal/db/sqlc"
|
||||||
|
"backend/internal/files"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
|
"github.com/riverqueue/river"
|
||||||
|
)
|
||||||
|
|
||||||
|
// OrphanCleanupArgs carries no data — the worker queries orphans dynamically.
|
||||||
|
type OrphanCleanupArgs struct{}
|
||||||
|
|
||||||
|
func (OrphanCleanupArgs) Kind() string { return "orphan_file_cleanup" }
|
||||||
|
|
||||||
|
// fileQuerier is the minimal DB interface used by OrphanCleanupWorker.
|
||||||
|
// sqlc.Queries satisfies this interface; tests inject a mock.
|
||||||
|
type fileQuerier interface {
|
||||||
|
ListOrphanFiles(ctx context.Context) ([]sqlc.ListOrphanFilesRow, error)
|
||||||
|
DeleteTabloFile(ctx context.Context, arg sqlc.DeleteTabloFileParams) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// OrphanCleanupWorker deletes S3 objects and their DB rows for tablo_files
|
||||||
|
// whose owning tablo no longer exists.
|
||||||
|
type OrphanCleanupWorker struct {
|
||||||
|
river.WorkerDefaults[OrphanCleanupArgs]
|
||||||
|
|
||||||
|
pool *pgxpool.Pool
|
||||||
|
store files.FileStorer
|
||||||
|
querier fileQuerier // nil in production; set by NewOrphanCleanupWorker for testability
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewOrphanCleanupWorker constructs an OrphanCleanupWorker with the given pool
|
||||||
|
// and S3 store. The fileQuerier is wired from the pool automatically.
|
||||||
|
func NewOrphanCleanupWorker(pool *pgxpool.Pool, store files.FileStorer) *OrphanCleanupWorker {
|
||||||
|
return &OrphanCleanupWorker{
|
||||||
|
pool: pool,
|
||||||
|
store: store,
|
||||||
|
querier: sqlc.New(pool),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Work iterates orphan tablo_files rows, deletes each S3 object first, then
|
||||||
|
// removes the DB row. Partial errors are logged and counted; the job itself
|
||||||
|
// always returns nil so river does not retry a partial run.
|
||||||
|
func (w *OrphanCleanupWorker) Work(ctx context.Context, job *river.Job[OrphanCleanupArgs]) error {
|
||||||
|
q := w.querier
|
||||||
|
if q == nil {
|
||||||
|
q = sqlc.New(w.pool)
|
||||||
|
}
|
||||||
|
|
||||||
|
orphans, err := q.ListOrphanFiles(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("list orphan files: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var deleted, errCount int
|
||||||
|
for _, f := range orphans {
|
||||||
|
if err := w.store.Delete(ctx, f.S3Key); err != nil {
|
||||||
|
slog.Error("orphan cleanup: s3 delete failed",
|
||||||
|
"s3_key", f.S3Key,
|
||||||
|
"err", err,
|
||||||
|
)
|
||||||
|
errCount++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := q.DeleteTabloFile(ctx, sqlc.DeleteTabloFileParams{
|
||||||
|
ID: f.ID,
|
||||||
|
TabloID: f.TabloID,
|
||||||
|
}); err != nil {
|
||||||
|
slog.Error("orphan cleanup: db delete failed",
|
||||||
|
"file_id", f.ID,
|
||||||
|
"err", err,
|
||||||
|
)
|
||||||
|
errCount++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
deleted++
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Info("orphan cleanup complete",
|
||||||
|
"orphans_found", len(orphans),
|
||||||
|
"deleted", deleted,
|
||||||
|
"errors", errCount,
|
||||||
|
)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
115
backend/internal/jobs/orphan_cleanup_test.go
Normal file
115
backend/internal/jobs/orphan_cleanup_test.go
Normal file
|
|
@ -0,0 +1,115 @@
|
||||||
|
package jobs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"backend/internal/db/sqlc"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/riverqueue/river"
|
||||||
|
"github.com/riverqueue/river/rivertype"
|
||||||
|
)
|
||||||
|
|
||||||
|
// mockQuerier implements fileQuerier for unit tests.
|
||||||
|
type mockQuerier struct {
|
||||||
|
orphans []sqlc.ListOrphanFilesRow
|
||||||
|
listErr error
|
||||||
|
deleteFileCalls []sqlc.DeleteTabloFileParams
|
||||||
|
deleteFileErr error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockQuerier) ListOrphanFiles(_ context.Context) ([]sqlc.ListOrphanFilesRow, error) {
|
||||||
|
return m.orphans, m.listErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockQuerier) DeleteTabloFile(_ context.Context, arg sqlc.DeleteTabloFileParams) error {
|
||||||
|
m.deleteFileCalls = append(m.deleteFileCalls, arg)
|
||||||
|
return m.deleteFileErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// mockFileStorer implements files.FileStorer for unit tests.
|
||||||
|
type mockFileStorer struct {
|
||||||
|
deletedKeys []string
|
||||||
|
deleteErr error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockFileStorer) Delete(_ context.Context, key string) error {
|
||||||
|
m.deletedKeys = append(m.deletedKeys, key)
|
||||||
|
return m.deleteErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockFileStorer) Upload(_ context.Context, _ string, _ io.Reader) (string, int64, error) {
|
||||||
|
return "", 0, errors.New("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockFileStorer) PresignDownload(_ context.Context, _ string) (string, error) {
|
||||||
|
return "", errors.New("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeWorkerJob() *river.Job[OrphanCleanupArgs] {
|
||||||
|
return &river.Job[OrphanCleanupArgs]{
|
||||||
|
JobRow: &rivertype.JobRow{ID: 1, Attempt: 1},
|
||||||
|
Args: OrphanCleanupArgs{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOrphanCleanupWorker_NoOrphans(t *testing.T) {
|
||||||
|
q := &mockQuerier{}
|
||||||
|
store := &mockFileStorer{}
|
||||||
|
w := &OrphanCleanupWorker{
|
||||||
|
store: store,
|
||||||
|
querier: q,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := w.Work(context.Background(), makeWorkerJob()); err != nil {
|
||||||
|
t.Fatalf("Work returned unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if len(store.deletedKeys) != 0 {
|
||||||
|
t.Errorf("expected 0 S3 deletes, got %d", len(store.deletedKeys))
|
||||||
|
}
|
||||||
|
if len(q.deleteFileCalls) != 0 {
|
||||||
|
t.Errorf("expected 0 DB deletes, got %d", len(q.deleteFileCalls))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOrphanCleanupWorker_DeletesOrphan(t *testing.T) {
|
||||||
|
fileID := uuid.New()
|
||||||
|
tabloID := uuid.New()
|
||||||
|
q := &mockQuerier{
|
||||||
|
orphans: []sqlc.ListOrphanFilesRow{
|
||||||
|
{ID: fileID, TabloID: tabloID, S3Key: "orphan-key"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
store := &mockFileStorer{}
|
||||||
|
w := &OrphanCleanupWorker{
|
||||||
|
store: store,
|
||||||
|
querier: q,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := w.Work(context.Background(), makeWorkerJob()); err != nil {
|
||||||
|
t.Fatalf("Work returned unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assert S3 Delete was called with the correct key.
|
||||||
|
if len(store.deletedKeys) != 1 || store.deletedKeys[0] != "orphan-key" {
|
||||||
|
t.Errorf("expected S3 Delete called with %q, got %v", "orphan-key", store.deletedKeys)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assert DB DeleteTabloFile was called with matching ID and TabloID.
|
||||||
|
if len(q.deleteFileCalls) != 1 {
|
||||||
|
t.Fatalf("expected 1 DB delete call, got %d", len(q.deleteFileCalls))
|
||||||
|
}
|
||||||
|
got := q.deleteFileCalls[0]
|
||||||
|
if got.ID != fileID || got.TabloID != tabloID {
|
||||||
|
t.Errorf("DeleteTabloFile called with {%v, %v}, want {%v, %v}", got.ID, got.TabloID, fileID, tabloID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOrphanCleanupArgs_Kind(t *testing.T) {
|
||||||
|
if got := (OrphanCleanupArgs{}).Kind(); got != "orphan_file_cleanup" {
|
||||||
|
t.Errorf("OrphanCleanupArgs.Kind() = %q, want %q", got, "orphan_file_cleanup")
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue