felt/internal/server/ws/hub.go
Mikkel Georgsen 16caa12d64 feat(01-01): implement core infrastructure — NATS, LibSQL, WebSocket hub, HTTP server
- Embedded NATS server with JetStream (sync_interval=always per Jepsen 2025)
- AUDIT and STATE JetStream streams for tournament event durability
- NATS publisher with UUID validation to prevent subject injection
- WebSocket hub with JWT auth (query param), tournament-scoped broadcasting
- Origin validation and slow-consumer message dropping
- chi HTTP router with middleware (logger, recoverer, request ID, CORS, body limits)
- Server timeouts: ReadHeader 10s, Read 30s, Write 60s, Idle 120s, MaxHeader 1MB
- MaxBytesReader middleware for request body limits (1MB default)
- JWT auth middleware with HMAC-SHA256 validation
- Role-based access control (admin > floor > viewer)
- Health endpoint reporting all subsystem status (DB, NATS, WebSocket)
- SvelteKit SPA served via go:embed with fallback routing
- Signal-driven graceful shutdown in reverse startup order
- 9 integration tests covering all verification criteria

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-01 03:42:42 +01:00

248 lines
6.8 KiB
Go

// Package ws implements a WebSocket hub for real-time tournament state
// broadcasting. Connections are authenticated via JWT (query parameter) and
// scoped to specific tournaments.
package ws
import (
"context"
"encoding/json"
"log"
"net/http"
"sync"
"time"
"github.com/coder/websocket"
)
// Message represents a WebSocket message sent to clients.
type Message struct {
Type string `json:"type"`
TournamentID string `json:"tournament_id,omitempty"`
Data json.RawMessage `json:"data,omitempty"`
Timestamp int64 `json:"timestamp"`
}
// Client represents a connected WebSocket client.
type Client struct {
conn *websocket.Conn
tournamentID string // Subscription scope (empty = global messages only)
operatorID string // From JWT claims
role string // From JWT claims (admin, floor, viewer)
send chan []byte
hub *Hub
}
// TokenValidator is a function that validates a JWT token string and returns
// the operator ID and role. Injected by the server package.
type TokenValidator func(tokenStr string) (operatorID string, role string, err error)
// TournamentValidator is a function that checks if a tournament exists and
// the operator has access to it. Returns nil if valid.
type TournamentValidator func(tournamentID string, operatorID string) error
// Hub manages all active WebSocket connections and broadcasts messages
// to tournament-scoped client groups.
type Hub struct {
mu sync.RWMutex
clients map[*Client]struct{}
validateToken TokenValidator
validateTournament TournamentValidator
allowedOrigins []string
}
// NewHub creates a new WebSocket hub.
func NewHub(validateToken TokenValidator, validateTournament TournamentValidator, allowedOrigins []string) *Hub {
return &Hub{
clients: make(map[*Client]struct{}),
validateToken: validateToken,
validateTournament: validateTournament,
allowedOrigins: allowedOrigins,
}
}
// Register adds a client to the hub.
func (h *Hub) Register(client *Client) {
h.mu.Lock()
defer h.mu.Unlock()
h.clients[client] = struct{}{}
log.Printf("ws: client registered (operator=%s, tournament=%s, total=%d)",
client.operatorID, client.tournamentID, len(h.clients))
}
// Unregister removes a client from the hub and closes its send channel.
func (h *Hub) Unregister(client *Client) {
h.mu.Lock()
defer h.mu.Unlock()
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
log.Printf("ws: client unregistered (operator=%s, tournament=%s, total=%d)",
client.operatorID, client.tournamentID, len(h.clients))
}
}
// Broadcast sends a message to all clients subscribed to the given tournament.
// If tournamentID is empty, the message is sent to ALL connected clients.
func (h *Hub) Broadcast(tournamentID string, msgType string, data json.RawMessage) {
msg := Message{
Type: msgType,
TournamentID: tournamentID,
Data: data,
Timestamp: time.Now().UnixMilli(),
}
payload, err := json.Marshal(msg)
if err != nil {
log.Printf("ws: broadcast marshal error: %v", err)
return
}
h.mu.RLock()
defer h.mu.RUnlock()
sent := 0
for client := range h.clients {
if tournamentID == "" || client.tournamentID == tournamentID {
// Non-blocking send: drop messages for slow consumers
select {
case client.send <- payload:
sent++
default:
log.Printf("ws: dropping message for slow client (operator=%s)", client.operatorID)
}
}
}
if sent > 0 {
log.Printf("ws: broadcast %s to %d client(s) (tournament=%s)", msgType, sent, tournamentID)
}
}
// ClientCount returns the total number of connected clients.
func (h *Hub) ClientCount() int {
h.mu.RLock()
defer h.mu.RUnlock()
return len(h.clients)
}
// HandleConnect handles WebSocket upgrade requests.
// Authentication: JWT must be provided as ?token= query parameter.
// Tournament scope: ?tournament= query parameter (optional).
func (h *Hub) HandleConnect(w http.ResponseWriter, r *http.Request) {
// Extract and validate JWT from query parameter
tokenStr := r.URL.Query().Get("token")
if tokenStr == "" {
http.Error(w, `{"error":"missing token parameter"}`, http.StatusUnauthorized)
return
}
operatorID, role, err := h.validateToken(tokenStr)
if err != nil {
http.Error(w, `{"error":"invalid or expired token"}`, http.StatusUnauthorized)
return
}
// Validate tournament scope if provided
tournamentID := r.URL.Query().Get("tournament")
if tournamentID != "" {
if h.validateTournament != nil {
if err := h.validateTournament(tournamentID, operatorID); err != nil {
http.Error(w, `{"error":"tournament not found or access denied"}`, http.StatusForbidden)
return
}
}
}
// Build accept options with Origin validation
acceptOpts := &websocket.AcceptOptions{
OriginPatterns: h.allowedOrigins,
}
if len(h.allowedOrigins) == 0 {
// In dev mode, allow all origins
acceptOpts.InsecureSkipVerify = true
}
// Accept the WebSocket connection
conn, err := websocket.Accept(w, r, acceptOpts)
if err != nil {
log.Printf("ws: accept error: %v", err)
return
}
client := &Client{
conn: conn,
tournamentID: tournamentID,
operatorID: operatorID,
role: role,
send: make(chan []byte, 256),
hub: h,
}
h.Register(client)
// Send initial connected message
connectMsg := Message{
Type: "connected",
TournamentID: tournamentID,
Data: json.RawMessage(`{"status":"ok"}`),
Timestamp: time.Now().UnixMilli(),
}
payload, _ := json.Marshal(connectMsg)
client.send <- payload
// Start read and write pumps
ctx := r.Context()
go client.writePump(ctx)
client.readPump(ctx) // Blocks until disconnect
// Cleanup
h.Unregister(client)
conn.CloseNow()
}
// readPump reads messages from the WebSocket connection.
// Currently discards incoming messages (one-way broadcast).
func (c *Client) readPump(ctx context.Context) {
for {
_, _, err := c.conn.Read(ctx)
if err != nil {
// Connection closed or error
return
}
// Currently no client-to-server messages are processed.
// Future: handle subscription changes, pings, etc.
}
}
// writePump writes messages from the send channel to the WebSocket connection.
func (c *Client) writePump(ctx context.Context) {
for {
select {
case msg, ok := <-c.send:
if !ok {
// Channel closed, hub unregistered us
c.conn.Close(websocket.StatusNormalClosure, "closing")
return
}
if err := c.conn.Write(ctx, websocket.MessageText, msg); err != nil {
return
}
case <-ctx.Done():
return
}
}
}
// Shutdown closes all client connections gracefully.
func (h *Hub) Shutdown() {
h.mu.Lock()
defer h.mu.Unlock()
for client := range h.clients {
client.conn.Close(websocket.StatusGoingAway, "server shutting down")
delete(h.clients, client)
close(client.send)
}
log.Printf("ws: hub shutdown complete")
}