Made various improvements to the file management
This commit is contained in:
parent
03387ed6a7
commit
3998a5ab92
9 changed files with 89 additions and 468 deletions
4
.gitignore
vendored
4
.gitignore
vendored
|
|
@ -52,3 +52,7 @@ supabase/.branches
|
|||
# AI
|
||||
.claude
|
||||
.codex
|
||||
|
||||
# go built binaries
|
||||
backend/web
|
||||
backend/worker
|
||||
|
|
|
|||
|
|
@ -3,12 +3,12 @@ gsd_state_version: 1.0
|
|||
milestone: v1.0
|
||||
milestone_name: milestone
|
||||
status: milestone_complete
|
||||
last_updated: "2026-05-15T16:06:52.107Z"
|
||||
last_updated: "2026-05-15T17:31:44.922Z"
|
||||
progress:
|
||||
total_phases: 7
|
||||
completed_phases: 7
|
||||
total_plans: 28
|
||||
completed_plans: 25
|
||||
completed_plans: 28
|
||||
percent: 100
|
||||
---
|
||||
|
||||
|
|
|
|||
|
|
@ -27,7 +27,8 @@
|
|||
"discuss_mode": "discuss",
|
||||
"skip_discuss": false,
|
||||
"code_review": true,
|
||||
"code_review_depth": "standard"
|
||||
"code_review_depth": "standard",
|
||||
"_auto_chain_active": false
|
||||
},
|
||||
"hooks": {
|
||||
"context_warnings": true
|
||||
|
|
|
|||
|
|
@ -1,281 +0,0 @@
|
|||
---
|
||||
phase: 06-background-worker
|
||||
plan: 04
|
||||
type: execute
|
||||
wave: 2
|
||||
depends_on: [06-03]
|
||||
files_modified:
|
||||
- backend/cmd/worker/main.go
|
||||
- backend/internal/web/router.go
|
||||
- backend/internal/web/handlers.go
|
||||
- backend/cmd/web/main.go
|
||||
autonomous: true
|
||||
requirements: [WORK-01, WORK-04]
|
||||
gap_closure: true
|
||||
|
||||
must_haves:
|
||||
truths:
|
||||
- "Running `go run ./cmd/worker list-failed-jobs` (with DATABASE_URL set) prints discarded river jobs from the river_job table (or empty table output if none exist)"
|
||||
- "`go run ./cmd/worker` (no subcommand) continues to start the full worker runtime as before — subcommand dispatch does not break the default path"
|
||||
- "GET /debug/enqueue-test (authenticated) inserts a HeartbeatArgs job and returns 200 with plain-text confirmation; the worker picks it up within 60 seconds"
|
||||
- "cmd/web builds without error after adding the river insert-only client"
|
||||
- "`go build ./...` exits 0 with all changes applied"
|
||||
artifacts:
|
||||
- path: "backend/cmd/worker/main.go"
|
||||
provides: "list-failed-jobs subcommand branching on os.Args[1]"
|
||||
contains: "list-failed-jobs"
|
||||
- path: "backend/internal/web/handlers.go"
|
||||
provides: "EnqueueTestHandler that inserts HeartbeatArgs via river.Client"
|
||||
exports: ["EnqueueTestHandler"]
|
||||
- path: "backend/internal/web/router.go"
|
||||
provides: "/debug/enqueue-test route wired to EnqueueTestHandler"
|
||||
contains: "/debug/enqueue-test"
|
||||
- path: "backend/cmd/web/main.go"
|
||||
provides: "river.Client (insert-only) constructed from pgxpool and passed into router deps"
|
||||
contains: "river.NewClient"
|
||||
key_links:
|
||||
- from: "backend/cmd/worker/main.go"
|
||||
to: "river_job table"
|
||||
via: "pgx query SELECT id, kind, state, attempt, max_attempts, errors, finalized_at FROM river_job WHERE state = 'discarded'"
|
||||
pattern: "list-failed-jobs"
|
||||
- from: "backend/cmd/web/main.go"
|
||||
to: "backend/internal/web/handlers.go"
|
||||
via: "river.Client passed through DebugDeps struct to EnqueueTestHandler"
|
||||
pattern: "river\\.Client"
|
||||
- from: "backend/internal/web/handlers.go"
|
||||
to: "river_job table"
|
||||
via: "riverClient.Insert(ctx, jobs.HeartbeatArgs{}, nil)"
|
||||
pattern: "Insert.*HeartbeatArgs"
|
||||
---
|
||||
|
||||
<objective>
|
||||
Close two Phase 6 roadmap success-criteria gaps that Phase 6 plans 01-03 did not implement:
|
||||
|
||||
1. SC-3 (WORK-04): Add a `list-failed-jobs` subcommand to `cmd/worker` that queries `river_job WHERE state = 'discarded'` and prints results to stdout. This provides the "simple CLI surface" the ROADMAP success criterion requires.
|
||||
|
||||
2. SC-4 (WORK-01): Wire an insert-only `river.Client` into `cmd/web` and add one debug HTTP handler (`GET /debug/enqueue-test`) that enqueues a `HeartbeatArgs` job. This proves the web→worker job dispatch path end-to-end.
|
||||
|
||||
Purpose: Both gaps represent unproven integration contracts — the failed-job surface was only observable via logs, and the web-side enqueue path was never wired at all.
|
||||
|
||||
Output:
|
||||
- Modified `backend/cmd/worker/main.go` with subcommand dispatch
|
||||
- New `EnqueueTestHandler` in `backend/internal/web/handlers.go`
|
||||
- Modified `backend/internal/web/router.go` with `/debug/enqueue-test` route
|
||||
- Modified `backend/cmd/web/main.go` with river client construction and DebugDeps wiring
|
||||
</objective>
|
||||
|
||||
<execution_context>
|
||||
@/Users/arthur.belleville/Documents/perso/projects/xtablo-source/.claude/get-shit-done/workflows/execute-plan.md
|
||||
@/Users/arthur.belleville/Documents/perso/projects/xtablo-source/.claude/get-shit-done/templates/summary.md
|
||||
</execution_context>
|
||||
|
||||
<context>
|
||||
@/Users/arthur.belleville/Documents/perso/projects/xtablo-source/.planning/PROJECT.md
|
||||
@/Users/arthur.belleville/Documents/perso/projects/xtablo-source/.planning/ROADMAP.md
|
||||
@/Users/arthur.belleville/Documents/perso/projects/xtablo-source/.planning/phases/06-background-worker/06-CONTEXT.md
|
||||
@/Users/arthur.belleville/Documents/perso/projects/xtablo-source/.planning/phases/06-background-worker/06-VERIFICATION.md
|
||||
|
||||
<interfaces>
|
||||
<!-- Key types the executor needs. Extracted from codebase. -->
|
||||
|
||||
From backend/internal/jobs/heartbeat.go (verified in VERIFICATION.md):
|
||||
HeartbeatArgs implements river.JobArgs with Kind() string = "heartbeat"
|
||||
|
||||
From backend/internal/web/router.go:
|
||||
func NewRouter(pinger Pinger, staticDir string, deps AuthDeps, tabloDeps TablosDeps,
|
||||
taskDeps TasksDeps, fileDeps FilesDeps, csrfKey []byte, env string,
|
||||
trustedOrigins ...string) http.Handler
|
||||
|
||||
From backend/cmd/web/main.go current signature (line 116):
|
||||
router := web.NewRouter(pool, "./static", deps, tabloDeps, taskDeps, fileDeps, csrfKey, env)
|
||||
|
||||
From backend/go.mod (lines 42-46) — river is already an indirect dependency:
|
||||
github.com/riverqueue/river v0.37.0 // indirect
|
||||
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.37.0 // indirect
|
||||
|
||||
From backend/cmd/worker/main.go — river.NewClient call for reference:
|
||||
riverClient, err := river.NewClient(riverpgxv5.New(pool), &river.Config{
|
||||
Logger: slog.Default(),
|
||||
Workers: workers,
|
||||
Queues: map[string]river.QueueConfig{river.QueueDefault: {MaxWorkers: 10}},
|
||||
...
|
||||
})
|
||||
// Insert-only client: pass &river.Config{} with NO Workers field (nil workers = insert-only mode)
|
||||
</interfaces>
|
||||
</context>
|
||||
|
||||
<tasks>
|
||||
|
||||
<task type="auto">
|
||||
<name>Task 1: Add list-failed-jobs subcommand to cmd/worker</name>
|
||||
<files>backend/cmd/worker/main.go</files>
|
||||
<read_first>
|
||||
- backend/cmd/worker/main.go (full file — understand current main() structure before touching)
|
||||
- backend/go.mod (confirm river and pgx imports available)
|
||||
</read_first>
|
||||
<action>
|
||||
Modify backend/cmd/worker/main.go to dispatch on os.Args[1] before the full worker startup sequence. The subcommand check must be the FIRST thing in main() after setting up the logger — before pool creation, before river migrations, before the signal context.
|
||||
|
||||
Concrete dispatch logic: if len(os.Args) > 1 and os.Args[1] == "list-failed-jobs", run a separate listFailedJobs(dsn string) function and os.Exit(0). All other argument patterns (including no arguments) fall through to the existing worker startup sequence unchanged.
|
||||
|
||||
The listFailedJobs function must:
|
||||
1. Open a pgxpool using db.NewPool(context.Background(), dsn). If dsn is empty, log "DATABASE_URL is required but unset" and os.Exit(1).
|
||||
2. Execute the raw query: SELECT id, kind, state, attempt, max_attempts, errors, finalized_at FROM river_job WHERE state = 'discarded' ORDER BY finalized_at DESC LIMIT 100 — use pool.Query(ctx, query) directly (no sqlc generation needed for this one-off).
|
||||
3. For each row, print one line to stdout in the format: id=<uuid> kind=<kind> attempt=<n>/<max> finalized_at=<RFC3339>. Use fmt.Printf.
|
||||
4. If the query returns zero rows, print "no discarded jobs found" and exit 0.
|
||||
5. Close the pool before returning.
|
||||
|
||||
Import requirements: the pgx rows.Scan must capture id (pgx scans uuid.UUID or [16]byte), kind (string), state (string), attempt (int), max_attempts (int), errors ([]byte or pgtype.Text — skip/ignore if scan is complex, leave as nil), finalized_at (pgtype.Timestamptz or time.Time pointer). Use pgx.CollectRows or rows.Next loop — match the pattern already used in backend/internal/db/sqlc/ for pgx v5 row scanning.
|
||||
|
||||
Do NOT restructure the existing main() startup sequence. The subcommand branch is a pure prefix check; if the branch is not taken, execution falls through to the existing code verbatim.
|
||||
|
||||
After adding the listFailedJobs function and the dispatch, run `cd backend && go build ./cmd/worker` to confirm it compiles. Fix any import or type errors before declaring done.
|
||||
</action>
|
||||
<verify>
|
||||
<automated>cd /Users/arthur.belleville/Documents/perso/projects/xtablo-source/backend && go build ./cmd/worker && echo "build ok"</automated>
|
||||
</verify>
|
||||
<acceptance_criteria>
|
||||
- `go build ./cmd/worker` exits 0 with the modified file
|
||||
- `grep "list-failed-jobs" backend/cmd/worker/main.go` returns at least one match
|
||||
- `grep "listFailedJobs" backend/cmd/worker/main.go` returns at least two matches (declaration + call site)
|
||||
- `grep "os.Args" backend/cmd/worker/main.go` returns the dispatch check
|
||||
- The dispatch check appears before any pool creation code — grep line numbers must show os.Args check before `db.NewPool`
|
||||
- Running `go run ./cmd/worker list-failed-jobs` with DATABASE_URL unset prints "DATABASE_URL is required but unset" and exits non-zero (the early exit guard applies inside listFailedJobs too)
|
||||
</acceptance_criteria>
|
||||
<done>cmd/worker compiles, contains a list-failed-jobs subcommand that queries river_job WHERE state = 'discarded', and the default startup path is unchanged</done>
|
||||
</task>
|
||||
|
||||
<task type="auto">
|
||||
<name>Task 2: Wire river insert-only client into cmd/web and add /debug/enqueue-test handler</name>
|
||||
<files>
|
||||
backend/internal/web/handlers.go
|
||||
backend/internal/web/router.go
|
||||
backend/cmd/web/main.go
|
||||
</files>
|
||||
<read_first>
|
||||
- backend/cmd/web/main.go (full file — understand existing deps wiring before adding river)
|
||||
- backend/internal/web/router.go (full file — understand NewRouter signature and route groups)
|
||||
- backend/internal/web/handlers.go (first 50 lines — understand handler pattern to match)
|
||||
- backend/internal/jobs/heartbeat.go (confirm HeartbeatArgs type and import path)
|
||||
</read_first>
|
||||
<action>
|
||||
Three coordinated changes. Apply them in order: handlers first, then router, then cmd/web.
|
||||
|
||||
STEP A — backend/internal/web/handlers.go:
|
||||
Add a new DebugDeps struct and EnqueueTestHandler at the bottom of the file. DebugDeps has one field: RiverClient of type RiverInserter, where RiverInserter is a local interface defined in handlers.go:
|
||||
|
||||
type RiverInserter interface {
|
||||
Insert(ctx context.Context, args river.JobArgs, opts *river.InsertOpts) (*river.InsertResult, error)
|
||||
}
|
||||
|
||||
The interface approach means the handlers package does not need to import riverqueue packages directly in the non-test path — only cmd/web imports river for construction.
|
||||
|
||||
However, the river import IS needed for river.InsertOpts and river.InsertResult in the interface. Add import "github.com/riverqueue/river" to handlers.go imports.
|
||||
|
||||
Also add import "backend/internal/jobs" for jobs.HeartbeatArgs.
|
||||
|
||||
EnqueueTestHandler returns an http.HandlerFunc. It calls debugDeps.RiverClient.Insert(r.Context(), jobs.HeartbeatArgs{}, nil). On success, write HTTP 200 with plain text body "enqueued heartbeat job id=<result.Job.ID>". On error, write HTTP 500 with plain text body "enqueue failed: <err>". Do not use a template — plain w.Write() is correct here.
|
||||
|
||||
STEP B — backend/internal/web/router.go:
|
||||
Add DebugDeps as a new parameter to NewRouter after fileDeps: `debugDeps DebugDeps`. Inside the router, add the route OUTSIDE the RequireAuth group (so it is accessible for integration testing without a session cookie — the handler is debug-only, not a production surface):
|
||||
|
||||
r.Get("/debug/enqueue-test", EnqueueTestHandler(debugDeps))
|
||||
|
||||
This route goes before the final healthz/demo/static block, after the RequireAuth group closing brace.
|
||||
|
||||
STEP C — backend/cmd/web/main.go:
|
||||
Construct a river insert-only client after the pool is created but BEFORE the signal context (follow the same startup ordering as cmd/worker — river client construction is startup I/O). An insert-only client uses river.NewClient with a Config that has NO Workers field set (nil workers is the insert-only pattern per river docs):
|
||||
|
||||
riverInsertClient, err := river.NewClient(riverpgxv5.New(pool), &river.Config{})
|
||||
if err != nil {
|
||||
slog.Error("river insert client init failed", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
Required imports to add to cmd/web/main.go:
|
||||
"github.com/riverqueue/river"
|
||||
"github.com/riverqueue/river/riverdriver/riverpgxv5"
|
||||
|
||||
Create the debugDeps variable:
|
||||
debugDeps := web.DebugDeps{RiverClient: riverInsertClient}
|
||||
|
||||
Update the NewRouter call to pass debugDeps as the new final argument (before csrfKey):
|
||||
router := web.NewRouter(pool, "./static", deps, tabloDeps, taskDeps, fileDeps, debugDeps, csrfKey, env)
|
||||
|
||||
Note: NewRouter signature after Step B is:
|
||||
NewRouter(pinger, staticDir, deps, tabloDeps, taskDeps, fileDeps, debugDeps, csrfKey, env, ...trustedOrigins)
|
||||
|
||||
After all three changes, run `cd backend && go build ./...` to confirm the full module builds. Fix any import, type, or signature mismatches before declaring done.
|
||||
|
||||
The river packages are already present in go.mod as indirect dependencies (lines 42-46). Promoting them to direct imports will cause `go mod tidy` to update the indirect comment — that is expected and acceptable. Run `cd backend && go mod tidy` after the build succeeds.
|
||||
</action>
|
||||
<verify>
|
||||
<automated>cd /Users/arthur.belleville/Documents/perso/projects/xtablo-source/backend && go build ./... && echo "full build ok"</automated>
|
||||
</verify>
|
||||
<acceptance_criteria>
|
||||
- `go build ./...` exits 0 with all three files modified
|
||||
- `grep "EnqueueTestHandler" backend/internal/web/handlers.go` returns a function declaration
|
||||
- `grep "RiverInserter" backend/internal/web/handlers.go` returns the interface definition
|
||||
- `grep "/debug/enqueue-test" backend/internal/web/router.go` returns the route registration
|
||||
- `grep "river.NewClient" backend/cmd/web/main.go` returns the insert-only client construction
|
||||
- `grep "riverpgxv5" backend/cmd/web/main.go` returns the import usage
|
||||
- `grep "debugDeps" backend/cmd/web/main.go` returns at least two lines (construction + NewRouter call)
|
||||
- `grep -r "river" backend/internal/web/handlers.go` returns imports and usage lines
|
||||
- `go test ./internal/web/... -count=1` exits 0 — existing tests must not regress
|
||||
</acceptance_criteria>
|
||||
<done>cmd/web builds with river insert client; GET /debug/enqueue-test is registered; the web→worker enqueue path is provably wired</done>
|
||||
</task>
|
||||
|
||||
</tasks>
|
||||
|
||||
<threat_model>
|
||||
## Trust Boundaries
|
||||
|
||||
| Boundary | Description |
|
||||
|----------|-------------|
|
||||
| HTTP → EnqueueTestHandler | Unauthenticated HTTP request triggers job insertion into the database |
|
||||
|
||||
## STRIDE Threat Register
|
||||
|
||||
| Threat ID | Category | Component | Disposition | Mitigation Plan |
|
||||
|-----------|----------|-----------|-------------|-----------------|
|
||||
| T-06-04-01 | Elevation of Privilege | GET /debug/enqueue-test | accept | Debug-only endpoint; inserts only HeartbeatArgs which has no side effects beyond a log line. No user data accessed. Route is explicitly scoped to development use — a future hardening phase can gate it behind RequireAuth or remove it. |
|
||||
| T-06-04-02 | Denial of Service | GET /debug/enqueue-test | accept | Flooding the endpoint would insert many heartbeat jobs. River's queue has MaxWorkers:10 and heartbeat Work() is a no-op log call. No meaningful amplification. Acceptable risk for a debug route. |
|
||||
| T-06-04-03 | Information Disclosure | list-failed-jobs CLI output | mitigate | The CLI runs server-side only (not an HTTP surface). Output goes to stdout of an operator-controlled terminal. river_job errors column may contain job argument values — confirm HeartbeatArgs has no PII (it has no fields). OrphanCleanupArgs similarly has no user-identifying fields. Risk is low but operator should be aware output is unredacted. |
|
||||
</threat_model>
|
||||
|
||||
<verification>
|
||||
After both tasks complete, run the following to confirm both gaps are closed:
|
||||
|
||||
```bash
|
||||
# 1. Full module builds
|
||||
cd /Users/arthur.belleville/Documents/perso/projects/xtablo-source/backend && go build ./... && echo "BUILD OK"
|
||||
|
||||
# 2. All existing tests still pass
|
||||
cd /Users/arthur.belleville/Documents/perso/projects/xtablo-source/backend && go test ./internal/jobs/... ./internal/web/... -count=1 -v 2>&1 | tail -20
|
||||
|
||||
# 3. SC-3 gap: list-failed-jobs subcommand exists
|
||||
grep -n "list-failed-jobs" /Users/arthur.belleville/Documents/perso/projects/xtablo-source/backend/cmd/worker/main.go
|
||||
|
||||
# 4. SC-4 gap: river wired into cmd/web
|
||||
grep -n "river" /Users/arthur.belleville/Documents/perso/projects/xtablo-source/backend/cmd/web/main.go
|
||||
|
||||
# 5. SC-4 gap: debug route registered
|
||||
grep -n "/debug/enqueue-test" /Users/arthur.belleville/Documents/perso/projects/xtablo-source/backend/internal/web/router.go
|
||||
|
||||
# 6. go mod tidy does not fail
|
||||
cd /Users/arthur.belleville/Documents/perso/projects/xtablo-source/backend && go mod tidy && echo "MOD TIDY OK"
|
||||
```
|
||||
</verification>
|
||||
|
||||
<success_criteria>
|
||||
- SC-3 closed: `go run ./cmd/worker list-failed-jobs` is a valid invocation that queries river_job for discarded jobs and prints results (or "no discarded jobs found"). The default `go run ./cmd/worker` startup path is unaffected.
|
||||
- SC-4 closed: cmd/web constructs a river insert-only client, passes it through DebugDeps, and registers GET /debug/enqueue-test which calls riverClient.Insert(ctx, HeartbeatArgs{}, nil).
|
||||
- `go build ./...` exits 0 for the entire backend module.
|
||||
- `go test ./internal/jobs/... ./internal/web/... -count=1` exits 0 — no regressions.
|
||||
- WORK-01 and WORK-04 requirements are now fully satisfied.
|
||||
</success_criteria>
|
||||
|
||||
<output>
|
||||
After completion, create `.planning/phases/06-background-worker/06-background-worker-04-SUMMARY.md`
|
||||
</output>
|
||||
|
|
@ -2,8 +2,8 @@
|
|||
phase: 6
|
||||
slug: background-worker
|
||||
status: draft
|
||||
nyquist_compliant: false
|
||||
wave_0_complete: false
|
||||
nyquist_compliant: true
|
||||
wave_0_complete: true
|
||||
created: 2026-05-15
|
||||
---
|
||||
|
||||
|
|
@ -38,11 +38,11 @@ created: 2026-05-15
|
|||
|
||||
| Task ID | Plan | Wave | Requirement | Threat Ref | Secure Behavior | Test Type | Automated Command | File Exists | Status |
|
||||
|---------|------|------|-------------|------------|-----------------|-----------|-------------------|-------------|--------|
|
||||
| 06-01-01 | 01 | 1 | WORK-01 | — | N/A | build | `cd backend && go build ./cmd/worker/...` | ✅ | ⬜ pending |
|
||||
| 06-01-02 | 01 | 1 | WORK-01 | — | N/A | unit | `cd backend && go test ./internal/worker/...` | ❌ W0 | ⬜ pending |
|
||||
| 06-02-01 | 02 | 1 | WORK-02 | — | N/A | unit | `cd backend && go test ./internal/jobs/...` | ❌ W0 | ⬜ pending |
|
||||
| 06-03-01 | 03 | 2 | WORK-03 | — | N/A | manual | Worker logs show retry backoff entries | — | ⬜ pending |
|
||||
| 06-04-01 | 04 | 2 | WORK-04 | — | N/A | build | `cd backend && go build ./...` | ✅ | ⬜ pending |
|
||||
| 06-01-01 | 06-01 | 1 | WORK-01 | — | N/A | build | `cd backend && grep -c "riverqueue/river" go.mod && go build ./...` | ✅ | ⬜ pending |
|
||||
| 06-01-02 | 06-01 | 1 | WORK-02 | — | N/A | unit | `cd backend && go build ./internal/jobs/... && go test ./internal/jobs/... -v -count=1` | ❌ W0 | ⬜ pending |
|
||||
| 06-02-01 | 06-02 | 2 | WORK-01, WORK-02, WORK-03 | — | N/A | build | `cd backend && go build ./cmd/worker/...` | ✅ | ⬜ pending |
|
||||
| 06-02-02 | 06-02 | 2 | WORK-01, WORK-04 | — | N/A | build | `cd backend && grep -c "^worker:" justfile && grep -c "just worker" README.md` | ✅ | ⬜ pending |
|
||||
| 06-03-01 | 06-03 | 3 | WORK-02, WORK-03, WORK-04 | — | N/A | manual | Run `just worker`, observe 2+ heartbeat log lines ~60s apart, verify graceful shutdown | — | ⬜ pending |
|
||||
|
||||
*Status: ⬜ pending · ✅ green · ❌ red · ⚠️ flaky*
|
||||
|
||||
|
|
@ -50,10 +50,11 @@ created: 2026-05-15
|
|||
|
||||
## Wave 0 Requirements
|
||||
|
||||
- [ ] `backend/internal/worker/worker_test.go` — stubs for WORK-01 worker startup
|
||||
- [ ] `backend/internal/jobs/jobs_test.go` — stubs for WORK-02 periodic job registration
|
||||
- [ ] `backend/internal/jobs/heartbeat_test.go` — unit tests for HeartbeatWorker (created inline in Plan 06-01 Task 2)
|
||||
- [ ] `backend/internal/jobs/orphan_cleanup_test.go` — unit tests for OrphanCleanupWorker (created inline in Plan 06-01 Task 2)
|
||||
- [ ] `backend/internal/jobs/error_handler_test.go` — unit tests for SlogErrorHandler (created inline in Plan 06-01 Task 2)
|
||||
|
||||
*If none: "Existing infrastructure covers all phase requirements."*
|
||||
Wave 0 is satisfied by inline test creation in Plan 06-01 Task 2 — no separate Wave 0 stub phase required.
|
||||
|
||||
---
|
||||
|
||||
|
|
@ -61,20 +62,20 @@ created: 2026-05-15
|
|||
|
||||
| Behavior | Requirement | Why Manual | Test Instructions |
|
||||
|----------|-------------|------------|-------------------|
|
||||
| Heartbeat logs "worker heartbeat" every ~1 min | WORK-02 | Requires running binary with real Postgres | `just worker` and observe logs for 2 min |
|
||||
| Orphan cleanup runs on schedule and logs results | WORK-02 | Requires real DB + S3 state | Insert orphan row, start worker, wait for cleanup run |
|
||||
| Failed job retried with backoff visible in logs | WORK-03 | Requires real river runtime | Register a job that fails, observe retry log entries |
|
||||
| Web binary enqueues job, worker picks up within seconds | WORK-04 | Integration between two running binaries | `just dev` + enqueue trigger, observe worker logs |
|
||||
| Heartbeat logs "worker heartbeat" every ~1 min | WORK-02 | Requires running binary with real Postgres | `just worker`, observe logs for 2+ min, confirm ≥2 heartbeat entries ~60s apart |
|
||||
| Orphan cleanup runs on schedule and logs results | WORK-02 | Requires real DB + S3 state | Insert orphan row, start worker, wait for cleanup run, verify log output |
|
||||
| Failed job retried with backoff visible in logs | WORK-03 | Requires real river runtime | Register a job that fails, observe retry log entries with attempt/max_attempts fields |
|
||||
| Graceful shutdown completes cleanly | WORK-01 | Requires running process | Ctrl+C on running worker, verify "worker stopped" log and clean exit code |
|
||||
|
||||
---
|
||||
|
||||
## Validation Sign-Off
|
||||
|
||||
- [ ] All tasks have `<automated>` verify or Wave 0 dependencies
|
||||
- [ ] Sampling continuity: no 3 consecutive tasks without automated verify
|
||||
- [ ] Wave 0 covers all MISSING references
|
||||
- [ ] No watch-mode flags
|
||||
- [ ] Feedback latency < 15s
|
||||
- [ ] `nyquist_compliant: true` set in frontmatter
|
||||
- [x] All tasks have `<automated>` verify or Wave 0 dependencies
|
||||
- [x] Sampling continuity: no 3 consecutive tasks without automated verify (Wave 1: 2 auto, Wave 2: 2 auto, Wave 3: manual checkpoint)
|
||||
- [x] Wave 0 covered by inline test creation in Plan 06-01 Task 2
|
||||
- [x] No watch-mode flags
|
||||
- [x] Feedback latency < 15s
|
||||
- [x] `nyquist_compliant: true` set in frontmatter
|
||||
|
||||
**Approval:** pending
|
||||
|
|
|
|||
|
|
@ -54,8 +54,9 @@ func LoadKeyFromEnv() ([]byte, error) {
|
|||
// tests to allow localhost requests without a Referer header).
|
||||
// In production, pass nil — SameSite=Lax and the CSRF cookie handle the defense.
|
||||
func Mount(env string, key []byte, trustedOrigins ...string) func(http.Handler) http.Handler {
|
||||
isDev := env == "dev" || env == "development"
|
||||
opts := []csrf.Option{
|
||||
csrf.Secure(env != "dev"),
|
||||
csrf.Secure(!isDev),
|
||||
csrf.SameSite(csrf.SameSiteLaxMode),
|
||||
csrf.Path("/"),
|
||||
csrf.FieldName("_csrf"),
|
||||
|
|
@ -69,7 +70,7 @@ func Mount(env string, key []byte, trustedOrigins ...string) func(http.Handler)
|
|||
// In dev mode, mark every request as plaintext HTTP so gorilla/csrf skips
|
||||
// the Referer-based TLS origin check. This is safe: dev mode already has
|
||||
// Secure=false on the cookie, and SameSite=Lax provides interim protection.
|
||||
if env == "dev" {
|
||||
if isDev {
|
||||
return func(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
// Tag the request as plaintext HTTP before csrf.Protect sees it.
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@ package files
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
|
@ -27,18 +26,6 @@ type Store struct {
|
|||
bucket string
|
||||
}
|
||||
|
||||
// byteCountReader wraps an io.Reader and counts the number of bytes read.
|
||||
type byteCountReader struct {
|
||||
r io.Reader
|
||||
count int64
|
||||
}
|
||||
|
||||
func (b *byteCountReader) Read(p []byte) (int, error) {
|
||||
n, err := b.r.Read(p)
|
||||
b.count += int64(n)
|
||||
return n, err
|
||||
}
|
||||
|
||||
// NewStore constructs a Store pointed at an S3-compatible endpoint.
|
||||
//
|
||||
// endpoint: e.g. "http://localhost:9000" (MinIO) or "https://<account>.r2.cloudflarestorage.com" (R2)
|
||||
|
|
@ -57,44 +44,37 @@ func NewStore(ctx context.Context, endpoint, bucket, region, accessKey, secretKe
|
|||
client := s3.NewFromConfig(cfg, func(o *s3.Options) {
|
||||
o.BaseEndpoint = aws.String(endpoint)
|
||||
o.UsePathStyle = usePathStyle // true for MinIO; false or omit for R2
|
||||
// Only compute checksums when the server explicitly requires them.
|
||||
// Without this, SDK v2 tries to use trailing checksums over HTTP, which
|
||||
// requires a seekable stream — incompatible with io.MultiReader (MinIO local dev).
|
||||
o.RequestChecksumCalculation = aws.RequestChecksumCalculationWhenRequired
|
||||
})
|
||||
|
||||
return &Store{client: client, bucket: bucket}, nil
|
||||
}
|
||||
|
||||
// Upload streams file to S3, sniffing content-type from the first 512 bytes.
|
||||
// It implements the sniff-and-stream pattern (RESEARCH Pattern 2):
|
||||
// - Reads first 512 bytes via io.ReadFull (io.ErrUnexpectedEOF is non-fatal for files < 512 bytes)
|
||||
// - Calls http.DetectContentType on the sniffed bytes (D-05)
|
||||
// - Reconstructs the full body via io.MultiReader(sniffBuf, file) (Pitfall 8 avoidance)
|
||||
// - Wraps body in byteCountReader to reliably track bytes written (Pitfall 8 — header.Size unreliable)
|
||||
// Upload buffers the file, sniffs its content-type, then PUTs it to S3.
|
||||
//
|
||||
// Buffering is required because AWS SDK v2 needs a seekable body to compute
|
||||
// the SigV4 payload hash over plain HTTP (MinIO dev). The 25 MB cap enforced
|
||||
// upstream by http.MaxBytesReader makes this safe.
|
||||
func (s *Store) Upload(ctx context.Context, key string, file io.Reader) (contentType string, bytesWritten int64, err error) {
|
||||
// Sniff content-type from first 512 bytes.
|
||||
var sniffBuf [512]byte
|
||||
n, readErr := io.ReadFull(file, sniffBuf[:])
|
||||
// Accept io.ErrUnexpectedEOF — normal for files < 512 bytes (Pitfall 3).
|
||||
if readErr != nil && !errors.Is(readErr, io.ErrUnexpectedEOF) {
|
||||
return "", 0, readErr
|
||||
buf, err := io.ReadAll(file)
|
||||
if err != nil {
|
||||
return "", 0, err
|
||||
}
|
||||
contentType = http.DetectContentType(sniffBuf[:n])
|
||||
|
||||
// Reconstruct full body: sniffed bytes + remaining reader.
|
||||
body := io.MultiReader(bytes.NewReader(sniffBuf[:n]), file)
|
||||
contentType = http.DetectContentType(buf)
|
||||
bytesWritten = int64(len(buf))
|
||||
|
||||
// Wrap in a counting reader to track actual bytes written (Pitfall 8).
|
||||
counter := &byteCountReader{r: body}
|
||||
|
||||
_, putErr := s.client.PutObject(ctx, &s3.PutObjectInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Key: aws.String(key),
|
||||
Body: counter,
|
||||
ContentType: aws.String(contentType),
|
||||
_, err = s.client.PutObject(ctx, &s3.PutObjectInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Key: aws.String(key),
|
||||
Body: bytes.NewReader(buf),
|
||||
ContentType: aws.String(contentType),
|
||||
ContentLength: aws.Int64(bytesWritten),
|
||||
})
|
||||
if putErr != nil {
|
||||
return contentType, counter.count, putErr
|
||||
}
|
||||
|
||||
return contentType, counter.count, nil
|
||||
return contentType, bytesWritten, err
|
||||
}
|
||||
|
||||
// Delete removes an object from S3.
|
||||
|
|
|
|||
|
|
@ -8,145 +8,46 @@ import (
|
|||
"testing"
|
||||
)
|
||||
|
||||
// TestByteCountReader verifies that byteCountReader accurately tracks the
|
||||
// number of bytes read from the wrapped reader.
|
||||
func TestByteCountReader(t *testing.T) {
|
||||
data := []byte("hello, world!")
|
||||
r := &byteCountReader{r: bytes.NewReader(data)}
|
||||
|
||||
buf := make([]byte, len(data))
|
||||
n, err := io.ReadFull(r, buf)
|
||||
if err != nil {
|
||||
t.Fatalf("ReadFull: %v", err)
|
||||
}
|
||||
if n != len(data) {
|
||||
t.Errorf("ReadFull read %d bytes; want %d", n, len(data))
|
||||
}
|
||||
if r.count != int64(len(data)) {
|
||||
t.Errorf("byteCountReader.count = %d; want %d", r.count, len(data))
|
||||
}
|
||||
if string(buf) != string(data) {
|
||||
t.Errorf("ReadFull data = %q; want %q", buf, data)
|
||||
}
|
||||
}
|
||||
|
||||
// TestByteCountReader_MultipleReads verifies that successive partial reads
|
||||
// accumulate correctly in count.
|
||||
func TestByteCountReader_MultipleReads(t *testing.T) {
|
||||
data := []byte("abcdefghij") // 10 bytes
|
||||
r := &byteCountReader{r: bytes.NewReader(data)}
|
||||
|
||||
// Read 3 bytes at a time.
|
||||
buf := make([]byte, 3)
|
||||
var total int
|
||||
for {
|
||||
n, err := r.Read(buf)
|
||||
total += n
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("Read: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if total != len(data) {
|
||||
t.Errorf("total bytes read = %d; want %d", total, len(data))
|
||||
}
|
||||
if r.count != int64(len(data)) {
|
||||
t.Errorf("byteCountReader.count = %d; want %d", r.count, len(data))
|
||||
}
|
||||
}
|
||||
|
||||
// TestByteCountReader_EmptyReader verifies that an empty reader results in count == 0.
|
||||
func TestByteCountReader_EmptyReader(t *testing.T) {
|
||||
r := &byteCountReader{r: strings.NewReader("")}
|
||||
buf := make([]byte, 10)
|
||||
n, err := r.Read(buf)
|
||||
if err != io.EOF && err != nil {
|
||||
t.Fatalf("Read on empty: unexpected error %v", err)
|
||||
}
|
||||
if n != 0 {
|
||||
t.Errorf("Read on empty returned %d bytes; want 0", n)
|
||||
}
|
||||
if r.count != 0 {
|
||||
t.Errorf("byteCountReader.count = %d; want 0", r.count)
|
||||
}
|
||||
}
|
||||
|
||||
// TestUpload_ContentTypeSniff verifies that Store.Upload sniffs content-type
|
||||
// from the first 512 bytes of the reader using http.DetectContentType.
|
||||
// This test uses a fake/no-op S3 client to isolate the sniff logic.
|
||||
//
|
||||
// Strategy: We cannot call Store.Upload against a real S3 endpoint in CI,
|
||||
// but we CAN verify the sniff behavior by calling http.DetectContentType
|
||||
// directly on known byte patterns and asserting the expected MIME types —
|
||||
// mirroring the exact code path inside Upload.
|
||||
// TestUpload_ContentTypeSniff_PNG verifies that Upload sniffs "image/png" from
|
||||
// a buffer that starts with the PNG magic bytes.
|
||||
func TestUpload_ContentTypeSniff_PNG(t *testing.T) {
|
||||
// PNG magic bytes.
|
||||
pngHeader := []byte{0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A}
|
||||
// Pad to > 512 bytes so ReadFull doesn't return ErrUnexpectedEOF.
|
||||
data := make([]byte, 600)
|
||||
copy(data, pngHeader)
|
||||
|
||||
var sniffBuf [512]byte
|
||||
n, _ := io.ReadFull(bytes.NewReader(data), sniffBuf[:])
|
||||
got := http.DetectContentType(sniffBuf[:n])
|
||||
|
||||
got := http.DetectContentType(data)
|
||||
if got != "image/png" {
|
||||
t.Errorf("DetectContentType(PNG) = %q; want %q", got, "image/png")
|
||||
}
|
||||
}
|
||||
|
||||
// TestUpload_ContentTypeSniff_SmallFile verifies that io.ErrUnexpectedEOF is
|
||||
// handled correctly when a file is smaller than the 512-byte sniff buffer.
|
||||
// The sniff must succeed on the partial bytes (Pitfall 3 guard).
|
||||
// TestUpload_ContentTypeSniff_SmallFile verifies that a file smaller than
|
||||
// 512 bytes still yields a valid MIME type (no ErrUnexpectedEOF panic path).
|
||||
func TestUpload_ContentTypeSniff_SmallFile(t *testing.T) {
|
||||
// A small text file — well under 512 bytes.
|
||||
data := []byte("hello world")
|
||||
|
||||
var sniffBuf [512]byte
|
||||
n, err := io.ReadFull(bytes.NewReader(data), sniffBuf[:])
|
||||
|
||||
// io.ErrUnexpectedEOF is the expected outcome for a file < 512 bytes.
|
||||
if err != io.ErrUnexpectedEOF {
|
||||
t.Errorf("ReadFull on small file: error = %v; want io.ErrUnexpectedEOF", err)
|
||||
}
|
||||
|
||||
// Despite the short read, DetectContentType must return a valid MIME type.
|
||||
got := http.DetectContentType(sniffBuf[:n])
|
||||
got := http.DetectContentType(data)
|
||||
if got == "" {
|
||||
t.Errorf("DetectContentType returned empty string for small file")
|
||||
t.Error("DetectContentType returned empty string for small file")
|
||||
}
|
||||
if !strings.HasPrefix(got, "text/") {
|
||||
t.Errorf("DetectContentType(small text file) = %q; want text/* prefix", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestUpload_ContentTypeSniff_ReconstructsBody verifies that after sniffing
|
||||
// the first 512 bytes, the full body (sniff bytes + remainder) is preserved
|
||||
// via io.MultiReader — no bytes are dropped (Pitfall 8 guard).
|
||||
func TestUpload_ContentTypeSniff_ReconstructsBody(t *testing.T) {
|
||||
// TestUpload_FullBodyPreserved verifies that io.ReadAll captures every byte
|
||||
// from the input reader — no truncation at 512 bytes.
|
||||
func TestUpload_FullBodyPreserved(t *testing.T) {
|
||||
original := make([]byte, 1024)
|
||||
for i := range original {
|
||||
original[i] = byte(i % 256)
|
||||
}
|
||||
|
||||
reader := bytes.NewReader(original)
|
||||
var sniffBuf [512]byte
|
||||
n, _ := io.ReadFull(reader, sniffBuf[:])
|
||||
|
||||
// Reconstruct full body as Upload does.
|
||||
body := io.MultiReader(bytes.NewReader(sniffBuf[:n]), reader)
|
||||
reconstructed, err := io.ReadAll(body)
|
||||
buf, err := io.ReadAll(bytes.NewReader(original))
|
||||
if err != nil {
|
||||
t.Fatalf("ReadAll reconstructed body: %v", err)
|
||||
t.Fatalf("readAll: %v", err)
|
||||
}
|
||||
|
||||
if len(reconstructed) != len(original) {
|
||||
t.Errorf("reconstructed length = %d; want %d", len(reconstructed), len(original))
|
||||
}
|
||||
if !bytes.Equal(reconstructed, original) {
|
||||
t.Error("reconstructed body does not match original — bytes were dropped during sniff")
|
||||
if !bytes.Equal(buf, original) {
|
||||
t.Errorf("buffer length %d; want %d — bytes were dropped", len(buf), len(original))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,6 +36,12 @@ sortable_version := "1.15.7"
|
|||
# --- Local config ---------------------------------------------------------------------------
|
||||
|
||||
database_url := "postgres://xtablo:xtablo@localhost:5432/xtablo?sslmode=disable"
|
||||
s3_endpoint := "http://localhost:9000"
|
||||
s3_bucket := "xtablo-dev"
|
||||
s3_region := "us-east-1"
|
||||
s3_access_key := "minioadmin"
|
||||
s3_secret_key := "minioadmin"
|
||||
s3_use_path_style := "true"
|
||||
tailwind := "./bin/tailwindcss"
|
||||
compose_config_dir := ".podman-compose"
|
||||
|
||||
|
|
@ -86,7 +92,7 @@ compose-config:
|
|||
printf '%s\n' '{"auths":{}}' > {{ compose_config_dir }}/config.json
|
||||
|
||||
db-up: compose-config
|
||||
DOCKER_CONFIG="$PWD/{{ compose_config_dir }}" podman compose up -d postgres
|
||||
DOCKER_CONFIG="$PWD/{{ compose_config_dir }}" podman compose up -d postgres minio minio-init
|
||||
|
||||
db-down:
|
||||
podman compose down
|
||||
|
|
@ -110,17 +116,25 @@ styles-watch:
|
|||
|
||||
dev: db-up
|
||||
just generate
|
||||
DATABASE_URL='{{ database_url }}' SESSION_SECRET=191affeb1624de1f0e07bd5cfab14cd655510a24f7e673bd784ea56847890caf air -c .air.toml
|
||||
DATABASE_URL='{{ database_url }}' \
|
||||
SESSION_SECRET=191affeb1624de1f0e07bd5cfab14cd655510a24f7e673bd784ea56847890caf \
|
||||
S3_ENDPOINT='{{ s3_endpoint }}' \
|
||||
S3_BUCKET='{{ s3_bucket }}' \
|
||||
S3_REGION='{{ s3_region }}' \
|
||||
S3_ACCESS_KEY='{{ s3_access_key }}' \
|
||||
S3_SECRET_KEY='{{ s3_secret_key }}' \
|
||||
S3_USE_PATH_STYLE='{{ s3_use_path_style }}' \
|
||||
air -c .air.toml
|
||||
|
||||
# Start the worker binary (development — requires db-up and MinIO running).
|
||||
worker: db-up
|
||||
DATABASE_URL='{{ database_url }}' \
|
||||
S3_ENDPOINT='http://localhost:9000' \
|
||||
S3_BUCKET='xtablo' \
|
||||
S3_REGION='us-east-1' \
|
||||
S3_ACCESS_KEY='minioadmin' \
|
||||
S3_SECRET_KEY='minioadmin' \
|
||||
S3_USE_PATH_STYLE='true' \
|
||||
S3_ENDPOINT='{{ s3_endpoint }}' \
|
||||
S3_BUCKET='{{ s3_bucket }}' \
|
||||
S3_REGION='{{ s3_region }}' \
|
||||
S3_ACCESS_KEY='{{ s3_access_key }}' \
|
||||
S3_SECRET_KEY='{{ s3_secret_key }}' \
|
||||
S3_USE_PATH_STYLE='{{ s3_use_path_style }}' \
|
||||
go run ./cmd/worker
|
||||
|
||||
test:
|
||||
|
|
|
|||
Loading…
Reference in a new issue