package queue import ( "context" "log" "time" ) // OpHandler is a function that processes a single dequeued operation. // Returns nil on success, error if the operation should be re-queued. type OpHandler func(ctx context.Context, op PendingOp) error // RunWorker runs a blocking BLPOP loop processing ops from the queue. // It calls handler for each dequeued op. If handler returns an error, // the op is re-enqueued with incremented Attempts count. // Ops that exceed maxAttempts are dropped with a log warning. // // On DragonFlyDB connection loss, RunWorker backs off and retries connection. // Call with a cancellable context to stop the worker cleanly. func (q *WAQ) RunWorker(ctx context.Context, handler OpHandler, maxAttempts int, retryInterval time.Duration) { log.Printf("WAQ worker started (maxAttempts=%d, retryInterval=%s)", maxAttempts, retryInterval) for { select { case <-ctx.Done(): log.Printf("WAQ worker stopping: %v", ctx.Err()) return default: } op, err := q.Dequeue(ctx, 5*time.Second) if err != nil { // Connection error — back off before retrying log.Printf("WAQ dequeue error: %v — backing off %s", err, retryInterval) select { case <-ctx.Done(): return case <-time.After(retryInterval): } continue } if op == nil { // Timeout with no items — loop immediately (BLPOP already waited 5s) continue } // Process the operation if err := handler(ctx, *op); err != nil { op.Attempts++ if op.Attempts >= maxAttempts { log.Printf("WAQ: dropping op %s (type=%s) after %d failed attempts: %v", op.ID, op.Type, op.Attempts, err) continue } // Re-enqueue for retry log.Printf("WAQ: re-enqueuing op %s (type=%s, attempt=%d): %v", op.ID, op.Type, op.Attempts, err) if enqErr := q.Enqueue(ctx, *op); enqErr != nil { log.Printf("WAQ: failed to re-enqueue op %s: %v", op.ID, enqErr) } } } } // NoOpHandler is a placeholder op handler for Phase 1. // Phase 2 will replace this with a real NetBox retry handler. // It logs the operation and returns nil (success) so ops drain from the queue. func NoOpHandler(ctx context.Context, op PendingOp) error { log.Printf("WAQ [noop]: processing op %s (type=%s, attempts=%d)", op.ID, op.Type, op.Attempts) return nil }