homelabby/internal/queue/waq.go
Mikkel Georgsen e07ad922eb feat(01-05): write-ahead queue core (Enqueue, Dequeue, Len)
- Add PendingOp struct with UUID, type, payload, created_at, attempts
- Add WAQ type backed by DragonFlyDB via go-redis v9
- Implement Enqueue (RPUSH), Dequeue (BLPOP), Len (LLEN)
- Custom URL parser handles passwords with forward slashes
- Unit tests pass; integration test passes against DragonFlyDB at 10.5.0.10:6379
2026-04-10 05:21:28 +00:00

154 lines
4.5 KiB
Go

package queue
import (
"context"
"encoding/json"
"fmt"
"log"
"regexp"
"strconv"
"time"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
)
const queueKey = "hwlab:netbox:pending_ops"
// PendingOp represents a queued NetBox operation.
// Serialized as JSON in DragonFlyDB LIST.
type PendingOp struct {
ID string `json:"id"` // UUID
Type string `json:"type"` // "create_device", "patch_custom_fields", "sync_tags", etc.
Payload json.RawMessage `json:"payload"` // operation-specific data
CreatedAt time.Time `json:"created_at"`
Attempts int `json:"attempts"` // retry count
}
// NewPendingOp creates a new PendingOp with a generated ID and current timestamp.
func NewPendingOp(opType string, payload interface{}) (PendingOp, error) {
data, err := json.Marshal(payload)
if err != nil {
return PendingOp{}, fmt.Errorf("marshal payload: %w", err)
}
return PendingOp{
ID: uuid.New().String(),
Type: opType,
Payload: json.RawMessage(data),
CreatedAt: time.Now().UTC(),
Attempts: 0,
}, nil
}
// WAQ is a write-ahead queue backed by DragonFlyDB (Redis-compatible).
// Operations are stored as JSON in a Redis LIST using RPUSH/BLPOP for FIFO ordering.
type WAQ struct {
rdb *redis.Client
}
// redisURLRegexp matches redis://[:password@]host[:port][/db]
// The password group is non-greedy up to the last '@' before host:port.
// This handles passwords containing forward slashes, which url.Parse cannot.
var redisURLRegexp = regexp.MustCompile(`^rediss?://:?([^@]*)@([^:/]+)(?::(\d+))?(?:/(\d+))?$`)
// parseRedisURL parses a redis:// URL into redis.Options, correctly handling
// passwords that contain forward slashes (which break standard url.Parse).
func parseRedisURL(rawURL string) (*redis.Options, error) {
// Try standard ParseURL first (works when password has no special chars).
if opt, err := redis.ParseURL(rawURL); err == nil {
return opt, nil
}
// Fall back to manual regex parsing for passwords with slashes.
m := redisURLRegexp.FindStringSubmatch(rawURL)
if m == nil {
return nil, fmt.Errorf("invalid redis URL format: %s", rawURL)
}
password := m[1]
host := m[2]
portStr := m[3]
dbStr := m[4]
port := 6379
if portStr != "" {
p, err := strconv.Atoi(portStr)
if err != nil {
return nil, fmt.Errorf("invalid port in redis URL: %s", portStr)
}
port = p
}
db := 0
if dbStr != "" {
d, err := strconv.Atoi(dbStr)
if err != nil {
return nil, fmt.Errorf("invalid db in redis URL: %s", dbStr)
}
db = d
}
return &redis.Options{
Addr: fmt.Sprintf("%s:%d", host, port),
Password: password,
DB: db,
}, nil
}
// NewWAQ creates a new WAQ connected to the given Redis/DragonFlyDB URL.
// Returns error if the URL is invalid or the server is unreachable.
// Passwords containing forward slashes (or other special characters) are supported.
func NewWAQ(redisURL string) (*WAQ, error) {
opt, err := parseRedisURL(redisURL)
if err != nil {
return nil, fmt.Errorf("parse dragonfly url: %w", err)
}
client := redis.NewClient(opt)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := client.Ping(ctx).Err(); err != nil {
client.Close()
return nil, fmt.Errorf("dragonfly unreachable at %s: %w", redisURL, err)
}
log.Printf("WAQ connected to DragonFlyDB")
return &WAQ{rdb: client}, nil
}
// Enqueue pushes a PendingOp onto the right end of the queue (FIFO).
func (q *WAQ) Enqueue(ctx context.Context, op PendingOp) error {
data, err := json.Marshal(op)
if err != nil {
return fmt.Errorf("marshal op: %w", err)
}
return q.rdb.RPush(ctx, queueKey, data).Err()
}
// Dequeue pops the oldest operation (blocking for up to timeout).
// Returns nil, nil if timeout elapses with no item.
func (q *WAQ) Dequeue(ctx context.Context, timeout time.Duration) (*PendingOp, error) {
result, err := q.rdb.BLPop(ctx, timeout, queueKey).Result()
if err == redis.Nil {
return nil, nil // timeout — no items
}
if err != nil {
return nil, fmt.Errorf("blpop: %w", err)
}
var op PendingOp
if err := json.Unmarshal([]byte(result[1]), &op); err != nil {
return nil, fmt.Errorf("unmarshal op: %w", err)
}
return &op, nil
}
// Len returns the current number of pending operations in the queue.
func (q *WAQ) Len(ctx context.Context) (int64, error) {
n, err := q.rdb.LLen(ctx, queueKey).Result()
if err != nil {
return 0, fmt.Errorf("llen: %w", err)
}
return n, nil
}
// Close releases the DragonFlyDB connection.
func (q *WAQ) Close() error {
return q.rdb.Close()
}