feat(01-05): write-ahead queue core (Enqueue, Dequeue, Len)
- Add PendingOp struct with UUID, type, payload, created_at, attempts - Add WAQ type backed by DragonFlyDB via go-redis v9 - Implement Enqueue (RPUSH), Dequeue (BLPOP), Len (LLEN) - Custom URL parser handles passwords with forward slashes - Unit tests pass; integration test passes against DragonFlyDB at 10.5.0.10:6379
This commit is contained in:
parent
9b4cc9a661
commit
e07ad922eb
4 changed files with 254 additions and 0 deletions
5
go.mod
5
go.mod
|
|
@ -9,16 +9,21 @@ require (
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||||
|
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||||
github.com/fsnotify/fsnotify v1.9.0 // indirect
|
github.com/fsnotify/fsnotify v1.9.0 // indirect
|
||||||
github.com/go-viper/mapstructure/v2 v2.4.0 // indirect
|
github.com/go-viper/mapstructure/v2 v2.4.0 // indirect
|
||||||
|
github.com/google/uuid v1.6.0 // indirect
|
||||||
github.com/netbox-community/go-netbox/v4 v4.3.0 // indirect
|
github.com/netbox-community/go-netbox/v4 v4.3.0 // indirect
|
||||||
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
|
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
|
||||||
|
github.com/redis/go-redis/v9 v9.18.0 // indirect
|
||||||
github.com/sagikazarmark/locafero v0.11.0 // indirect
|
github.com/sagikazarmark/locafero v0.11.0 // indirect
|
||||||
github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 // indirect
|
github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 // indirect
|
||||||
github.com/spf13/afero v1.15.0 // indirect
|
github.com/spf13/afero v1.15.0 // indirect
|
||||||
github.com/spf13/cast v1.10.0 // indirect
|
github.com/spf13/cast v1.10.0 // indirect
|
||||||
github.com/spf13/pflag v1.0.10 // indirect
|
github.com/spf13/pflag v1.0.10 // indirect
|
||||||
github.com/subosito/gotenv v1.6.0 // indirect
|
github.com/subosito/gotenv v1.6.0 // indirect
|
||||||
|
go.uber.org/atomic v1.11.0 // indirect
|
||||||
go.yaml.in/yaml/v3 v3.0.4 // indirect
|
go.yaml.in/yaml/v3 v3.0.4 // indirect
|
||||||
golang.org/x/sys v0.29.0 // indirect
|
golang.org/x/sys v0.29.0 // indirect
|
||||||
golang.org/x/text v0.28.0 // indirect
|
golang.org/x/text v0.28.0 // indirect
|
||||||
|
|
|
||||||
10
go.sum
10
go.sum
|
|
@ -1,5 +1,9 @@
|
||||||
|
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||||
|
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
|
||||||
|
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
|
||||||
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
|
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
|
||||||
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
|
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
|
||||||
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
|
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
|
||||||
|
|
@ -10,6 +14,8 @@ github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9L
|
||||||
github.com/go-viper/mapstructure/v2 v2.4.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
|
github.com/go-viper/mapstructure/v2 v2.4.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
|
||||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||||
|
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||||
|
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
|
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
|
||||||
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
|
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
|
||||||
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
||||||
|
|
@ -25,6 +31,8 @@ github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0
|
||||||
github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY=
|
github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY=
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
github.com/redis/go-redis/v9 v9.18.0 h1:pMkxYPkEbMPwRdenAzUNyFNrDgHx9U+DrBabWNfSRQs=
|
||||||
|
github.com/redis/go-redis/v9 v9.18.0/go.mod h1:k3ufPphLU5YXwNTUcCRXGxUoF1fqxnhFQmscfkCoDA0=
|
||||||
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
|
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
|
||||||
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
|
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
|
||||||
github.com/sagikazarmark/locafero v0.11.0 h1:1iurJgmM9G3PA/I+wWYIOw/5SyBtxapeHDcg+AAIFXc=
|
github.com/sagikazarmark/locafero v0.11.0 h1:1iurJgmM9G3PA/I+wWYIOw/5SyBtxapeHDcg+AAIFXc=
|
||||||
|
|
@ -43,6 +51,8 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu
|
||||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||||
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
|
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
|
||||||
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
|
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
|
||||||
|
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
|
||||||
|
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
|
||||||
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
|
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
|
||||||
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
|
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
|
||||||
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
|
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
|
||||||
|
|
|
||||||
154
internal/queue/waq.go
Normal file
154
internal/queue/waq.go
Normal file
|
|
@ -0,0 +1,154 @@
|
||||||
|
package queue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"regexp"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
)
|
||||||
|
|
||||||
|
const queueKey = "hwlab:netbox:pending_ops"
|
||||||
|
|
||||||
|
// PendingOp represents a queued NetBox operation.
|
||||||
|
// Serialized as JSON in DragonFlyDB LIST.
|
||||||
|
type PendingOp struct {
|
||||||
|
ID string `json:"id"` // UUID
|
||||||
|
Type string `json:"type"` // "create_device", "patch_custom_fields", "sync_tags", etc.
|
||||||
|
Payload json.RawMessage `json:"payload"` // operation-specific data
|
||||||
|
CreatedAt time.Time `json:"created_at"`
|
||||||
|
Attempts int `json:"attempts"` // retry count
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPendingOp creates a new PendingOp with a generated ID and current timestamp.
|
||||||
|
func NewPendingOp(opType string, payload interface{}) (PendingOp, error) {
|
||||||
|
data, err := json.Marshal(payload)
|
||||||
|
if err != nil {
|
||||||
|
return PendingOp{}, fmt.Errorf("marshal payload: %w", err)
|
||||||
|
}
|
||||||
|
return PendingOp{
|
||||||
|
ID: uuid.New().String(),
|
||||||
|
Type: opType,
|
||||||
|
Payload: json.RawMessage(data),
|
||||||
|
CreatedAt: time.Now().UTC(),
|
||||||
|
Attempts: 0,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// WAQ is a write-ahead queue backed by DragonFlyDB (Redis-compatible).
|
||||||
|
// Operations are stored as JSON in a Redis LIST using RPUSH/BLPOP for FIFO ordering.
|
||||||
|
type WAQ struct {
|
||||||
|
rdb *redis.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
// redisURLRegexp matches redis://[:password@]host[:port][/db]
|
||||||
|
// The password group is non-greedy up to the last '@' before host:port.
|
||||||
|
// This handles passwords containing forward slashes, which url.Parse cannot.
|
||||||
|
var redisURLRegexp = regexp.MustCompile(`^rediss?://:?([^@]*)@([^:/]+)(?::(\d+))?(?:/(\d+))?$`)
|
||||||
|
|
||||||
|
// parseRedisURL parses a redis:// URL into redis.Options, correctly handling
|
||||||
|
// passwords that contain forward slashes (which break standard url.Parse).
|
||||||
|
func parseRedisURL(rawURL string) (*redis.Options, error) {
|
||||||
|
// Try standard ParseURL first (works when password has no special chars).
|
||||||
|
if opt, err := redis.ParseURL(rawURL); err == nil {
|
||||||
|
return opt, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fall back to manual regex parsing for passwords with slashes.
|
||||||
|
m := redisURLRegexp.FindStringSubmatch(rawURL)
|
||||||
|
if m == nil {
|
||||||
|
return nil, fmt.Errorf("invalid redis URL format: %s", rawURL)
|
||||||
|
}
|
||||||
|
password := m[1]
|
||||||
|
host := m[2]
|
||||||
|
portStr := m[3]
|
||||||
|
dbStr := m[4]
|
||||||
|
|
||||||
|
port := 6379
|
||||||
|
if portStr != "" {
|
||||||
|
p, err := strconv.Atoi(portStr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid port in redis URL: %s", portStr)
|
||||||
|
}
|
||||||
|
port = p
|
||||||
|
}
|
||||||
|
|
||||||
|
db := 0
|
||||||
|
if dbStr != "" {
|
||||||
|
d, err := strconv.Atoi(dbStr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid db in redis URL: %s", dbStr)
|
||||||
|
}
|
||||||
|
db = d
|
||||||
|
}
|
||||||
|
|
||||||
|
return &redis.Options{
|
||||||
|
Addr: fmt.Sprintf("%s:%d", host, port),
|
||||||
|
Password: password,
|
||||||
|
DB: db,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewWAQ creates a new WAQ connected to the given Redis/DragonFlyDB URL.
|
||||||
|
// Returns error if the URL is invalid or the server is unreachable.
|
||||||
|
// Passwords containing forward slashes (or other special characters) are supported.
|
||||||
|
func NewWAQ(redisURL string) (*WAQ, error) {
|
||||||
|
opt, err := parseRedisURL(redisURL)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("parse dragonfly url: %w", err)
|
||||||
|
}
|
||||||
|
client := redis.NewClient(opt)
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
if err := client.Ping(ctx).Err(); err != nil {
|
||||||
|
client.Close()
|
||||||
|
return nil, fmt.Errorf("dragonfly unreachable at %s: %w", redisURL, err)
|
||||||
|
}
|
||||||
|
log.Printf("WAQ connected to DragonFlyDB")
|
||||||
|
return &WAQ{rdb: client}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Enqueue pushes a PendingOp onto the right end of the queue (FIFO).
|
||||||
|
func (q *WAQ) Enqueue(ctx context.Context, op PendingOp) error {
|
||||||
|
data, err := json.Marshal(op)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("marshal op: %w", err)
|
||||||
|
}
|
||||||
|
return q.rdb.RPush(ctx, queueKey, data).Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dequeue pops the oldest operation (blocking for up to timeout).
|
||||||
|
// Returns nil, nil if timeout elapses with no item.
|
||||||
|
func (q *WAQ) Dequeue(ctx context.Context, timeout time.Duration) (*PendingOp, error) {
|
||||||
|
result, err := q.rdb.BLPop(ctx, timeout, queueKey).Result()
|
||||||
|
if err == redis.Nil {
|
||||||
|
return nil, nil // timeout — no items
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("blpop: %w", err)
|
||||||
|
}
|
||||||
|
var op PendingOp
|
||||||
|
if err := json.Unmarshal([]byte(result[1]), &op); err != nil {
|
||||||
|
return nil, fmt.Errorf("unmarshal op: %w", err)
|
||||||
|
}
|
||||||
|
return &op, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Len returns the current number of pending operations in the queue.
|
||||||
|
func (q *WAQ) Len(ctx context.Context) (int64, error) {
|
||||||
|
n, err := q.rdb.LLen(ctx, queueKey).Result()
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("llen: %w", err)
|
||||||
|
}
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close releases the DragonFlyDB connection.
|
||||||
|
func (q *WAQ) Close() error {
|
||||||
|
return q.rdb.Close()
|
||||||
|
}
|
||||||
85
internal/queue/waq_test.go
Normal file
85
internal/queue/waq_test.go
Normal file
|
|
@ -0,0 +1,85 @@
|
||||||
|
package queue_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.georgsen.dk/hwlab/internal/queue"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPendingOpJSON(t *testing.T) {
|
||||||
|
payload := map[string]string{"device_id": "42", "hw_id": "HW-00001"}
|
||||||
|
op, err := queue.NewPendingOp("create_device", payload)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("NewPendingOp: %v", err)
|
||||||
|
}
|
||||||
|
if op.ID == "" {
|
||||||
|
t.Error("ID should be a UUID")
|
||||||
|
}
|
||||||
|
if op.Type != "create_device" {
|
||||||
|
t.Errorf("Type: want create_device, got %s", op.Type)
|
||||||
|
}
|
||||||
|
// Round-trip JSON
|
||||||
|
data, _ := json.Marshal(op)
|
||||||
|
var op2 queue.PendingOp
|
||||||
|
if err := json.Unmarshal(data, &op2); err != nil {
|
||||||
|
t.Fatalf("unmarshal: %v", err)
|
||||||
|
}
|
||||||
|
if op2.ID != op.ID {
|
||||||
|
t.Errorf("ID mismatch after round-trip: %s != %s", op2.ID, op.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewWAQInvalidURL(t *testing.T) {
|
||||||
|
_, err := queue.NewWAQ("not-a-redis-url")
|
||||||
|
if err == nil {
|
||||||
|
t.Error("expected error for invalid URL")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// dragonflyURL returns the DragonFlyDB URL from env, or the default from .env.
|
||||||
|
func dragonflyURL(t *testing.T) string {
|
||||||
|
t.Helper()
|
||||||
|
url := os.Getenv("HWLAB_DRAGONFLY_URL")
|
||||||
|
if url == "" {
|
||||||
|
url = "redis://:nUq/IfoIQJf/kouckKHRQOk7vV0NwCuI@10.5.0.10:6379"
|
||||||
|
}
|
||||||
|
return url
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWAQEnqueueDequeue(t *testing.T) {
|
||||||
|
waq, err := queue.NewWAQ(dragonflyURL(t))
|
||||||
|
if err != nil {
|
||||||
|
t.Skipf("DragonFlyDB unavailable: %v", err)
|
||||||
|
}
|
||||||
|
defer waq.Close()
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
op, _ := queue.NewPendingOp("test_op", map[string]string{"test": "value"})
|
||||||
|
if err := waq.Enqueue(ctx, op); err != nil {
|
||||||
|
t.Fatalf("Enqueue: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err := waq.Len(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Len: %v", err)
|
||||||
|
}
|
||||||
|
if n < 1 {
|
||||||
|
t.Error("expected at least 1 item after enqueue")
|
||||||
|
}
|
||||||
|
|
||||||
|
got, err := waq.Dequeue(ctx, 2*time.Second)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Dequeue: %v", err)
|
||||||
|
}
|
||||||
|
if got == nil {
|
||||||
|
t.Fatal("expected op from dequeue, got nil")
|
||||||
|
}
|
||||||
|
if got.ID != op.ID {
|
||||||
|
t.Errorf("dequeued op ID mismatch: want %s, got %s", op.ID, got.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Reference in a new issue