- Add RunWorker: BLPOP loop with context cancellation and retryInterval backoff - Add NoOpHandler: Phase 1 placeholder that drains ops with a log line - Drop ops after maxAttempts with warning log (T-05-03 mitigation) - Update main.go: non-fatal WAQ init, graceful HTTP shutdown on SIGINT/SIGTERM
70 lines
2.2 KiB
Go
70 lines
2.2 KiB
Go
package queue
|
|
|
|
import (
|
|
"context"
|
|
"log"
|
|
"time"
|
|
)
|
|
|
|
// OpHandler is a function that processes a single dequeued operation.
|
|
// Returns nil on success, error if the operation should be re-queued.
|
|
type OpHandler func(ctx context.Context, op PendingOp) error
|
|
|
|
// RunWorker runs a blocking BLPOP loop processing ops from the queue.
|
|
// It calls handler for each dequeued op. If handler returns an error,
|
|
// the op is re-enqueued with incremented Attempts count.
|
|
// Ops that exceed maxAttempts are dropped with a log warning.
|
|
//
|
|
// On DragonFlyDB connection loss, RunWorker backs off and retries connection.
|
|
// Call with a cancellable context to stop the worker cleanly.
|
|
func (q *WAQ) RunWorker(ctx context.Context, handler OpHandler, maxAttempts int, retryInterval time.Duration) {
|
|
log.Printf("WAQ worker started (maxAttempts=%d, retryInterval=%s)", maxAttempts, retryInterval)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Printf("WAQ worker stopping: %v", ctx.Err())
|
|
return
|
|
default:
|
|
}
|
|
|
|
op, err := q.Dequeue(ctx, 5*time.Second)
|
|
if err != nil {
|
|
// Connection error — back off before retrying
|
|
log.Printf("WAQ dequeue error: %v — backing off %s", err, retryInterval)
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(retryInterval):
|
|
}
|
|
continue
|
|
}
|
|
if op == nil {
|
|
// Timeout with no items — loop immediately (BLPOP already waited 5s)
|
|
continue
|
|
}
|
|
|
|
// Process the operation
|
|
if err := handler(ctx, *op); err != nil {
|
|
op.Attempts++
|
|
if op.Attempts >= maxAttempts {
|
|
log.Printf("WAQ: dropping op %s (type=%s) after %d failed attempts: %v",
|
|
op.ID, op.Type, op.Attempts, err)
|
|
continue
|
|
}
|
|
// Re-enqueue for retry
|
|
log.Printf("WAQ: re-enqueuing op %s (type=%s, attempt=%d): %v",
|
|
op.ID, op.Type, op.Attempts, err)
|
|
if enqErr := q.Enqueue(ctx, *op); enqErr != nil {
|
|
log.Printf("WAQ: failed to re-enqueue op %s: %v", op.ID, enqErr)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// NoOpHandler is a placeholder op handler for Phase 1.
|
|
// Phase 2 will replace this with a real NetBox retry handler.
|
|
// It logs the operation and returns nil (success) so ops drain from the queue.
|
|
func NoOpHandler(ctx context.Context, op PendingOp) error {
|
|
log.Printf("WAQ [noop]: processing op %s (type=%s, attempts=%d)", op.ID, op.Type, op.Attempts)
|
|
return nil
|
|
}
|