From 81b5bfff35d38ce13c59f1f76b1a1ffc0ac62d0d Mon Sep 17 00:00:00 2001 From: Mikkel Georgsen Date: Fri, 16 Jan 2026 19:36:13 +0000 Subject: [PATCH] feat(05-02): create orchestrator with parallel model queries Add orchestrator module with SYSTEM_PROMPT constant and query_models_parallel function that uses asyncio.gather() to query all models simultaneously. Co-Authored-By: Claude Opus 4.5 --- src/moai/core/orchestrator.py | 71 +++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 src/moai/core/orchestrator.py diff --git a/src/moai/core/orchestrator.py b/src/moai/core/orchestrator.py new file mode 100644 index 0000000..5116a8e --- /dev/null +++ b/src/moai/core/orchestrator.py @@ -0,0 +1,71 @@ +"""AI orchestrator for managing multi-model discussions. + +Provides functions for orchestrating parallel and sequential AI queries +across multiple models, building context, and managing discussion flow. +""" + +import asyncio +import logging + +from moai.core.ai_client import get_ai_client + +logger = logging.getLogger(__name__) + +# System prompt for roundtable discussions +SYSTEM_PROMPT = """You are participating in a roundtable discussion with other AI models. +Other participants: {models} +Current topic: {topic} + +Guidelines: +- Be concise but substantive +- You can agree, disagree, or build upon others' points +- Reference other models by name when responding to their points +- Focus on practical, actionable insights +- If you reach agreement with others, state it clearly""" + + +async def query_models_parallel( + models: list[str], + question: str, + project_name: str, +) -> dict[str, str]: + """Query multiple AI models in parallel. + + Sends the same question to all models simultaneously and collects + their responses. Each model receives a system prompt identifying + the other participants and the discussion topic. + + Args: + models: List of model short names (e.g., ["claude", "gpt", "gemini"]). + question: The question to ask all models. + project_name: The project name for context. + + Returns: + Dict mapping model name to response text. If a model fails, + its value will be an error message string. + """ + client = get_ai_client() + + # Build system prompt with participant info + models_str = ", ".join(models) + system_prompt = SYSTEM_PROMPT.format(models=models_str, topic=project_name) + + async def query_single_model(model: str) -> tuple[str, str]: + """Query a single model and return (model_name, response).""" + try: + response = await client.complete( + model=model, + messages=[{"role": "user", "content": question}], + system_prompt=system_prompt, + ) + logger.info("Model %s responded successfully", model) + return (model, response) + except Exception as e: + logger.error("Model %s failed: %s", model, e) + return (model, f"[Error: {e}]") + + # Run all queries in parallel + tasks = [query_single_model(model) for model in models] + results = await asyncio.gather(*tasks) + + return dict(results)