import os from typing import List from langchain_chroma import Chroma from langchain_core.documents.base import Document from langchain_core.tools import tool from langchain_core.tools.base import ArgsSchema from langchain_huggingface import HuggingFaceEmbeddings from pydantic import SecretStr from sqlalchemy.sql.selectable import ForUpdateParameter # Initialize RAG vector store for strategy retrieval CHROMA_PATH = "./chroma_gaia_db" _embeddings = None _vector_store = None def _get_vector_store(): """Lazy load vector store.""" global _embeddings, _vector_store if _vector_store is None: _embeddings = HuggingFaceEmbeddings( model_name="sentence-transformers/all-mpnet-base-v2" ) _vector_store = Chroma( persist_directory=CHROMA_PATH, embedding_function=_embeddings ) return _vector_store @tool def get_solving_strategy(question: str) -> str: """Search for similar solved questions and get the solving strategy. Use this FIRST to understand how to approach a problem before using other tools. Args: question: The question you need to solve.""" print(f"\n[GET_SOLVING_STRATEGY] Searching for: {question[:80]}...") try: vector_store = _get_vector_store() similar_docs = vector_store.similarity_search(question, k=1) print(f"[GET_SOLVING_STRATEGY] Found {len(similar_docs)} similar questions") if similar_docs: doc = similar_docs[0] steps = ( doc.page_content.split("Steps to solve:")[-1] .split("Tools needed:")[0] .strip() ) tools_raw = doc.metadata.get("tools", "") # Clean up tools format - replace inline numbers with newlines tools = tools_raw.replace("\n", "\n- ").strip() if tools and not tools.startswith("-"): tools = "- " + tools set_current_strategy(steps) return f"""Similar question found! ## Strategy to solve (按此策略执行): {steps} ## Rules (必须严格遵守): 1. Use EXACT wording from sources. Do not paraphrase or shorten. 2. For lists: sort items alphabetically, separate with comma and space. 3. Use tools to find information. Do not guess. 4. When you find the answer, call `submit_answer` immediately. 不要继续搜索。 """ else: return "No similar questions found. Use your best judgment." except Exception as e: return f"Error searching for strategy: {e}" def _get_llm(): """Get LLM for post-processing.""" from langchain_openai import ChatOpenAI if os.getenv("ZAI_API_KEY"): api_base = "https://api.z.ai/api/paas/v4/" if os.getenv("ZAI_USE_CODING_PLAN", "f") == "t": api_base = "https://api.z.ai/api/coding/paas/v4/" return ChatOpenAI( model="GLM-4.5-Air", temperature=0, base_url=api_base, api_key=SecretStr(os.getenv("ZAI_API_KEY", "")), ) else: return ChatOpenAI(model="gpt-4o-mini", temperature=0) def _fetch_url_with_tables(url: str) -> str: """Fetch URL content including tables using Jina reader.""" import requests try: # Use Jina to get full page content including tables api_key = os.getenv("JINA_API_KEY", "") headers = {"Authorization": f"Bearer {api_key}"} if api_key else {} response = requests.get(f"https://r.jina.ai/{url}", headers=headers, timeout=30) return response.text except Exception: return "" @tool def wiki_search(query: str) -> str: """Search Wikipedia for a query and return relevant content including tables. Args: query: The search query.""" import wikipedia try: # Search for pages search_results = wikipedia.search(query, results=3) if not search_results: return "No Wikipedia results found." formatted_parts = [] for title in search_results[:2]: try: page = wikipedia.page(title, auto_suggest=False) url = page.url # Fetch the page via Jina to get full content including tables content = _fetch_url_with_tables(url) if not content: # Fallback to wikipedia API content content = page.content # Use smart section extraction extracted = _extract_relevant_content(content, query) formatted_parts.append( f'\n{extracted}\n' ) except (wikipedia.DisambiguationError, wikipedia.PageError): continue except Exception: continue return ( "\n\n---\n\n".join(formatted_parts) if formatted_parts else "No results found." ) except Exception as e: return f"Wikipedia search error: {e}" _zai_mcp_tools = None async def _get_zai_mcp_tools(): """Lazy load Z.AI MCP tools.""" global _zai_mcp_tools if _zai_mcp_tools is None: from langchain_mcp_adapters.client import MultiServerMCPClient api_key = os.getenv("ZAI_API_KEY", "") client = MultiServerMCPClient( { "web-search": { "transport": "streamable_http", "url": "https://api.z.ai/api/mcp/web_search_prime/mcp", "headers": {"Authorization": f"Bearer {api_key}"}, }, "web-reader": { "transport": "streamable_http", "url": "https://api.z.ai/api/mcp/web_reader/mcp", "headers": {"Authorization": f"Bearer {api_key}"}, }, "zai-mcp": { "transport": "stdio", "command": "npx", "args": ["-y", "@z_ai/mcp-server"], "env": { "Z_AI_API_KEY": api_key, "Z_AI_MODE": "ZAI", }, }, } ) _zai_mcp_tools = await client.get_tools() return _zai_mcp_tools @tool def jina_search(query: str) -> str: """Search the web using Jina AI and return clean results. Args: query: The search query.""" import requests api_key = os.getenv("JINA_API_KEY", "") headers = {"Authorization": f"Bearer {api_key}"} if api_key else {} response = requests.get(f"https://s.jina.ai/{query}", headers=headers, timeout=30) return response.text def _extract_section_by_marker( content: str, section_marker: str, context_lines: int = 50 ) -> str: """Extract a section starting from a marker found in strategy steps. This is the SMART extraction - uses strategy steps like "scrolled down to Studio albums" to find the exact section we need. """ import re lines = content.split("\n") marker_lower = section_marker.lower().strip() print(f"[EXTRACT_SECTION] Looking for section marker: '{section_marker}'") # Find the line containing the section marker start_idx = None for i, line in enumerate(lines): if marker_lower in line.lower(): start_idx = i print(f"[EXTRACT_SECTION] Found marker at line {i}: {line[:80]}") break if start_idx is None: # Try partial matching (e.g., "Studio albums" might be "Studio Albums" or "Discography") for i, line in enumerate(lines): # Check if most words from marker are in line marker_words = [ w for w in re.findall(r"\b\w+\b", marker_lower) if len(w) > 2 ] line_lower = line.lower() matches = sum(1 for w in marker_words if w in line_lower) if matches >= len(marker_words) * 0.6: # 60% match threshold start_idx = i print(f"[EXTRACT_SECTION] Found partial match at line {i}: {line[:80]}") break if start_idx is None: print(f"[EXTRACT_SECTION] Section marker not found") return "" # Extract from marker line + context_lines after it end_idx = min(start_idx + context_lines, len(lines)) section = "\n".join(lines[start_idx:end_idx]) print(f"[EXTRACT_SECTION] Extracted {end_idx - start_idx} lines from section") return section def _parse_section_markers_from_strategy(strategy: str) -> list: """Parse strategy steps to extract section markers. Looks for phrases like: - "scrolled down to Studio albums" -> "Studio albums" - "found the Discography section" -> "Discography" - "went to Studio albums" -> "Studio albums" """ import re markers = [] # Patterns that indicate a section name patterns = [ r'scrolled?\s+(?:down\s+)?to\s+["\']?([^"\',.]+)["\']?', # scrolled down to X r'went\s+to\s+(?:the\s+)?["\']?([^"\',.]+)["\']?\s+section', # went to X section r'found\s+(?:the\s+)?["\']?([^"\',.]+)["\']?\s+section', # found X section r'clicked\s+on\s+["\']?([^"\',.]+)["\']?', # clicked on X r'looked\s+(?:at|under)\s+["\']?([^"\',.]+)["\']?', # looked at/under X r'(?:in|under)\s+(?:the\s+)?["\']?([^"\',.]+)["\']?\s+section', # in/under X section ] for pattern in patterns: matches = re.findall(pattern, strategy.lower()) for match in matches: cleaned = match.strip() if cleaned and len(cleaned) > 2 and len(cleaned) < 50: markers.append(cleaned) # Also look for quoted section names quoted = re.findall(r'"([^"]+)"', strategy) for q in quoted: if len(q) > 2 and len(q) < 50 and q.lower() not in ["wikipedia", "google"]: markers.append(q) print(f"[PARSE_STRATEGY] Extracted section markers: {markers}") return markers # Global variable to store current strategy for smart extraction _current_strategy = None def set_current_strategy(strategy: str): """Store the current strategy for use by content extraction.""" global _current_strategy _current_strategy = strategy print(f"[STRATEGY] Updated current strategy") @tool def jina_read(url: str, question: str = "") -> str: """Read a webpage and extract content relevant to the question. Args: url: The URL to read. question: The question to extract relevant info for.""" import requests api_key = os.getenv("JINA_API_KEY", "") headers = {"Authorization": f"Bearer {api_key}"} if api_key else {} response = requests.get(f"https://r.jina.ai/{url}", headers=headers, timeout=30) content = response.text # Use smart extraction with strategy section markers if question: return content[:10000] @tool def web_search(query: str) -> str: """Search the web and return summarized results with URLs.""" if os.getenv("TAVILY_API_KEY"): from langchain_tavily import TavilySearch web_search_tool = TavilySearch( max_results=5, include_answer=False, ) else: from langchain_community.tools import DuckDuckGoSearchResults web_search_tool = DuckDuckGoSearchResults() search_docs = web_search_tool.invoke(query) if isinstance(search_docs, str): return search_docs elif isinstance(search_docs, dict) and "results" in search_docs: results = search_docs["results"] elif isinstance(search_docs, list): results = search_docs else: return str(search_docs) formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.get("content", "")}\n' for doc in results ] ) return formatted_search_docs @tool def arxiv_search(query: str) -> str: """Search arXiv for a query and return maximum 2 results. Args: query: The search query.""" from langchain_community.document_loaders import ArxivLoader search_docs = ArxivLoader(query=query, load_max_docs=2).load() parts = [] for doc in search_docs: source = doc.metadata.get("source", "") parts.append(f"Source: {source}\n{doc.page_content}") return "\n\n---\n\n".join(parts) @tool def analyze_text(text: str, question: str) -> str: """Analyze text and extract the answer to a specific question. Use after fetching a webpage or PDF.""" llm = _get_llm() response = llm.invoke( f"Given this text:\n\n{text[:8000]}\n\n" f"Answer this question: {question}\n\n" f"Be specific and list any relevant data points (numbers, dates, names). " f"If counting items, list each one explicitly before giving the count." ) return response.content @tool def read_excel(file_path: str) -> str: """Read and extract data from an Excel file (.xlsx, .xls). Args: file_path: Path to the Excel file.""" import pandas as pd try: # Read all sheets xlsx = pd.ExcelFile(file_path) results = [] for sheet_name in xlsx.sheet_names: df = pd.read_excel(xlsx, sheet_name=sheet_name) results.append(f"=== Sheet: {sheet_name} ===\n{df.to_string()}") return "\n\n".join(results)[:15000] except Exception as e: return f"Error reading Excel: {e}" @tool def read_csv(file_path: str) -> str: """Read and extract data from a CSV file. Args: file_path: Path to the CSV file.""" import pandas as pd try: df = pd.read_csv(file_path) return df.to_string()[:15000] except Exception as e: return f"Error reading CSV: {e}" @tool def read_docx(file_path: str) -> str: """Read and extract text from a Word document (.docx). Args: file_path: Path to the Word document.""" try: from docx import Document doc = Document(file_path) text = "\n".join([para.text for para in doc.paragraphs]) return text[:15000] except Exception as e: return f"Error reading Word doc: {e}" @tool def read_pptx(file_path: str) -> str: """Read and extract text from a PowerPoint presentation (.pptx). Args: file_path: Path to the PowerPoint file.""" try: from pptx import Presentation prs = Presentation(file_path) text_parts = [] for slide_num, slide in enumerate(prs.slides, 1): slide_text = [f"=== Slide {slide_num} ==="] for shape in slide.shapes: if hasattr(shape, "text"): slide_text.append(shape.text) text_parts.append("\n".join(slide_text)) return "\n\n".join(text_parts)[:15000] except Exception as e: return f"Error reading PowerPoint: {e}" @tool def extract_zip(file_path: str) -> str: """Extract a zip file and list its contents. Args: file_path: Path to the zip file.""" import zipfile from pathlib import Path try: extract_dir = Path(file_path).parent / Path(file_path).stem extract_dir.mkdir(exist_ok=True) with zipfile.ZipFile(file_path, "r") as zip_ref: zip_ref.extractall(extract_dir) file_list = zip_ref.namelist() return f"Extracted to: {extract_dir}\nContents:\n" + "\n".join(file_list) except Exception as e: return f"Error extracting zip: {e}" @tool def analyze_image(file_path: str, question: str) -> str: """Analyze an image and answer a question about it using vision model. Args: file_path: Path to the image file (png, jpg, etc.) question: Question to answer about the image.""" import base64 from langchain_openai import ChatOpenAI try: with open(file_path, "rb") as f: image_data = base64.b64encode(f.read()).decode("utf-8") # Determine mime type ext = file_path.lower().split(".")[-1] mime_type = {"png": "image/png", "jpg": "image/jpeg", "jpeg": "image/jpeg"}.get( ext, "image/png" ) # Use GPT-4o for vision llm = ChatOpenAI(model="gpt-4o", temperature=0) response = llm.invoke( [ { "role": "user", "content": [ {"type": "text", "text": question}, { "type": "image_url", "image_url": { "url": f"data:{mime_type};base64,{image_data}" }, }, ], } ] ) return response.content except Exception as e: return f"Error analyzing image: {e}" @tool def submit_answer(answer: str) -> str: """Submit your final answer. Use this when you have found the answer. Args: answer: The final answer to submit.""" print(f"[SUBMIT_ANSWER] {answer}") return f"FINAL ANSWER: {answer}" async def get_tools() -> list: """Retrieve the list of available tools for the agent.""" base_tools = [ get_solving_strategy, # Use FIRST to get approach submit_answer, # wiki_search, download_file, read_pdf, read_excel, read_csv, read_docx, read_pptx, extract_zip, analyze_image, py_calc_tool, youtube_transcript_tool, transcribe_audio, arxiv_search, ] # Add Z.AI MCP tools (webSearchPrime, webReader) zai_tools = await _get_zai_mcp_tools() return base_tools + zai_tools @tool def py_calc_tool(expression: str) -> str: """Evaluate a Python expression safely.""" try: allowed_builtins = {"__builtins__": {}} result = eval(expression, allowed_builtins, {}) return str(result) except Exception as e: return f"Error evaluating expression: {e}" @tool def download_file(url: str) -> str: """Download a file (PDF, etc.) from URL and save locally. Returns the local file path.""" import hashlib from pathlib import Path import requests try: # Create downloads directory downloads_dir = Path("downloads") downloads_dir.mkdir(exist_ok=True) # Generate filename from URL hash + extension ext = Path(url).suffix or ".bin" filename = hashlib.md5(url.encode()).hexdigest()[:12] + ext filepath = downloads_dir / filename # Download if not already cached if not filepath.exists(): response = requests.get(url, timeout=60) response.raise_for_status() filepath.write_bytes(response.content) return f"Downloaded to: {filepath}" except Exception as e: return f"Error downloading: {e}" @tool def read_pdf(file_path: str) -> str: """Read and extract text from a local PDF file.""" try: from pypdf import PdfReader reader = PdfReader(file_path) text = "\n".join(page.extract_text() or "" for page in reader.pages) return text[:15000] # Limit to 15k chars except Exception as e: return f"Error reading PDF: {e}" @tool def fetch_webpage(url: str) -> str: """Fetch and read content from a webpage URL. For PDFs, use download_file then read_pdf instead.""" import requests # Reject PDF URLs if url.lower().endswith(".pdf"): return "Error: This is a PDF file. Use download_file(url) first, then read_pdf(filepath) to read it." try: headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" } response = requests.get(url, timeout=30, headers=headers) response.raise_for_status() if "application/pdf" in response.headers.get("content-type", ""): return "Error: This is a PDF file. Use download_file(url) first, then read_pdf(filepath) to read it." import html2text h = html2text.HTML2Text() h.ignore_links = False h.ignore_images = True h.ignore_emphasis = False h.body_width = 0 # No wrapping markdown = h.handle(response.text) return markdown[:10000] if markdown else "No content found" except Exception as e: return f"Error fetching URL: {e}" @tool def transcribe_audio(file_path: str) -> str: """Transcribe an audio file to text using OpenAI Whisper. Args: file_path: Path to the audio file (mp3, wav).""" from openai import OpenAI client = OpenAI() with open(file_path, "rb") as audio_file: transcription = client.audio.transcriptions.create( model="whisper-1", file=audio_file, ) print(f"[TRANSCRIPTION]: {transcription.text}") return transcription.text @tool def youtube_transcript_tool(video_url: str) -> List[Document]: """Fetch the transcript of a YouTube video given its URL.""" from langchain_community.document_loaders import YoutubeLoader loader = YoutubeLoader.from_youtube_url(video_url, add_video_info=False) return loader.load()