feat(06-01): implement internal/jobs package with workers and error handler
- HeartbeatArgs + HeartbeatWorker (logs slog.Info on each tick) - OrphanCleanupArgs + OrphanCleanupWorker (S3 delete then DB delete loop) - NewOrphanCleanupWorker constructor with pool + FileStorer injection - SlogErrorHandler implementing river.ErrorHandler (HandleError + HandlePanic) - fileQuerier interface for test injection without real DB - Unit tests: 7 tests pass (pure mock-based, no DB required) - go build ./... exits 0
This commit is contained in:
parent
62e5e3eb60
commit
a1c2828dc4
6 changed files with 337 additions and 0 deletions
39
backend/internal/jobs/error_handler.go
Normal file
39
backend/internal/jobs/error_handler.go
Normal file
|
|
@ -0,0 +1,39 @@
|
|||
package jobs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
|
||||
"github.com/riverqueue/river"
|
||||
"github.com/riverqueue/river/rivertype"
|
||||
)
|
||||
|
||||
// SlogErrorHandler logs job errors and panics via the structured slog logger.
|
||||
// Returning nil from HandleError and HandlePanic instructs river to follow its
|
||||
// default retry schedule.
|
||||
type SlogErrorHandler struct{}
|
||||
|
||||
func (*SlogErrorHandler) HandleError(
|
||||
ctx context.Context, job *rivertype.JobRow, err error,
|
||||
) *river.ErrorHandlerResult {
|
||||
slog.Error("job error",
|
||||
"job_id", job.ID,
|
||||
"job_kind", job.Kind,
|
||||
"attempt", job.Attempt,
|
||||
"max_attempts", job.MaxAttempts,
|
||||
"err", err,
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*SlogErrorHandler) HandlePanic(
|
||||
ctx context.Context, job *rivertype.JobRow, panicVal any, trace string,
|
||||
) *river.ErrorHandlerResult {
|
||||
slog.Error("job panic",
|
||||
"job_id", job.ID,
|
||||
"job_kind", job.Kind,
|
||||
"panic", panicVal,
|
||||
"trace", trace,
|
||||
)
|
||||
return nil
|
||||
}
|
||||
35
backend/internal/jobs/error_handler_test.go
Normal file
35
backend/internal/jobs/error_handler_test.go
Normal file
|
|
@ -0,0 +1,35 @@
|
|||
package jobs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/riverqueue/river/rivertype"
|
||||
)
|
||||
|
||||
func TestSlogErrorHandler_HandleError(t *testing.T) {
|
||||
h := &SlogErrorHandler{}
|
||||
job := &rivertype.JobRow{
|
||||
ID: 1,
|
||||
Kind: "test_job",
|
||||
Attempt: 1,
|
||||
MaxAttempts: 3,
|
||||
}
|
||||
result := h.HandleError(context.Background(), job, errors.New("test error"))
|
||||
if result != nil {
|
||||
t.Errorf("HandleError returned %v, want nil", result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSlogErrorHandler_HandlePanic(t *testing.T) {
|
||||
h := &SlogErrorHandler{}
|
||||
job := &rivertype.JobRow{
|
||||
ID: 2,
|
||||
Kind: "test_job",
|
||||
}
|
||||
result := h.HandlePanic(context.Background(), job, "panic value", "goroutine 1 ...")
|
||||
if result != nil {
|
||||
t.Errorf("HandlePanic returned %v, want nil", result)
|
||||
}
|
||||
}
|
||||
26
backend/internal/jobs/heartbeat.go
Normal file
26
backend/internal/jobs/heartbeat.go
Normal file
|
|
@ -0,0 +1,26 @@
|
|||
package jobs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
|
||||
"github.com/riverqueue/river"
|
||||
)
|
||||
|
||||
// HeartbeatArgs carries no data — heartbeat is purely a proof-of-life tick.
|
||||
type HeartbeatArgs struct{}
|
||||
|
||||
func (HeartbeatArgs) Kind() string { return "heartbeat" }
|
||||
|
||||
// HeartbeatWorker logs a heartbeat message on each execution.
|
||||
type HeartbeatWorker struct {
|
||||
river.WorkerDefaults[HeartbeatArgs]
|
||||
}
|
||||
|
||||
func (w *HeartbeatWorker) Work(ctx context.Context, job *river.Job[HeartbeatArgs]) error {
|
||||
slog.Info("worker heartbeat",
|
||||
"job_id", job.ID,
|
||||
"attempt", job.Attempt,
|
||||
)
|
||||
return nil
|
||||
}
|
||||
29
backend/internal/jobs/heartbeat_test.go
Normal file
29
backend/internal/jobs/heartbeat_test.go
Normal file
|
|
@ -0,0 +1,29 @@
|
|||
package jobs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/riverqueue/river"
|
||||
"github.com/riverqueue/river/rivertype"
|
||||
)
|
||||
|
||||
func TestHeartbeatWorker(t *testing.T) {
|
||||
w := &HeartbeatWorker{}
|
||||
job := &river.Job[HeartbeatArgs]{
|
||||
JobRow: &rivertype.JobRow{
|
||||
ID: 42,
|
||||
Attempt: 1,
|
||||
},
|
||||
Args: HeartbeatArgs{},
|
||||
}
|
||||
if err := w.Work(context.Background(), job); err != nil {
|
||||
t.Fatalf("HeartbeatWorker.Work returned unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHeartbeatArgs_Kind(t *testing.T) {
|
||||
if got := (HeartbeatArgs{}).Kind(); got != "heartbeat" {
|
||||
t.Errorf("HeartbeatArgs.Kind() = %q, want %q", got, "heartbeat")
|
||||
}
|
||||
}
|
||||
93
backend/internal/jobs/orphan_cleanup.go
Normal file
93
backend/internal/jobs/orphan_cleanup.go
Normal file
|
|
@ -0,0 +1,93 @@
|
|||
package jobs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
|
||||
"backend/internal/db/sqlc"
|
||||
"backend/internal/files"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/riverqueue/river"
|
||||
)
|
||||
|
||||
// OrphanCleanupArgs carries no data — the worker queries orphans dynamically.
|
||||
type OrphanCleanupArgs struct{}
|
||||
|
||||
func (OrphanCleanupArgs) Kind() string { return "orphan_file_cleanup" }
|
||||
|
||||
// fileQuerier is the minimal DB interface used by OrphanCleanupWorker.
|
||||
// sqlc.Queries satisfies this interface; tests inject a mock.
|
||||
type fileQuerier interface {
|
||||
ListOrphanFiles(ctx context.Context) ([]sqlc.ListOrphanFilesRow, error)
|
||||
DeleteTabloFile(ctx context.Context, arg sqlc.DeleteTabloFileParams) error
|
||||
}
|
||||
|
||||
// OrphanCleanupWorker deletes S3 objects and their DB rows for tablo_files
|
||||
// whose owning tablo no longer exists.
|
||||
type OrphanCleanupWorker struct {
|
||||
river.WorkerDefaults[OrphanCleanupArgs]
|
||||
|
||||
pool *pgxpool.Pool
|
||||
store files.FileStorer
|
||||
querier fileQuerier // nil in production; set by NewOrphanCleanupWorker for testability
|
||||
}
|
||||
|
||||
// NewOrphanCleanupWorker constructs an OrphanCleanupWorker with the given pool
|
||||
// and S3 store. The fileQuerier is wired from the pool automatically.
|
||||
func NewOrphanCleanupWorker(pool *pgxpool.Pool, store files.FileStorer) *OrphanCleanupWorker {
|
||||
return &OrphanCleanupWorker{
|
||||
pool: pool,
|
||||
store: store,
|
||||
querier: sqlc.New(pool),
|
||||
}
|
||||
}
|
||||
|
||||
// Work iterates orphan tablo_files rows, deletes each S3 object first, then
|
||||
// removes the DB row. Partial errors are logged and counted; the job itself
|
||||
// always returns nil so river does not retry a partial run.
|
||||
func (w *OrphanCleanupWorker) Work(ctx context.Context, job *river.Job[OrphanCleanupArgs]) error {
|
||||
q := w.querier
|
||||
if q == nil {
|
||||
q = sqlc.New(w.pool)
|
||||
}
|
||||
|
||||
orphans, err := q.ListOrphanFiles(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("list orphan files: %w", err)
|
||||
}
|
||||
|
||||
var deleted, errCount int
|
||||
for _, f := range orphans {
|
||||
if err := w.store.Delete(ctx, f.S3Key); err != nil {
|
||||
slog.Error("orphan cleanup: s3 delete failed",
|
||||
"s3_key", f.S3Key,
|
||||
"err", err,
|
||||
)
|
||||
errCount++
|
||||
continue
|
||||
}
|
||||
|
||||
if err := q.DeleteTabloFile(ctx, sqlc.DeleteTabloFileParams{
|
||||
ID: f.ID,
|
||||
TabloID: f.TabloID,
|
||||
}); err != nil {
|
||||
slog.Error("orphan cleanup: db delete failed",
|
||||
"file_id", f.ID,
|
||||
"err", err,
|
||||
)
|
||||
errCount++
|
||||
continue
|
||||
}
|
||||
|
||||
deleted++
|
||||
}
|
||||
|
||||
slog.Info("orphan cleanup complete",
|
||||
"orphans_found", len(orphans),
|
||||
"deleted", deleted,
|
||||
"errors", errCount,
|
||||
)
|
||||
return nil
|
||||
}
|
||||
115
backend/internal/jobs/orphan_cleanup_test.go
Normal file
115
backend/internal/jobs/orphan_cleanup_test.go
Normal file
|
|
@ -0,0 +1,115 @@
|
|||
package jobs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"backend/internal/db/sqlc"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/riverqueue/river"
|
||||
"github.com/riverqueue/river/rivertype"
|
||||
)
|
||||
|
||||
// mockQuerier implements fileQuerier for unit tests.
|
||||
type mockQuerier struct {
|
||||
orphans []sqlc.ListOrphanFilesRow
|
||||
listErr error
|
||||
deleteFileCalls []sqlc.DeleteTabloFileParams
|
||||
deleteFileErr error
|
||||
}
|
||||
|
||||
func (m *mockQuerier) ListOrphanFiles(_ context.Context) ([]sqlc.ListOrphanFilesRow, error) {
|
||||
return m.orphans, m.listErr
|
||||
}
|
||||
|
||||
func (m *mockQuerier) DeleteTabloFile(_ context.Context, arg sqlc.DeleteTabloFileParams) error {
|
||||
m.deleteFileCalls = append(m.deleteFileCalls, arg)
|
||||
return m.deleteFileErr
|
||||
}
|
||||
|
||||
// mockFileStorer implements files.FileStorer for unit tests.
|
||||
type mockFileStorer struct {
|
||||
deletedKeys []string
|
||||
deleteErr error
|
||||
}
|
||||
|
||||
func (m *mockFileStorer) Delete(_ context.Context, key string) error {
|
||||
m.deletedKeys = append(m.deletedKeys, key)
|
||||
return m.deleteErr
|
||||
}
|
||||
|
||||
func (m *mockFileStorer) Upload(_ context.Context, _ string, _ io.Reader) (string, int64, error) {
|
||||
return "", 0, errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (m *mockFileStorer) PresignDownload(_ context.Context, _ string) (string, error) {
|
||||
return "", errors.New("not implemented")
|
||||
}
|
||||
|
||||
func makeWorkerJob() *river.Job[OrphanCleanupArgs] {
|
||||
return &river.Job[OrphanCleanupArgs]{
|
||||
JobRow: &rivertype.JobRow{ID: 1, Attempt: 1},
|
||||
Args: OrphanCleanupArgs{},
|
||||
}
|
||||
}
|
||||
|
||||
func TestOrphanCleanupWorker_NoOrphans(t *testing.T) {
|
||||
q := &mockQuerier{}
|
||||
store := &mockFileStorer{}
|
||||
w := &OrphanCleanupWorker{
|
||||
store: store,
|
||||
querier: q,
|
||||
}
|
||||
|
||||
if err := w.Work(context.Background(), makeWorkerJob()); err != nil {
|
||||
t.Fatalf("Work returned unexpected error: %v", err)
|
||||
}
|
||||
if len(store.deletedKeys) != 0 {
|
||||
t.Errorf("expected 0 S3 deletes, got %d", len(store.deletedKeys))
|
||||
}
|
||||
if len(q.deleteFileCalls) != 0 {
|
||||
t.Errorf("expected 0 DB deletes, got %d", len(q.deleteFileCalls))
|
||||
}
|
||||
}
|
||||
|
||||
func TestOrphanCleanupWorker_DeletesOrphan(t *testing.T) {
|
||||
fileID := uuid.New()
|
||||
tabloID := uuid.New()
|
||||
q := &mockQuerier{
|
||||
orphans: []sqlc.ListOrphanFilesRow{
|
||||
{ID: fileID, TabloID: tabloID, S3Key: "orphan-key"},
|
||||
},
|
||||
}
|
||||
store := &mockFileStorer{}
|
||||
w := &OrphanCleanupWorker{
|
||||
store: store,
|
||||
querier: q,
|
||||
}
|
||||
|
||||
if err := w.Work(context.Background(), makeWorkerJob()); err != nil {
|
||||
t.Fatalf("Work returned unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// Assert S3 Delete was called with the correct key.
|
||||
if len(store.deletedKeys) != 1 || store.deletedKeys[0] != "orphan-key" {
|
||||
t.Errorf("expected S3 Delete called with %q, got %v", "orphan-key", store.deletedKeys)
|
||||
}
|
||||
|
||||
// Assert DB DeleteTabloFile was called with matching ID and TabloID.
|
||||
if len(q.deleteFileCalls) != 1 {
|
||||
t.Fatalf("expected 1 DB delete call, got %d", len(q.deleteFileCalls))
|
||||
}
|
||||
got := q.deleteFileCalls[0]
|
||||
if got.ID != fileID || got.TabloID != tabloID {
|
||||
t.Errorf("DeleteTabloFile called with {%v, %v}, want {%v, %v}", got.ID, got.TabloID, fileID, tabloID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOrphanCleanupArgs_Kind(t *testing.T) {
|
||||
if got := (OrphanCleanupArgs{}).Kind(); got != "orphan_file_cleanup" {
|
||||
t.Errorf("OrphanCleanupArgs.Kind() = %q, want %q", got, "orphan_file_cleanup")
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue