feat(05-02): create orchestrator with parallel model queries
Add orchestrator module with SYSTEM_PROMPT constant and query_models_parallel function that uses asyncio.gather() to query all models simultaneously. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
b2610cd90a
commit
81b5bfff35
1 changed files with 71 additions and 0 deletions
71
src/moai/core/orchestrator.py
Normal file
71
src/moai/core/orchestrator.py
Normal file
|
|
@ -0,0 +1,71 @@
|
|||
"""AI orchestrator for managing multi-model discussions.
|
||||
|
||||
Provides functions for orchestrating parallel and sequential AI queries
|
||||
across multiple models, building context, and managing discussion flow.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from moai.core.ai_client import get_ai_client
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# System prompt for roundtable discussions
|
||||
SYSTEM_PROMPT = """You are participating in a roundtable discussion with other AI models.
|
||||
Other participants: {models}
|
||||
Current topic: {topic}
|
||||
|
||||
Guidelines:
|
||||
- Be concise but substantive
|
||||
- You can agree, disagree, or build upon others' points
|
||||
- Reference other models by name when responding to their points
|
||||
- Focus on practical, actionable insights
|
||||
- If you reach agreement with others, state it clearly"""
|
||||
|
||||
|
||||
async def query_models_parallel(
|
||||
models: list[str],
|
||||
question: str,
|
||||
project_name: str,
|
||||
) -> dict[str, str]:
|
||||
"""Query multiple AI models in parallel.
|
||||
|
||||
Sends the same question to all models simultaneously and collects
|
||||
their responses. Each model receives a system prompt identifying
|
||||
the other participants and the discussion topic.
|
||||
|
||||
Args:
|
||||
models: List of model short names (e.g., ["claude", "gpt", "gemini"]).
|
||||
question: The question to ask all models.
|
||||
project_name: The project name for context.
|
||||
|
||||
Returns:
|
||||
Dict mapping model name to response text. If a model fails,
|
||||
its value will be an error message string.
|
||||
"""
|
||||
client = get_ai_client()
|
||||
|
||||
# Build system prompt with participant info
|
||||
models_str = ", ".join(models)
|
||||
system_prompt = SYSTEM_PROMPT.format(models=models_str, topic=project_name)
|
||||
|
||||
async def query_single_model(model: str) -> tuple[str, str]:
|
||||
"""Query a single model and return (model_name, response)."""
|
||||
try:
|
||||
response = await client.complete(
|
||||
model=model,
|
||||
messages=[{"role": "user", "content": question}],
|
||||
system_prompt=system_prompt,
|
||||
)
|
||||
logger.info("Model %s responded successfully", model)
|
||||
return (model, response)
|
||||
except Exception as e:
|
||||
logger.error("Model %s failed: %s", model, e)
|
||||
return (model, f"[Error: {e}]")
|
||||
|
||||
# Run all queries in parallel
|
||||
tasks = [query_single_model(model) for model in models]
|
||||
results = await asyncio.gather(*tasks)
|
||||
|
||||
return dict(results)
|
||||
Loading…
Add table
Reference in a new issue