package queue import ( "context" "encoding/json" "fmt" "log" "regexp" "strconv" "time" "github.com/google/uuid" "github.com/redis/go-redis/v9" ) const queueKey = "hwlab:netbox:pending_ops" // PendingOp represents a queued NetBox operation. // Serialized as JSON in DragonFlyDB LIST. type PendingOp struct { ID string `json:"id"` // UUID Type string `json:"type"` // "create_device", "patch_custom_fields", "sync_tags", etc. Payload json.RawMessage `json:"payload"` // operation-specific data CreatedAt time.Time `json:"created_at"` Attempts int `json:"attempts"` // retry count } // NewPendingOp creates a new PendingOp with a generated ID and current timestamp. func NewPendingOp(opType string, payload interface{}) (PendingOp, error) { data, err := json.Marshal(payload) if err != nil { return PendingOp{}, fmt.Errorf("marshal payload: %w", err) } return PendingOp{ ID: uuid.New().String(), Type: opType, Payload: json.RawMessage(data), CreatedAt: time.Now().UTC(), Attempts: 0, }, nil } // WAQ is a write-ahead queue backed by DragonFlyDB (Redis-compatible). // Operations are stored as JSON in a Redis LIST using RPUSH/BLPOP for FIFO ordering. type WAQ struct { rdb *redis.Client } // redisURLRegexp matches redis://[:password@]host[:port][/db] // The password group is non-greedy up to the last '@' before host:port. // This handles passwords containing forward slashes, which url.Parse cannot. var redisURLRegexp = regexp.MustCompile(`^rediss?://:?([^@]*)@([^:/]+)(?::(\d+))?(?:/(\d+))?$`) // parseRedisURL parses a redis:// URL into redis.Options, correctly handling // passwords that contain forward slashes (which break standard url.Parse). func parseRedisURL(rawURL string) (*redis.Options, error) { // Try standard ParseURL first (works when password has no special chars). if opt, err := redis.ParseURL(rawURL); err == nil { return opt, nil } // Fall back to manual regex parsing for passwords with slashes. m := redisURLRegexp.FindStringSubmatch(rawURL) if m == nil { return nil, fmt.Errorf("invalid redis URL format: %s", rawURL) } password := m[1] host := m[2] portStr := m[3] dbStr := m[4] port := 6379 if portStr != "" { p, err := strconv.Atoi(portStr) if err != nil { return nil, fmt.Errorf("invalid port in redis URL: %s", portStr) } port = p } db := 0 if dbStr != "" { d, err := strconv.Atoi(dbStr) if err != nil { return nil, fmt.Errorf("invalid db in redis URL: %s", dbStr) } db = d } return &redis.Options{ Addr: fmt.Sprintf("%s:%d", host, port), Password: password, DB: db, }, nil } // NewWAQ creates a new WAQ connected to the given Redis/DragonFlyDB URL. // Returns error if the URL is invalid or the server is unreachable. // Passwords containing forward slashes (or other special characters) are supported. func NewWAQ(redisURL string) (*WAQ, error) { opt, err := parseRedisURL(redisURL) if err != nil { return nil, fmt.Errorf("parse dragonfly url: %w", err) } client := redis.NewClient(opt) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := client.Ping(ctx).Err(); err != nil { client.Close() return nil, fmt.Errorf("dragonfly unreachable at %s: %w", redisURL, err) } log.Printf("WAQ connected to DragonFlyDB") return &WAQ{rdb: client}, nil } // Enqueue pushes a PendingOp onto the right end of the queue (FIFO). func (q *WAQ) Enqueue(ctx context.Context, op PendingOp) error { data, err := json.Marshal(op) if err != nil { return fmt.Errorf("marshal op: %w", err) } return q.rdb.RPush(ctx, queueKey, data).Err() } // Dequeue pops the oldest operation (blocking for up to timeout). // Returns nil, nil if timeout elapses with no item. func (q *WAQ) Dequeue(ctx context.Context, timeout time.Duration) (*PendingOp, error) { result, err := q.rdb.BLPop(ctx, timeout, queueKey).Result() if err == redis.Nil { return nil, nil // timeout — no items } if err != nil { return nil, fmt.Errorf("blpop: %w", err) } var op PendingOp if err := json.Unmarshal([]byte(result[1]), &op); err != nil { return nil, fmt.Errorf("unmarshal op: %w", err) } return &op, nil } // Len returns the current number of pending operations in the queue. func (q *WAQ) Len(ctx context.Context) (int64, error) { n, err := q.rdb.LLen(ctx, queueKey).Result() if err != nil { return 0, fmt.Errorf("llen: %w", err) } return n, nil } // Close releases the DragonFlyDB connection. func (q *WAQ) Close() error { return q.rdb.Close() }