- Add PendingOp struct with UUID, type, payload, created_at, attempts - Add WAQ type backed by DragonFlyDB via go-redis v9 - Implement Enqueue (RPUSH), Dequeue (BLPOP), Len (LLEN) - Custom URL parser handles passwords with forward slashes - Unit tests pass; integration test passes against DragonFlyDB at 10.5.0.10:6379
154 lines
4.5 KiB
Go
154 lines
4.5 KiB
Go
package queue
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"regexp"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/redis/go-redis/v9"
|
|
)
|
|
|
|
const queueKey = "hwlab:netbox:pending_ops"
|
|
|
|
// PendingOp represents a queued NetBox operation.
|
|
// Serialized as JSON in DragonFlyDB LIST.
|
|
type PendingOp struct {
|
|
ID string `json:"id"` // UUID
|
|
Type string `json:"type"` // "create_device", "patch_custom_fields", "sync_tags", etc.
|
|
Payload json.RawMessage `json:"payload"` // operation-specific data
|
|
CreatedAt time.Time `json:"created_at"`
|
|
Attempts int `json:"attempts"` // retry count
|
|
}
|
|
|
|
// NewPendingOp creates a new PendingOp with a generated ID and current timestamp.
|
|
func NewPendingOp(opType string, payload interface{}) (PendingOp, error) {
|
|
data, err := json.Marshal(payload)
|
|
if err != nil {
|
|
return PendingOp{}, fmt.Errorf("marshal payload: %w", err)
|
|
}
|
|
return PendingOp{
|
|
ID: uuid.New().String(),
|
|
Type: opType,
|
|
Payload: json.RawMessage(data),
|
|
CreatedAt: time.Now().UTC(),
|
|
Attempts: 0,
|
|
}, nil
|
|
}
|
|
|
|
// WAQ is a write-ahead queue backed by DragonFlyDB (Redis-compatible).
|
|
// Operations are stored as JSON in a Redis LIST using RPUSH/BLPOP for FIFO ordering.
|
|
type WAQ struct {
|
|
rdb *redis.Client
|
|
}
|
|
|
|
// redisURLRegexp matches redis://[:password@]host[:port][/db]
|
|
// The password group is non-greedy up to the last '@' before host:port.
|
|
// This handles passwords containing forward slashes, which url.Parse cannot.
|
|
var redisURLRegexp = regexp.MustCompile(`^rediss?://:?([^@]*)@([^:/]+)(?::(\d+))?(?:/(\d+))?$`)
|
|
|
|
// parseRedisURL parses a redis:// URL into redis.Options, correctly handling
|
|
// passwords that contain forward slashes (which break standard url.Parse).
|
|
func parseRedisURL(rawURL string) (*redis.Options, error) {
|
|
// Try standard ParseURL first (works when password has no special chars).
|
|
if opt, err := redis.ParseURL(rawURL); err == nil {
|
|
return opt, nil
|
|
}
|
|
|
|
// Fall back to manual regex parsing for passwords with slashes.
|
|
m := redisURLRegexp.FindStringSubmatch(rawURL)
|
|
if m == nil {
|
|
return nil, fmt.Errorf("invalid redis URL format: %s", rawURL)
|
|
}
|
|
password := m[1]
|
|
host := m[2]
|
|
portStr := m[3]
|
|
dbStr := m[4]
|
|
|
|
port := 6379
|
|
if portStr != "" {
|
|
p, err := strconv.Atoi(portStr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid port in redis URL: %s", portStr)
|
|
}
|
|
port = p
|
|
}
|
|
|
|
db := 0
|
|
if dbStr != "" {
|
|
d, err := strconv.Atoi(dbStr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid db in redis URL: %s", dbStr)
|
|
}
|
|
db = d
|
|
}
|
|
|
|
return &redis.Options{
|
|
Addr: fmt.Sprintf("%s:%d", host, port),
|
|
Password: password,
|
|
DB: db,
|
|
}, nil
|
|
}
|
|
|
|
// NewWAQ creates a new WAQ connected to the given Redis/DragonFlyDB URL.
|
|
// Returns error if the URL is invalid or the server is unreachable.
|
|
// Passwords containing forward slashes (or other special characters) are supported.
|
|
func NewWAQ(redisURL string) (*WAQ, error) {
|
|
opt, err := parseRedisURL(redisURL)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse dragonfly url: %w", err)
|
|
}
|
|
client := redis.NewClient(opt)
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
if err := client.Ping(ctx).Err(); err != nil {
|
|
client.Close()
|
|
return nil, fmt.Errorf("dragonfly unreachable at %s: %w", redisURL, err)
|
|
}
|
|
log.Printf("WAQ connected to DragonFlyDB")
|
|
return &WAQ{rdb: client}, nil
|
|
}
|
|
|
|
// Enqueue pushes a PendingOp onto the right end of the queue (FIFO).
|
|
func (q *WAQ) Enqueue(ctx context.Context, op PendingOp) error {
|
|
data, err := json.Marshal(op)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal op: %w", err)
|
|
}
|
|
return q.rdb.RPush(ctx, queueKey, data).Err()
|
|
}
|
|
|
|
// Dequeue pops the oldest operation (blocking for up to timeout).
|
|
// Returns nil, nil if timeout elapses with no item.
|
|
func (q *WAQ) Dequeue(ctx context.Context, timeout time.Duration) (*PendingOp, error) {
|
|
result, err := q.rdb.BLPop(ctx, timeout, queueKey).Result()
|
|
if err == redis.Nil {
|
|
return nil, nil // timeout — no items
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("blpop: %w", err)
|
|
}
|
|
var op PendingOp
|
|
if err := json.Unmarshal([]byte(result[1]), &op); err != nil {
|
|
return nil, fmt.Errorf("unmarshal op: %w", err)
|
|
}
|
|
return &op, nil
|
|
}
|
|
|
|
// Len returns the current number of pending operations in the queue.
|
|
func (q *WAQ) Len(ctx context.Context) (int64, error) {
|
|
n, err := q.rdb.LLen(ctx, queueKey).Result()
|
|
if err != nil {
|
|
return 0, fmt.Errorf("llen: %w", err)
|
|
}
|
|
return n, nil
|
|
}
|
|
|
|
// Close releases the DragonFlyDB connection.
|
|
func (q *WAQ) Close() error {
|
|
return q.rdb.Close()
|
|
}
|