--- phase: 01-foundation plan: 05 type: execute wave: 2 depends_on: - 01-01-PLAN.md files_modified: - internal/queue/waq.go - internal/queue/waq_test.go - internal/queue/worker.go - cmd/hwlab/main.go autonomous: true requirements: - NB-05 must_haves: truths: - "WAQ connects to DragonFlyDB at redis://:nUq/IfoIQJf/kouckKHRQOk7vV0NwCuI@10.5.0.10:6379" - "Enqueue() pushes a serialized PendingOp onto the hwlab:netbox:pending_ops LIST" - "RunWorker() BLPOP-blocks and processes available operations" - "DragonFlyDB unavailability does not crash the binary — WAQ degrades gracefully" - "Dequeue returns the oldest operation first (FIFO via RPUSH/BLPOP pattern)" artifacts: - path: "internal/queue/waq.go" provides: "WAQ type: Enqueue, Dequeue, Len — DragonFlyDB-backed write-ahead queue" exports: ["WAQ", "NewWAQ", "PendingOp", "Enqueue", "Len"] - path: "internal/queue/worker.go" provides: "RunWorker goroutine: BLPOP retry loop, exponential backoff on connection failure" exports: ["RunWorker"] key_links: - from: "internal/queue/waq.go" to: "redis://:nUq/IfoIQJf/kouckKHRQOk7vV0NwCuI@10.5.0.10:6379" via: "go-redis v9 ParseURL + NewClient" pattern: "redis.ParseURL|ParseURL" - from: "internal/queue/worker.go" to: "internal/queue/waq.go" via: "BLPOP + processOp callback" pattern: "BLPOP|RunWorker" - from: "cmd/hwlab/main.go" to: "internal/queue/worker.go" via: "go waq.RunWorker(ctx)" pattern: "RunWorker" --- Implement the DragonFlyDB write-ahead queue: enqueue failed or deferred NetBox operations during downtime, and a worker goroutine that retries them when connectivity restores. Purpose: NetBox may be temporarily unavailable (container restart, network blip). The WAQ ensures no inventory operations are lost — they're buffered in DragonFlyDB and retried automatically. NB-05 is the sole requirement for this plan. Output: `internal/queue` package with WAQ and worker, wired into the main binary. @$HOME/.claude/get-shit-done/workflows/execute-plan.md @$HOME/.claude/get-shit-done/templates/summary.md @.planning/PROJECT.md @.planning/ROADMAP.md @.env @.planning/phases/01-foundation/01-RESEARCH.md @.planning/phases/01-foundation/01-01-SUMMARY.md URL: redis://:nUq/IfoIQJf/kouckKHRQOk7vV0NwCuI@10.5.0.10:6379 Queue key: "hwlab:netbox:pending_ops" (Redis LIST) Operations: RPUSH (enqueue), BLPOP (dequeue + block), LLEN (queue depth) From internal/config/config.go: ```go type Config struct { DragonflyURL string // "redis://:PASSWORD@10.5.0.10:6379" WAQRetryIntervalSeconds int // default 30 WAQMaxAttempts int // default 5 } ``` go-redis v9 pattern (verified in RESEARCH.md): ```go import "github.com/redis/go-redis/v9" opt, err := redis.ParseURL(redisURL) // parses redis:// URL including password client := redis.NewClient(opt) err = client.Ping(ctx).Err() // connectivity check // Enqueue: RPUSH appends to right (FIFO with BLPOP from left) err = client.RPush(ctx, key, data).Err() // Dequeue (blocking): BLPOP pops from left, blocks up to timeout result, err := client.BLPop(ctx, 5*time.Second, key).Result() // result[0] = key name, result[1] = value // Queue depth n, err := client.LLen(ctx, key).Result() ``` Task 1: Write-ahead queue core (Enqueue, Dequeue, Len) internal/queue/waq.go, internal/queue/waq_test.go - /home/mikkel/homelabby/.env (HWLAB_DRAGONFLY_URL — verify it's redis://:nUq/IfoIQJf/kouckKHRQOk7vV0NwCuI@10.5.0.10:6379) - /home/mikkel/homelabby/.planning/phases/01-foundation/01-RESEARCH.md (Pattern 5: DragonFlyDB Write-Ahead Queue, lines 266-320) - /home/mikkel/homelabby/internal/config/config.go (DragonflyURL field) - Test 1: NewWAQ with invalid URL returns error - Test 2: NewWAQ with valid URL but unreachable server returns error on Ping - Test 3 (INTEGRATION — skip if DragonFlyDB unreachable): Enqueue() + Len() = 1 - Test 4 (INTEGRATION): Enqueue(op1), Enqueue(op2), Dequeue() returns op1 first (FIFO) - Test 5 (INTEGRATION): Len() returns 0 after all ops dequeued - Test 6: PendingOp marshals/unmarshals to/from JSON correctly (ID, Type, Payload preserved) Create `internal/queue/waq.go`: ```go package queue import ( "context" "encoding/json" "fmt" "log" "time" "github.com/google/uuid" "github.com/redis/go-redis/v9" ) const queueKey = "hwlab:netbox:pending_ops" // PendingOp represents a queued NetBox operation. // Serialized as JSON in DragonFlyDB LIST. type PendingOp struct { ID string `json:"id"` // UUID Type string `json:"type"` // "create_device", "patch_custom_fields", "sync_tags", etc. Payload json.RawMessage `json:"payload"` // operation-specific data CreatedAt time.Time `json:"created_at"` Attempts int `json:"attempts"` // retry count } // NewPendingOp creates a new PendingOp with a generated ID and current timestamp. func NewPendingOp(opType string, payload interface{}) (PendingOp, error) { data, err := json.Marshal(payload) if err != nil { return PendingOp{}, fmt.Errorf("marshal payload: %w", err) } return PendingOp{ ID: uuid.New().String(), Type: opType, Payload: json.RawMessage(data), CreatedAt: time.Now().UTC(), Attempts: 0, }, nil } // WAQ is a write-ahead queue backed by DragonFlyDB (Redis-compatible). // Operations are stored as JSON in a Redis LIST using RPUSH/BLPOP for FIFO ordering. type WAQ struct { rdb *redis.Client } // NewWAQ creates a new WAQ connected to the given Redis/DragonFlyDB URL. // Returns error if the URL is invalid or the server is unreachable. func NewWAQ(redisURL string) (*WAQ, error) { opt, err := redis.ParseURL(redisURL) if err != nil { return nil, fmt.Errorf("parse dragonfly url: %w", err) } client := redis.NewClient(opt) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := client.Ping(ctx).Err(); err != nil { client.Close() return nil, fmt.Errorf("dragonfly unreachable at %s: %w", redisURL, err) } log.Printf("WAQ connected to DragonFlyDB") return &WAQ{rdb: client}, nil } // Enqueue pushes a PendingOp onto the right end of the queue (FIFO). func (q *WAQ) Enqueue(ctx context.Context, op PendingOp) error { data, err := json.Marshal(op) if err != nil { return fmt.Errorf("marshal op: %w", err) } return q.rdb.RPush(ctx, queueKey, data).Err() } // Dequeue pops the oldest operation (blocking for up to timeout). // Returns nil, nil if timeout elapses with no item. // Returns nil, redis.Nil if queue is empty (non-blocking variant would use LPop). func (q *WAQ) Dequeue(ctx context.Context, timeout time.Duration) (*PendingOp, error) { result, err := q.rdb.BLPop(ctx, timeout, queueKey).Result() if err == redis.Nil { return nil, nil // timeout — no items } if err != nil { return nil, fmt.Errorf("blpop: %w", err) } var op PendingOp if err := json.Unmarshal([]byte(result[1]), &op); err != nil { return nil, fmt.Errorf("unmarshal op: %w", err) } return &op, nil } // Len returns the current number of pending operations in the queue. func (q *WAQ) Len(ctx context.Context) (int64, error) { n, err := q.rdb.LLen(ctx, queueKey).Result() if err != nil { return 0, fmt.Errorf("llen: %w", err) } return n, nil } // Close releases the DragonFlyDB connection. func (q *WAQ) Close() error { return q.rdb.Close() } ``` Create `internal/queue/waq_test.go`: ```go package queue_test import ( "context" "encoding/json" "os" "testing" "time" "git.georgsen.dk/hwlab/internal/queue" ) func TestPendingOpJSON(t *testing.T) { payload := map[string]string{"device_id": "42", "hw_id": "HW-00001"} op, err := queue.NewPendingOp("create_device", payload) if err != nil { t.Fatalf("NewPendingOp: %v", err) } if op.ID == "" { t.Error("ID should be a UUID") } if op.Type != "create_device" { t.Errorf("Type: want create_device, got %s", op.Type) } // Round-trip JSON data, _ := json.Marshal(op) var op2 queue.PendingOp if err := json.Unmarshal(data, &op2); err != nil { t.Fatalf("unmarshal: %v", err) } if op2.ID != op.ID { t.Errorf("ID mismatch after round-trip: %s != %s", op2.ID, op.ID) } } func TestNewWAQInvalidURL(t *testing.T) { _, err := queue.NewWAQ("not-a-redis-url") if err == nil { t.Error("expected error for invalid URL") } } // dragonflyURL returns the DragonFlyDB URL from env, or skips test if unreachable. func dragonflyURL(t *testing.T) string { t.Helper() url := os.Getenv("HWLAB_DRAGONFLY_URL") if url == "" { url = "redis://:nUq/IfoIQJf/kouckKHRQOk7vV0NwCuI@10.5.0.10:6379" } return url } func TestWAQEnqueueDequeue(t *testing.T) { waq, err := queue.NewWAQ(dragonflyURL(t)) if err != nil { t.Skipf("DragonFlyDB unavailable: %v", err) } defer waq.Close() ctx := context.Background() // Clean up before test // Note: In a real test suite, use a test-specific queue key. // For now, just ensure queue starts non-empty cleanup is acceptable. op, _ := queue.NewPendingOp("test_op", map[string]string{"test": "value"}) if err := waq.Enqueue(ctx, op); err != nil { t.Fatalf("Enqueue: %v", err) } n, err := waq.Len(ctx) if err != nil { t.Fatalf("Len: %v", err) } if n < 1 { t.Error("expected at least 1 item after enqueue") } got, err := waq.Dequeue(ctx, 2*time.Second) if err != nil { t.Fatalf("Dequeue: %v", err) } if got == nil { t.Fatal("expected op from dequeue, got nil") } if got.ID != op.ID { t.Errorf("dequeued op ID mismatch: want %s, got %s", op.ID, got.ID) } } ``` cd /home/mikkel/homelabby && go test ./internal/queue/... -v -run "TestPendingOpJSON|TestNewWAQInvalidURL" - `go test ./internal/queue/... -run "TestPendingOpJSON|TestNewWAQInvalidURL"` passes (unit tests, no DragonFlyDB needed) - `grep "hwlab:netbox:pending_ops" internal/queue/waq.go` returns the queueKey const - `grep "RPush" internal/queue/waq.go` returns the Enqueue implementation - `grep "BLPop" internal/queue/waq.go` returns the Dequeue implementation - `grep "ParseURL" internal/queue/waq.go` returns the redis.ParseURL call - `go build ./internal/queue/...` exits 0 - If DragonFlyDB reachable: `go test ./internal/queue/... -v -run TestWAQEnqueueDequeue` passes WAQ core implemented. Unit tests pass. FIFO enqueue/dequeue via RPUSH/BLPOP. Integration test skips gracefully when DragonFlyDB unreachable. Task 2: WAQ retry worker + wire into main binary internal/queue/worker.go, cmd/hwlab/main.go - /home/mikkel/homelabby/internal/queue/waq.go (WAQ type, PendingOp, Dequeue method) - /home/mikkel/homelabby/cmd/hwlab/main.go (current main.go to understand wiring pattern) - /home/mikkel/homelabby/internal/config/config.go (WAQRetryIntervalSeconds, DragonflyURL) 1. Create `internal/queue/worker.go`: ```go package queue import ( "context" "log" "time" ) // OpHandler is a function that processes a single dequeued operation. // Returns nil on success, error if the operation should be re-queued. type OpHandler func(ctx context.Context, op PendingOp) error // RunWorker runs a blocking BLPOP loop processing ops from the queue. // It calls handler for each dequeued op. If handler returns an error, // the op is re-enqueued with incremented Attempts count. // Ops that exceed maxAttempts are dropped with a log warning. // // On DragonFlyDB connection loss, RunWorker backs off and retries connection. // Call with a cancellable context to stop the worker cleanly. func (q *WAQ) RunWorker(ctx context.Context, handler OpHandler, maxAttempts int, retryInterval time.Duration) { log.Printf("WAQ worker started (maxAttempts=%d, retryInterval=%s)", maxAttempts, retryInterval) for { select { case <-ctx.Done(): log.Printf("WAQ worker stopping: %v", ctx.Err()) return default: } op, err := q.Dequeue(ctx, 5*time.Second) if err != nil { // Connection error — back off before retrying log.Printf("WAQ dequeue error: %v — backing off %s", err, retryInterval) select { case <-ctx.Done(): return case <-time.After(retryInterval): } continue } if op == nil { // Timeout with no items — loop immediately (BLPOP already waited 5s) continue } // Process the operation if err := handler(ctx, *op); err != nil { op.Attempts++ if op.Attempts >= maxAttempts { log.Printf("WAQ: dropping op %s (type=%s) after %d failed attempts: %v", op.ID, op.Type, op.Attempts, err) continue } // Re-enqueue for retry log.Printf("WAQ: re-enqueuing op %s (type=%s, attempt=%d): %v", op.ID, op.Type, op.Attempts, err) if enqErr := q.Enqueue(ctx, *op); enqErr != nil { log.Printf("WAQ: failed to re-enqueue op %s: %v", op.ID, enqErr) } } } } // NoOpHandler is a placeholder op handler for Phase 1. // Phase 2 will replace this with a real NetBox retry handler. // It logs the operation and returns nil (success) so ops drain from the queue. func NoOpHandler(ctx context.Context, op PendingOp) error { log.Printf("WAQ [noop]: processing op %s (type=%s, attempts=%d)", op.ID, op.Type, op.Attempts) return nil } ``` 2. Update `cmd/hwlab/main.go` to wire the WAQ: Read the current main.go first, then add WAQ initialization and worker goroutine. The WAQ initialization must be non-fatal — if DragonFlyDB is unavailable, the binary still starts and serves HTTP. WAQ is degraded, not required. Updated main.go pattern: ```go package main import ( "context" "fmt" "log" "net/http" "os/signal" "syscall" "time" "git.georgsen.dk/hwlab/internal/api" "git.georgsen.dk/hwlab/internal/config" "git.georgsen.dk/hwlab/internal/queue" ) func main() { cfg, err := config.Load() if err != nil { log.Fatalf("config: %v", err) } // Context for graceful shutdown ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() // Start write-ahead queue worker (non-fatal if DragonFlyDB unavailable) waq, err := queue.NewWAQ(cfg.DragonflyURL) if err != nil { log.Printf("WARNING: WAQ unavailable (%v) — NetBox operations will not be queued during downtime", err) } else { retryInterval := time.Duration(cfg.WAQRetryIntervalSeconds) * time.Second go waq.RunWorker(ctx, queue.NoOpHandler, cfg.WAQMaxAttempts, retryInterval) defer waq.Close() log.Printf("WAQ worker started") } router := api.NewRouter() addr := fmt.Sprintf("%s:%d", cfg.Host, cfg.Port) log.Printf("HWLab starting on %s", addr) srv := &http.Server{Addr: addr, Handler: router} go func() { if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatalf("server: %v", err) } }() // Wait for shutdown signal <-ctx.Done() log.Println("Shutting down...") shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() srv.Shutdown(shutdownCtx) log.Println("Shutdown complete") } ``` cd /home/mikkel/homelabby && go build ./cmd/hwlab/... && go vet ./... - `go build ./cmd/hwlab/...` exits 0 - `go vet ./...` exits 0 - `grep "RunWorker" cmd/hwlab/main.go` returns the goroutine call - `grep "NoOpHandler" internal/queue/worker.go` returns the placeholder handler - `grep "maxAttempts" internal/queue/worker.go` returns the op drop condition - `grep "WARNING: WAQ unavailable" cmd/hwlab/main.go` returns the non-fatal degraded path - Binary starts without panic when DragonFlyDB is available - Binary starts with WARNING log (not fatal) when DragonFlyDB is unavailable - `go test ./...` remains green (no regressions from main.go changes) WAQ worker implemented with exponential backoff, max attempts drop, and NoOpHandler placeholder. main.go updated with graceful shutdown and non-fatal WAQ initialization. Full `go test ./...` still green. ## Trust Boundaries | Boundary | Description | |----------|-------------| | internal/queue → DragonFlyDB | Redis protocol over TCP to 10.5.0.10:6379; password in URL | | PendingOp.Payload | JSON RawMessage from NetBox operation context — validated by op handler | ## STRIDE Threat Register | Threat ID | Category | Component | Disposition | Mitigation Plan | |-----------|----------|-----------|-------------|-----------------| | T-05-01 | Information Disclosure | DragonFlyDB password in HWLAB_DRAGONFLY_URL | accept | Private homelab LAN (10.5.0.x); password is in .env which is .gitignored; no external access | | T-05-02 | Tampering | PendingOp re-enqueue on failure | accept | Ops re-enqueued only by the worker itself; no external write path to the queue in Phase 1 | | T-05-03 | Denial of Service | Queue accumulation if handler always fails | mitigate | maxAttempts drop logic — ops dropped after cfg.WAQMaxAttempts (default 5) failures; prevents unbounded queue growth | | T-05-04 | Denial of Service | WAQ worker tight-loop on connection loss | mitigate | retryInterval backoff (default 30s) prevents hammering DragonFlyDB on reconnect | After both tasks complete: - `go test ./...` passes (all packages, no regressions) - `go build ./cmd/hwlab/...` exits 0 - `go vet ./...` exits 0 - If DragonFlyDB reachable: `go test ./internal/queue/... -v` shows TestWAQEnqueueDequeue PASS - Start binary and verify it serves health endpoint: `./hwlab &; sleep 1; curl http://localhost:8080/api/health; kill %1` 1. WAQ core: Enqueue, Dequeue, Len all working against DragonFlyDB (integration) or skipping gracefully (unit) 2. Worker: BLPOP loop with backoff, max attempts drop, context cancellation 3. main.go: Non-fatal WAQ init — WARNING log instead of panic when DragonFlyDB unavailable 4. Graceful shutdown: SIGINT triggers orderly HTTP server and WAQ worker shutdown 5. Full test suite `go test ./...` green After completion, create `.planning/phases/01-foundation/01-05-SUMMARY.md` with: - Whether DragonFlyDB integration tests ran or skipped - Any DragonFlyDB/go-redis v9 compatibility notes (DragonFlyDB is Redis-compatible but may have minor differences) - Final `go test ./...` output - Files created/modified