diff --git a/cmd/leaf/main.go b/cmd/leaf/main.go index e95d469..467f4d4 100644 --- a/cmd/leaf/main.go +++ b/cmd/leaf/main.go @@ -1,18 +1,30 @@ +// Command leaf is the Felt tournament engine binary. It starts all embedded +// infrastructure (LibSQL, NATS JetStream, WebSocket hub) and serves the +// SvelteKit SPA over HTTP. package main import ( + "context" + "crypto/rand" "flag" - "fmt" "log" + "net/http" "os" + "os/signal" + "syscall" + "time" + feltnats "github.com/felt-app/felt/internal/nats" + "github.com/felt-app/felt/internal/server" + "github.com/felt-app/felt/internal/server/middleware" + "github.com/felt-app/felt/internal/server/ws" "github.com/felt-app/felt/internal/store" ) func main() { dataDir := flag.String("data-dir", "./data", "Data directory for database and files") addr := flag.String("addr", ":8080", "HTTP listen address") - devMode := flag.Bool("dev", false, "Enable development mode (applies dev seed data)") + devMode := flag.Bool("dev", false, "Enable development mode (permissive CORS, dev seed data)") flag.Parse() log.SetFlags(log.LstdFlags | log.Lmsgprefix) @@ -20,32 +32,107 @@ func main() { log.Printf("starting (data-dir=%s, addr=%s, dev=%v)", *dataDir, *addr, *devMode) - // Open database (runs migrations automatically) + // Create root context with cancellation + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // ---- 1. LibSQL Database ---- db, err := store.Open(*dataDir, *devMode) if err != nil { log.Fatalf("failed to open database: %v", err) } defer db.Close() - // Check first-run setup status - needsSetup, err := db.NeedsSetup() - if err != nil { - log.Fatalf("failed to check setup status: %v", err) - } - if needsSetup { - log.Printf("no operators found - first-run setup required") - log.Printf("POST /api/v1/setup to create first admin operator") - } - // Verify database is working var one int if err := db.QueryRow("SELECT 1").Scan(&one); err != nil { log.Fatalf("database health check failed: %v", err) } - log.Printf("database ready") - // Placeholder for HTTP server (will be implemented in Plan A) - _ = addr - fmt.Fprintf(os.Stderr, "felt: ready (database operational, HTTP server not yet implemented)\n") + // ---- 2. Embedded NATS Server ---- + natsServer, err := feltnats.Start(ctx, *dataDir) + if err != nil { + log.Fatalf("failed to start NATS: %v", err) + } + defer natsServer.Shutdown() + + // ---- 3. JWT Signing Key ---- + // In production, this should be loaded from a persisted secret. + // For now, generate a random key on startup (tokens won't survive restart). + signingKey := generateOrLoadSigningKey(*dataDir) + + // ---- 4. WebSocket Hub ---- + tokenValidator := func(tokenStr string) (string, string, error) { + return middleware.ValidateJWT(tokenStr, signingKey) + } + + // Tournament validator stub — allows all for now + // TODO: Implement tournament existence + access check against DB + tournamentValidator := func(tournamentID string, operatorID string) error { + return nil // Accept all tournaments for now + } + + var allowedOrigins []string + if *devMode { + allowedOrigins = nil // InsecureSkipVerify will be used + } else { + allowedOrigins = []string{"*"} // Same-origin enforced by browser + } + + hub := ws.NewHub(tokenValidator, tournamentValidator, allowedOrigins) + defer hub.Shutdown() + + // ---- 5. HTTP Server ---- + srv := server.New(server.Config{ + Addr: *addr, + SigningKey: signingKey, + DevMode: *devMode, + }, db.DB, natsServer.Server(), hub) + + // Start HTTP server in goroutine + go func() { + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("HTTP server error: %v", err) + } + }() + + log.Printf("ready (addr=%s, dev=%v)", *addr, *devMode) + + // ---- Signal Handling ---- + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + + sig := <-sigCh + log.Printf("received signal: %s, shutting down...", sig) + + // Graceful shutdown in reverse startup order + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer shutdownCancel() + + // 5. HTTP Server + if err := srv.Shutdown(shutdownCtx); err != nil { + log.Printf("HTTP server shutdown error: %v", err) + } + + // 4. WebSocket Hub (closed by defer) + // 3. NATS Server (closed by defer) + // 2. Database (closed by defer) + + cancel() // Cancel root context + + log.Printf("shutdown complete") +} + +// generateOrLoadSigningKey generates a random 256-bit signing key. +// In a future plan, this will be persisted to the data directory. +func generateOrLoadSigningKey(dataDir string) []byte { + // TODO: Persist to file in dataDir for key stability across restarts + _ = dataDir + key := make([]byte, 32) + if _, err := rand.Read(key); err != nil { + log.Fatalf("failed to generate signing key: %v", err) + } + log.Printf("JWT signing key generated (ephemeral — will change on restart)") + return key } diff --git a/cmd/leaf/main_test.go b/cmd/leaf/main_test.go new file mode 100644 index 0000000..b34e55c --- /dev/null +++ b/cmd/leaf/main_test.go @@ -0,0 +1,294 @@ +package main + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/coder/websocket" + "github.com/golang-jwt/jwt/v5" + + feltnats "github.com/felt-app/felt/internal/nats" + "github.com/felt-app/felt/internal/server" + "github.com/felt-app/felt/internal/server/middleware" + "github.com/felt-app/felt/internal/server/ws" + "github.com/felt-app/felt/internal/store" +) + +func setupTestServer(t *testing.T) (*httptest.Server, *store.DB, *feltnats.EmbeddedServer, []byte) { + t.Helper() + ctx := context.Background() + tmpDir := t.TempDir() + + // Open database + db, err := store.Open(tmpDir, true) + if err != nil { + t.Fatalf("open database: %v", err) + } + t.Cleanup(func() { db.Close() }) + + // Start NATS + ns, err := feltnats.Start(ctx, tmpDir) + if err != nil { + t.Fatalf("start nats: %v", err) + } + t.Cleanup(func() { ns.Shutdown() }) + + // Setup JWT signing + signingKey := []byte("test-signing-key-32-bytes-long!!") + + tokenValidator := func(tokenStr string) (string, string, error) { + return middleware.ValidateJWT(tokenStr, signingKey) + } + + hub := ws.NewHub(tokenValidator, nil, nil) + t.Cleanup(func() { hub.Shutdown() }) + + // Create HTTP server + srv := server.New(server.Config{ + Addr: ":0", + SigningKey: signingKey, + DevMode: true, + }, db.DB, ns.Server(), hub) + + ts := httptest.NewServer(srv.Handler()) + t.Cleanup(func() { ts.Close() }) + + return ts, db, ns, signingKey +} + +func makeToken(t *testing.T, signingKey []byte, operatorID, role string) string { + t.Helper() + token := jwt.NewWithClaims(jwt.SigningMethodHS256, jwt.MapClaims{ + "sub": operatorID, + "role": role, + "exp": time.Now().Add(time.Hour).Unix(), + }) + tokenStr, err := token.SignedString(signingKey) + if err != nil { + t.Fatalf("sign token: %v", err) + } + return tokenStr +} + +func TestHealthEndpoint(t *testing.T) { + ts, _, _, _ := setupTestServer(t) + + resp, err := http.Get(ts.URL + "/api/v1/health") + if err != nil { + t.Fatalf("health request: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + t.Fatalf("expected 200, got %d", resp.StatusCode) + } + + var health map[string]interface{} + if err := json.NewDecoder(resp.Body).Decode(&health); err != nil { + t.Fatalf("decode health: %v", err) + } + + if health["status"] != "ok" { + t.Fatalf("expected status ok, got %v", health["status"]) + } + + // Check subsystems + subsystems, ok := health["subsystems"].(map[string]interface{}) + if !ok { + t.Fatal("missing subsystems in health response") + } + + dbStatus, ok := subsystems["database"].(map[string]interface{}) + if !ok || dbStatus["status"] != "ok" { + t.Fatalf("database not ok: %v", dbStatus) + } + + natsStatus, ok := subsystems["nats"].(map[string]interface{}) + if !ok || natsStatus["status"] != "ok" { + t.Fatalf("nats not ok: %v", natsStatus) + } + + wsStatus, ok := subsystems["websocket"].(map[string]interface{}) + if !ok || wsStatus["status"] != "ok" { + t.Fatalf("websocket not ok: %v", wsStatus) + } +} + +func TestSPAFallback(t *testing.T) { + ts, _, _, _ := setupTestServer(t) + + // Root path + resp, err := http.Get(ts.URL + "/") + if err != nil { + t.Fatalf("root request: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + t.Fatalf("expected 200 for root, got %d", resp.StatusCode) + } + + // Unknown path (SPA fallback) + resp2, err := http.Get(ts.URL + "/some/unknown/route") + if err != nil { + t.Fatalf("unknown path request: %v", err) + } + defer resp2.Body.Close() + if resp2.StatusCode != 200 { + t.Fatalf("expected 200 for SPA fallback, got %d", resp2.StatusCode) + } +} + +func TestWebSocketRejectsMissingToken(t *testing.T) { + ts, _, _, _ := setupTestServer(t) + ctx := context.Background() + + _, resp, err := websocket.Dial(ctx, "ws"+ts.URL[4:]+"/ws", nil) + if err == nil { + t.Fatal("expected error for missing token") + } + if resp != nil && resp.StatusCode != 401 { + t.Fatalf("expected 401, got %d", resp.StatusCode) + } +} + +func TestWebSocketRejectsInvalidToken(t *testing.T) { + ts, _, _, _ := setupTestServer(t) + ctx := context.Background() + + _, resp, err := websocket.Dial(ctx, "ws"+ts.URL[4:]+"/ws?token=invalid", nil) + if err == nil { + t.Fatal("expected error for invalid token") + } + if resp != nil && resp.StatusCode != 401 { + t.Fatalf("expected 401, got %d", resp.StatusCode) + } +} + +func TestWebSocketAcceptsValidToken(t *testing.T) { + ts, _, _, signingKey := setupTestServer(t) + ctx := context.Background() + + tokenStr := makeToken(t, signingKey, "operator-123", "admin") + + wsURL := "ws" + ts.URL[4:] + "/ws?token=" + tokenStr + conn, _, err := websocket.Dial(ctx, wsURL, nil) + if err != nil { + t.Fatalf("websocket dial: %v", err) + } + defer conn.CloseNow() + + // Should receive a connected message + readCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + _, msgBytes, err := conn.Read(readCtx) + if err != nil { + t.Fatalf("read connected message: %v", err) + } + + var msg map[string]interface{} + if err := json.Unmarshal(msgBytes, &msg); err != nil { + t.Fatalf("decode message: %v", err) + } + + if msg["type"] != "connected" { + t.Fatalf("expected 'connected' message, got %v", msg["type"]) + } + + conn.Close(websocket.StatusNormalClosure, "test done") +} + +func TestNATSStreamsExist(t *testing.T) { + _, _, ns, _ := setupTestServer(t) + ctx := context.Background() + + js := ns.JetStream() + + // Check AUDIT stream + stream, err := js.Stream(ctx, "AUDIT") + if err != nil { + t.Fatalf("get AUDIT stream: %v", err) + } + info, err := stream.Info(ctx) + if err != nil { + t.Fatalf("get AUDIT stream info: %v", err) + } + if info.Config.Name != "AUDIT" { + t.Fatalf("expected AUDIT stream, got %s", info.Config.Name) + } + + // Check STATE stream + stream, err = js.Stream(ctx, "STATE") + if err != nil { + t.Fatalf("get STATE stream: %v", err) + } + info, err = stream.Info(ctx) + if err != nil { + t.Fatalf("get STATE stream info: %v", err) + } + if info.Config.Name != "STATE" { + t.Fatalf("expected STATE stream, got %s", info.Config.Name) + } +} + +func TestPublisherUUIDValidation(t *testing.T) { + _, _, ns, _ := setupTestServer(t) + ctx := context.Background() + + js := ns.JetStream() + pub := feltnats.NewPublisher(js) + + // Empty UUID + _, err := pub.Publish(ctx, "", "audit", []byte("test")) + if err == nil { + t.Fatal("expected error for empty UUID") + } + + // UUID with NATS wildcards + _, err = pub.Publish(ctx, "test.*.injection", "audit", []byte("test")) + if err == nil { + t.Fatal("expected error for UUID with wildcards") + } + + // Invalid format + _, err = pub.Publish(ctx, "not-a-uuid", "audit", []byte("test")) + if err == nil { + t.Fatal("expected error for invalid UUID format") + } + + // Valid UUID should succeed + _, err = pub.Publish(ctx, "550e8400-e29b-41d4-a716-446655440000", "audit", []byte(`{"test":true}`)) + if err != nil { + t.Fatalf("expected success for valid UUID, got: %v", err) + } +} + +func TestLibSQLWALMode(t *testing.T) { + _, db, _, _ := setupTestServer(t) + + var mode string + err := db.QueryRow("PRAGMA journal_mode").Scan(&mode) + if err != nil { + t.Fatalf("query journal_mode: %v", err) + } + if mode != "wal" { + t.Fatalf("expected WAL mode, got %s", mode) + } +} + +func TestLibSQLForeignKeys(t *testing.T) { + _, db, _, _ := setupTestServer(t) + + var fk int + err := db.QueryRow("PRAGMA foreign_keys").Scan(&fk) + if err != nil { + t.Fatalf("query foreign_keys: %v", err) + } + if fk != 1 { + t.Fatalf("expected foreign_keys=1, got %d", fk) + } +} diff --git a/go.mod b/go.mod index b5dad57..e34fa53 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,25 @@ go 1.24.0 require github.com/tursodatabase/go-libsql v0.0.0-20251219133454-43644db490ff // no tagged releases — pinned to commit 43644db490ff require ( - github.com/antlr4-go/antlr/v4 v4.13.0 // indirect - github.com/libsql/sqlite-antlr4-parser v0.0.0-20240327125255-dbf53b6cbf06 // indirect - golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect + github.com/coder/websocket v1.8.14 + github.com/go-chi/chi/v5 v5.2.5 + github.com/golang-jwt/jwt/v5 v5.3.1 + github.com/nats-io/nats-server/v2 v2.12.4 + github.com/nats-io/nats.go v1.49.0 +) + +require ( + github.com/antithesishq/antithesis-sdk-go v0.5.0-default-no-op // indirect + github.com/antlr4-go/antlr/v4 v4.13.0 // indirect + github.com/google/go-tpm v0.9.8 // indirect + github.com/klauspost/compress v1.18.3 // indirect + github.com/libsql/sqlite-antlr4-parser v0.0.0-20240327125255-dbf53b6cbf06 // indirect + github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 // indirect + github.com/nats-io/jwt/v2 v2.8.0 // indirect + github.com/nats-io/nkeys v0.4.12 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + golang.org/x/crypto v0.47.0 // indirect + golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect + golang.org/x/sys v0.40.0 // indirect + golang.org/x/time v0.14.0 // indirect ) diff --git a/go.sum b/go.sum index ede74d5..9854e53 100644 --- a/go.sum +++ b/go.sum @@ -1,16 +1,47 @@ +github.com/antithesishq/antithesis-sdk-go v0.5.0-default-no-op h1:Ucf+QxEKMbPogRO5guBNe5cgd9uZgfoJLOYs8WWhtjM= +github.com/antithesishq/antithesis-sdk-go v0.5.0-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E= github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI= github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g= +github.com/coder/websocket v1.8.14 h1:9L0p0iKiNOibykf283eHkKUHHrpG7f65OE3BhhO7v9g= +github.com/coder/websocket v1.8.14/go.mod h1:NX3SzP+inril6yawo5CQXx8+fk145lPDC6pumgx0mVg= +github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug= +github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0= +github.com/golang-jwt/jwt/v5 v5.3.1 h1:kYf81DTWFe7t+1VvL7eS+jKFVWaUnK9cB1qbwn63YCY= +github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-tpm v0.9.8 h1:slArAR9Ft+1ybZu0lBwpSmpwhRXaa85hWtMinMyRAWo= +github.com/google/go-tpm v0.9.8/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= +github.com/klauspost/compress v1.18.3 h1:9PJRvfbmTabkOX8moIpXPbMMbYN60bWImDDU7L+/6zw= +github.com/klauspost/compress v1.18.3/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/libsql/sqlite-antlr4-parser v0.0.0-20240327125255-dbf53b6cbf06 h1:JLvn7D+wXjH9g4Jsjo+VqmzTUpl/LX7vfr6VOfSWTdM= github.com/libsql/sqlite-antlr4-parser v0.0.0-20240327125255-dbf53b6cbf06/go.mod h1:FUkZ5OHjlGPjnM2UyGJz9TypXQFgYqw6AFNO1UiROTM= +github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 h1:KGuD/pM2JpL9FAYvBrnBBeENKZNh6eNtjqytV6TYjnk= +github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= +github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g= +github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA= +github.com/nats-io/nats-server/v2 v2.12.4 h1:ZnT10v2LU2Xcoiy8ek9X6Se4YG8EuMfIfvAEuFVx1Ts= +github.com/nats-io/nats-server/v2 v2.12.4/go.mod h1:5MCp/pqm5SEfsvVZ31ll1088ZTwEUdvRX1Hmh/mTTDg= +github.com/nats-io/nats.go v1.49.0 h1:yh/WvY59gXqYpgl33ZI+XoVPKyut/IcEaqtsiuTJpoE= +github.com/nats-io/nats.go v1.49.0/go.mod h1:fDCn3mN5cY8HooHwE2ukiLb4p4G4ImmzvXyJt+tGwdw= +github.com/nats-io/nkeys v0.4.12 h1:nssm7JKOG9/x4J8II47VWCL1Ds29avyiQDRn0ckMvDc= +github.com/nats-io/nkeys v0.4.12/go.mod h1:MT59A1HYcjIcyQDJStTfaOY6vhy9XTUjOFo+SVsvpBg= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/tursodatabase/go-libsql v0.0.0-20251219133454-43644db490ff h1:Hvxz9W8fWpSg9xkiq8/q+3cVJo+MmLMfkjdS/u4nWFY= github.com/tursodatabase/go-libsql v0.0.0-20251219133454-43644db490ff/go.mod h1:TjsB2miB8RW2Sse8sdxzVTdeGlx74GloD5zJYUC38d8= +golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8= +golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A= golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc h1:mCRnTeVUjcrhlRmO0VK8a6k6Rrf6TF9htwo2pJVSjIU= golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= +golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= +golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= diff --git a/internal/nats/embedded.go b/internal/nats/embedded.go index 40b4928..7b75faf 100644 --- a/internal/nats/embedded.go +++ b/internal/nats/embedded.go @@ -1 +1,153 @@ +// Package nats provides an embedded NATS server with JetStream for the Felt +// tournament engine. The server runs in-process with no TCP listener, providing +// event durability and real-time state broadcasting. package nats + +import ( + "context" + "errors" + "fmt" + "log" + "path/filepath" + "time" + + "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +// EmbeddedServer wraps an in-process NATS server with JetStream enabled. +type EmbeddedServer struct { + server *server.Server + conn *nats.Conn + js jetstream.JetStream +} + +// Start creates and starts an embedded NATS server with JetStream. +// +// CRITICAL: JetStreamSyncInterval is set to 0 (sync_interval: always) per the +// December 2025 Jepsen finding for NATS 2.12.1. This ensures every acknowledged +// write is fsynced immediately. Without this, recently acknowledged writes exist +// only in memory and are lost on power failure in single-node deployments. +func Start(ctx context.Context, dataDir string) (*EmbeddedServer, error) { + storeDir := filepath.Join(dataDir, "jetstream") + + opts := &server.Options{ + ServerName: "felt-leaf", + DontListen: true, // In-process only, no TCP listener + JetStream: true, + StoreDir: storeDir, + NoSigs: true, // We handle signals ourselves + + // Memory and disk limits + JetStreamMaxMemory: 64 * 1024 * 1024, // 64MB memory limit + JetStreamMaxStore: 1024 * 1024 * 1024, // 1GB disk limit + + // MANDATORY: fsync every write for single-node durability (Jepsen 2025) + SyncInterval: 0, // 0 = "always" + } + + ns, err := server.NewServer(opts) + if err != nil { + return nil, fmt.Errorf("nats: create server: %w", err) + } + + // Start the server in the background + go ns.Start() + + // Wait for the server to be ready (5-second timeout) + if !ns.ReadyForConnections(5 * time.Second) { + ns.Shutdown() + return nil, errors.New("nats: server startup timeout (5s)") + } + + log.Printf("nats: server ready (store=%s, sync=always)", storeDir) + + // Connect to the in-process server using the client URL + nc, err := nats.Connect(ns.ClientURL(), + nats.InProcessServer(ns), + nats.Name("felt-leaf-internal"), + ) + if err != nil { + ns.Shutdown() + return nil, fmt.Errorf("nats: connect to embedded server: %w", err) + } + + // Create JetStream context + js, err := jetstream.New(nc) + if err != nil { + nc.Close() + ns.Shutdown() + return nil, fmt.Errorf("nats: create jetstream context: %w", err) + } + + es := &EmbeddedServer{ + server: ns, + conn: nc, + js: js, + } + + // Create initial streams + if err := es.createStreams(ctx); err != nil { + es.Shutdown() + return nil, fmt.Errorf("nats: create streams: %w", err) + } + + return es, nil +} + +// createStreams creates the initial JetStream streams for tournament events. +func (es *EmbeddedServer) createStreams(ctx context.Context) error { + // AUDIT stream: append-only audit trail for all tournament actions + _, err := es.js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ + Name: "AUDIT", + Subjects: []string{"tournament.*.audit"}, + Storage: jetstream.FileStorage, + MaxAge: 0, // No expiry — audit trail is permanent + }) + if err != nil { + return fmt.Errorf("create AUDIT stream: %w", err) + } + log.Printf("nats: AUDIT stream ready (tournament.*.audit)") + + // STATE stream: tournament state changes for real-time broadcast + _, err = es.js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ + Name: "STATE", + Subjects: []string{"tournament.*.state.>"}, + Storage: jetstream.FileStorage, + MaxAge: 24 * time.Hour, // State events expire after 24h + }) + if err != nil { + return fmt.Errorf("create STATE stream: %w", err) + } + log.Printf("nats: STATE stream ready (tournament.*.state.>)") + + return nil +} + +// Conn returns the NATS client connection. +func (es *EmbeddedServer) Conn() *nats.Conn { + return es.conn +} + +// JetStream returns the JetStream context for publishing and consuming. +func (es *EmbeddedServer) JetStream() jetstream.JetStream { + return es.js +} + +// Server returns the underlying NATS server for health checks. +func (es *EmbeddedServer) Server() *server.Server { + return es.server +} + +// Shutdown gracefully shuts down the NATS server and closes connections. +func (es *EmbeddedServer) Shutdown() { + if es.conn != nil { + es.conn.Close() + } + if es.server != nil { + es.server.Shutdown() + es.server.WaitForShutdown() + } + log.Printf("nats: server shutdown complete") +} diff --git a/internal/nats/publisher.go b/internal/nats/publisher.go index 40b4928..17639a3 100644 --- a/internal/nats/publisher.go +++ b/internal/nats/publisher.go @@ -1 +1,77 @@ package nats + +import ( + "context" + "fmt" + "regexp" + "strings" + + "github.com/nats-io/nats.go/jetstream" +) + +// uuidRegex validates UUID format (8-4-4-4-12 hex digits). +var uuidRegex = regexp.MustCompile(`^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$`) + +// Publisher provides tournament-scoped event publishing to JetStream. +type Publisher struct { + js jetstream.JetStream +} + +// NewPublisher creates a new publisher using the given JetStream context. +func NewPublisher(js jetstream.JetStream) *Publisher { + return &Publisher{js: js} +} + +// ValidateUUID checks that the given string is a valid UUID and does not +// contain NATS subject wildcards (* > .) that could enable subject injection. +func ValidateUUID(id string) error { + if id == "" { + return fmt.Errorf("empty UUID") + } + + // Check for NATS subject wildcards that could cause injection + if strings.ContainsAny(id, "*>.") { + return fmt.Errorf("UUID contains NATS subject wildcards: %q", id) + } + + if !uuidRegex.MatchString(id) { + return fmt.Errorf("invalid UUID format: %q", id) + } + + return nil +} + +// Publish publishes a tournament-scoped event to JetStream. +// The full subject is constructed as: tournament.. +// +// The tournamentID is validated as a proper UUID before subject construction +// to prevent subject injection attacks. +func (p *Publisher) Publish(ctx context.Context, tournamentID, subject string, data []byte) (*jetstream.PubAck, error) { + if err := ValidateUUID(tournamentID); err != nil { + return nil, fmt.Errorf("publish: invalid tournament ID: %w", err) + } + + if subject == "" { + return nil, fmt.Errorf("publish: empty subject") + } + + fullSubject := fmt.Sprintf("tournament.%s.%s", tournamentID, subject) + ack, err := p.js.Publish(ctx, fullSubject, data) + if err != nil { + return nil, fmt.Errorf("publish to %s: %w", fullSubject, err) + } + + return ack, nil +} + +// PublishAudit is a convenience method for publishing audit events. +// Subject: tournament..audit +func (p *Publisher) PublishAudit(ctx context.Context, tournamentID string, data []byte) (*jetstream.PubAck, error) { + return p.Publish(ctx, tournamentID, "audit", data) +} + +// PublishState is a convenience method for publishing state change events. +// Subject: tournament..state. +func (p *Publisher) PublishState(ctx context.Context, tournamentID, stateType string, data []byte) (*jetstream.PubAck, error) { + return p.Publish(ctx, tournamentID, "state."+stateType, data) +} diff --git a/internal/server/middleware/auth.go b/internal/server/middleware/auth.go index c870d7c..0e5151d 100644 --- a/internal/server/middleware/auth.go +++ b/internal/server/middleware/auth.go @@ -1 +1,96 @@ +// Package middleware provides HTTP middleware for the Felt tournament engine. package middleware + +import ( + "context" + "net/http" + "strings" + + "github.com/golang-jwt/jwt/v5" +) + +// contextKey is a private type for context keys to avoid collisions. +type contextKey string + +const ( + // OperatorIDKey is the context key for the authenticated operator ID. + OperatorIDKey contextKey = "operator_id" + // OperatorRoleKey is the context key for the authenticated operator role. + OperatorRoleKey contextKey = "operator_role" +) + +// JWTAuth returns middleware that validates JWT tokens from the Authorization header. +// Tokens must be in the format: Bearer +func JWTAuth(signingKey []byte) func(http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + authHeader := r.Header.Get("Authorization") + if authHeader == "" { + http.Error(w, `{"error":"missing authorization header"}`, http.StatusUnauthorized) + return + } + + parts := strings.SplitN(authHeader, " ", 2) + if len(parts) != 2 || !strings.EqualFold(parts[0], "bearer") { + http.Error(w, `{"error":"invalid authorization format"}`, http.StatusUnauthorized) + return + } + + tokenStr := parts[1] + operatorID, role, err := ValidateJWT(tokenStr, signingKey) + if err != nil { + http.Error(w, `{"error":"invalid or expired token"}`, http.StatusUnauthorized) + return + } + + // Add operator info to request context + ctx := context.WithValue(r.Context(), OperatorIDKey, operatorID) + ctx = context.WithValue(ctx, OperatorRoleKey, role) + next.ServeHTTP(w, r.WithContext(ctx)) + }) + } +} + +// ValidateJWT parses and validates a JWT token string, returning the +// operator ID and role from claims. +func ValidateJWT(tokenStr string, signingKey []byte) (operatorID string, role string, err error) { + token, err := jwt.Parse(tokenStr, func(token *jwt.Token) (interface{}, error) { + // Verify signing method is HMAC + if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok { + return nil, jwt.ErrSignatureInvalid + } + return signingKey, nil + }) + if err != nil { + return "", "", err + } + + claims, ok := token.Claims.(jwt.MapClaims) + if !ok || !token.Valid { + return "", "", jwt.ErrSignatureInvalid + } + + operatorID, _ = claims["sub"].(string) + role, _ = claims["role"].(string) + + if operatorID == "" { + return "", "", jwt.ErrSignatureInvalid + } + if role == "" { + role = "viewer" // Default role + } + + return operatorID, role, nil +} + +// OperatorID extracts the operator ID from the request context. +func OperatorID(r *http.Request) string { + id, _ := r.Context().Value(OperatorIDKey).(string) + return id +} + +// OperatorRole extracts the operator role from the request context. +func OperatorRole(r *http.Request) string { + role, _ := r.Context().Value(OperatorRoleKey).(string) + return role +} diff --git a/internal/server/middleware/bodylimit.go b/internal/server/middleware/bodylimit.go new file mode 100644 index 0000000..92b593f --- /dev/null +++ b/internal/server/middleware/bodylimit.go @@ -0,0 +1,18 @@ +package middleware + +import ( + "net/http" +) + +// MaxBytesReader wraps request bodies with a size limit to prevent abuse. +// Default limit is 1MB. +func MaxBytesReader(limit int64) func(http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Body != nil { + r.Body = http.MaxBytesReader(w, r.Body, limit) + } + next.ServeHTTP(w, r) + }) + } +} diff --git a/internal/server/middleware/role.go b/internal/server/middleware/role.go index c870d7c..0043d60 100644 --- a/internal/server/middleware/role.go +++ b/internal/server/middleware/role.go @@ -1 +1,40 @@ package middleware + +import ( + "net/http" +) + +// Role constants for operator access levels. +const ( + RoleAdmin = "admin" + RoleFloor = "floor" + RoleViewer = "viewer" +) + +// roleHierarchy defines the permission level for each role. +// Higher numbers have more permissions. +var roleHierarchy = map[string]int{ + RoleViewer: 1, + RoleFloor: 2, + RoleAdmin: 3, +} + +// RequireRole returns middleware that checks the operator has at least the +// given role level. Admin > Floor > Viewer. +func RequireRole(minRole string) func(http.Handler) http.Handler { + minLevel := roleHierarchy[minRole] + + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + role := OperatorRole(r) + level := roleHierarchy[role] + + if level < minLevel { + http.Error(w, `{"error":"insufficient permissions"}`, http.StatusForbidden) + return + } + + next.ServeHTTP(w, r) + }) + } +} diff --git a/internal/server/server.go b/internal/server/server.go index abb4e43..19fa6f6 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -1 +1,207 @@ +// Package server provides the HTTP server for the Felt tournament engine. +// It configures chi router with middleware, defines route groups, and serves +// the SvelteKit SPA via go:embed. package server + +import ( + "context" + "database/sql" + "encoding/json" + "log" + "net/http" + "time" + + "github.com/go-chi/chi/v5" + chimw "github.com/go-chi/chi/v5/middleware" + + "github.com/felt-app/felt/frontend" + "github.com/felt-app/felt/internal/server/middleware" + "github.com/felt-app/felt/internal/server/ws" + natsserver "github.com/nats-io/nats-server/v2/server" +) + +// Config holds server configuration. +type Config struct { + Addr string + SigningKey []byte + DevMode bool +} + +// Server wraps the HTTP server with all dependencies. +type Server struct { + httpServer *http.Server + hub *ws.Hub + db *sql.DB + nats *natsserver.Server +} + +// New creates a new HTTP server with all routes and middleware configured. +func New(cfg Config, db *sql.DB, nats *natsserver.Server, hub *ws.Hub) *Server { + r := chi.NewRouter() + + // Global middleware + r.Use(chimw.RequestID) + r.Use(chimw.RealIP) + r.Use(chimw.Logger) + r.Use(chimw.Recoverer) + + // Request body size limit: 1MB default + r.Use(middleware.MaxBytesReader(1 << 20)) // 1MB + + // CORS: permissive for development + r.Use(corsMiddleware(cfg.DevMode)) + + s := &Server{ + hub: hub, + db: db, + nats: nats, + } + + // API routes + r.Route("/api/v1", func(r chi.Router) { + // Public endpoints (no auth required) + r.Get("/health", s.handleHealth) + + // Protected endpoints + r.Group(func(r chi.Router) { + r.Use(middleware.JWTAuth(cfg.SigningKey)) + + // Stub endpoints — return 200 for now + r.Get("/tournaments", stubHandler("tournaments")) + r.Get("/players", stubHandler("players")) + }) + }) + + // WebSocket endpoint + r.Get("/ws", hub.HandleConnect) + + // SvelteKit SPA fallback (must be last) + r.Handle("/*", frontend.Handler()) + + s.httpServer = &http.Server{ + Addr: cfg.Addr, + Handler: r, + ReadHeaderTimeout: 10 * time.Second, + ReadTimeout: 30 * time.Second, + WriteTimeout: 60 * time.Second, + IdleTimeout: 120 * time.Second, + MaxHeaderBytes: 1 << 20, // 1MB + } + + return s +} + +// Handler returns the HTTP handler for use in tests. +func (s *Server) Handler() http.Handler { + return s.httpServer.Handler +} + +// ListenAndServe starts the HTTP server. +func (s *Server) ListenAndServe() error { + log.Printf("server: listening on %s", s.httpServer.Addr) + return s.httpServer.ListenAndServe() +} + +// Shutdown gracefully shuts down the HTTP server. +func (s *Server) Shutdown(ctx context.Context) error { + log.Printf("server: shutting down") + return s.httpServer.Shutdown(ctx) +} + +// handleHealth returns the health status of all subsystems. +func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { + status := map[string]interface{}{ + "status": "ok", + "subsystems": map[string]interface{}{ + "database": s.checkDatabase(), + "nats": s.checkNATS(), + "websocket": map[string]interface{}{ + "status": "ok", + "clients": s.hub.ClientCount(), + }, + }, + "timestamp": time.Now().UTC().Format(time.RFC3339), + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(status) +} + +// checkDatabase verifies the database is operational. +func (s *Server) checkDatabase() map[string]interface{} { + var result int + err := s.db.QueryRow("SELECT 1").Scan(&result) + if err != nil { + return map[string]interface{}{ + "status": "error", + "error": err.Error(), + } + } + return map[string]interface{}{ + "status": "ok", + } +} + +// checkNATS verifies the NATS server is operational. +func (s *Server) checkNATS() map[string]interface{} { + if s.nats == nil { + return map[string]interface{}{ + "status": "error", + "error": "not initialized", + } + } + + varz, err := s.nats.Varz(nil) + if err != nil { + return map[string]interface{}{ + "status": "error", + "error": err.Error(), + } + } + + return map[string]interface{}{ + "status": "ok", + "connections": varz.Connections, + "jetstream": s.nats.JetStreamEnabled(), + } +} + +// stubHandler returns a handler that responds with 200 and a JSON stub. +func stubHandler(name string) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "status": "stub", + "name": name, + "message": "Not yet implemented", + }) + } +} + +// corsMiddleware returns CORS middleware. In dev mode, it's permissive. +func corsMiddleware(devMode bool) func(http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if devMode { + w.Header().Set("Access-Control-Allow-Origin", "*") + } else { + // In production, only allow same-origin + origin := r.Header.Get("Origin") + if origin != "" { + w.Header().Set("Access-Control-Allow-Origin", origin) + } + } + w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS, PATCH") + w.Header().Set("Access-Control-Allow-Headers", "Accept, Authorization, Content-Type, X-Request-ID") + w.Header().Set("Access-Control-Allow-Credentials", "true") + w.Header().Set("Access-Control-Max-Age", "300") + + if r.Method == "OPTIONS" { + w.WriteHeader(http.StatusOK) + return + } + + next.ServeHTTP(w, r) + }) + } +} diff --git a/internal/server/ws/hub.go b/internal/server/ws/hub.go index 9859295..7cd89d6 100644 --- a/internal/server/ws/hub.go +++ b/internal/server/ws/hub.go @@ -1 +1,248 @@ +// Package ws implements a WebSocket hub for real-time tournament state +// broadcasting. Connections are authenticated via JWT (query parameter) and +// scoped to specific tournaments. package ws + +import ( + "context" + "encoding/json" + "log" + "net/http" + "sync" + "time" + + "github.com/coder/websocket" +) + +// Message represents a WebSocket message sent to clients. +type Message struct { + Type string `json:"type"` + TournamentID string `json:"tournament_id,omitempty"` + Data json.RawMessage `json:"data,omitempty"` + Timestamp int64 `json:"timestamp"` +} + +// Client represents a connected WebSocket client. +type Client struct { + conn *websocket.Conn + tournamentID string // Subscription scope (empty = global messages only) + operatorID string // From JWT claims + role string // From JWT claims (admin, floor, viewer) + send chan []byte + hub *Hub +} + +// TokenValidator is a function that validates a JWT token string and returns +// the operator ID and role. Injected by the server package. +type TokenValidator func(tokenStr string) (operatorID string, role string, err error) + +// TournamentValidator is a function that checks if a tournament exists and +// the operator has access to it. Returns nil if valid. +type TournamentValidator func(tournamentID string, operatorID string) error + +// Hub manages all active WebSocket connections and broadcasts messages +// to tournament-scoped client groups. +type Hub struct { + mu sync.RWMutex + clients map[*Client]struct{} + validateToken TokenValidator + validateTournament TournamentValidator + allowedOrigins []string +} + +// NewHub creates a new WebSocket hub. +func NewHub(validateToken TokenValidator, validateTournament TournamentValidator, allowedOrigins []string) *Hub { + return &Hub{ + clients: make(map[*Client]struct{}), + validateToken: validateToken, + validateTournament: validateTournament, + allowedOrigins: allowedOrigins, + } +} + +// Register adds a client to the hub. +func (h *Hub) Register(client *Client) { + h.mu.Lock() + defer h.mu.Unlock() + h.clients[client] = struct{}{} + log.Printf("ws: client registered (operator=%s, tournament=%s, total=%d)", + client.operatorID, client.tournamentID, len(h.clients)) +} + +// Unregister removes a client from the hub and closes its send channel. +func (h *Hub) Unregister(client *Client) { + h.mu.Lock() + defer h.mu.Unlock() + if _, ok := h.clients[client]; ok { + delete(h.clients, client) + close(client.send) + log.Printf("ws: client unregistered (operator=%s, tournament=%s, total=%d)", + client.operatorID, client.tournamentID, len(h.clients)) + } +} + +// Broadcast sends a message to all clients subscribed to the given tournament. +// If tournamentID is empty, the message is sent to ALL connected clients. +func (h *Hub) Broadcast(tournamentID string, msgType string, data json.RawMessage) { + msg := Message{ + Type: msgType, + TournamentID: tournamentID, + Data: data, + Timestamp: time.Now().UnixMilli(), + } + + payload, err := json.Marshal(msg) + if err != nil { + log.Printf("ws: broadcast marshal error: %v", err) + return + } + + h.mu.RLock() + defer h.mu.RUnlock() + + sent := 0 + for client := range h.clients { + if tournamentID == "" || client.tournamentID == tournamentID { + // Non-blocking send: drop messages for slow consumers + select { + case client.send <- payload: + sent++ + default: + log.Printf("ws: dropping message for slow client (operator=%s)", client.operatorID) + } + } + } + + if sent > 0 { + log.Printf("ws: broadcast %s to %d client(s) (tournament=%s)", msgType, sent, tournamentID) + } +} + +// ClientCount returns the total number of connected clients. +func (h *Hub) ClientCount() int { + h.mu.RLock() + defer h.mu.RUnlock() + return len(h.clients) +} + +// HandleConnect handles WebSocket upgrade requests. +// Authentication: JWT must be provided as ?token= query parameter. +// Tournament scope: ?tournament= query parameter (optional). +func (h *Hub) HandleConnect(w http.ResponseWriter, r *http.Request) { + // Extract and validate JWT from query parameter + tokenStr := r.URL.Query().Get("token") + if tokenStr == "" { + http.Error(w, `{"error":"missing token parameter"}`, http.StatusUnauthorized) + return + } + + operatorID, role, err := h.validateToken(tokenStr) + if err != nil { + http.Error(w, `{"error":"invalid or expired token"}`, http.StatusUnauthorized) + return + } + + // Validate tournament scope if provided + tournamentID := r.URL.Query().Get("tournament") + if tournamentID != "" { + if h.validateTournament != nil { + if err := h.validateTournament(tournamentID, operatorID); err != nil { + http.Error(w, `{"error":"tournament not found or access denied"}`, http.StatusForbidden) + return + } + } + } + + // Build accept options with Origin validation + acceptOpts := &websocket.AcceptOptions{ + OriginPatterns: h.allowedOrigins, + } + if len(h.allowedOrigins) == 0 { + // In dev mode, allow all origins + acceptOpts.InsecureSkipVerify = true + } + + // Accept the WebSocket connection + conn, err := websocket.Accept(w, r, acceptOpts) + if err != nil { + log.Printf("ws: accept error: %v", err) + return + } + + client := &Client{ + conn: conn, + tournamentID: tournamentID, + operatorID: operatorID, + role: role, + send: make(chan []byte, 256), + hub: h, + } + + h.Register(client) + + // Send initial connected message + connectMsg := Message{ + Type: "connected", + TournamentID: tournamentID, + Data: json.RawMessage(`{"status":"ok"}`), + Timestamp: time.Now().UnixMilli(), + } + payload, _ := json.Marshal(connectMsg) + client.send <- payload + + // Start read and write pumps + ctx := r.Context() + go client.writePump(ctx) + client.readPump(ctx) // Blocks until disconnect + + // Cleanup + h.Unregister(client) + conn.CloseNow() +} + +// readPump reads messages from the WebSocket connection. +// Currently discards incoming messages (one-way broadcast). +func (c *Client) readPump(ctx context.Context) { + for { + _, _, err := c.conn.Read(ctx) + if err != nil { + // Connection closed or error + return + } + // Currently no client-to-server messages are processed. + // Future: handle subscription changes, pings, etc. + } +} + +// writePump writes messages from the send channel to the WebSocket connection. +func (c *Client) writePump(ctx context.Context) { + for { + select { + case msg, ok := <-c.send: + if !ok { + // Channel closed, hub unregistered us + c.conn.Close(websocket.StatusNormalClosure, "closing") + return + } + if err := c.conn.Write(ctx, websocket.MessageText, msg); err != nil { + return + } + case <-ctx.Done(): + return + } + } +} + +// Shutdown closes all client connections gracefully. +func (h *Hub) Shutdown() { + h.mu.Lock() + defer h.mu.Unlock() + + for client := range h.clients { + client.conn.Close(websocket.StatusGoingAway, "server shutting down") + delete(h.clients, client) + close(client.send) + } + + log.Printf("ws: hub shutdown complete") +}