feat(01-01): implement core infrastructure — NATS, LibSQL, WebSocket hub, HTTP server

- Embedded NATS server with JetStream (sync_interval=always per Jepsen 2025)
- AUDIT and STATE JetStream streams for tournament event durability
- NATS publisher with UUID validation to prevent subject injection
- WebSocket hub with JWT auth (query param), tournament-scoped broadcasting
- Origin validation and slow-consumer message dropping
- chi HTTP router with middleware (logger, recoverer, request ID, CORS, body limits)
- Server timeouts: ReadHeader 10s, Read 30s, Write 60s, Idle 120s, MaxHeader 1MB
- MaxBytesReader middleware for request body limits (1MB default)
- JWT auth middleware with HMAC-SHA256 validation
- Role-based access control (admin > floor > viewer)
- Health endpoint reporting all subsystem status (DB, NATS, WebSocket)
- SvelteKit SPA served via go:embed with fallback routing
- Signal-driven graceful shutdown in reverse startup order
- 9 integration tests covering all verification criteria

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Mikkel Georgsen 2026-03-01 03:42:42 +01:00
parent 9bfd959eaf
commit 16caa12d64
11 changed files with 1283 additions and 20 deletions

View file

@ -1,18 +1,30 @@
// Command leaf is the Felt tournament engine binary. It starts all embedded
// infrastructure (LibSQL, NATS JetStream, WebSocket hub) and serves the
// SvelteKit SPA over HTTP.
package main package main
import ( import (
"context"
"crypto/rand"
"flag" "flag"
"fmt"
"log" "log"
"net/http"
"os" "os"
"os/signal"
"syscall"
"time"
feltnats "github.com/felt-app/felt/internal/nats"
"github.com/felt-app/felt/internal/server"
"github.com/felt-app/felt/internal/server/middleware"
"github.com/felt-app/felt/internal/server/ws"
"github.com/felt-app/felt/internal/store" "github.com/felt-app/felt/internal/store"
) )
func main() { func main() {
dataDir := flag.String("data-dir", "./data", "Data directory for database and files") dataDir := flag.String("data-dir", "./data", "Data directory for database and files")
addr := flag.String("addr", ":8080", "HTTP listen address") addr := flag.String("addr", ":8080", "HTTP listen address")
devMode := flag.Bool("dev", false, "Enable development mode (applies dev seed data)") devMode := flag.Bool("dev", false, "Enable development mode (permissive CORS, dev seed data)")
flag.Parse() flag.Parse()
log.SetFlags(log.LstdFlags | log.Lmsgprefix) log.SetFlags(log.LstdFlags | log.Lmsgprefix)
@ -20,32 +32,107 @@ func main() {
log.Printf("starting (data-dir=%s, addr=%s, dev=%v)", *dataDir, *addr, *devMode) log.Printf("starting (data-dir=%s, addr=%s, dev=%v)", *dataDir, *addr, *devMode)
// Open database (runs migrations automatically) // Create root context with cancellation
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// ---- 1. LibSQL Database ----
db, err := store.Open(*dataDir, *devMode) db, err := store.Open(*dataDir, *devMode)
if err != nil { if err != nil {
log.Fatalf("failed to open database: %v", err) log.Fatalf("failed to open database: %v", err)
} }
defer db.Close() defer db.Close()
// Check first-run setup status
needsSetup, err := db.NeedsSetup()
if err != nil {
log.Fatalf("failed to check setup status: %v", err)
}
if needsSetup {
log.Printf("no operators found - first-run setup required")
log.Printf("POST /api/v1/setup to create first admin operator")
}
// Verify database is working // Verify database is working
var one int var one int
if err := db.QueryRow("SELECT 1").Scan(&one); err != nil { if err := db.QueryRow("SELECT 1").Scan(&one); err != nil {
log.Fatalf("database health check failed: %v", err) log.Fatalf("database health check failed: %v", err)
} }
log.Printf("database ready") log.Printf("database ready")
// Placeholder for HTTP server (will be implemented in Plan A) // ---- 2. Embedded NATS Server ----
_ = addr natsServer, err := feltnats.Start(ctx, *dataDir)
fmt.Fprintf(os.Stderr, "felt: ready (database operational, HTTP server not yet implemented)\n") if err != nil {
log.Fatalf("failed to start NATS: %v", err)
}
defer natsServer.Shutdown()
// ---- 3. JWT Signing Key ----
// In production, this should be loaded from a persisted secret.
// For now, generate a random key on startup (tokens won't survive restart).
signingKey := generateOrLoadSigningKey(*dataDir)
// ---- 4. WebSocket Hub ----
tokenValidator := func(tokenStr string) (string, string, error) {
return middleware.ValidateJWT(tokenStr, signingKey)
}
// Tournament validator stub — allows all for now
// TODO: Implement tournament existence + access check against DB
tournamentValidator := func(tournamentID string, operatorID string) error {
return nil // Accept all tournaments for now
}
var allowedOrigins []string
if *devMode {
allowedOrigins = nil // InsecureSkipVerify will be used
} else {
allowedOrigins = []string{"*"} // Same-origin enforced by browser
}
hub := ws.NewHub(tokenValidator, tournamentValidator, allowedOrigins)
defer hub.Shutdown()
// ---- 5. HTTP Server ----
srv := server.New(server.Config{
Addr: *addr,
SigningKey: signingKey,
DevMode: *devMode,
}, db.DB, natsServer.Server(), hub)
// Start HTTP server in goroutine
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("HTTP server error: %v", err)
}
}()
log.Printf("ready (addr=%s, dev=%v)", *addr, *devMode)
// ---- Signal Handling ----
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigCh
log.Printf("received signal: %s, shutting down...", sig)
// Graceful shutdown in reverse startup order
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer shutdownCancel()
// 5. HTTP Server
if err := srv.Shutdown(shutdownCtx); err != nil {
log.Printf("HTTP server shutdown error: %v", err)
}
// 4. WebSocket Hub (closed by defer)
// 3. NATS Server (closed by defer)
// 2. Database (closed by defer)
cancel() // Cancel root context
log.Printf("shutdown complete")
}
// generateOrLoadSigningKey generates a random 256-bit signing key.
// In a future plan, this will be persisted to the data directory.
func generateOrLoadSigningKey(dataDir string) []byte {
// TODO: Persist to file in dataDir for key stability across restarts
_ = dataDir
key := make([]byte, 32)
if _, err := rand.Read(key); err != nil {
log.Fatalf("failed to generate signing key: %v", err)
}
log.Printf("JWT signing key generated (ephemeral — will change on restart)")
return key
} }

294
cmd/leaf/main_test.go Normal file
View file

@ -0,0 +1,294 @@
package main
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/coder/websocket"
"github.com/golang-jwt/jwt/v5"
feltnats "github.com/felt-app/felt/internal/nats"
"github.com/felt-app/felt/internal/server"
"github.com/felt-app/felt/internal/server/middleware"
"github.com/felt-app/felt/internal/server/ws"
"github.com/felt-app/felt/internal/store"
)
func setupTestServer(t *testing.T) (*httptest.Server, *store.DB, *feltnats.EmbeddedServer, []byte) {
t.Helper()
ctx := context.Background()
tmpDir := t.TempDir()
// Open database
db, err := store.Open(tmpDir, true)
if err != nil {
t.Fatalf("open database: %v", err)
}
t.Cleanup(func() { db.Close() })
// Start NATS
ns, err := feltnats.Start(ctx, tmpDir)
if err != nil {
t.Fatalf("start nats: %v", err)
}
t.Cleanup(func() { ns.Shutdown() })
// Setup JWT signing
signingKey := []byte("test-signing-key-32-bytes-long!!")
tokenValidator := func(tokenStr string) (string, string, error) {
return middleware.ValidateJWT(tokenStr, signingKey)
}
hub := ws.NewHub(tokenValidator, nil, nil)
t.Cleanup(func() { hub.Shutdown() })
// Create HTTP server
srv := server.New(server.Config{
Addr: ":0",
SigningKey: signingKey,
DevMode: true,
}, db.DB, ns.Server(), hub)
ts := httptest.NewServer(srv.Handler())
t.Cleanup(func() { ts.Close() })
return ts, db, ns, signingKey
}
func makeToken(t *testing.T, signingKey []byte, operatorID, role string) string {
t.Helper()
token := jwt.NewWithClaims(jwt.SigningMethodHS256, jwt.MapClaims{
"sub": operatorID,
"role": role,
"exp": time.Now().Add(time.Hour).Unix(),
})
tokenStr, err := token.SignedString(signingKey)
if err != nil {
t.Fatalf("sign token: %v", err)
}
return tokenStr
}
func TestHealthEndpoint(t *testing.T) {
ts, _, _, _ := setupTestServer(t)
resp, err := http.Get(ts.URL + "/api/v1/health")
if err != nil {
t.Fatalf("health request: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
t.Fatalf("expected 200, got %d", resp.StatusCode)
}
var health map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&health); err != nil {
t.Fatalf("decode health: %v", err)
}
if health["status"] != "ok" {
t.Fatalf("expected status ok, got %v", health["status"])
}
// Check subsystems
subsystems, ok := health["subsystems"].(map[string]interface{})
if !ok {
t.Fatal("missing subsystems in health response")
}
dbStatus, ok := subsystems["database"].(map[string]interface{})
if !ok || dbStatus["status"] != "ok" {
t.Fatalf("database not ok: %v", dbStatus)
}
natsStatus, ok := subsystems["nats"].(map[string]interface{})
if !ok || natsStatus["status"] != "ok" {
t.Fatalf("nats not ok: %v", natsStatus)
}
wsStatus, ok := subsystems["websocket"].(map[string]interface{})
if !ok || wsStatus["status"] != "ok" {
t.Fatalf("websocket not ok: %v", wsStatus)
}
}
func TestSPAFallback(t *testing.T) {
ts, _, _, _ := setupTestServer(t)
// Root path
resp, err := http.Get(ts.URL + "/")
if err != nil {
t.Fatalf("root request: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
t.Fatalf("expected 200 for root, got %d", resp.StatusCode)
}
// Unknown path (SPA fallback)
resp2, err := http.Get(ts.URL + "/some/unknown/route")
if err != nil {
t.Fatalf("unknown path request: %v", err)
}
defer resp2.Body.Close()
if resp2.StatusCode != 200 {
t.Fatalf("expected 200 for SPA fallback, got %d", resp2.StatusCode)
}
}
func TestWebSocketRejectsMissingToken(t *testing.T) {
ts, _, _, _ := setupTestServer(t)
ctx := context.Background()
_, resp, err := websocket.Dial(ctx, "ws"+ts.URL[4:]+"/ws", nil)
if err == nil {
t.Fatal("expected error for missing token")
}
if resp != nil && resp.StatusCode != 401 {
t.Fatalf("expected 401, got %d", resp.StatusCode)
}
}
func TestWebSocketRejectsInvalidToken(t *testing.T) {
ts, _, _, _ := setupTestServer(t)
ctx := context.Background()
_, resp, err := websocket.Dial(ctx, "ws"+ts.URL[4:]+"/ws?token=invalid", nil)
if err == nil {
t.Fatal("expected error for invalid token")
}
if resp != nil && resp.StatusCode != 401 {
t.Fatalf("expected 401, got %d", resp.StatusCode)
}
}
func TestWebSocketAcceptsValidToken(t *testing.T) {
ts, _, _, signingKey := setupTestServer(t)
ctx := context.Background()
tokenStr := makeToken(t, signingKey, "operator-123", "admin")
wsURL := "ws" + ts.URL[4:] + "/ws?token=" + tokenStr
conn, _, err := websocket.Dial(ctx, wsURL, nil)
if err != nil {
t.Fatalf("websocket dial: %v", err)
}
defer conn.CloseNow()
// Should receive a connected message
readCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
_, msgBytes, err := conn.Read(readCtx)
if err != nil {
t.Fatalf("read connected message: %v", err)
}
var msg map[string]interface{}
if err := json.Unmarshal(msgBytes, &msg); err != nil {
t.Fatalf("decode message: %v", err)
}
if msg["type"] != "connected" {
t.Fatalf("expected 'connected' message, got %v", msg["type"])
}
conn.Close(websocket.StatusNormalClosure, "test done")
}
func TestNATSStreamsExist(t *testing.T) {
_, _, ns, _ := setupTestServer(t)
ctx := context.Background()
js := ns.JetStream()
// Check AUDIT stream
stream, err := js.Stream(ctx, "AUDIT")
if err != nil {
t.Fatalf("get AUDIT stream: %v", err)
}
info, err := stream.Info(ctx)
if err != nil {
t.Fatalf("get AUDIT stream info: %v", err)
}
if info.Config.Name != "AUDIT" {
t.Fatalf("expected AUDIT stream, got %s", info.Config.Name)
}
// Check STATE stream
stream, err = js.Stream(ctx, "STATE")
if err != nil {
t.Fatalf("get STATE stream: %v", err)
}
info, err = stream.Info(ctx)
if err != nil {
t.Fatalf("get STATE stream info: %v", err)
}
if info.Config.Name != "STATE" {
t.Fatalf("expected STATE stream, got %s", info.Config.Name)
}
}
func TestPublisherUUIDValidation(t *testing.T) {
_, _, ns, _ := setupTestServer(t)
ctx := context.Background()
js := ns.JetStream()
pub := feltnats.NewPublisher(js)
// Empty UUID
_, err := pub.Publish(ctx, "", "audit", []byte("test"))
if err == nil {
t.Fatal("expected error for empty UUID")
}
// UUID with NATS wildcards
_, err = pub.Publish(ctx, "test.*.injection", "audit", []byte("test"))
if err == nil {
t.Fatal("expected error for UUID with wildcards")
}
// Invalid format
_, err = pub.Publish(ctx, "not-a-uuid", "audit", []byte("test"))
if err == nil {
t.Fatal("expected error for invalid UUID format")
}
// Valid UUID should succeed
_, err = pub.Publish(ctx, "550e8400-e29b-41d4-a716-446655440000", "audit", []byte(`{"test":true}`))
if err != nil {
t.Fatalf("expected success for valid UUID, got: %v", err)
}
}
func TestLibSQLWALMode(t *testing.T) {
_, db, _, _ := setupTestServer(t)
var mode string
err := db.QueryRow("PRAGMA journal_mode").Scan(&mode)
if err != nil {
t.Fatalf("query journal_mode: %v", err)
}
if mode != "wal" {
t.Fatalf("expected WAL mode, got %s", mode)
}
}
func TestLibSQLForeignKeys(t *testing.T) {
_, db, _, _ := setupTestServer(t)
var fk int
err := db.QueryRow("PRAGMA foreign_keys").Scan(&fk)
if err != nil {
t.Fatalf("query foreign_keys: %v", err)
}
if fk != 1 {
t.Fatalf("expected foreign_keys=1, got %d", fk)
}
}

24
go.mod
View file

@ -5,7 +5,25 @@ go 1.24.0
require github.com/tursodatabase/go-libsql v0.0.0-20251219133454-43644db490ff // no tagged releases pinned to commit 43644db490ff require github.com/tursodatabase/go-libsql v0.0.0-20251219133454-43644db490ff // no tagged releases pinned to commit 43644db490ff
require ( require (
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect github.com/coder/websocket v1.8.14
github.com/libsql/sqlite-antlr4-parser v0.0.0-20240327125255-dbf53b6cbf06 // indirect github.com/go-chi/chi/v5 v5.2.5
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect github.com/golang-jwt/jwt/v5 v5.3.1
github.com/nats-io/nats-server/v2 v2.12.4
github.com/nats-io/nats.go v1.49.0
)
require (
github.com/antithesishq/antithesis-sdk-go v0.5.0-default-no-op // indirect
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/google/go-tpm v0.9.8 // indirect
github.com/klauspost/compress v1.18.3 // indirect
github.com/libsql/sqlite-antlr4-parser v0.0.0-20240327125255-dbf53b6cbf06 // indirect
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 // indirect
github.com/nats-io/jwt/v2 v2.8.0 // indirect
github.com/nats-io/nkeys v0.4.12 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
golang.org/x/crypto v0.47.0 // indirect
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect
golang.org/x/sys v0.40.0 // indirect
golang.org/x/time v0.14.0 // indirect
) )

31
go.sum
View file

@ -1,16 +1,47 @@
github.com/antithesishq/antithesis-sdk-go v0.5.0-default-no-op h1:Ucf+QxEKMbPogRO5guBNe5cgd9uZgfoJLOYs8WWhtjM=
github.com/antithesishq/antithesis-sdk-go v0.5.0-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E=
github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI= github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI=
github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g= github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
github.com/coder/websocket v1.8.14 h1:9L0p0iKiNOibykf283eHkKUHHrpG7f65OE3BhhO7v9g=
github.com/coder/websocket v1.8.14/go.mod h1:NX3SzP+inril6yawo5CQXx8+fk145lPDC6pumgx0mVg=
github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug=
github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0=
github.com/golang-jwt/jwt/v5 v5.3.1 h1:kYf81DTWFe7t+1VvL7eS+jKFVWaUnK9cB1qbwn63YCY=
github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-tpm v0.9.8 h1:slArAR9Ft+1ybZu0lBwpSmpwhRXaa85hWtMinMyRAWo=
github.com/google/go-tpm v0.9.8/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/klauspost/compress v1.18.3 h1:9PJRvfbmTabkOX8moIpXPbMMbYN60bWImDDU7L+/6zw=
github.com/klauspost/compress v1.18.3/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
github.com/libsql/sqlite-antlr4-parser v0.0.0-20240327125255-dbf53b6cbf06 h1:JLvn7D+wXjH9g4Jsjo+VqmzTUpl/LX7vfr6VOfSWTdM= github.com/libsql/sqlite-antlr4-parser v0.0.0-20240327125255-dbf53b6cbf06 h1:JLvn7D+wXjH9g4Jsjo+VqmzTUpl/LX7vfr6VOfSWTdM=
github.com/libsql/sqlite-antlr4-parser v0.0.0-20240327125255-dbf53b6cbf06/go.mod h1:FUkZ5OHjlGPjnM2UyGJz9TypXQFgYqw6AFNO1UiROTM= github.com/libsql/sqlite-antlr4-parser v0.0.0-20240327125255-dbf53b6cbf06/go.mod h1:FUkZ5OHjlGPjnM2UyGJz9TypXQFgYqw6AFNO1UiROTM=
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 h1:KGuD/pM2JpL9FAYvBrnBBeENKZNh6eNtjqytV6TYjnk=
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g=
github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats-server/v2 v2.12.4 h1:ZnT10v2LU2Xcoiy8ek9X6Se4YG8EuMfIfvAEuFVx1Ts=
github.com/nats-io/nats-server/v2 v2.12.4/go.mod h1:5MCp/pqm5SEfsvVZ31ll1088ZTwEUdvRX1Hmh/mTTDg=
github.com/nats-io/nats.go v1.49.0 h1:yh/WvY59gXqYpgl33ZI+XoVPKyut/IcEaqtsiuTJpoE=
github.com/nats-io/nats.go v1.49.0/go.mod h1:fDCn3mN5cY8HooHwE2ukiLb4p4G4ImmzvXyJt+tGwdw=
github.com/nats-io/nkeys v0.4.12 h1:nssm7JKOG9/x4J8II47VWCL1Ds29avyiQDRn0ckMvDc=
github.com/nats-io/nkeys v0.4.12/go.mod h1:MT59A1HYcjIcyQDJStTfaOY6vhy9XTUjOFo+SVsvpBg=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/tursodatabase/go-libsql v0.0.0-20251219133454-43644db490ff h1:Hvxz9W8fWpSg9xkiq8/q+3cVJo+MmLMfkjdS/u4nWFY= github.com/tursodatabase/go-libsql v0.0.0-20251219133454-43644db490ff h1:Hvxz9W8fWpSg9xkiq8/q+3cVJo+MmLMfkjdS/u4nWFY=
github.com/tursodatabase/go-libsql v0.0.0-20251219133454-43644db490ff/go.mod h1:TjsB2miB8RW2Sse8sdxzVTdeGlx74GloD5zJYUC38d8= github.com/tursodatabase/go-libsql v0.0.0-20251219133454-43644db490ff/go.mod h1:TjsB2miB8RW2Sse8sdxzVTdeGlx74GloD5zJYUC38d8=
golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8=
golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A=
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc h1:mCRnTeVUjcrhlRmO0VK8a6k6Rrf6TF9htwo2pJVSjIU= golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc h1:mCRnTeVUjcrhlRmO0VK8a6k6Rrf6TF9htwo2pJVSjIU=
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ=
golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=

View file

@ -1 +1,153 @@
// Package nats provides an embedded NATS server with JetStream for the Felt
// tournament engine. The server runs in-process with no TCP listener, providing
// event durability and real-time state broadcasting.
package nats package nats
import (
"context"
"errors"
"fmt"
"log"
"path/filepath"
"time"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
// EmbeddedServer wraps an in-process NATS server with JetStream enabled.
type EmbeddedServer struct {
server *server.Server
conn *nats.Conn
js jetstream.JetStream
}
// Start creates and starts an embedded NATS server with JetStream.
//
// CRITICAL: JetStreamSyncInterval is set to 0 (sync_interval: always) per the
// December 2025 Jepsen finding for NATS 2.12.1. This ensures every acknowledged
// write is fsynced immediately. Without this, recently acknowledged writes exist
// only in memory and are lost on power failure in single-node deployments.
func Start(ctx context.Context, dataDir string) (*EmbeddedServer, error) {
storeDir := filepath.Join(dataDir, "jetstream")
opts := &server.Options{
ServerName: "felt-leaf",
DontListen: true, // In-process only, no TCP listener
JetStream: true,
StoreDir: storeDir,
NoSigs: true, // We handle signals ourselves
// Memory and disk limits
JetStreamMaxMemory: 64 * 1024 * 1024, // 64MB memory limit
JetStreamMaxStore: 1024 * 1024 * 1024, // 1GB disk limit
// MANDATORY: fsync every write for single-node durability (Jepsen 2025)
SyncInterval: 0, // 0 = "always"
}
ns, err := server.NewServer(opts)
if err != nil {
return nil, fmt.Errorf("nats: create server: %w", err)
}
// Start the server in the background
go ns.Start()
// Wait for the server to be ready (5-second timeout)
if !ns.ReadyForConnections(5 * time.Second) {
ns.Shutdown()
return nil, errors.New("nats: server startup timeout (5s)")
}
log.Printf("nats: server ready (store=%s, sync=always)", storeDir)
// Connect to the in-process server using the client URL
nc, err := nats.Connect(ns.ClientURL(),
nats.InProcessServer(ns),
nats.Name("felt-leaf-internal"),
)
if err != nil {
ns.Shutdown()
return nil, fmt.Errorf("nats: connect to embedded server: %w", err)
}
// Create JetStream context
js, err := jetstream.New(nc)
if err != nil {
nc.Close()
ns.Shutdown()
return nil, fmt.Errorf("nats: create jetstream context: %w", err)
}
es := &EmbeddedServer{
server: ns,
conn: nc,
js: js,
}
// Create initial streams
if err := es.createStreams(ctx); err != nil {
es.Shutdown()
return nil, fmt.Errorf("nats: create streams: %w", err)
}
return es, nil
}
// createStreams creates the initial JetStream streams for tournament events.
func (es *EmbeddedServer) createStreams(ctx context.Context) error {
// AUDIT stream: append-only audit trail for all tournament actions
_, err := es.js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: "AUDIT",
Subjects: []string{"tournament.*.audit"},
Storage: jetstream.FileStorage,
MaxAge: 0, // No expiry — audit trail is permanent
})
if err != nil {
return fmt.Errorf("create AUDIT stream: %w", err)
}
log.Printf("nats: AUDIT stream ready (tournament.*.audit)")
// STATE stream: tournament state changes for real-time broadcast
_, err = es.js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: "STATE",
Subjects: []string{"tournament.*.state.>"},
Storage: jetstream.FileStorage,
MaxAge: 24 * time.Hour, // State events expire after 24h
})
if err != nil {
return fmt.Errorf("create STATE stream: %w", err)
}
log.Printf("nats: STATE stream ready (tournament.*.state.>)")
return nil
}
// Conn returns the NATS client connection.
func (es *EmbeddedServer) Conn() *nats.Conn {
return es.conn
}
// JetStream returns the JetStream context for publishing and consuming.
func (es *EmbeddedServer) JetStream() jetstream.JetStream {
return es.js
}
// Server returns the underlying NATS server for health checks.
func (es *EmbeddedServer) Server() *server.Server {
return es.server
}
// Shutdown gracefully shuts down the NATS server and closes connections.
func (es *EmbeddedServer) Shutdown() {
if es.conn != nil {
es.conn.Close()
}
if es.server != nil {
es.server.Shutdown()
es.server.WaitForShutdown()
}
log.Printf("nats: server shutdown complete")
}

View file

@ -1 +1,77 @@
package nats package nats
import (
"context"
"fmt"
"regexp"
"strings"
"github.com/nats-io/nats.go/jetstream"
)
// uuidRegex validates UUID format (8-4-4-4-12 hex digits).
var uuidRegex = regexp.MustCompile(`^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$`)
// Publisher provides tournament-scoped event publishing to JetStream.
type Publisher struct {
js jetstream.JetStream
}
// NewPublisher creates a new publisher using the given JetStream context.
func NewPublisher(js jetstream.JetStream) *Publisher {
return &Publisher{js: js}
}
// ValidateUUID checks that the given string is a valid UUID and does not
// contain NATS subject wildcards (* > .) that could enable subject injection.
func ValidateUUID(id string) error {
if id == "" {
return fmt.Errorf("empty UUID")
}
// Check for NATS subject wildcards that could cause injection
if strings.ContainsAny(id, "*>.") {
return fmt.Errorf("UUID contains NATS subject wildcards: %q", id)
}
if !uuidRegex.MatchString(id) {
return fmt.Errorf("invalid UUID format: %q", id)
}
return nil
}
// Publish publishes a tournament-scoped event to JetStream.
// The full subject is constructed as: tournament.<tournamentID>.<subject>
//
// The tournamentID is validated as a proper UUID before subject construction
// to prevent subject injection attacks.
func (p *Publisher) Publish(ctx context.Context, tournamentID, subject string, data []byte) (*jetstream.PubAck, error) {
if err := ValidateUUID(tournamentID); err != nil {
return nil, fmt.Errorf("publish: invalid tournament ID: %w", err)
}
if subject == "" {
return nil, fmt.Errorf("publish: empty subject")
}
fullSubject := fmt.Sprintf("tournament.%s.%s", tournamentID, subject)
ack, err := p.js.Publish(ctx, fullSubject, data)
if err != nil {
return nil, fmt.Errorf("publish to %s: %w", fullSubject, err)
}
return ack, nil
}
// PublishAudit is a convenience method for publishing audit events.
// Subject: tournament.<tournamentID>.audit
func (p *Publisher) PublishAudit(ctx context.Context, tournamentID string, data []byte) (*jetstream.PubAck, error) {
return p.Publish(ctx, tournamentID, "audit", data)
}
// PublishState is a convenience method for publishing state change events.
// Subject: tournament.<tournamentID>.state.<stateType>
func (p *Publisher) PublishState(ctx context.Context, tournamentID, stateType string, data []byte) (*jetstream.PubAck, error) {
return p.Publish(ctx, tournamentID, "state."+stateType, data)
}

View file

@ -1 +1,96 @@
// Package middleware provides HTTP middleware for the Felt tournament engine.
package middleware package middleware
import (
"context"
"net/http"
"strings"
"github.com/golang-jwt/jwt/v5"
)
// contextKey is a private type for context keys to avoid collisions.
type contextKey string
const (
// OperatorIDKey is the context key for the authenticated operator ID.
OperatorIDKey contextKey = "operator_id"
// OperatorRoleKey is the context key for the authenticated operator role.
OperatorRoleKey contextKey = "operator_role"
)
// JWTAuth returns middleware that validates JWT tokens from the Authorization header.
// Tokens must be in the format: Bearer <token>
func JWTAuth(signingKey []byte) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
authHeader := r.Header.Get("Authorization")
if authHeader == "" {
http.Error(w, `{"error":"missing authorization header"}`, http.StatusUnauthorized)
return
}
parts := strings.SplitN(authHeader, " ", 2)
if len(parts) != 2 || !strings.EqualFold(parts[0], "bearer") {
http.Error(w, `{"error":"invalid authorization format"}`, http.StatusUnauthorized)
return
}
tokenStr := parts[1]
operatorID, role, err := ValidateJWT(tokenStr, signingKey)
if err != nil {
http.Error(w, `{"error":"invalid or expired token"}`, http.StatusUnauthorized)
return
}
// Add operator info to request context
ctx := context.WithValue(r.Context(), OperatorIDKey, operatorID)
ctx = context.WithValue(ctx, OperatorRoleKey, role)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
}
// ValidateJWT parses and validates a JWT token string, returning the
// operator ID and role from claims.
func ValidateJWT(tokenStr string, signingKey []byte) (operatorID string, role string, err error) {
token, err := jwt.Parse(tokenStr, func(token *jwt.Token) (interface{}, error) {
// Verify signing method is HMAC
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, jwt.ErrSignatureInvalid
}
return signingKey, nil
})
if err != nil {
return "", "", err
}
claims, ok := token.Claims.(jwt.MapClaims)
if !ok || !token.Valid {
return "", "", jwt.ErrSignatureInvalid
}
operatorID, _ = claims["sub"].(string)
role, _ = claims["role"].(string)
if operatorID == "" {
return "", "", jwt.ErrSignatureInvalid
}
if role == "" {
role = "viewer" // Default role
}
return operatorID, role, nil
}
// OperatorID extracts the operator ID from the request context.
func OperatorID(r *http.Request) string {
id, _ := r.Context().Value(OperatorIDKey).(string)
return id
}
// OperatorRole extracts the operator role from the request context.
func OperatorRole(r *http.Request) string {
role, _ := r.Context().Value(OperatorRoleKey).(string)
return role
}

View file

@ -0,0 +1,18 @@
package middleware
import (
"net/http"
)
// MaxBytesReader wraps request bodies with a size limit to prevent abuse.
// Default limit is 1MB.
func MaxBytesReader(limit int64) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Body != nil {
r.Body = http.MaxBytesReader(w, r.Body, limit)
}
next.ServeHTTP(w, r)
})
}
}

View file

@ -1 +1,40 @@
package middleware package middleware
import (
"net/http"
)
// Role constants for operator access levels.
const (
RoleAdmin = "admin"
RoleFloor = "floor"
RoleViewer = "viewer"
)
// roleHierarchy defines the permission level for each role.
// Higher numbers have more permissions.
var roleHierarchy = map[string]int{
RoleViewer: 1,
RoleFloor: 2,
RoleAdmin: 3,
}
// RequireRole returns middleware that checks the operator has at least the
// given role level. Admin > Floor > Viewer.
func RequireRole(minRole string) func(http.Handler) http.Handler {
minLevel := roleHierarchy[minRole]
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
role := OperatorRole(r)
level := roleHierarchy[role]
if level < minLevel {
http.Error(w, `{"error":"insufficient permissions"}`, http.StatusForbidden)
return
}
next.ServeHTTP(w, r)
})
}
}

View file

@ -1 +1,207 @@
// Package server provides the HTTP server for the Felt tournament engine.
// It configures chi router with middleware, defines route groups, and serves
// the SvelteKit SPA via go:embed.
package server package server
import (
"context"
"database/sql"
"encoding/json"
"log"
"net/http"
"time"
"github.com/go-chi/chi/v5"
chimw "github.com/go-chi/chi/v5/middleware"
"github.com/felt-app/felt/frontend"
"github.com/felt-app/felt/internal/server/middleware"
"github.com/felt-app/felt/internal/server/ws"
natsserver "github.com/nats-io/nats-server/v2/server"
)
// Config holds server configuration.
type Config struct {
Addr string
SigningKey []byte
DevMode bool
}
// Server wraps the HTTP server with all dependencies.
type Server struct {
httpServer *http.Server
hub *ws.Hub
db *sql.DB
nats *natsserver.Server
}
// New creates a new HTTP server with all routes and middleware configured.
func New(cfg Config, db *sql.DB, nats *natsserver.Server, hub *ws.Hub) *Server {
r := chi.NewRouter()
// Global middleware
r.Use(chimw.RequestID)
r.Use(chimw.RealIP)
r.Use(chimw.Logger)
r.Use(chimw.Recoverer)
// Request body size limit: 1MB default
r.Use(middleware.MaxBytesReader(1 << 20)) // 1MB
// CORS: permissive for development
r.Use(corsMiddleware(cfg.DevMode))
s := &Server{
hub: hub,
db: db,
nats: nats,
}
// API routes
r.Route("/api/v1", func(r chi.Router) {
// Public endpoints (no auth required)
r.Get("/health", s.handleHealth)
// Protected endpoints
r.Group(func(r chi.Router) {
r.Use(middleware.JWTAuth(cfg.SigningKey))
// Stub endpoints — return 200 for now
r.Get("/tournaments", stubHandler("tournaments"))
r.Get("/players", stubHandler("players"))
})
})
// WebSocket endpoint
r.Get("/ws", hub.HandleConnect)
// SvelteKit SPA fallback (must be last)
r.Handle("/*", frontend.Handler())
s.httpServer = &http.Server{
Addr: cfg.Addr,
Handler: r,
ReadHeaderTimeout: 10 * time.Second,
ReadTimeout: 30 * time.Second,
WriteTimeout: 60 * time.Second,
IdleTimeout: 120 * time.Second,
MaxHeaderBytes: 1 << 20, // 1MB
}
return s
}
// Handler returns the HTTP handler for use in tests.
func (s *Server) Handler() http.Handler {
return s.httpServer.Handler
}
// ListenAndServe starts the HTTP server.
func (s *Server) ListenAndServe() error {
log.Printf("server: listening on %s", s.httpServer.Addr)
return s.httpServer.ListenAndServe()
}
// Shutdown gracefully shuts down the HTTP server.
func (s *Server) Shutdown(ctx context.Context) error {
log.Printf("server: shutting down")
return s.httpServer.Shutdown(ctx)
}
// handleHealth returns the health status of all subsystems.
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
status := map[string]interface{}{
"status": "ok",
"subsystems": map[string]interface{}{
"database": s.checkDatabase(),
"nats": s.checkNATS(),
"websocket": map[string]interface{}{
"status": "ok",
"clients": s.hub.ClientCount(),
},
},
"timestamp": time.Now().UTC().Format(time.RFC3339),
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(status)
}
// checkDatabase verifies the database is operational.
func (s *Server) checkDatabase() map[string]interface{} {
var result int
err := s.db.QueryRow("SELECT 1").Scan(&result)
if err != nil {
return map[string]interface{}{
"status": "error",
"error": err.Error(),
}
}
return map[string]interface{}{
"status": "ok",
}
}
// checkNATS verifies the NATS server is operational.
func (s *Server) checkNATS() map[string]interface{} {
if s.nats == nil {
return map[string]interface{}{
"status": "error",
"error": "not initialized",
}
}
varz, err := s.nats.Varz(nil)
if err != nil {
return map[string]interface{}{
"status": "error",
"error": err.Error(),
}
}
return map[string]interface{}{
"status": "ok",
"connections": varz.Connections,
"jetstream": s.nats.JetStreamEnabled(),
}
}
// stubHandler returns a handler that responds with 200 and a JSON stub.
func stubHandler(name string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"status": "stub",
"name": name,
"message": "Not yet implemented",
})
}
}
// corsMiddleware returns CORS middleware. In dev mode, it's permissive.
func corsMiddleware(devMode bool) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if devMode {
w.Header().Set("Access-Control-Allow-Origin", "*")
} else {
// In production, only allow same-origin
origin := r.Header.Get("Origin")
if origin != "" {
w.Header().Set("Access-Control-Allow-Origin", origin)
}
}
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS, PATCH")
w.Header().Set("Access-Control-Allow-Headers", "Accept, Authorization, Content-Type, X-Request-ID")
w.Header().Set("Access-Control-Allow-Credentials", "true")
w.Header().Set("Access-Control-Max-Age", "300")
if r.Method == "OPTIONS" {
w.WriteHeader(http.StatusOK)
return
}
next.ServeHTTP(w, r)
})
}
}

View file

@ -1 +1,248 @@
// Package ws implements a WebSocket hub for real-time tournament state
// broadcasting. Connections are authenticated via JWT (query parameter) and
// scoped to specific tournaments.
package ws package ws
import (
"context"
"encoding/json"
"log"
"net/http"
"sync"
"time"
"github.com/coder/websocket"
)
// Message represents a WebSocket message sent to clients.
type Message struct {
Type string `json:"type"`
TournamentID string `json:"tournament_id,omitempty"`
Data json.RawMessage `json:"data,omitempty"`
Timestamp int64 `json:"timestamp"`
}
// Client represents a connected WebSocket client.
type Client struct {
conn *websocket.Conn
tournamentID string // Subscription scope (empty = global messages only)
operatorID string // From JWT claims
role string // From JWT claims (admin, floor, viewer)
send chan []byte
hub *Hub
}
// TokenValidator is a function that validates a JWT token string and returns
// the operator ID and role. Injected by the server package.
type TokenValidator func(tokenStr string) (operatorID string, role string, err error)
// TournamentValidator is a function that checks if a tournament exists and
// the operator has access to it. Returns nil if valid.
type TournamentValidator func(tournamentID string, operatorID string) error
// Hub manages all active WebSocket connections and broadcasts messages
// to tournament-scoped client groups.
type Hub struct {
mu sync.RWMutex
clients map[*Client]struct{}
validateToken TokenValidator
validateTournament TournamentValidator
allowedOrigins []string
}
// NewHub creates a new WebSocket hub.
func NewHub(validateToken TokenValidator, validateTournament TournamentValidator, allowedOrigins []string) *Hub {
return &Hub{
clients: make(map[*Client]struct{}),
validateToken: validateToken,
validateTournament: validateTournament,
allowedOrigins: allowedOrigins,
}
}
// Register adds a client to the hub.
func (h *Hub) Register(client *Client) {
h.mu.Lock()
defer h.mu.Unlock()
h.clients[client] = struct{}{}
log.Printf("ws: client registered (operator=%s, tournament=%s, total=%d)",
client.operatorID, client.tournamentID, len(h.clients))
}
// Unregister removes a client from the hub and closes its send channel.
func (h *Hub) Unregister(client *Client) {
h.mu.Lock()
defer h.mu.Unlock()
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
log.Printf("ws: client unregistered (operator=%s, tournament=%s, total=%d)",
client.operatorID, client.tournamentID, len(h.clients))
}
}
// Broadcast sends a message to all clients subscribed to the given tournament.
// If tournamentID is empty, the message is sent to ALL connected clients.
func (h *Hub) Broadcast(tournamentID string, msgType string, data json.RawMessage) {
msg := Message{
Type: msgType,
TournamentID: tournamentID,
Data: data,
Timestamp: time.Now().UnixMilli(),
}
payload, err := json.Marshal(msg)
if err != nil {
log.Printf("ws: broadcast marshal error: %v", err)
return
}
h.mu.RLock()
defer h.mu.RUnlock()
sent := 0
for client := range h.clients {
if tournamentID == "" || client.tournamentID == tournamentID {
// Non-blocking send: drop messages for slow consumers
select {
case client.send <- payload:
sent++
default:
log.Printf("ws: dropping message for slow client (operator=%s)", client.operatorID)
}
}
}
if sent > 0 {
log.Printf("ws: broadcast %s to %d client(s) (tournament=%s)", msgType, sent, tournamentID)
}
}
// ClientCount returns the total number of connected clients.
func (h *Hub) ClientCount() int {
h.mu.RLock()
defer h.mu.RUnlock()
return len(h.clients)
}
// HandleConnect handles WebSocket upgrade requests.
// Authentication: JWT must be provided as ?token= query parameter.
// Tournament scope: ?tournament= query parameter (optional).
func (h *Hub) HandleConnect(w http.ResponseWriter, r *http.Request) {
// Extract and validate JWT from query parameter
tokenStr := r.URL.Query().Get("token")
if tokenStr == "" {
http.Error(w, `{"error":"missing token parameter"}`, http.StatusUnauthorized)
return
}
operatorID, role, err := h.validateToken(tokenStr)
if err != nil {
http.Error(w, `{"error":"invalid or expired token"}`, http.StatusUnauthorized)
return
}
// Validate tournament scope if provided
tournamentID := r.URL.Query().Get("tournament")
if tournamentID != "" {
if h.validateTournament != nil {
if err := h.validateTournament(tournamentID, operatorID); err != nil {
http.Error(w, `{"error":"tournament not found or access denied"}`, http.StatusForbidden)
return
}
}
}
// Build accept options with Origin validation
acceptOpts := &websocket.AcceptOptions{
OriginPatterns: h.allowedOrigins,
}
if len(h.allowedOrigins) == 0 {
// In dev mode, allow all origins
acceptOpts.InsecureSkipVerify = true
}
// Accept the WebSocket connection
conn, err := websocket.Accept(w, r, acceptOpts)
if err != nil {
log.Printf("ws: accept error: %v", err)
return
}
client := &Client{
conn: conn,
tournamentID: tournamentID,
operatorID: operatorID,
role: role,
send: make(chan []byte, 256),
hub: h,
}
h.Register(client)
// Send initial connected message
connectMsg := Message{
Type: "connected",
TournamentID: tournamentID,
Data: json.RawMessage(`{"status":"ok"}`),
Timestamp: time.Now().UnixMilli(),
}
payload, _ := json.Marshal(connectMsg)
client.send <- payload
// Start read and write pumps
ctx := r.Context()
go client.writePump(ctx)
client.readPump(ctx) // Blocks until disconnect
// Cleanup
h.Unregister(client)
conn.CloseNow()
}
// readPump reads messages from the WebSocket connection.
// Currently discards incoming messages (one-way broadcast).
func (c *Client) readPump(ctx context.Context) {
for {
_, _, err := c.conn.Read(ctx)
if err != nil {
// Connection closed or error
return
}
// Currently no client-to-server messages are processed.
// Future: handle subscription changes, pings, etc.
}
}
// writePump writes messages from the send channel to the WebSocket connection.
func (c *Client) writePump(ctx context.Context) {
for {
select {
case msg, ok := <-c.send:
if !ok {
// Channel closed, hub unregistered us
c.conn.Close(websocket.StatusNormalClosure, "closing")
return
}
if err := c.conn.Write(ctx, websocket.MessageText, msg); err != nil {
return
}
case <-ctx.Done():
return
}
}
}
// Shutdown closes all client connections gracefully.
func (h *Hub) Shutdown() {
h.mu.Lock()
defer h.mu.Unlock()
for client := range h.clients {
client.conn.Close(websocket.StatusGoingAway, "server shutting down")
delete(h.clients, client)
close(client.send)
}
log.Printf("ws: hub shutdown complete")
}