// Package ws implements a WebSocket hub for real-time tournament state // broadcasting. Connections are authenticated via JWT (query parameter) and // scoped to specific tournaments. package ws import ( "context" "encoding/json" "log" "net/http" "sync" "time" "github.com/coder/websocket" ) // Message represents a WebSocket message sent to clients. type Message struct { Type string `json:"type"` TournamentID string `json:"tournament_id,omitempty"` Data json.RawMessage `json:"data,omitempty"` Timestamp int64 `json:"timestamp"` } // Client represents a connected WebSocket client. type Client struct { conn *websocket.Conn tournamentID string // Subscription scope (empty = global messages only) operatorID string // From JWT claims role string // From JWT claims (admin, floor, viewer) send chan []byte hub *Hub } // TokenValidator is a function that validates a JWT token string and returns // the operator ID and role. Injected by the server package. type TokenValidator func(tokenStr string) (operatorID string, role string, err error) // TournamentValidator is a function that checks if a tournament exists and // the operator has access to it. Returns nil if valid. type TournamentValidator func(tournamentID string, operatorID string) error // Hub manages all active WebSocket connections and broadcasts messages // to tournament-scoped client groups. type Hub struct { mu sync.RWMutex clients map[*Client]struct{} validateToken TokenValidator validateTournament TournamentValidator allowedOrigins []string } // NewHub creates a new WebSocket hub. func NewHub(validateToken TokenValidator, validateTournament TournamentValidator, allowedOrigins []string) *Hub { return &Hub{ clients: make(map[*Client]struct{}), validateToken: validateToken, validateTournament: validateTournament, allowedOrigins: allowedOrigins, } } // Register adds a client to the hub. func (h *Hub) Register(client *Client) { h.mu.Lock() defer h.mu.Unlock() h.clients[client] = struct{}{} log.Printf("ws: client registered (operator=%s, tournament=%s, total=%d)", client.operatorID, client.tournamentID, len(h.clients)) } // Unregister removes a client from the hub and closes its send channel. func (h *Hub) Unregister(client *Client) { h.mu.Lock() defer h.mu.Unlock() if _, ok := h.clients[client]; ok { delete(h.clients, client) close(client.send) log.Printf("ws: client unregistered (operator=%s, tournament=%s, total=%d)", client.operatorID, client.tournamentID, len(h.clients)) } } // Broadcast sends a message to all clients subscribed to the given tournament. // If tournamentID is empty, the message is sent to ALL connected clients. func (h *Hub) Broadcast(tournamentID string, msgType string, data json.RawMessage) { msg := Message{ Type: msgType, TournamentID: tournamentID, Data: data, Timestamp: time.Now().UnixMilli(), } payload, err := json.Marshal(msg) if err != nil { log.Printf("ws: broadcast marshal error: %v", err) return } h.mu.RLock() defer h.mu.RUnlock() sent := 0 for client := range h.clients { if tournamentID == "" || client.tournamentID == tournamentID { // Non-blocking send: drop messages for slow consumers select { case client.send <- payload: sent++ default: log.Printf("ws: dropping message for slow client (operator=%s)", client.operatorID) } } } if sent > 0 { log.Printf("ws: broadcast %s to %d client(s) (tournament=%s)", msgType, sent, tournamentID) } } // ClientCount returns the total number of connected clients. func (h *Hub) ClientCount() int { h.mu.RLock() defer h.mu.RUnlock() return len(h.clients) } // HandleConnect handles WebSocket upgrade requests. // Authentication: JWT must be provided as ?token= query parameter. // Tournament scope: ?tournament= query parameter (optional). func (h *Hub) HandleConnect(w http.ResponseWriter, r *http.Request) { // Extract and validate JWT from query parameter tokenStr := r.URL.Query().Get("token") if tokenStr == "" { http.Error(w, `{"error":"missing token parameter"}`, http.StatusUnauthorized) return } operatorID, role, err := h.validateToken(tokenStr) if err != nil { http.Error(w, `{"error":"invalid or expired token"}`, http.StatusUnauthorized) return } // Validate tournament scope if provided tournamentID := r.URL.Query().Get("tournament") if tournamentID != "" { if h.validateTournament != nil { if err := h.validateTournament(tournamentID, operatorID); err != nil { http.Error(w, `{"error":"tournament not found or access denied"}`, http.StatusForbidden) return } } } // Build accept options with Origin validation acceptOpts := &websocket.AcceptOptions{ OriginPatterns: h.allowedOrigins, } if len(h.allowedOrigins) == 0 { // In dev mode, allow all origins acceptOpts.InsecureSkipVerify = true } // Accept the WebSocket connection conn, err := websocket.Accept(w, r, acceptOpts) if err != nil { log.Printf("ws: accept error: %v", err) return } client := &Client{ conn: conn, tournamentID: tournamentID, operatorID: operatorID, role: role, send: make(chan []byte, 256), hub: h, } h.Register(client) // Send initial connected message connectMsg := Message{ Type: "connected", TournamentID: tournamentID, Data: json.RawMessage(`{"status":"ok"}`), Timestamp: time.Now().UnixMilli(), } payload, _ := json.Marshal(connectMsg) client.send <- payload // Start read and write pumps ctx := r.Context() go client.writePump(ctx) client.readPump(ctx) // Blocks until disconnect // Cleanup h.Unregister(client) conn.CloseNow() } // readPump reads messages from the WebSocket connection. // Currently discards incoming messages (one-way broadcast). func (c *Client) readPump(ctx context.Context) { for { _, _, err := c.conn.Read(ctx) if err != nil { // Connection closed or error return } // Currently no client-to-server messages are processed. // Future: handle subscription changes, pings, etc. } } // writePump writes messages from the send channel to the WebSocket connection. func (c *Client) writePump(ctx context.Context) { for { select { case msg, ok := <-c.send: if !ok { // Channel closed, hub unregistered us c.conn.Close(websocket.StatusNormalClosure, "closing") return } if err := c.conn.Write(ctx, websocket.MessageText, msg); err != nil { return } case <-ctx.Done(): return } } } // Shutdown closes all client connections gracefully. func (h *Hub) Shutdown() { h.mu.Lock() defer h.mu.Unlock() for client := range h.clients { client.conn.Close(websocket.StatusGoingAway, "server shutting down") delete(h.clients, client) close(client.send) } log.Printf("ws: hub shutdown complete") }