homelabby/internal/api/handlers/intake.go
Mikkel Georgsen 4fc9362519 feat(02-03): POST /api/intake handler with orchestrator and NetBox wiring
- IntakeHandler with IntakeOrchestrator/IntakeNetBoxClient/IntakeCatalogUpdater/IntakeWAQ interfaces
- Validates 1-3 photos, base64-encodes, calls Analyze, allocates HW-ID
- Quick-add mode: confidence >= threshold skips review, creates NetBox record immediately
- WAQ enqueue on NetBox failure returns 202 with queued=true
- nil WAQ + NetBox down returns 503
- Six unit tests: reject-0, reject-4, high-confidence, low-confidence, quick-add, netbox-down
- [Rule 1 - Bug] PatchCustomFields signature changed int -> int64 to match NetBoxOpsClient interface
- [Rule 1 - Bug] UpdateCatalogStatus signature changed int -> int64 for consistency with CreateDevice return type
2026-04-10 05:54:33 +00:00

263 lines
8.5 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package handlers
import (
"context"
"encoding/base64"
"encoding/json"
"io"
"log"
"net/http"
"strings"
"github.com/google/uuid"
"git.georgsen.dk/hwlab/internal/ai"
"git.georgsen.dk/hwlab/internal/inventory"
"git.georgsen.dk/hwlab/internal/netbox"
"git.georgsen.dk/hwlab/internal/queue"
)
// IntakeOrchestrator is the subset of ai.Orchestrator the intake handler needs.
// Using an interface allows tests to inject a mock without depending on the concrete type.
type IntakeOrchestrator interface {
Analyze(ctx context.Context, req ai.IntakeRequest) (*ai.IntakeResult, inventory.CatalogStatus, error)
}
// IntakeNetBoxClient is the subset of netbox.Client the intake handler needs.
type IntakeNetBoxClient interface {
AllocateNextHWID(ctx context.Context) (string, error)
CreateDevice(ctx context.Context, name, assetTag string, deviceTypeID, roleID, siteID int32) (int64, error)
PatchCustomFields(ctx context.Context, deviceID int64, patch map[string]interface{}) error
SyncTags(ctx context.Context, tags []string) ([]netbox.TagRef, error)
}
// IntakeCatalogUpdater is the subset needed for catalog status persistence.
type IntakeCatalogUpdater interface {
UpdateCatalogStatus(ctx context.Context, deviceID int64, current, next inventory.CatalogStatus) (inventory.CatalogStatus, error)
}
// IntakeWAQ is the subset of WAQ the handler needs.
type IntakeWAQ interface {
Enqueue(ctx context.Context, op queue.PendingOp) error
}
// IntakeHandler handles POST /api/intake — multipart photo upload → AI analysis →
// HW-ID allocation → NetBox record creation (or WAQ enqueue on NetBox failure).
type IntakeHandler struct {
orchestrator IntakeOrchestrator
netboxClient IntakeNetBoxClient
catalogUpdater IntakeCatalogUpdater
waq IntakeWAQ // may be nil if DragonFlyDB unavailable
deviceTypeID int32
roleID int32
siteID int32
quickAddEnabled bool
quickAddThresh float64
}
// NewIntakeHandler constructs an IntakeHandler. waq may be nil if DragonFlyDB is unavailable.
func NewIntakeHandler(
orch IntakeOrchestrator,
nb IntakeNetBoxClient,
cu IntakeCatalogUpdater,
waq IntakeWAQ,
deviceTypeID, roleID, siteID int32,
quickAddEnabled bool,
quickAddThresh float64,
) *IntakeHandler {
return &IntakeHandler{
orchestrator: orch,
netboxClient: nb,
catalogUpdater: cu,
waq: waq,
deviceTypeID: deviceTypeID,
roleID: roleID,
siteID: siteID,
quickAddEnabled: quickAddEnabled,
quickAddThresh: quickAddThresh,
}
}
// IntakeResponse is the JSON body returned by POST /api/intake.
type IntakeResponse struct {
HWID string `json:"hw_id"`
Model string `json:"model"`
Manufacturer string `json:"manufacturer"`
Category string `json:"category"`
Specs map[string]string `json:"specs"`
SuggestedTags []string `json:"suggested_tags"`
AINotes string `json:"ai_notes"`
Confidence float64 `json:"confidence"`
CatalogStatus string `json:"catalog_status"`
NetBoxID int64 `json:"netbox_id,omitempty"`
Queued bool `json:"queued,omitempty"` // true if NetBox was unreachable
}
// ServeHTTP implements http.Handler for the intake endpoint.
//
// Flow:
// 1. Parse multipart form (32 MB limit)
// 2. Validate 13 photo files → 400 on violation
// 3. Base64-encode photos → orchestrator.Analyze
// 4. AllocateNextHWID
// 5. CreateDevice in NetBox (or enqueue to WAQ on failure → 202)
// 6. PatchCustomFields, SyncTags, UpdateCatalogStatus
// 7. Return 201 IntakeResponse
func (h *IntakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// T-02-09: 32 MB hard cap on request body (DoS mitigation)
if err := r.ParseMultipartForm(32 << 20); err != nil {
http.Error(w, "invalid multipart form", http.StatusBadRequest)
return
}
files := r.MultipartForm.File["photos"]
// T-02-12: Explicit count check — 0 or 4+ photos rejected immediately
if len(files) == 0 {
http.Error(w, "at least 1 photo required", http.StatusBadRequest)
return
}
if len(files) > 3 {
http.Error(w, "maximum 3 photos allowed", http.StatusBadRequest)
return
}
// Read and base64-encode each photo
photosBase64 := make([]string, 0, len(files))
for _, fh := range files {
f, err := fh.Open()
if err != nil {
http.Error(w, "could not read photo", http.StatusInternalServerError)
return
}
data, err := io.ReadAll(f)
f.Close()
if err != nil {
http.Error(w, "could not read photo data", http.StatusInternalServerError)
return
}
// Detect MIME type from first 512 bytes
mimeType := http.DetectContentType(data)
encoded := "data:" + mimeType + ";base64," + base64.StdEncoding.EncodeToString(data)
photosBase64 = append(photosBase64, encoded)
}
jobID := uuid.New().String()
result, status, err := h.orchestrator.Analyze(r.Context(), ai.IntakeRequest{
PhotosBase64: photosBase64,
JobID: jobID,
})
if err != nil {
log.Printf("intake: orchestrator error for job %s: %v", jobID, err)
http.Error(w, "AI analysis failed", http.StatusInternalServerError)
return
}
hwid, err := h.netboxClient.AllocateNextHWID(r.Context())
if err != nil {
log.Printf("intake: HW-ID allocation error: %v", err)
http.Error(w, "HW-ID allocation failed", http.StatusInternalServerError)
return
}
// Build device name from AI result
deviceName := strings.TrimSpace(result.Manufacturer + " " + result.Model)
if deviceName == "" {
deviceName = hwid
}
// Attempt NetBox record creation
// Both quick-add and standard paths call CreateDevice.
// Quick-add path: proceed only when confidence meets threshold.
// Standard path: always attempt (review step is a UI concern, not handler concern).
deviceID, createErr := h.netboxClient.CreateDevice(
r.Context(),
deviceName,
hwid,
h.deviceTypeID,
h.roleID,
h.siteID,
)
if createErr != nil {
log.Printf("intake: NetBox CreateDevice error: %v", createErr)
// Enqueue to WAQ if available
if h.waq != nil {
payload, _ := json.Marshal(queue.CreateDevicePayload{
Name: deviceName,
AssetTag: hwid,
DeviceTypeID: h.deviceTypeID,
RoleID: h.roleID,
SiteID: h.siteID,
})
op, err := queue.NewPendingOp(queue.OpNetBoxCreateDevice, json.RawMessage(payload))
if err == nil {
if enqErr := h.waq.Enqueue(r.Context(), op); enqErr != nil {
log.Printf("intake: WAQ enqueue error: %v", enqErr)
}
}
writeJSON(w, http.StatusAccepted, IntakeResponse{
HWID: hwid,
Model: result.Model,
Manufacturer: result.Manufacturer,
Category: result.Category,
Specs: result.Specs,
SuggestedTags: result.SuggestedTags,
AINotes: result.AINotes,
Confidence: result.Confidence,
CatalogStatus: string(status),
Queued: true,
})
return
}
// WAQ unavailable — cannot queue
http.Error(w, "NetBox unavailable and WAQ not configured", http.StatusServiceUnavailable)
return
}
// NetBox create succeeded — patch custom fields, sync tags, update catalog status
cf := netbox.CustomFields{
HWID: hwid,
CatalogStatus: string(status),
AINotes: result.AINotes,
}
patch := netbox.BuildFullCustomFieldsPatch(cf)
if err := h.netboxClient.PatchCustomFields(r.Context(), deviceID, patch); err != nil {
log.Printf("intake: PatchCustomFields error for device %d: %v", deviceID, err)
// Non-fatal — proceed with partial data
}
if len(result.SuggestedTags) > 0 {
if _, err := h.netboxClient.SyncTags(r.Context(), result.SuggestedTags); err != nil {
log.Printf("intake: SyncTags error for device %d: %v", deviceID, err)
// Non-fatal
}
}
if _, err := h.catalogUpdater.UpdateCatalogStatus(r.Context(), deviceID, "", status); err != nil {
log.Printf("intake: UpdateCatalogStatus error for device %d: %v", deviceID, err)
// Non-fatal
}
writeJSON(w, http.StatusCreated, IntakeResponse{
HWID: hwid,
Model: result.Model,
Manufacturer: result.Manufacturer,
Category: result.Category,
Specs: result.Specs,
SuggestedTags: result.SuggestedTags,
AINotes: result.AINotes,
Confidence: result.Confidence,
CatalogStatus: string(status),
NetBoxID: deviceID,
})
}
// writeJSON writes a JSON response with the given status code.
func writeJSON(w http.ResponseWriter, status int, v interface{}) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
if err := json.NewEncoder(w).Encode(v); err != nil {
log.Printf("writeJSON: encode error: %v", err)
}
}