feat(07-01): ResearchAgent worker, trigger endpoint, main.go wiring
- internal/research/agent.go: Agent with RunOnce+Start, sanitizeQuery, interface adapters - internal/research/agent_test.go: stub-based unit tests (sanitize, enrich, skip, empty) - internal/ai/client.go: TierClient.TextComplete for text-only LLM calls - internal/api/handlers/research.go: POST /api/research/trigger handler (202 Accepted) - internal/api/router.go: researchHandler param + /api/research/trigger route - cmd/hwlab/main.go: research agent goroutine started with 10min interval
This commit is contained in:
parent
30cd279f49
commit
0072aa41bd
6 changed files with 437 additions and 1 deletions
|
|
@ -21,6 +21,7 @@ import (
|
|||
"git.georgsen.dk/hwlab/internal/netbox"
|
||||
"git.georgsen.dk/hwlab/internal/printer"
|
||||
"git.georgsen.dk/hwlab/internal/queue"
|
||||
"git.georgsen.dk/hwlab/internal/research"
|
||||
"git.georgsen.dk/hwlab/internal/store"
|
||||
"git.georgsen.dk/hwlab/internal/usb"
|
||||
)
|
||||
|
|
@ -121,6 +122,12 @@ func main() {
|
|||
log.Printf("HWLAB_DATABASE_URL not set — advisor endpoints disabled")
|
||||
}
|
||||
|
||||
// Research agent — enriches needs_research items via SearXNG + Tier 2 LLM.
|
||||
searxngClient := research.NewSearXNGClient(cfg.SearXNGURL)
|
||||
researchAgent := research.NewAgent(nbClient, searxngClient, tier2, catalogUpdater)
|
||||
go researchAgent.Start(ctx, 10*time.Minute)
|
||||
researchHandler := handlers.NewResearchHandler(researchAgent)
|
||||
|
||||
// Wire USB Manager events to cable tester driver when a RoleCableTester device connects.
|
||||
// Currently a no-op stub — wires the plumbing for Phase 5 hardware integration.
|
||||
go func() {
|
||||
|
|
@ -134,7 +141,7 @@ func main() {
|
|||
}
|
||||
}()
|
||||
|
||||
router := api.NewRouter(staticFS, intakeHandler, inventoryHandler, labelHandler, usbEventsHandler, testHandler, advisorHandler)
|
||||
router := api.NewRouter(staticFS, intakeHandler, inventoryHandler, labelHandler, usbEventsHandler, testHandler, advisorHandler, researchHandler)
|
||||
addr := fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)
|
||||
log.Printf("HWLab starting on %s", addr)
|
||||
|
||||
|
|
|
|||
|
|
@ -92,6 +92,28 @@ func (c *TierClient) AnalyzePhotos(ctx context.Context, req IntakeRequest) (*Int
|
|||
return &result, nil
|
||||
}
|
||||
|
||||
// TextComplete sends a text-only (non-vision) chat completion to the configured model.
|
||||
// Used by the research agent for hardware enrichment prompts that require no images.
|
||||
// Returns the raw string content of the first response choice.
|
||||
func (c *TierClient) TextComplete(ctx context.Context, prompt string) (string, error) {
|
||||
tctx, cancel := context.WithTimeout(ctx, c.timeout)
|
||||
defer cancel()
|
||||
|
||||
resp, err := c.client.CreateChatCompletion(tctx, openai.ChatCompletionRequest{
|
||||
Model: c.model,
|
||||
Messages: []openai.ChatCompletionMessage{
|
||||
{Role: openai.ChatMessageRoleUser, Content: prompt},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("text complete: %w", err)
|
||||
}
|
||||
if len(resp.Choices) == 0 {
|
||||
return "", fmt.Errorf("text complete: no choices in response")
|
||||
}
|
||||
return resp.Choices[0].Message.Content, nil
|
||||
}
|
||||
|
||||
// buildIntakePromptWithCount is a package-internal shim to the prompts package.
|
||||
func buildIntakePromptWithCount(n int) string {
|
||||
return prompts.BuildIntakePrompt(n)
|
||||
|
|
|
|||
30
internal/api/handlers/research.go
Normal file
30
internal/api/handlers/research.go
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
|
||||
"git.georgsen.dk/hwlab/internal/research"
|
||||
)
|
||||
|
||||
// ResearchHandler handles research-related API endpoints.
|
||||
type ResearchHandler struct {
|
||||
agent *research.Agent
|
||||
}
|
||||
|
||||
// NewResearchHandler creates a ResearchHandler backed by the given Agent.
|
||||
func NewResearchHandler(agent *research.Agent) *ResearchHandler {
|
||||
return &ResearchHandler{agent: agent}
|
||||
}
|
||||
|
||||
// TriggerResearch handles POST /api/research/trigger.
|
||||
// It fires a RunOnce cycle in a background goroutine and responds 202 Accepted immediately.
|
||||
func (h *ResearchHandler) TriggerResearch(w http.ResponseWriter, r *http.Request) {
|
||||
go func() {
|
||||
_, _ = h.agent.RunOnce(context.Background())
|
||||
}()
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
json.NewEncoder(w).Encode(map[string]string{"status": "accepted"})
|
||||
}
|
||||
|
|
@ -39,6 +39,7 @@ func (h spaHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
// usbEventsHandler handles GET /api/usb/events (SSE stream).
|
||||
// testHandler handles POST /api/test/cable, GET /api/test/events, GET /api/test/recent.
|
||||
// advisorHandler handles POST /api/advisor/chat, GET /api/advisor/conversations, GET /api/advisor/conversations/{id}.
|
||||
// researchHandler handles POST /api/research/trigger.
|
||||
func NewRouter(
|
||||
staticFiles fs.FS,
|
||||
intakeHandler http.Handler,
|
||||
|
|
@ -47,6 +48,7 @@ func NewRouter(
|
|||
usbEventsHandler *handlers.USBEventsHandler,
|
||||
testHandler *handlers.TestHandler,
|
||||
advisorHandler *advisor.AdvisorHandler,
|
||||
researchHandler *handlers.ResearchHandler,
|
||||
) http.Handler {
|
||||
r := chi.NewRouter()
|
||||
r.Use(middleware.Logger)
|
||||
|
|
@ -78,6 +80,16 @@ func NewRouter(
|
|||
r.Get("/conversations/{id}", unavailable)
|
||||
}
|
||||
})
|
||||
|
||||
r.Route("/research", func(r chi.Router) {
|
||||
if researchHandler != nil {
|
||||
r.Post("/trigger", researchHandler.TriggerResearch)
|
||||
} else {
|
||||
r.Post("/trigger", func(w http.ResponseWriter, _ *http.Request) {
|
||||
http.Error(w, "research unavailable", http.StatusServiceUnavailable)
|
||||
})
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
// SPA fallback — serve static files; unknown paths fall back to index.html.
|
||||
|
|
|
|||
185
internal/research/agent.go
Normal file
185
internal/research/agent.go
Normal file
|
|
@ -0,0 +1,185 @@
|
|||
package research
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.georgsen.dk/hwlab/internal/ai"
|
||||
"git.georgsen.dk/hwlab/internal/inventory"
|
||||
"git.georgsen.dk/hwlab/internal/netbox"
|
||||
)
|
||||
|
||||
// nonSafeChars matches characters that are not safe to send to SearXNG.
|
||||
// Allowed: alphanumeric, space, dot, dash, underscore.
|
||||
var nonSafeChars = regexp.MustCompile(`[^a-zA-Z0-9 .\-_]+`)
|
||||
|
||||
// SanitizeQuery strips unsafe characters from a search query string.
|
||||
// Exported so it can be tested from the _test package.
|
||||
func SanitizeQuery(s string) string {
|
||||
sanitized := nonSafeChars.ReplaceAllString(s, " ")
|
||||
return strings.TrimSpace(sanitized)
|
||||
}
|
||||
|
||||
// NetBoxer is the subset of netbox.Client used by the Agent.
|
||||
// Using an interface allows stub injection in tests.
|
||||
type NetBoxer interface {
|
||||
ListDevicesWithStatus(ctx context.Context, status string) ([]netbox.Device, error)
|
||||
PatchCustomFields(ctx context.Context, deviceID int64, patch map[string]interface{}) error
|
||||
}
|
||||
|
||||
// TextCompleter is the subset of ai.TierClient used by the Agent for text-only LLM calls.
|
||||
type TextCompleter interface {
|
||||
TextComplete(ctx context.Context, prompt string) (string, error)
|
||||
}
|
||||
|
||||
// CatalogTransitioner is the subset of inventory.CatalogUpdater used by the Agent.
|
||||
type CatalogTransitioner interface {
|
||||
UpdateCatalogStatus(ctx context.Context, deviceID int64, current, next inventory.CatalogStatus) (inventory.CatalogStatus, error)
|
||||
}
|
||||
|
||||
// Agent is the background worker that enriches needs_research hardware items.
|
||||
// It polls NetBox, searches SearXNG, calls a Tier 2 LLM, and transitions items to researched.
|
||||
type Agent struct {
|
||||
nbClient NetBoxer
|
||||
researchClient ai.ResearchClient
|
||||
llm TextCompleter
|
||||
updater CatalogTransitioner
|
||||
}
|
||||
|
||||
// NewAgent creates an Agent. All arguments must be non-nil.
|
||||
func NewAgent(nb NetBoxer, rc ai.ResearchClient, llm TextCompleter, updater CatalogTransitioner) *Agent {
|
||||
return &Agent{
|
||||
nbClient: nb,
|
||||
researchClient: rc,
|
||||
llm: llm,
|
||||
updater: updater,
|
||||
}
|
||||
}
|
||||
|
||||
// enrichmentResponse is the expected JSON structure from the Tier 2 LLM.
|
||||
type enrichmentResponse struct {
|
||||
AINotes string `json:"ai_notes"`
|
||||
ProductURL string `json:"product_url"`
|
||||
}
|
||||
|
||||
// RunOnce performs a single research cycle: finds all needs_research devices,
|
||||
// enriches each via SearXNG + LLM, patches NetBox custom fields, and transitions
|
||||
// the catalog status to researched. Returns the number of items enriched.
|
||||
func (a *Agent) RunOnce(ctx context.Context) (int, error) {
|
||||
devices, err := a.nbClient.ListDevicesWithStatus(ctx, string(inventory.StatusNeedsResearch))
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("research agent: list needs_research devices: %w", err)
|
||||
}
|
||||
|
||||
enriched := 0
|
||||
for _, dev := range devices {
|
||||
query := SanitizeQuery(dev.Name)
|
||||
if query == "" {
|
||||
log.Printf("research agent: device %d has empty name after sanitization, skipping", dev.ID)
|
||||
continue
|
||||
}
|
||||
|
||||
results, err := a.researchClient.Search(ctx, query)
|
||||
if err != nil {
|
||||
log.Printf("research agent: search error for device %d (%q): %v", dev.ID, dev.Name, err)
|
||||
continue
|
||||
}
|
||||
if len(results) == 0 {
|
||||
log.Printf("research agent: no SearXNG results for device %d (%q), skipping", dev.ID, dev.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
// Build enrichment prompt using top 3 results
|
||||
top := results
|
||||
if len(top) > 3 {
|
||||
top = top[:3]
|
||||
}
|
||||
var sb strings.Builder
|
||||
for i, r := range top {
|
||||
sb.WriteString(fmt.Sprintf("%d. %s\n %s\n URL: %s\n", i+1, r.Title, r.Snippet, r.URL))
|
||||
}
|
||||
prompt := fmt.Sprintf(
|
||||
"You are enriching a hardware inventory record.\nItem: %s\nSearch results:\n%s\nReturn JSON: {\"ai_notes\": \"...\", \"product_url\": \"...\"}",
|
||||
dev.Name, sb.String(),
|
||||
)
|
||||
|
||||
rawResponse, err := a.llm.TextComplete(ctx, prompt)
|
||||
if err != nil {
|
||||
log.Printf("research agent: LLM error for device %d (%q): %v", dev.ID, dev.Name, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Parse LLM JSON response — extract ai_notes and product_url
|
||||
var enrichResp enrichmentResponse
|
||||
if parseErr := json.Unmarshal([]byte(rawResponse), &enrichResp); parseErr != nil {
|
||||
log.Printf("research agent: LLM non-JSON for device %d: %v (raw: %.100s)", dev.ID, parseErr, rawResponse)
|
||||
// Use raw response as ai_notes fallback
|
||||
enrichResp.AINotes = rawResponse
|
||||
}
|
||||
|
||||
// Patch NetBox custom fields with enrichment data
|
||||
patch := map[string]interface{}{}
|
||||
if enrichResp.AINotes != "" {
|
||||
patch["ai_notes"] = enrichResp.AINotes
|
||||
}
|
||||
if enrichResp.ProductURL == "" && len(results) > 0 {
|
||||
// Fall back to first SearXNG result URL if LLM didn't provide one
|
||||
enrichResp.ProductURL = results[0].URL
|
||||
}
|
||||
if enrichResp.ProductURL != "" {
|
||||
patch["product_url"] = enrichResp.ProductURL
|
||||
}
|
||||
if len(patch) > 0 {
|
||||
if patchErr := a.nbClient.PatchCustomFields(ctx, int64(dev.ID), patch); patchErr != nil {
|
||||
log.Printf("research agent: patch error for device %d: %v", dev.ID, patchErr)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Transition catalog status: needs_research -> researched
|
||||
if _, transErr := a.updater.UpdateCatalogStatus(ctx, int64(dev.ID),
|
||||
inventory.StatusNeedsResearch, inventory.StatusResearched); transErr != nil {
|
||||
log.Printf("research agent: status transition error for device %d: %v", dev.ID, transErr)
|
||||
continue
|
||||
}
|
||||
|
||||
enriched++
|
||||
}
|
||||
|
||||
return enriched, nil
|
||||
}
|
||||
|
||||
// Start runs the research agent on the given interval until ctx is cancelled.
|
||||
// RunOnce is called immediately on start, then on each tick.
|
||||
func (a *Agent) Start(ctx context.Context, interval time.Duration) {
|
||||
log.Printf("research agent: starting, interval=%v", interval)
|
||||
|
||||
// Run immediately on startup
|
||||
if n, err := a.RunOnce(ctx); err != nil {
|
||||
log.Printf("research agent: initial cycle error: %v", err)
|
||||
} else {
|
||||
log.Printf("research agent: cycle complete, enriched %d items", n)
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Printf("research agent: shutting down")
|
||||
return
|
||||
case <-ticker.C:
|
||||
if n, err := a.RunOnce(ctx); err != nil {
|
||||
log.Printf("research agent: cycle error: %v", err)
|
||||
} else {
|
||||
log.Printf("research agent: cycle complete, enriched %d items", n)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
180
internal/research/agent_test.go
Normal file
180
internal/research/agent_test.go
Normal file
|
|
@ -0,0 +1,180 @@
|
|||
package research_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"git.georgsen.dk/hwlab/internal/ai"
|
||||
"git.georgsen.dk/hwlab/internal/inventory"
|
||||
"git.georgsen.dk/hwlab/internal/netbox"
|
||||
"git.georgsen.dk/hwlab/internal/research"
|
||||
)
|
||||
|
||||
// --- Stubs ---
|
||||
|
||||
// stubResearchClient returns canned SearchResults.
|
||||
type stubResearchClient struct {
|
||||
results []ai.SearchResult
|
||||
err error
|
||||
calls []string
|
||||
}
|
||||
|
||||
func (s *stubResearchClient) Search(_ context.Context, query string) ([]ai.SearchResult, error) {
|
||||
s.calls = append(s.calls, query)
|
||||
return s.results, s.err
|
||||
}
|
||||
|
||||
// stubTextCompleter returns a canned LLM response text.
|
||||
type stubTextCompleter struct {
|
||||
response string
|
||||
err error
|
||||
calls []string
|
||||
}
|
||||
|
||||
func (s *stubTextCompleter) TextComplete(_ context.Context, prompt string) (string, error) {
|
||||
s.calls = append(s.calls, prompt)
|
||||
return s.response, s.err
|
||||
}
|
||||
|
||||
// stubNetBoxClient satisfies the research.NetBoxer interface used by Agent.
|
||||
type stubNetBoxClient struct {
|
||||
devices []netbox.Device
|
||||
patches map[int64]map[string]interface{}
|
||||
}
|
||||
|
||||
func (s *stubNetBoxClient) ListDevicesWithStatus(_ context.Context, status string) ([]netbox.Device, error) {
|
||||
return s.devices, nil
|
||||
}
|
||||
|
||||
func (s *stubNetBoxClient) PatchCustomFields(_ context.Context, deviceID int64, patch map[string]interface{}) error {
|
||||
if s.patches == nil {
|
||||
s.patches = make(map[int64]map[string]interface{})
|
||||
}
|
||||
s.patches[deviceID] = patch
|
||||
return nil
|
||||
}
|
||||
|
||||
// stubCatalogUpdater records transitions.
|
||||
type stubCatalogUpdater struct {
|
||||
transitions []struct {
|
||||
id int64
|
||||
current inventory.CatalogStatus
|
||||
next inventory.CatalogStatus
|
||||
}
|
||||
}
|
||||
|
||||
func (s *stubCatalogUpdater) UpdateCatalogStatus(_ context.Context, deviceID int64, current, next inventory.CatalogStatus) (inventory.CatalogStatus, error) {
|
||||
s.transitions = append(s.transitions, struct {
|
||||
id int64
|
||||
current inventory.CatalogStatus
|
||||
next inventory.CatalogStatus
|
||||
}{deviceID, current, next})
|
||||
return next, nil
|
||||
}
|
||||
|
||||
// --- Tests ---
|
||||
|
||||
func TestSanitizeQuery(t *testing.T) {
|
||||
cases := []struct {
|
||||
input string
|
||||
expected string
|
||||
}{
|
||||
{"Intel NIC i350", "Intel NIC i350"},
|
||||
{"Dell<script>alert(1)</script>", "Dell script alert 1 script"},
|
||||
{"HP ProLiant DL380 Gen9", "HP ProLiant DL380 Gen9"},
|
||||
{" trim ", "trim"},
|
||||
{"special!@#$%chars", "special chars"},
|
||||
{"dots.and-dashes_ok", "dots.and-dashes_ok"},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
got := research.SanitizeQuery(tc.input)
|
||||
if got != tc.expected {
|
||||
t.Errorf("SanitizeQuery(%q) = %q, want %q", tc.input, got, tc.expected)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunOnce_EnrichesDevice(t *testing.T) {
|
||||
nb := &stubNetBoxClient{
|
||||
devices: []netbox.Device{
|
||||
{
|
||||
ID: 42,
|
||||
Name: "Intel i350 NIC",
|
||||
CustomFields: netbox.CustomFields{
|
||||
CatalogStatus: "needs_research",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
rc := &stubResearchClient{
|
||||
results: []ai.SearchResult{
|
||||
{Title: "Intel i350", URL: "https://ark.intel.com", Snippet: "Quad-port GbE"},
|
||||
{Title: "Datasheet", URL: "https://intel.com/ds", Snippet: "Technical specs"},
|
||||
},
|
||||
}
|
||||
llm := &stubTextCompleter{
|
||||
response: `{"ai_notes": "Intel i350 quad-port GbE adapter", "product_url": "https://ark.intel.com"}`,
|
||||
}
|
||||
updater := &stubCatalogUpdater{}
|
||||
|
||||
agent := research.NewAgent(nb, rc, llm, updater)
|
||||
enriched, err := agent.RunOnce(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("RunOnce error: %v", err)
|
||||
}
|
||||
if enriched != 1 {
|
||||
t.Errorf("expected enriched=1, got %d", enriched)
|
||||
}
|
||||
if len(updater.transitions) != 1 {
|
||||
t.Fatalf("expected 1 status transition, got %d", len(updater.transitions))
|
||||
}
|
||||
tr := updater.transitions[0]
|
||||
if tr.id != 42 {
|
||||
t.Errorf("expected device id=42, got %d", tr.id)
|
||||
}
|
||||
if tr.current != inventory.StatusNeedsResearch {
|
||||
t.Errorf("unexpected current status: %s", tr.current)
|
||||
}
|
||||
if tr.next != inventory.StatusResearched {
|
||||
t.Errorf("unexpected next status: %s", tr.next)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunOnce_SkipsDeviceWithNoResults(t *testing.T) {
|
||||
nb := &stubNetBoxClient{
|
||||
devices: []netbox.Device{
|
||||
{ID: 10, Name: "Mystery Device", CustomFields: netbox.CustomFields{CatalogStatus: "needs_research"}},
|
||||
},
|
||||
}
|
||||
rc := &stubResearchClient{results: []ai.SearchResult{}} // empty
|
||||
llm := &stubTextCompleter{}
|
||||
updater := &stubCatalogUpdater{}
|
||||
|
||||
agent := research.NewAgent(nb, rc, llm, updater)
|
||||
enriched, err := agent.RunOnce(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("RunOnce error: %v", err)
|
||||
}
|
||||
if enriched != 0 {
|
||||
t.Errorf("expected enriched=0 (skipped), got %d", enriched)
|
||||
}
|
||||
if len(updater.transitions) != 0 {
|
||||
t.Errorf("expected 0 transitions (device skipped), got %d", len(updater.transitions))
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunOnce_NoDevices(t *testing.T) {
|
||||
nb := &stubNetBoxClient{devices: []netbox.Device{}}
|
||||
rc := &stubResearchClient{}
|
||||
llm := &stubTextCompleter{}
|
||||
updater := &stubCatalogUpdater{}
|
||||
|
||||
agent := research.NewAgent(nb, rc, llm, updater)
|
||||
enriched, err := agent.RunOnce(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if enriched != 0 {
|
||||
t.Errorf("expected 0 enriched, got %d", enriched)
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue