- Embedded NATS server with JetStream (sync_interval=always per Jepsen 2025) - AUDIT and STATE JetStream streams for tournament event durability - NATS publisher with UUID validation to prevent subject injection - WebSocket hub with JWT auth (query param), tournament-scoped broadcasting - Origin validation and slow-consumer message dropping - chi HTTP router with middleware (logger, recoverer, request ID, CORS, body limits) - Server timeouts: ReadHeader 10s, Read 30s, Write 60s, Idle 120s, MaxHeader 1MB - MaxBytesReader middleware for request body limits (1MB default) - JWT auth middleware with HMAC-SHA256 validation - Role-based access control (admin > floor > viewer) - Health endpoint reporting all subsystem status (DB, NATS, WebSocket) - SvelteKit SPA served via go:embed with fallback routing - Signal-driven graceful shutdown in reverse startup order - 9 integration tests covering all verification criteria Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
153 lines
4.3 KiB
Go
153 lines
4.3 KiB
Go
// Package nats provides an embedded NATS server with JetStream for the Felt
|
|
// tournament engine. The server runs in-process with no TCP listener, providing
|
|
// event durability and real-time state broadcasting.
|
|
package nats
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
"github.com/nats-io/nats-server/v2/server"
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/nats-io/nats.go/jetstream"
|
|
)
|
|
|
|
// EmbeddedServer wraps an in-process NATS server with JetStream enabled.
|
|
type EmbeddedServer struct {
|
|
server *server.Server
|
|
conn *nats.Conn
|
|
js jetstream.JetStream
|
|
}
|
|
|
|
// Start creates and starts an embedded NATS server with JetStream.
|
|
//
|
|
// CRITICAL: JetStreamSyncInterval is set to 0 (sync_interval: always) per the
|
|
// December 2025 Jepsen finding for NATS 2.12.1. This ensures every acknowledged
|
|
// write is fsynced immediately. Without this, recently acknowledged writes exist
|
|
// only in memory and are lost on power failure in single-node deployments.
|
|
func Start(ctx context.Context, dataDir string) (*EmbeddedServer, error) {
|
|
storeDir := filepath.Join(dataDir, "jetstream")
|
|
|
|
opts := &server.Options{
|
|
ServerName: "felt-leaf",
|
|
DontListen: true, // In-process only, no TCP listener
|
|
JetStream: true,
|
|
StoreDir: storeDir,
|
|
NoSigs: true, // We handle signals ourselves
|
|
|
|
// Memory and disk limits
|
|
JetStreamMaxMemory: 64 * 1024 * 1024, // 64MB memory limit
|
|
JetStreamMaxStore: 1024 * 1024 * 1024, // 1GB disk limit
|
|
|
|
// MANDATORY: fsync every write for single-node durability (Jepsen 2025)
|
|
SyncInterval: 0, // 0 = "always"
|
|
}
|
|
|
|
ns, err := server.NewServer(opts)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("nats: create server: %w", err)
|
|
}
|
|
|
|
// Start the server in the background
|
|
go ns.Start()
|
|
|
|
// Wait for the server to be ready (5-second timeout)
|
|
if !ns.ReadyForConnections(5 * time.Second) {
|
|
ns.Shutdown()
|
|
return nil, errors.New("nats: server startup timeout (5s)")
|
|
}
|
|
|
|
log.Printf("nats: server ready (store=%s, sync=always)", storeDir)
|
|
|
|
// Connect to the in-process server using the client URL
|
|
nc, err := nats.Connect(ns.ClientURL(),
|
|
nats.InProcessServer(ns),
|
|
nats.Name("felt-leaf-internal"),
|
|
)
|
|
if err != nil {
|
|
ns.Shutdown()
|
|
return nil, fmt.Errorf("nats: connect to embedded server: %w", err)
|
|
}
|
|
|
|
// Create JetStream context
|
|
js, err := jetstream.New(nc)
|
|
if err != nil {
|
|
nc.Close()
|
|
ns.Shutdown()
|
|
return nil, fmt.Errorf("nats: create jetstream context: %w", err)
|
|
}
|
|
|
|
es := &EmbeddedServer{
|
|
server: ns,
|
|
conn: nc,
|
|
js: js,
|
|
}
|
|
|
|
// Create initial streams
|
|
if err := es.createStreams(ctx); err != nil {
|
|
es.Shutdown()
|
|
return nil, fmt.Errorf("nats: create streams: %w", err)
|
|
}
|
|
|
|
return es, nil
|
|
}
|
|
|
|
// createStreams creates the initial JetStream streams for tournament events.
|
|
func (es *EmbeddedServer) createStreams(ctx context.Context) error {
|
|
// AUDIT stream: append-only audit trail for all tournament actions
|
|
_, err := es.js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
|
|
Name: "AUDIT",
|
|
Subjects: []string{"tournament.*.audit"},
|
|
Storage: jetstream.FileStorage,
|
|
MaxAge: 0, // No expiry — audit trail is permanent
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("create AUDIT stream: %w", err)
|
|
}
|
|
log.Printf("nats: AUDIT stream ready (tournament.*.audit)")
|
|
|
|
// STATE stream: tournament state changes for real-time broadcast
|
|
_, err = es.js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
|
|
Name: "STATE",
|
|
Subjects: []string{"tournament.*.state.>"},
|
|
Storage: jetstream.FileStorage,
|
|
MaxAge: 24 * time.Hour, // State events expire after 24h
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("create STATE stream: %w", err)
|
|
}
|
|
log.Printf("nats: STATE stream ready (tournament.*.state.>)")
|
|
|
|
return nil
|
|
}
|
|
|
|
// Conn returns the NATS client connection.
|
|
func (es *EmbeddedServer) Conn() *nats.Conn {
|
|
return es.conn
|
|
}
|
|
|
|
// JetStream returns the JetStream context for publishing and consuming.
|
|
func (es *EmbeddedServer) JetStream() jetstream.JetStream {
|
|
return es.js
|
|
}
|
|
|
|
// Server returns the underlying NATS server for health checks.
|
|
func (es *EmbeddedServer) Server() *server.Server {
|
|
return es.server
|
|
}
|
|
|
|
// Shutdown gracefully shuts down the NATS server and closes connections.
|
|
func (es *EmbeddedServer) Shutdown() {
|
|
if es.conn != nil {
|
|
es.conn.Close()
|
|
}
|
|
if es.server != nil {
|
|
es.server.Shutdown()
|
|
es.server.WaitForShutdown()
|
|
}
|
|
log.Printf("nats: server shutdown complete")
|
|
}
|