- Embedded NATS server with JetStream (sync_interval=always per Jepsen 2025) - AUDIT and STATE JetStream streams for tournament event durability - NATS publisher with UUID validation to prevent subject injection - WebSocket hub with JWT auth (query param), tournament-scoped broadcasting - Origin validation and slow-consumer message dropping - chi HTTP router with middleware (logger, recoverer, request ID, CORS, body limits) - Server timeouts: ReadHeader 10s, Read 30s, Write 60s, Idle 120s, MaxHeader 1MB - MaxBytesReader middleware for request body limits (1MB default) - JWT auth middleware with HMAC-SHA256 validation - Role-based access control (admin > floor > viewer) - Health endpoint reporting all subsystem status (DB, NATS, WebSocket) - SvelteKit SPA served via go:embed with fallback routing - Signal-driven graceful shutdown in reverse startup order - 9 integration tests covering all verification criteria Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
248 lines
6.8 KiB
Go
248 lines
6.8 KiB
Go
// Package ws implements a WebSocket hub for real-time tournament state
|
|
// broadcasting. Connections are authenticated via JWT (query parameter) and
|
|
// scoped to specific tournaments.
|
|
package ws
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"log"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/coder/websocket"
|
|
)
|
|
|
|
// Message represents a WebSocket message sent to clients.
|
|
type Message struct {
|
|
Type string `json:"type"`
|
|
TournamentID string `json:"tournament_id,omitempty"`
|
|
Data json.RawMessage `json:"data,omitempty"`
|
|
Timestamp int64 `json:"timestamp"`
|
|
}
|
|
|
|
// Client represents a connected WebSocket client.
|
|
type Client struct {
|
|
conn *websocket.Conn
|
|
tournamentID string // Subscription scope (empty = global messages only)
|
|
operatorID string // From JWT claims
|
|
role string // From JWT claims (admin, floor, viewer)
|
|
send chan []byte
|
|
hub *Hub
|
|
}
|
|
|
|
// TokenValidator is a function that validates a JWT token string and returns
|
|
// the operator ID and role. Injected by the server package.
|
|
type TokenValidator func(tokenStr string) (operatorID string, role string, err error)
|
|
|
|
// TournamentValidator is a function that checks if a tournament exists and
|
|
// the operator has access to it. Returns nil if valid.
|
|
type TournamentValidator func(tournamentID string, operatorID string) error
|
|
|
|
// Hub manages all active WebSocket connections and broadcasts messages
|
|
// to tournament-scoped client groups.
|
|
type Hub struct {
|
|
mu sync.RWMutex
|
|
clients map[*Client]struct{}
|
|
validateToken TokenValidator
|
|
validateTournament TournamentValidator
|
|
allowedOrigins []string
|
|
}
|
|
|
|
// NewHub creates a new WebSocket hub.
|
|
func NewHub(validateToken TokenValidator, validateTournament TournamentValidator, allowedOrigins []string) *Hub {
|
|
return &Hub{
|
|
clients: make(map[*Client]struct{}),
|
|
validateToken: validateToken,
|
|
validateTournament: validateTournament,
|
|
allowedOrigins: allowedOrigins,
|
|
}
|
|
}
|
|
|
|
// Register adds a client to the hub.
|
|
func (h *Hub) Register(client *Client) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
h.clients[client] = struct{}{}
|
|
log.Printf("ws: client registered (operator=%s, tournament=%s, total=%d)",
|
|
client.operatorID, client.tournamentID, len(h.clients))
|
|
}
|
|
|
|
// Unregister removes a client from the hub and closes its send channel.
|
|
func (h *Hub) Unregister(client *Client) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
if _, ok := h.clients[client]; ok {
|
|
delete(h.clients, client)
|
|
close(client.send)
|
|
log.Printf("ws: client unregistered (operator=%s, tournament=%s, total=%d)",
|
|
client.operatorID, client.tournamentID, len(h.clients))
|
|
}
|
|
}
|
|
|
|
// Broadcast sends a message to all clients subscribed to the given tournament.
|
|
// If tournamentID is empty, the message is sent to ALL connected clients.
|
|
func (h *Hub) Broadcast(tournamentID string, msgType string, data json.RawMessage) {
|
|
msg := Message{
|
|
Type: msgType,
|
|
TournamentID: tournamentID,
|
|
Data: data,
|
|
Timestamp: time.Now().UnixMilli(),
|
|
}
|
|
|
|
payload, err := json.Marshal(msg)
|
|
if err != nil {
|
|
log.Printf("ws: broadcast marshal error: %v", err)
|
|
return
|
|
}
|
|
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
|
|
sent := 0
|
|
for client := range h.clients {
|
|
if tournamentID == "" || client.tournamentID == tournamentID {
|
|
// Non-blocking send: drop messages for slow consumers
|
|
select {
|
|
case client.send <- payload:
|
|
sent++
|
|
default:
|
|
log.Printf("ws: dropping message for slow client (operator=%s)", client.operatorID)
|
|
}
|
|
}
|
|
}
|
|
|
|
if sent > 0 {
|
|
log.Printf("ws: broadcast %s to %d client(s) (tournament=%s)", msgType, sent, tournamentID)
|
|
}
|
|
}
|
|
|
|
// ClientCount returns the total number of connected clients.
|
|
func (h *Hub) ClientCount() int {
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
return len(h.clients)
|
|
}
|
|
|
|
// HandleConnect handles WebSocket upgrade requests.
|
|
// Authentication: JWT must be provided as ?token= query parameter.
|
|
// Tournament scope: ?tournament= query parameter (optional).
|
|
func (h *Hub) HandleConnect(w http.ResponseWriter, r *http.Request) {
|
|
// Extract and validate JWT from query parameter
|
|
tokenStr := r.URL.Query().Get("token")
|
|
if tokenStr == "" {
|
|
http.Error(w, `{"error":"missing token parameter"}`, http.StatusUnauthorized)
|
|
return
|
|
}
|
|
|
|
operatorID, role, err := h.validateToken(tokenStr)
|
|
if err != nil {
|
|
http.Error(w, `{"error":"invalid or expired token"}`, http.StatusUnauthorized)
|
|
return
|
|
}
|
|
|
|
// Validate tournament scope if provided
|
|
tournamentID := r.URL.Query().Get("tournament")
|
|
if tournamentID != "" {
|
|
if h.validateTournament != nil {
|
|
if err := h.validateTournament(tournamentID, operatorID); err != nil {
|
|
http.Error(w, `{"error":"tournament not found or access denied"}`, http.StatusForbidden)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Build accept options with Origin validation
|
|
acceptOpts := &websocket.AcceptOptions{
|
|
OriginPatterns: h.allowedOrigins,
|
|
}
|
|
if len(h.allowedOrigins) == 0 {
|
|
// In dev mode, allow all origins
|
|
acceptOpts.InsecureSkipVerify = true
|
|
}
|
|
|
|
// Accept the WebSocket connection
|
|
conn, err := websocket.Accept(w, r, acceptOpts)
|
|
if err != nil {
|
|
log.Printf("ws: accept error: %v", err)
|
|
return
|
|
}
|
|
|
|
client := &Client{
|
|
conn: conn,
|
|
tournamentID: tournamentID,
|
|
operatorID: operatorID,
|
|
role: role,
|
|
send: make(chan []byte, 256),
|
|
hub: h,
|
|
}
|
|
|
|
h.Register(client)
|
|
|
|
// Send initial connected message
|
|
connectMsg := Message{
|
|
Type: "connected",
|
|
TournamentID: tournamentID,
|
|
Data: json.RawMessage(`{"status":"ok"}`),
|
|
Timestamp: time.Now().UnixMilli(),
|
|
}
|
|
payload, _ := json.Marshal(connectMsg)
|
|
client.send <- payload
|
|
|
|
// Start read and write pumps
|
|
ctx := r.Context()
|
|
go client.writePump(ctx)
|
|
client.readPump(ctx) // Blocks until disconnect
|
|
|
|
// Cleanup
|
|
h.Unregister(client)
|
|
conn.CloseNow()
|
|
}
|
|
|
|
// readPump reads messages from the WebSocket connection.
|
|
// Currently discards incoming messages (one-way broadcast).
|
|
func (c *Client) readPump(ctx context.Context) {
|
|
for {
|
|
_, _, err := c.conn.Read(ctx)
|
|
if err != nil {
|
|
// Connection closed or error
|
|
return
|
|
}
|
|
// Currently no client-to-server messages are processed.
|
|
// Future: handle subscription changes, pings, etc.
|
|
}
|
|
}
|
|
|
|
// writePump writes messages from the send channel to the WebSocket connection.
|
|
func (c *Client) writePump(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case msg, ok := <-c.send:
|
|
if !ok {
|
|
// Channel closed, hub unregistered us
|
|
c.conn.Close(websocket.StatusNormalClosure, "closing")
|
|
return
|
|
}
|
|
if err := c.conn.Write(ctx, websocket.MessageText, msg); err != nil {
|
|
return
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Shutdown closes all client connections gracefully.
|
|
func (h *Hub) Shutdown() {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
|
|
for client := range h.clients {
|
|
client.conn.Close(websocket.StatusGoingAway, "server shutting down")
|
|
delete(h.clients, client)
|
|
close(client.send)
|
|
}
|
|
|
|
log.Printf("ws: hub shutdown complete")
|
|
}
|