feat(06-lab-advisor-02): AdvisorHandler SSE streaming + router wiring

- internal/advisor/handler.go: StreamChat (SSE, token-by-token),
  GetConversations, GetConversation; body limited to 64KB, message
  truncated to 8000 chars (T-06-02-03); API key never echoed (T-06-02-02)
- internal/api/router.go: /api/advisor/{chat,conversations,conversations/{id}}
  with nil-guard returning 503 when DB not configured
- internal/config/config.go: Tier3 defaults + HWLAB_AI_TIER3_* env bindings
- cmd/hwlab/main.go: store init from HWLAB_DATABASE_URL, RunMigrations,
  InventoryContextBuilder, AdvisorHandler wired into NewRouter
This commit is contained in:
Mikkel Georgsen 2026-04-10 07:36:16 +00:00
parent 7b02e67365
commit 0190e8583c
4 changed files with 274 additions and 1 deletions

View file

@ -6,11 +6,13 @@ import (
"io/fs"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
hwlab "git.georgsen.dk/hwlab"
"git.georgsen.dk/hwlab/internal/advisor"
"git.georgsen.dk/hwlab/internal/ai"
"git.georgsen.dk/hwlab/internal/api"
"git.georgsen.dk/hwlab/internal/api/handlers"
@ -19,6 +21,7 @@ import (
"git.georgsen.dk/hwlab/internal/netbox"
"git.georgsen.dk/hwlab/internal/printer"
"git.georgsen.dk/hwlab/internal/queue"
"git.georgsen.dk/hwlab/internal/store"
"git.georgsen.dk/hwlab/internal/usb"
)
@ -98,6 +101,26 @@ func main() {
usbEventsHandler := handlers.NewUSBEventsHandler(usbManager)
testHandler := handlers.NewTestHandler(nbClient, mockDriver)
// Store + advisor — non-fatal if DB unavailable (advisor degrades gracefully).
var advisorHandler *advisor.AdvisorHandler
dbDSN := os.Getenv("HWLAB_DATABASE_URL")
if dbDSN != "" {
s, storeErr := store.NewStore(ctx, dbDSN)
if storeErr != nil {
log.Printf("WARNING: store unavailable (%v) — advisor endpoints will be disabled", storeErr)
} else {
defer s.Close()
if migErr := store.RunMigrations(ctx, s.Pool()); migErr != nil {
log.Printf("WARNING: store migrations failed (%v) — advisor endpoints may misbehave", migErr)
}
ctxBuilder := advisor.NewInventoryContextBuilder(nbClient)
advisorHandler = advisor.NewAdvisorHandler(s, ctxBuilder, cfg.AI)
log.Printf("Advisor handler ready")
}
} else {
log.Printf("HWLAB_DATABASE_URL not set — advisor endpoints disabled")
}
// Wire USB Manager events to cable tester driver when a RoleCableTester device connects.
// Currently a no-op stub — wires the plumbing for Phase 5 hardware integration.
go func() {
@ -111,7 +134,7 @@ func main() {
}
}()
router := api.NewRouter(staticFS, intakeHandler, inventoryHandler, labelHandler, usbEventsHandler, testHandler)
router := api.NewRouter(staticFS, intakeHandler, inventoryHandler, labelHandler, usbEventsHandler, testHandler, advisorHandler)
addr := fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)
log.Printf("HWLab starting on %s", addr)

225
internal/advisor/handler.go Normal file
View file

@ -0,0 +1,225 @@
package advisor
import (
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"strings"
"github.com/go-chi/chi/v5"
openai "github.com/sashabaranov/go-openai"
"git.georgsen.dk/hwlab/internal/ai"
"git.georgsen.dk/hwlab/internal/store"
)
const (
defaultModel = "anthropic/claude-opus-4"
maxMessageBytes = 64 * 1024 // 64 KB body limit (T-06-02-03)
maxMessageChars = 8000 // truncate message content to 8000 chars (T-06-02-03)
)
// ChatRequest is the JSON body for POST /api/advisor/chat.
type ChatRequest struct {
ConversationID string `json:"conversation_id"` // empty = new conversation
Message string `json:"message"`
Model string `json:"model"` // empty = defaultModel
}
// AdvisorHandler implements the three advisor endpoints.
type AdvisorHandler struct {
store *store.Store
ctx *InventoryContextBuilder
aiCfg ai.AIConfig
}
// NewAdvisorHandler creates an AdvisorHandler.
func NewAdvisorHandler(s *store.Store, ctxBuilder *InventoryContextBuilder, aiCfg ai.AIConfig) *AdvisorHandler {
return &AdvisorHandler{
store: s,
ctx: ctxBuilder,
aiCfg: aiCfg,
}
}
// StreamChat handles POST /api/advisor/chat.
// It streams tokens from the configured AI model back to the client via SSE.
// Each token is sent as: data: {"conversation_id":"...","token":"..."}\n\n
// The final event is: data: [DONE]\n\n
//
// Security:
// - Body is limited to 64 KB (T-06-02-03)
// - Message content is truncated to 8000 chars (T-06-02-03)
// - API key is never written to the SSE stream (T-06-02-02)
func (h *AdvisorHandler) StreamChat(w http.ResponseWriter, r *http.Request) {
// Enforce body size limit (T-06-02-03).
r.Body = http.MaxBytesReader(w, r.Body, maxMessageBytes)
var req ChatRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid request body", http.StatusBadRequest)
return
}
if req.Message == "" {
http.Error(w, "message is required", http.StatusBadRequest)
return
}
// Truncate message to guard against overly long inputs (T-06-02-03).
if len(req.Message) > maxMessageChars {
req.Message = req.Message[:maxMessageChars]
}
model := req.Model
if model == "" {
model = defaultModel
}
ctx := r.Context()
// Create or reuse conversation.
convID := req.ConversationID
if convID == "" {
id, err := h.store.CreateConversation(ctx, model)
if err != nil {
log.Printf("advisor: create conversation: %v", err)
http.Error(w, "internal error", http.StatusInternalServerError)
return
}
convID = id
}
// Persist user message.
if _, err := h.store.AddMessage(ctx, convID, "user", req.Message); err != nil {
log.Printf("advisor: add user message: %v", err)
http.Error(w, "internal error", http.StatusInternalServerError)
return
}
// Build inventory context for system prompt.
invCtx, err := h.ctx.BuildContext(ctx)
if err != nil {
log.Printf("advisor: build inventory context: %v (proceeding without inventory)", err)
invCtx = "Inventory: unavailable\n"
}
systemPrompt := "You are a homelab advisor. Here is the current inventory:\n\n" + invCtx
messages := []openai.ChatCompletionMessage{
{Role: openai.ChatMessageRoleSystem, Content: systemPrompt},
{Role: openai.ChatMessageRoleUser, Content: req.Message},
}
// Build an OpenAI-compatible client pointed at Tier3 (OpenRouter).
// API key is read from config only — never from the request (T-06-02-02).
oCfg := openai.DefaultConfig(h.aiCfg.Tier3.APIKey)
oCfg.BaseURL = h.aiCfg.Tier3.BaseURL
client := openai.NewClientWithConfig(oCfg)
stream, err := client.CreateChatCompletionStream(ctx, openai.ChatCompletionRequest{
Model: model,
Messages: messages,
Stream: true,
})
if err != nil {
log.Printf("advisor: create stream: %v", err)
http.Error(w, "upstream AI error", http.StatusBadGateway)
return
}
defer stream.Close()
// Set SSE headers.
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming not supported", http.StatusInternalServerError)
return
}
type tokenEvent struct {
ConversationID string `json:"conversation_id"`
Token string `json:"token,omitempty"`
}
type errorEvent struct {
Error string `json:"error"`
}
var fullContent strings.Builder
for {
resp, err := stream.Recv()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
log.Printf("advisor: stream recv: %v", err)
errPayload, _ := json.Marshal(errorEvent{Error: "stream interrupted"})
fmt.Fprintf(w, "data: %s\n\n", errPayload)
flusher.Flush()
return
}
if len(resp.Choices) == 0 {
continue
}
token := resp.Choices[0].Delta.Content
if token == "" {
continue
}
fullContent.WriteString(token)
payload, _ := json.Marshal(tokenEvent{ConversationID: convID, Token: token})
fmt.Fprintf(w, "data: %s\n\n", payload)
flusher.Flush()
}
// Send DONE sentinel.
fmt.Fprintf(w, "data: [DONE]\n\n")
flusher.Flush()
// Persist assistant response.
if _, err := h.store.AddMessage(ctx, convID, "assistant", fullContent.String()); err != nil {
log.Printf("advisor: add assistant message: %v", err)
}
}
// GetConversations handles GET /api/advisor/conversations.
func (h *AdvisorHandler) GetConversations(w http.ResponseWriter, r *http.Request) {
list, err := h.store.ListConversations(r.Context())
if err != nil {
log.Printf("advisor: list conversations: %v", err)
http.Error(w, "internal error", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(list); err != nil {
log.Printf("advisor: encode conversations: %v", err)
}
}
// GetConversation handles GET /api/advisor/conversations/{id}.
func (h *AdvisorHandler) GetConversation(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
conv, err := h.store.GetConversation(r.Context(), id)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
http.Error(w, "conversation not found", http.StatusNotFound)
return
}
log.Printf("advisor: get conversation %s: %v", id, err)
http.Error(w, "internal error", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(conv); err != nil {
log.Printf("advisor: encode conversation: %v", err)
}
}

View file

@ -7,6 +7,7 @@ import (
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"git.georgsen.dk/hwlab/internal/advisor"
"git.georgsen.dk/hwlab/internal/api/handlers"
)
@ -37,6 +38,7 @@ func (h spaHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// labelHandler handles POST /api/labels/:deviceID/print.
// usbEventsHandler handles GET /api/usb/events (SSE stream).
// testHandler handles POST /api/test/cable, GET /api/test/events, GET /api/test/recent.
// advisorHandler handles POST /api/advisor/chat, GET /api/advisor/conversations, GET /api/advisor/conversations/{id}.
func NewRouter(
staticFiles fs.FS,
intakeHandler http.Handler,
@ -44,6 +46,7 @@ func NewRouter(
labelHandler *handlers.LabelHandler,
usbEventsHandler *handlers.USBEventsHandler,
testHandler *handlers.TestHandler,
advisorHandler *advisor.AdvisorHandler,
) http.Handler {
r := chi.NewRouter()
r.Use(middleware.Logger)
@ -60,6 +63,21 @@ func NewRouter(
r.Post("/test/cable", testHandler.SubmitCableTest)
r.Get("/test/events", testHandler.StreamEvents)
r.Get("/test/recent", testHandler.RecentTests)
r.Route("/advisor", func(r chi.Router) {
if advisorHandler != nil {
r.Post("/chat", advisorHandler.StreamChat)
r.Get("/conversations", advisorHandler.GetConversations)
r.Get("/conversations/{id}", advisorHandler.GetConversation)
} else {
unavailable := func(w http.ResponseWriter, _ *http.Request) {
http.Error(w, "advisor unavailable: database not configured", http.StatusServiceUnavailable)
}
r.Post("/chat", unavailable)
r.Get("/conversations", unavailable)
r.Get("/conversations/{id}", unavailable)
}
})
})
// SPA fallback — serve static files; unknown paths fall back to index.html.

View file

@ -58,6 +58,10 @@ func Load() (*Config, error) {
v.SetDefault("ai.tier2.api_key", "")
v.SetDefault("ai.tier2.model", "google/gemma-3-27b-it")
v.SetDefault("ai.tier2.timeout_seconds", 60)
v.SetDefault("ai.tier3.base_url", "https://openrouter.ai/api/v1")
v.SetDefault("ai.tier3.api_key", "")
v.SetDefault("ai.tier3.model", "anthropic/claude-opus-4")
v.SetDefault("ai.tier3.timeout_seconds", 120)
v.SetDefault("ai.confidence_threshold", 0.75)
v.SetDefault("ai.quick_add_enabled", false)
v.SetDefault("ai.quick_add_threshold", 0.90)
@ -94,6 +98,9 @@ func Load() (*Config, error) {
_ = v.BindEnv("ai.tier2.base_url", "HWLAB_AI_TIER2_BASE_URL")
_ = v.BindEnv("ai.tier2.api_key", "HWLAB_AI_TIER2_API_KEY")
_ = v.BindEnv("ai.tier2.model", "HWLAB_AI_TIER2_MODEL")
_ = v.BindEnv("ai.tier3.base_url", "HWLAB_AI_TIER3_BASE_URL")
_ = v.BindEnv("ai.tier3.api_key", "HWLAB_AI_TIER3_API_KEY")
_ = v.BindEnv("ai.tier3.model", "HWLAB_AI_TIER3_MODEL")
_ = v.BindEnv("ai.confidence_threshold", "HWLAB_AI_CONFIDENCE_THRESHOLD")
_ = v.BindEnv("ai.quick_add_enabled", "HWLAB_AI_QUICK_ADD_ENABLED")