xtablo-source/backend/internal/jobs/orphan_cleanup.go
Arthur Belleville a1c2828dc4
feat(06-01): implement internal/jobs package with workers and error handler
- HeartbeatArgs + HeartbeatWorker (logs slog.Info on each tick)
- OrphanCleanupArgs + OrphanCleanupWorker (S3 delete then DB delete loop)
- NewOrphanCleanupWorker constructor with pool + FileStorer injection
- SlogErrorHandler implementing river.ErrorHandler (HandleError + HandlePanic)
- fileQuerier interface for test injection without real DB
- Unit tests: 7 tests pass (pure mock-based, no DB required)
- go build ./... exits 0
2026-05-15 16:34:08 +02:00

93 lines
2.4 KiB
Go

package jobs
import (
"context"
"fmt"
"log/slog"
"backend/internal/db/sqlc"
"backend/internal/files"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
)
// OrphanCleanupArgs carries no data — the worker queries orphans dynamically.
type OrphanCleanupArgs struct{}
func (OrphanCleanupArgs) Kind() string { return "orphan_file_cleanup" }
// fileQuerier is the minimal DB interface used by OrphanCleanupWorker.
// sqlc.Queries satisfies this interface; tests inject a mock.
type fileQuerier interface {
ListOrphanFiles(ctx context.Context) ([]sqlc.ListOrphanFilesRow, error)
DeleteTabloFile(ctx context.Context, arg sqlc.DeleteTabloFileParams) error
}
// OrphanCleanupWorker deletes S3 objects and their DB rows for tablo_files
// whose owning tablo no longer exists.
type OrphanCleanupWorker struct {
river.WorkerDefaults[OrphanCleanupArgs]
pool *pgxpool.Pool
store files.FileStorer
querier fileQuerier // nil in production; set by NewOrphanCleanupWorker for testability
}
// NewOrphanCleanupWorker constructs an OrphanCleanupWorker with the given pool
// and S3 store. The fileQuerier is wired from the pool automatically.
func NewOrphanCleanupWorker(pool *pgxpool.Pool, store files.FileStorer) *OrphanCleanupWorker {
return &OrphanCleanupWorker{
pool: pool,
store: store,
querier: sqlc.New(pool),
}
}
// Work iterates orphan tablo_files rows, deletes each S3 object first, then
// removes the DB row. Partial errors are logged and counted; the job itself
// always returns nil so river does not retry a partial run.
func (w *OrphanCleanupWorker) Work(ctx context.Context, job *river.Job[OrphanCleanupArgs]) error {
q := w.querier
if q == nil {
q = sqlc.New(w.pool)
}
orphans, err := q.ListOrphanFiles(ctx)
if err != nil {
return fmt.Errorf("list orphan files: %w", err)
}
var deleted, errCount int
for _, f := range orphans {
if err := w.store.Delete(ctx, f.S3Key); err != nil {
slog.Error("orphan cleanup: s3 delete failed",
"s3_key", f.S3Key,
"err", err,
)
errCount++
continue
}
if err := q.DeleteTabloFile(ctx, sqlc.DeleteTabloFileParams{
ID: f.ID,
TabloID: f.TabloID,
}); err != nil {
slog.Error("orphan cleanup: db delete failed",
"file_id", f.ID,
"err", err,
)
errCount++
continue
}
deleted++
}
slog.Info("orphan cleanup complete",
"orphans_found", len(orphans),
"deleted", deleted,
"errors", errCount,
)
return nil
}