""" Synchronous wrapper for MCP Client Provides sync interface for Gradio event handlers """ import asyncio from typing import Optional, Dict, Any, List from .client import MCPClient class SyncMCPClient: """Synchronous wrapper for MCPClient to use in Gradio event handlers""" def __init__(self, server_url: Optional[str] = None): self.client = MCPClient(server_url) self._loop = None def _get_or_create_event_loop(self): """Get or create an event loop for async operations""" try: loop = asyncio.get_event_loop() if loop.is_closed(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop def _run_async(self, coro): """Run an async coroutine and return the result""" loop = self._get_or_create_event_loop() return loop.run_until_complete(coro) def initialize(self): """Initialize connection to MCP server (sync)""" return self._run_async(self.client.initialize()) def analyze_leaderboard( self, leaderboard_repo: str = "kshitijthakkar/smoltrace-leaderboard", metric_focus: str = "overall", time_range: str = "last_week", top_n: int = 5, hf_token: Optional[str] = None, gemini_api_key: Optional[str] = None ) -> str: """Analyze leaderboard (sync wrapper)""" return self._run_async( self.client.analyze_leaderboard( leaderboard_repo, metric_focus, time_range, top_n, hf_token, gemini_api_key ) ) def debug_trace( self, trace_data: Dict[str, Any], question: str, metrics_data: Optional[Dict[str, Any]] = None, hf_token: Optional[str] = None, gemini_api_key: Optional[str] = None ) -> str: """Debug trace (sync wrapper)""" return self._run_async( self.client.debug_trace(trace_data, question, metrics_data, hf_token, gemini_api_key) ) def estimate_cost( self, model: str, agent_type: str = "both", num_tests: int = 100, hardware: Optional[str] = None, hf_token: Optional[str] = None, gemini_api_key: Optional[str] = None ) -> str: """Estimate cost (sync wrapper)""" return self._run_async( self.client.estimate_cost(model, agent_type, num_tests, hardware, hf_token, gemini_api_key) ) def compare_runs( self, run_data_list: List[Dict[str, Any]], focus_metrics: Optional[List[str]] = None, hf_token: Optional[str] = None, gemini_api_key: Optional[str] = None ) -> str: """Compare runs (sync wrapper)""" return self._run_async( self.client.compare_runs(run_data_list, focus_metrics, hf_token, gemini_api_key) ) def analyze_results( self, results_data: List[Dict[str, Any]], analysis_focus: str = "optimization", hf_token: Optional[str] = None, gemini_api_key: Optional[str] = None ) -> str: """Analyze results (sync wrapper)""" return self._run_async( self.client.analyze_results(results_data, analysis_focus, hf_token, gemini_api_key) ) def get_dataset_info( self, dataset_repo: str, hf_token: Optional[str] = None, gemini_api_key: Optional[str] = None ) -> str: """Get dataset info (sync wrapper)""" return self._run_async( self.client.get_dataset_info(dataset_repo, hf_token, gemini_api_key) ) def close(self): """Close the MCP client (sync wrapper)""" return self._run_async(self.client.close()) # Global instance _sync_mcp_client: Optional[SyncMCPClient] = None def get_sync_mcp_client() -> SyncMCPClient: """Get or create the global synchronous MCP client""" global _sync_mcp_client if _sync_mcp_client is None: _sync_mcp_client = SyncMCPClient() return _sync_mcp_client