Spaces:
Running
Running
File size: 7,956 Bytes
1922dbd 15459e9 1922dbd 3bacbf8 1922dbd 3bacbf8 1922dbd 3bacbf8 1922dbd 3bacbf8 1922dbd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
"""Magentic-based orchestrator for DeepCritical."""
from collections.abc import AsyncGenerator
import structlog
from agent_framework import (
MagenticAgentDeltaEvent,
MagenticAgentMessageEvent,
MagenticBuilder,
MagenticFinalResultEvent,
MagenticOrchestratorMessageEvent,
WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient
from src.agents.judge_agent import JudgeAgent
from src.agents.search_agent import SearchAgent
from src.orchestrator import JudgeHandlerProtocol, SearchHandlerProtocol
from src.utils.config import settings
from src.utils.models import AgentEvent, Evidence
logger = structlog.get_logger()
class MagenticOrchestrator:
"""
Magentic-based orchestrator - same API as Orchestrator.
Uses Microsoft Agent Framework's MagenticBuilder for multi-agent coordination.
"""
def __init__(
self,
search_handler: SearchHandlerProtocol,
judge_handler: JudgeHandlerProtocol,
max_rounds: int = 10,
) -> None:
self._search_handler = search_handler
self._judge_handler = judge_handler
self._max_rounds = max_rounds
self._evidence_store: dict[str, list[Evidence]] = {"current": []}
async def run(self, query: str) -> AsyncGenerator[AgentEvent, None]:
"""
Run the Magentic workflow - same API as simple Orchestrator.
Yields AgentEvent objects for real-time UI updates.
"""
logger.info("Starting Magentic orchestrator", query=query)
yield AgentEvent(
type="started",
message=f"Starting research (Magentic mode): {query}",
iteration=0,
)
# Initialize embedding service (optional)
embedding_service = None
try:
from src.services.embeddings import get_embedding_service
embedding_service = get_embedding_service()
logger.info("Embedding service enabled")
except ImportError:
logger.info("Embedding service not available (dependencies missing)")
except Exception as e:
logger.warning("Failed to initialize embedding service", error=str(e))
# Create agent wrappers
search_agent = SearchAgent(
self._search_handler, self._evidence_store, embedding_service=embedding_service
)
judge_agent = JudgeAgent(self._judge_handler, self._evidence_store)
# Build Magentic workflow
# Note: MagenticBuilder.participants takes named arguments for agent instances
workflow = (
MagenticBuilder()
.participants(
searcher=search_agent,
judge=judge_agent,
)
.with_standard_manager(
chat_client=OpenAIChatClient(
model_id=settings.openai_model, api_key=settings.openai_api_key
),
max_round_count=self._max_rounds,
max_stall_count=3,
max_reset_count=2,
)
.build()
)
# Task instruction for the manager
semantic_note = ""
if embedding_service:
semantic_note = """
The system has semantic search enabled. When evidence is found:
1. Related concepts will be automatically surfaced
2. Duplicates are removed by meaning, not just URL
3. Use the surfaced related concepts to refine searches
"""
task = f"""Research drug repurposing opportunities for: {query}
{semantic_note}
Instructions:
1. Use SearcherAgent to find evidence. SEND ONLY A SIMPLE KEYWORD QUERY (e.g. "metformin aging")
as the instruction. Complex queries fail.
2. Use JudgeAgent to evaluate if evidence is sufficient.
3. If JudgeAgent says "continue", search with refined queries.
4. If JudgeAgent says "synthesize", provide final synthesis
5. Stop when synthesis is ready or max rounds reached
Focus on finding:
- Mechanism of action evidence
- Clinical/preclinical studies
- Specific drug candidates
"""
iteration = 0
try:
# workflow.run_stream returns an async generator of workflow events
# We use 'await' in the for loop for async generator
async for event in workflow.run_stream(task):
if isinstance(event, MagenticOrchestratorMessageEvent):
# Manager events (planning, instruction, ledger)
# The 'message' attribute might be None if it's just a state change,
# check message presence
message_text = (
event.message.text
if event.message and hasattr(event.message, "text")
else ""
)
# kind might be 'plan', 'instruction', etc.
kind = getattr(event, "kind", "manager")
if message_text:
yield AgentEvent(
type="judging",
message=f"Manager ({kind}): {message_text[:100]}...",
iteration=iteration,
)
elif isinstance(event, MagenticAgentMessageEvent):
# Complete agent response
iteration += 1
agent_name = event.agent_id or "unknown"
msg_text = (
event.message.text
if event.message and hasattr(event.message, "text")
else ""
)
if "search" in agent_name.lower():
# Check if we found evidence (based on SearchAgent logic)
yield AgentEvent(
type="search_complete",
message=f"Search agent: {msg_text[:100]}...",
iteration=iteration,
)
elif "judge" in agent_name.lower():
yield AgentEvent(
type="judge_complete",
message=f"Judge agent: {msg_text[:100]}...",
iteration=iteration,
)
elif isinstance(event, MagenticFinalResultEvent):
# Final workflow result
final_text = (
event.message.text
if event.message and hasattr(event.message, "text")
else "No result"
)
yield AgentEvent(
type="complete",
message=final_text,
data={"iterations": iteration},
iteration=iteration,
)
elif isinstance(event, MagenticAgentDeltaEvent):
# Streaming token chunks from agents (optional "typing" effect)
# Only emit if we have actual text content
if event.text:
yield AgentEvent(
type="streaming",
message=event.text,
data={"agent_id": event.agent_id},
iteration=iteration,
)
elif isinstance(event, WorkflowOutputEvent):
# Alternative final output event
if event.data:
yield AgentEvent(
type="complete",
message=str(event.data),
iteration=iteration,
)
except Exception as e:
logger.error("Magentic workflow failed", error=str(e))
yield AgentEvent(
type="error",
message=f"Workflow error: {e!s}",
iteration=iteration,
)
|