diff --git a/cmd/hwlab/main.go b/cmd/hwlab/main.go index d606c8a..ede209c 100644 --- a/cmd/hwlab/main.go +++ b/cmd/hwlab/main.go @@ -1,14 +1,19 @@ package main import ( + "context" "fmt" "io/fs" "log" "net/http" + "os/signal" + "syscall" + "time" hwlab "git.georgsen.dk/hwlab" "git.georgsen.dk/hwlab/internal/api" "git.georgsen.dk/hwlab/internal/config" + "git.georgsen.dk/hwlab/internal/queue" ) func main() { @@ -22,10 +27,39 @@ func main() { log.Fatalf("embed: %v", err) } + // Context for graceful shutdown + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + // Start write-ahead queue worker (non-fatal if DragonFlyDB unavailable) + waq, err := queue.NewWAQ(cfg.DragonflyURL) + if err != nil { + log.Printf("WARNING: WAQ unavailable (%v) — NetBox operations will not be queued during downtime", err) + } else { + retryInterval := time.Duration(cfg.WAQRetryIntervalSeconds) * time.Second + go waq.RunWorker(ctx, queue.NoOpHandler, cfg.WAQMaxAttempts, retryInterval) + defer waq.Close() + log.Printf("WAQ worker started") + } + router := api.NewRouter(staticFS) addr := fmt.Sprintf("%s:%d", cfg.Host, cfg.Port) log.Printf("HWLab starting on %s", addr) - if err := http.ListenAndServe(addr, router); err != nil { - log.Fatalf("server: %v", err) + + srv := &http.Server{Addr: addr, Handler: router} + go func() { + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("server: %v", err) + } + }() + + // Wait for shutdown signal + <-ctx.Done() + log.Println("Shutting down...") + shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := srv.Shutdown(shutdownCtx); err != nil { + log.Printf("server shutdown: %v", err) } + log.Println("Shutdown complete") } diff --git a/internal/queue/worker.go b/internal/queue/worker.go new file mode 100644 index 0000000..cd61cf4 --- /dev/null +++ b/internal/queue/worker.go @@ -0,0 +1,70 @@ +package queue + +import ( + "context" + "log" + "time" +) + +// OpHandler is a function that processes a single dequeued operation. +// Returns nil on success, error if the operation should be re-queued. +type OpHandler func(ctx context.Context, op PendingOp) error + +// RunWorker runs a blocking BLPOP loop processing ops from the queue. +// It calls handler for each dequeued op. If handler returns an error, +// the op is re-enqueued with incremented Attempts count. +// Ops that exceed maxAttempts are dropped with a log warning. +// +// On DragonFlyDB connection loss, RunWorker backs off and retries connection. +// Call with a cancellable context to stop the worker cleanly. +func (q *WAQ) RunWorker(ctx context.Context, handler OpHandler, maxAttempts int, retryInterval time.Duration) { + log.Printf("WAQ worker started (maxAttempts=%d, retryInterval=%s)", maxAttempts, retryInterval) + for { + select { + case <-ctx.Done(): + log.Printf("WAQ worker stopping: %v", ctx.Err()) + return + default: + } + + op, err := q.Dequeue(ctx, 5*time.Second) + if err != nil { + // Connection error — back off before retrying + log.Printf("WAQ dequeue error: %v — backing off %s", err, retryInterval) + select { + case <-ctx.Done(): + return + case <-time.After(retryInterval): + } + continue + } + if op == nil { + // Timeout with no items — loop immediately (BLPOP already waited 5s) + continue + } + + // Process the operation + if err := handler(ctx, *op); err != nil { + op.Attempts++ + if op.Attempts >= maxAttempts { + log.Printf("WAQ: dropping op %s (type=%s) after %d failed attempts: %v", + op.ID, op.Type, op.Attempts, err) + continue + } + // Re-enqueue for retry + log.Printf("WAQ: re-enqueuing op %s (type=%s, attempt=%d): %v", + op.ID, op.Type, op.Attempts, err) + if enqErr := q.Enqueue(ctx, *op); enqErr != nil { + log.Printf("WAQ: failed to re-enqueue op %s: %v", op.ID, enqErr) + } + } + } +} + +// NoOpHandler is a placeholder op handler for Phase 1. +// Phase 2 will replace this with a real NetBox retry handler. +// It logs the operation and returns nil (success) so ops drain from the queue. +func NoOpHandler(ctx context.Context, op PendingOp) error { + log.Printf("WAQ [noop]: processing op %s (type=%s, attempts=%d)", op.ID, op.Type, op.Attempts) + return nil +}