- HeartbeatArgs + HeartbeatWorker (logs slog.Info on each tick) - OrphanCleanupArgs + OrphanCleanupWorker (S3 delete then DB delete loop) - NewOrphanCleanupWorker constructor with pool + FileStorer injection - SlogErrorHandler implementing river.ErrorHandler (HandleError + HandlePanic) - fileQuerier interface for test injection without real DB - Unit tests: 7 tests pass (pure mock-based, no DB required) - go build ./... exits 0
93 lines
2.4 KiB
Go
93 lines
2.4 KiB
Go
package jobs
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
|
|
"backend/internal/db/sqlc"
|
|
"backend/internal/files"
|
|
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
"github.com/riverqueue/river"
|
|
)
|
|
|
|
// OrphanCleanupArgs carries no data — the worker queries orphans dynamically.
|
|
type OrphanCleanupArgs struct{}
|
|
|
|
func (OrphanCleanupArgs) Kind() string { return "orphan_file_cleanup" }
|
|
|
|
// fileQuerier is the minimal DB interface used by OrphanCleanupWorker.
|
|
// sqlc.Queries satisfies this interface; tests inject a mock.
|
|
type fileQuerier interface {
|
|
ListOrphanFiles(ctx context.Context) ([]sqlc.ListOrphanFilesRow, error)
|
|
DeleteTabloFile(ctx context.Context, arg sqlc.DeleteTabloFileParams) error
|
|
}
|
|
|
|
// OrphanCleanupWorker deletes S3 objects and their DB rows for tablo_files
|
|
// whose owning tablo no longer exists.
|
|
type OrphanCleanupWorker struct {
|
|
river.WorkerDefaults[OrphanCleanupArgs]
|
|
|
|
pool *pgxpool.Pool
|
|
store files.FileStorer
|
|
querier fileQuerier // nil in production; set by NewOrphanCleanupWorker for testability
|
|
}
|
|
|
|
// NewOrphanCleanupWorker constructs an OrphanCleanupWorker with the given pool
|
|
// and S3 store. The fileQuerier is wired from the pool automatically.
|
|
func NewOrphanCleanupWorker(pool *pgxpool.Pool, store files.FileStorer) *OrphanCleanupWorker {
|
|
return &OrphanCleanupWorker{
|
|
pool: pool,
|
|
store: store,
|
|
querier: sqlc.New(pool),
|
|
}
|
|
}
|
|
|
|
// Work iterates orphan tablo_files rows, deletes each S3 object first, then
|
|
// removes the DB row. Partial errors are logged and counted; the job itself
|
|
// always returns nil so river does not retry a partial run.
|
|
func (w *OrphanCleanupWorker) Work(ctx context.Context, job *river.Job[OrphanCleanupArgs]) error {
|
|
q := w.querier
|
|
if q == nil {
|
|
q = sqlc.New(w.pool)
|
|
}
|
|
|
|
orphans, err := q.ListOrphanFiles(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("list orphan files: %w", err)
|
|
}
|
|
|
|
var deleted, errCount int
|
|
for _, f := range orphans {
|
|
if err := w.store.Delete(ctx, f.S3Key); err != nil {
|
|
slog.Error("orphan cleanup: s3 delete failed",
|
|
"s3_key", f.S3Key,
|
|
"err", err,
|
|
)
|
|
errCount++
|
|
continue
|
|
}
|
|
|
|
if err := q.DeleteTabloFile(ctx, sqlc.DeleteTabloFileParams{
|
|
ID: f.ID,
|
|
TabloID: f.TabloID,
|
|
}); err != nil {
|
|
slog.Error("orphan cleanup: db delete failed",
|
|
"file_id", f.ID,
|
|
"err", err,
|
|
)
|
|
errCount++
|
|
continue
|
|
}
|
|
|
|
deleted++
|
|
}
|
|
|
|
slog.Info("orphan cleanup complete",
|
|
"orphans_found", len(orphans),
|
|
"deleted", deleted,
|
|
"errors", errCount,
|
|
)
|
|
return nil
|
|
}
|