diff --git a/cmd/hwlab/main.go b/cmd/hwlab/main.go index f3020fd..c04541e 100644 --- a/cmd/hwlab/main.go +++ b/cmd/hwlab/main.go @@ -6,11 +6,13 @@ import ( "io/fs" "log" "net/http" + "os" "os/signal" "syscall" "time" hwlab "git.georgsen.dk/hwlab" + "git.georgsen.dk/hwlab/internal/advisor" "git.georgsen.dk/hwlab/internal/ai" "git.georgsen.dk/hwlab/internal/api" "git.georgsen.dk/hwlab/internal/api/handlers" @@ -19,6 +21,7 @@ import ( "git.georgsen.dk/hwlab/internal/netbox" "git.georgsen.dk/hwlab/internal/printer" "git.georgsen.dk/hwlab/internal/queue" + "git.georgsen.dk/hwlab/internal/store" "git.georgsen.dk/hwlab/internal/usb" ) @@ -98,6 +101,26 @@ func main() { usbEventsHandler := handlers.NewUSBEventsHandler(usbManager) testHandler := handlers.NewTestHandler(nbClient, mockDriver) + // Store + advisor — non-fatal if DB unavailable (advisor degrades gracefully). + var advisorHandler *advisor.AdvisorHandler + dbDSN := os.Getenv("HWLAB_DATABASE_URL") + if dbDSN != "" { + s, storeErr := store.NewStore(ctx, dbDSN) + if storeErr != nil { + log.Printf("WARNING: store unavailable (%v) — advisor endpoints will be disabled", storeErr) + } else { + defer s.Close() + if migErr := store.RunMigrations(ctx, s.Pool()); migErr != nil { + log.Printf("WARNING: store migrations failed (%v) — advisor endpoints may misbehave", migErr) + } + ctxBuilder := advisor.NewInventoryContextBuilder(nbClient) + advisorHandler = advisor.NewAdvisorHandler(s, ctxBuilder, cfg.AI) + log.Printf("Advisor handler ready") + } + } else { + log.Printf("HWLAB_DATABASE_URL not set — advisor endpoints disabled") + } + // Wire USB Manager events to cable tester driver when a RoleCableTester device connects. // Currently a no-op stub — wires the plumbing for Phase 5 hardware integration. go func() { @@ -111,7 +134,7 @@ func main() { } }() - router := api.NewRouter(staticFS, intakeHandler, inventoryHandler, labelHandler, usbEventsHandler, testHandler) + router := api.NewRouter(staticFS, intakeHandler, inventoryHandler, labelHandler, usbEventsHandler, testHandler, advisorHandler) addr := fmt.Sprintf("%s:%d", cfg.Host, cfg.Port) log.Printf("HWLab starting on %s", addr) diff --git a/internal/advisor/handler.go b/internal/advisor/handler.go new file mode 100644 index 0000000..5d2e531 --- /dev/null +++ b/internal/advisor/handler.go @@ -0,0 +1,225 @@ +package advisor + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "log" + "net/http" + "strings" + + "github.com/go-chi/chi/v5" + openai "github.com/sashabaranov/go-openai" + + "git.georgsen.dk/hwlab/internal/ai" + "git.georgsen.dk/hwlab/internal/store" +) + +const ( + defaultModel = "anthropic/claude-opus-4" + maxMessageBytes = 64 * 1024 // 64 KB body limit (T-06-02-03) + maxMessageChars = 8000 // truncate message content to 8000 chars (T-06-02-03) +) + +// ChatRequest is the JSON body for POST /api/advisor/chat. +type ChatRequest struct { + ConversationID string `json:"conversation_id"` // empty = new conversation + Message string `json:"message"` + Model string `json:"model"` // empty = defaultModel +} + +// AdvisorHandler implements the three advisor endpoints. +type AdvisorHandler struct { + store *store.Store + ctx *InventoryContextBuilder + aiCfg ai.AIConfig +} + +// NewAdvisorHandler creates an AdvisorHandler. +func NewAdvisorHandler(s *store.Store, ctxBuilder *InventoryContextBuilder, aiCfg ai.AIConfig) *AdvisorHandler { + return &AdvisorHandler{ + store: s, + ctx: ctxBuilder, + aiCfg: aiCfg, + } +} + +// StreamChat handles POST /api/advisor/chat. +// It streams tokens from the configured AI model back to the client via SSE. +// Each token is sent as: data: {"conversation_id":"...","token":"..."}\n\n +// The final event is: data: [DONE]\n\n +// +// Security: +// - Body is limited to 64 KB (T-06-02-03) +// - Message content is truncated to 8000 chars (T-06-02-03) +// - API key is never written to the SSE stream (T-06-02-02) +func (h *AdvisorHandler) StreamChat(w http.ResponseWriter, r *http.Request) { + // Enforce body size limit (T-06-02-03). + r.Body = http.MaxBytesReader(w, r.Body, maxMessageBytes) + + var req ChatRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "invalid request body", http.StatusBadRequest) + return + } + + if req.Message == "" { + http.Error(w, "message is required", http.StatusBadRequest) + return + } + + // Truncate message to guard against overly long inputs (T-06-02-03). + if len(req.Message) > maxMessageChars { + req.Message = req.Message[:maxMessageChars] + } + + model := req.Model + if model == "" { + model = defaultModel + } + + ctx := r.Context() + + // Create or reuse conversation. + convID := req.ConversationID + if convID == "" { + id, err := h.store.CreateConversation(ctx, model) + if err != nil { + log.Printf("advisor: create conversation: %v", err) + http.Error(w, "internal error", http.StatusInternalServerError) + return + } + convID = id + } + + // Persist user message. + if _, err := h.store.AddMessage(ctx, convID, "user", req.Message); err != nil { + log.Printf("advisor: add user message: %v", err) + http.Error(w, "internal error", http.StatusInternalServerError) + return + } + + // Build inventory context for system prompt. + invCtx, err := h.ctx.BuildContext(ctx) + if err != nil { + log.Printf("advisor: build inventory context: %v (proceeding without inventory)", err) + invCtx = "Inventory: unavailable\n" + } + + systemPrompt := "You are a homelab advisor. Here is the current inventory:\n\n" + invCtx + + messages := []openai.ChatCompletionMessage{ + {Role: openai.ChatMessageRoleSystem, Content: systemPrompt}, + {Role: openai.ChatMessageRoleUser, Content: req.Message}, + } + + // Build an OpenAI-compatible client pointed at Tier3 (OpenRouter). + // API key is read from config only — never from the request (T-06-02-02). + oCfg := openai.DefaultConfig(h.aiCfg.Tier3.APIKey) + oCfg.BaseURL = h.aiCfg.Tier3.BaseURL + client := openai.NewClientWithConfig(oCfg) + + stream, err := client.CreateChatCompletionStream(ctx, openai.ChatCompletionRequest{ + Model: model, + Messages: messages, + Stream: true, + }) + if err != nil { + log.Printf("advisor: create stream: %v", err) + http.Error(w, "upstream AI error", http.StatusBadGateway) + return + } + defer stream.Close() + + // Set SSE headers. + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") + + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming not supported", http.StatusInternalServerError) + return + } + + type tokenEvent struct { + ConversationID string `json:"conversation_id"` + Token string `json:"token,omitempty"` + } + type errorEvent struct { + Error string `json:"error"` + } + + var fullContent strings.Builder + for { + resp, err := stream.Recv() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + log.Printf("advisor: stream recv: %v", err) + errPayload, _ := json.Marshal(errorEvent{Error: "stream interrupted"}) + fmt.Fprintf(w, "data: %s\n\n", errPayload) + flusher.Flush() + return + } + + if len(resp.Choices) == 0 { + continue + } + + token := resp.Choices[0].Delta.Content + if token == "" { + continue + } + fullContent.WriteString(token) + + payload, _ := json.Marshal(tokenEvent{ConversationID: convID, Token: token}) + fmt.Fprintf(w, "data: %s\n\n", payload) + flusher.Flush() + } + + // Send DONE sentinel. + fmt.Fprintf(w, "data: [DONE]\n\n") + flusher.Flush() + + // Persist assistant response. + if _, err := h.store.AddMessage(ctx, convID, "assistant", fullContent.String()); err != nil { + log.Printf("advisor: add assistant message: %v", err) + } +} + +// GetConversations handles GET /api/advisor/conversations. +func (h *AdvisorHandler) GetConversations(w http.ResponseWriter, r *http.Request) { + list, err := h.store.ListConversations(r.Context()) + if err != nil { + log.Printf("advisor: list conversations: %v", err) + http.Error(w, "internal error", http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(list); err != nil { + log.Printf("advisor: encode conversations: %v", err) + } +} + +// GetConversation handles GET /api/advisor/conversations/{id}. +func (h *AdvisorHandler) GetConversation(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + conv, err := h.store.GetConversation(r.Context(), id) + if err != nil { + if errors.Is(err, store.ErrNotFound) { + http.Error(w, "conversation not found", http.StatusNotFound) + return + } + log.Printf("advisor: get conversation %s: %v", id, err) + http.Error(w, "internal error", http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(conv); err != nil { + log.Printf("advisor: encode conversation: %v", err) + } +} diff --git a/internal/api/router.go b/internal/api/router.go index 33c5f68..e549fe3 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -7,6 +7,7 @@ import ( "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" + "git.georgsen.dk/hwlab/internal/advisor" "git.georgsen.dk/hwlab/internal/api/handlers" ) @@ -37,6 +38,7 @@ func (h spaHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // labelHandler handles POST /api/labels/:deviceID/print. // usbEventsHandler handles GET /api/usb/events (SSE stream). // testHandler handles POST /api/test/cable, GET /api/test/events, GET /api/test/recent. +// advisorHandler handles POST /api/advisor/chat, GET /api/advisor/conversations, GET /api/advisor/conversations/{id}. func NewRouter( staticFiles fs.FS, intakeHandler http.Handler, @@ -44,6 +46,7 @@ func NewRouter( labelHandler *handlers.LabelHandler, usbEventsHandler *handlers.USBEventsHandler, testHandler *handlers.TestHandler, + advisorHandler *advisor.AdvisorHandler, ) http.Handler { r := chi.NewRouter() r.Use(middleware.Logger) @@ -60,6 +63,21 @@ func NewRouter( r.Post("/test/cable", testHandler.SubmitCableTest) r.Get("/test/events", testHandler.StreamEvents) r.Get("/test/recent", testHandler.RecentTests) + + r.Route("/advisor", func(r chi.Router) { + if advisorHandler != nil { + r.Post("/chat", advisorHandler.StreamChat) + r.Get("/conversations", advisorHandler.GetConversations) + r.Get("/conversations/{id}", advisorHandler.GetConversation) + } else { + unavailable := func(w http.ResponseWriter, _ *http.Request) { + http.Error(w, "advisor unavailable: database not configured", http.StatusServiceUnavailable) + } + r.Post("/chat", unavailable) + r.Get("/conversations", unavailable) + r.Get("/conversations/{id}", unavailable) + } + }) }) // SPA fallback — serve static files; unknown paths fall back to index.html. diff --git a/internal/config/config.go b/internal/config/config.go index 14bad7a..45a848a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -58,6 +58,10 @@ func Load() (*Config, error) { v.SetDefault("ai.tier2.api_key", "") v.SetDefault("ai.tier2.model", "google/gemma-3-27b-it") v.SetDefault("ai.tier2.timeout_seconds", 60) + v.SetDefault("ai.tier3.base_url", "https://openrouter.ai/api/v1") + v.SetDefault("ai.tier3.api_key", "") + v.SetDefault("ai.tier3.model", "anthropic/claude-opus-4") + v.SetDefault("ai.tier3.timeout_seconds", 120) v.SetDefault("ai.confidence_threshold", 0.75) v.SetDefault("ai.quick_add_enabled", false) v.SetDefault("ai.quick_add_threshold", 0.90) @@ -94,6 +98,9 @@ func Load() (*Config, error) { _ = v.BindEnv("ai.tier2.base_url", "HWLAB_AI_TIER2_BASE_URL") _ = v.BindEnv("ai.tier2.api_key", "HWLAB_AI_TIER2_API_KEY") _ = v.BindEnv("ai.tier2.model", "HWLAB_AI_TIER2_MODEL") + _ = v.BindEnv("ai.tier3.base_url", "HWLAB_AI_TIER3_BASE_URL") + _ = v.BindEnv("ai.tier3.api_key", "HWLAB_AI_TIER3_API_KEY") + _ = v.BindEnv("ai.tier3.model", "HWLAB_AI_TIER3_MODEL") _ = v.BindEnv("ai.confidence_threshold", "HWLAB_AI_CONFIDENCE_THRESHOLD") _ = v.BindEnv("ai.quick_add_enabled", "HWLAB_AI_QUICK_ADD_ENABLED")