felt/internal/nats/embedded.go
Mikkel Georgsen 16caa12d64 feat(01-01): implement core infrastructure — NATS, LibSQL, WebSocket hub, HTTP server
- Embedded NATS server with JetStream (sync_interval=always per Jepsen 2025)
- AUDIT and STATE JetStream streams for tournament event durability
- NATS publisher with UUID validation to prevent subject injection
- WebSocket hub with JWT auth (query param), tournament-scoped broadcasting
- Origin validation and slow-consumer message dropping
- chi HTTP router with middleware (logger, recoverer, request ID, CORS, body limits)
- Server timeouts: ReadHeader 10s, Read 30s, Write 60s, Idle 120s, MaxHeader 1MB
- MaxBytesReader middleware for request body limits (1MB default)
- JWT auth middleware with HMAC-SHA256 validation
- Role-based access control (admin > floor > viewer)
- Health endpoint reporting all subsystem status (DB, NATS, WebSocket)
- SvelteKit SPA served via go:embed with fallback routing
- Signal-driven graceful shutdown in reverse startup order
- 9 integration tests covering all verification criteria

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-01 03:42:42 +01:00

153 lines
4.3 KiB
Go

// Package nats provides an embedded NATS server with JetStream for the Felt
// tournament engine. The server runs in-process with no TCP listener, providing
// event durability and real-time state broadcasting.
package nats
import (
"context"
"errors"
"fmt"
"log"
"path/filepath"
"time"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
// EmbeddedServer wraps an in-process NATS server with JetStream enabled.
type EmbeddedServer struct {
server *server.Server
conn *nats.Conn
js jetstream.JetStream
}
// Start creates and starts an embedded NATS server with JetStream.
//
// CRITICAL: JetStreamSyncInterval is set to 0 (sync_interval: always) per the
// December 2025 Jepsen finding for NATS 2.12.1. This ensures every acknowledged
// write is fsynced immediately. Without this, recently acknowledged writes exist
// only in memory and are lost on power failure in single-node deployments.
func Start(ctx context.Context, dataDir string) (*EmbeddedServer, error) {
storeDir := filepath.Join(dataDir, "jetstream")
opts := &server.Options{
ServerName: "felt-leaf",
DontListen: true, // In-process only, no TCP listener
JetStream: true,
StoreDir: storeDir,
NoSigs: true, // We handle signals ourselves
// Memory and disk limits
JetStreamMaxMemory: 64 * 1024 * 1024, // 64MB memory limit
JetStreamMaxStore: 1024 * 1024 * 1024, // 1GB disk limit
// MANDATORY: fsync every write for single-node durability (Jepsen 2025)
SyncInterval: 0, // 0 = "always"
}
ns, err := server.NewServer(opts)
if err != nil {
return nil, fmt.Errorf("nats: create server: %w", err)
}
// Start the server in the background
go ns.Start()
// Wait for the server to be ready (5-second timeout)
if !ns.ReadyForConnections(5 * time.Second) {
ns.Shutdown()
return nil, errors.New("nats: server startup timeout (5s)")
}
log.Printf("nats: server ready (store=%s, sync=always)", storeDir)
// Connect to the in-process server using the client URL
nc, err := nats.Connect(ns.ClientURL(),
nats.InProcessServer(ns),
nats.Name("felt-leaf-internal"),
)
if err != nil {
ns.Shutdown()
return nil, fmt.Errorf("nats: connect to embedded server: %w", err)
}
// Create JetStream context
js, err := jetstream.New(nc)
if err != nil {
nc.Close()
ns.Shutdown()
return nil, fmt.Errorf("nats: create jetstream context: %w", err)
}
es := &EmbeddedServer{
server: ns,
conn: nc,
js: js,
}
// Create initial streams
if err := es.createStreams(ctx); err != nil {
es.Shutdown()
return nil, fmt.Errorf("nats: create streams: %w", err)
}
return es, nil
}
// createStreams creates the initial JetStream streams for tournament events.
func (es *EmbeddedServer) createStreams(ctx context.Context) error {
// AUDIT stream: append-only audit trail for all tournament actions
_, err := es.js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: "AUDIT",
Subjects: []string{"tournament.*.audit"},
Storage: jetstream.FileStorage,
MaxAge: 0, // No expiry — audit trail is permanent
})
if err != nil {
return fmt.Errorf("create AUDIT stream: %w", err)
}
log.Printf("nats: AUDIT stream ready (tournament.*.audit)")
// STATE stream: tournament state changes for real-time broadcast
_, err = es.js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: "STATE",
Subjects: []string{"tournament.*.state.>"},
Storage: jetstream.FileStorage,
MaxAge: 24 * time.Hour, // State events expire after 24h
})
if err != nil {
return fmt.Errorf("create STATE stream: %w", err)
}
log.Printf("nats: STATE stream ready (tournament.*.state.>)")
return nil
}
// Conn returns the NATS client connection.
func (es *EmbeddedServer) Conn() *nats.Conn {
return es.conn
}
// JetStream returns the JetStream context for publishing and consuming.
func (es *EmbeddedServer) JetStream() jetstream.JetStream {
return es.js
}
// Server returns the underlying NATS server for health checks.
func (es *EmbeddedServer) Server() *server.Server {
return es.server
}
// Shutdown gracefully shuts down the NATS server and closes connections.
func (es *EmbeddedServer) Shutdown() {
if es.conn != nil {
es.conn.Close()
}
if es.server != nil {
es.server.Shutdown()
es.server.WaitForShutdown()
}
log.Printf("nats: server shutdown complete")
}