// Package nats provides an embedded NATS server with JetStream for the Felt // tournament engine. The server runs in-process with no TCP listener, providing // event durability and real-time state broadcasting. package nats import ( "context" "errors" "fmt" "log" "path/filepath" "time" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" ) // EmbeddedServer wraps an in-process NATS server with JetStream enabled. type EmbeddedServer struct { server *server.Server conn *nats.Conn js jetstream.JetStream } // Start creates and starts an embedded NATS server with JetStream. // // CRITICAL: JetStreamSyncInterval is set to 0 (sync_interval: always) per the // December 2025 Jepsen finding for NATS 2.12.1. This ensures every acknowledged // write is fsynced immediately. Without this, recently acknowledged writes exist // only in memory and are lost on power failure in single-node deployments. func Start(ctx context.Context, dataDir string) (*EmbeddedServer, error) { storeDir := filepath.Join(dataDir, "jetstream") opts := &server.Options{ ServerName: "felt-leaf", DontListen: true, // In-process only, no TCP listener JetStream: true, StoreDir: storeDir, NoSigs: true, // We handle signals ourselves // Memory and disk limits JetStreamMaxMemory: 64 * 1024 * 1024, // 64MB memory limit JetStreamMaxStore: 1024 * 1024 * 1024, // 1GB disk limit // MANDATORY: fsync every write for single-node durability (Jepsen 2025) SyncInterval: 0, // 0 = "always" } ns, err := server.NewServer(opts) if err != nil { return nil, fmt.Errorf("nats: create server: %w", err) } // Start the server in the background go ns.Start() // Wait for the server to be ready (5-second timeout) if !ns.ReadyForConnections(5 * time.Second) { ns.Shutdown() return nil, errors.New("nats: server startup timeout (5s)") } log.Printf("nats: server ready (store=%s, sync=always)", storeDir) // Connect to the in-process server using the client URL nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns), nats.Name("felt-leaf-internal"), ) if err != nil { ns.Shutdown() return nil, fmt.Errorf("nats: connect to embedded server: %w", err) } // Create JetStream context js, err := jetstream.New(nc) if err != nil { nc.Close() ns.Shutdown() return nil, fmt.Errorf("nats: create jetstream context: %w", err) } es := &EmbeddedServer{ server: ns, conn: nc, js: js, } // Create initial streams if err := es.createStreams(ctx); err != nil { es.Shutdown() return nil, fmt.Errorf("nats: create streams: %w", err) } return es, nil } // createStreams creates the initial JetStream streams for tournament events. func (es *EmbeddedServer) createStreams(ctx context.Context) error { // AUDIT stream: append-only audit trail for all tournament actions _, err := es.js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ Name: "AUDIT", Subjects: []string{"tournament.*.audit"}, Storage: jetstream.FileStorage, MaxAge: 0, // No expiry — audit trail is permanent }) if err != nil { return fmt.Errorf("create AUDIT stream: %w", err) } log.Printf("nats: AUDIT stream ready (tournament.*.audit)") // STATE stream: tournament state changes for real-time broadcast _, err = es.js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ Name: "STATE", Subjects: []string{"tournament.*.state.>"}, Storage: jetstream.FileStorage, MaxAge: 24 * time.Hour, // State events expire after 24h }) if err != nil { return fmt.Errorf("create STATE stream: %w", err) } log.Printf("nats: STATE stream ready (tournament.*.state.>)") return nil } // Conn returns the NATS client connection. func (es *EmbeddedServer) Conn() *nats.Conn { return es.conn } // JetStream returns the JetStream context for publishing and consuming. func (es *EmbeddedServer) JetStream() jetstream.JetStream { return es.js } // Server returns the underlying NATS server for health checks. func (es *EmbeddedServer) Server() *server.Server { return es.server } // Shutdown gracefully shuts down the NATS server and closes connections. func (es *EmbeddedServer) Shutdown() { if es.conn != nil { es.conn.Close() } if es.server != nil { es.server.Shutdown() es.server.WaitForShutdown() } log.Printf("nats: server shutdown complete") }