feat(01-05): WAQ retry worker and graceful shutdown wiring
- Add RunWorker: BLPOP loop with context cancellation and retryInterval backoff - Add NoOpHandler: Phase 1 placeholder that drains ops with a log line - Drop ops after maxAttempts with warning log (T-05-03 mitigation) - Update main.go: non-fatal WAQ init, graceful HTTP shutdown on SIGINT/SIGTERM
This commit is contained in:
parent
49a729a1a6
commit
d1192c3380
2 changed files with 106 additions and 2 deletions
|
|
@ -1,14 +1,19 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"log"
|
||||
"net/http"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
hwlab "git.georgsen.dk/hwlab"
|
||||
"git.georgsen.dk/hwlab/internal/api"
|
||||
"git.georgsen.dk/hwlab/internal/config"
|
||||
"git.georgsen.dk/hwlab/internal/queue"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
|
@ -22,10 +27,39 @@ func main() {
|
|||
log.Fatalf("embed: %v", err)
|
||||
}
|
||||
|
||||
// Context for graceful shutdown
|
||||
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||
defer stop()
|
||||
|
||||
// Start write-ahead queue worker (non-fatal if DragonFlyDB unavailable)
|
||||
waq, err := queue.NewWAQ(cfg.DragonflyURL)
|
||||
if err != nil {
|
||||
log.Printf("WARNING: WAQ unavailable (%v) — NetBox operations will not be queued during downtime", err)
|
||||
} else {
|
||||
retryInterval := time.Duration(cfg.WAQRetryIntervalSeconds) * time.Second
|
||||
go waq.RunWorker(ctx, queue.NoOpHandler, cfg.WAQMaxAttempts, retryInterval)
|
||||
defer waq.Close()
|
||||
log.Printf("WAQ worker started")
|
||||
}
|
||||
|
||||
router := api.NewRouter(staticFS)
|
||||
addr := fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)
|
||||
log.Printf("HWLab starting on %s", addr)
|
||||
if err := http.ListenAndServe(addr, router); err != nil {
|
||||
log.Fatalf("server: %v", err)
|
||||
|
||||
srv := &http.Server{Addr: addr, Handler: router}
|
||||
go func() {
|
||||
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||||
log.Fatalf("server: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for shutdown signal
|
||||
<-ctx.Done()
|
||||
log.Println("Shutting down...")
|
||||
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
if err := srv.Shutdown(shutdownCtx); err != nil {
|
||||
log.Printf("server shutdown: %v", err)
|
||||
}
|
||||
log.Println("Shutdown complete")
|
||||
}
|
||||
|
|
|
|||
70
internal/queue/worker.go
Normal file
70
internal/queue/worker.go
Normal file
|
|
@ -0,0 +1,70 @@
|
|||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"time"
|
||||
)
|
||||
|
||||
// OpHandler is a function that processes a single dequeued operation.
|
||||
// Returns nil on success, error if the operation should be re-queued.
|
||||
type OpHandler func(ctx context.Context, op PendingOp) error
|
||||
|
||||
// RunWorker runs a blocking BLPOP loop processing ops from the queue.
|
||||
// It calls handler for each dequeued op. If handler returns an error,
|
||||
// the op is re-enqueued with incremented Attempts count.
|
||||
// Ops that exceed maxAttempts are dropped with a log warning.
|
||||
//
|
||||
// On DragonFlyDB connection loss, RunWorker backs off and retries connection.
|
||||
// Call with a cancellable context to stop the worker cleanly.
|
||||
func (q *WAQ) RunWorker(ctx context.Context, handler OpHandler, maxAttempts int, retryInterval time.Duration) {
|
||||
log.Printf("WAQ worker started (maxAttempts=%d, retryInterval=%s)", maxAttempts, retryInterval)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Printf("WAQ worker stopping: %v", ctx.Err())
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
op, err := q.Dequeue(ctx, 5*time.Second)
|
||||
if err != nil {
|
||||
// Connection error — back off before retrying
|
||||
log.Printf("WAQ dequeue error: %v — backing off %s", err, retryInterval)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(retryInterval):
|
||||
}
|
||||
continue
|
||||
}
|
||||
if op == nil {
|
||||
// Timeout with no items — loop immediately (BLPOP already waited 5s)
|
||||
continue
|
||||
}
|
||||
|
||||
// Process the operation
|
||||
if err := handler(ctx, *op); err != nil {
|
||||
op.Attempts++
|
||||
if op.Attempts >= maxAttempts {
|
||||
log.Printf("WAQ: dropping op %s (type=%s) after %d failed attempts: %v",
|
||||
op.ID, op.Type, op.Attempts, err)
|
||||
continue
|
||||
}
|
||||
// Re-enqueue for retry
|
||||
log.Printf("WAQ: re-enqueuing op %s (type=%s, attempt=%d): %v",
|
||||
op.ID, op.Type, op.Attempts, err)
|
||||
if enqErr := q.Enqueue(ctx, *op); enqErr != nil {
|
||||
log.Printf("WAQ: failed to re-enqueue op %s: %v", op.ID, enqErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NoOpHandler is a placeholder op handler for Phase 1.
|
||||
// Phase 2 will replace this with a real NetBox retry handler.
|
||||
// It logs the operation and returns nil (success) so ops drain from the queue.
|
||||
func NoOpHandler(ctx context.Context, op PendingOp) error {
|
||||
log.Printf("WAQ [noop]: processing op %s (type=%s, attempts=%d)", op.ID, op.Type, op.Attempts)
|
||||
return nil
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue