fa_agents / tools.py
j14i's picture
Got 45%
e04e3db
import os
from typing import List
from langchain_chroma import Chroma
from langchain_core.documents.base import Document
from langchain_core.tools import tool
from langchain_core.tools.base import ArgsSchema
from langchain_huggingface import HuggingFaceEmbeddings
from pydantic import SecretStr
from sqlalchemy.sql.selectable import ForUpdateParameter
# Initialize RAG vector store for strategy retrieval
CHROMA_PATH = "./chroma_gaia_db"
_embeddings = None
_vector_store = None
def _get_vector_store():
"""Lazy load vector store."""
global _embeddings, _vector_store
if _vector_store is None:
_embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-mpnet-base-v2"
)
_vector_store = Chroma(
persist_directory=CHROMA_PATH, embedding_function=_embeddings
)
return _vector_store
@tool
def get_solving_strategy(question: str) -> str:
"""Search for similar solved questions and get the solving strategy.
Use this FIRST to understand how to approach a problem before using other tools.
Args:
question: The question you need to solve."""
print(f"\n[GET_SOLVING_STRATEGY] Searching for: {question[:80]}...")
try:
vector_store = _get_vector_store()
similar_docs = vector_store.similarity_search(question, k=1)
print(f"[GET_SOLVING_STRATEGY] Found {len(similar_docs)} similar questions")
if similar_docs:
doc = similar_docs[0]
steps = (
doc.page_content.split("Steps to solve:")[-1]
.split("Tools needed:")[0]
.strip()
)
tools_raw = doc.metadata.get("tools", "")
# Clean up tools format - replace inline numbers with newlines
tools = tools_raw.replace("\n", "\n- ").strip()
if tools and not tools.startswith("-"):
tools = "- " + tools
set_current_strategy(steps)
return f"""Similar question found!
## Strategy to solve (按此策略执行):
{steps}
## Rules (必须严格遵守):
1. Use EXACT wording from sources. Do not paraphrase or shorten.
2. For lists: sort items alphabetically, separate with comma and space.
3. Use tools to find information. Do not guess.
4. When you find the answer, call `submit_answer` immediately. 不要继续搜索。
"""
else:
return "No similar questions found. Use your best judgment."
except Exception as e:
return f"Error searching for strategy: {e}"
def _get_llm():
"""Get LLM for post-processing."""
from langchain_openai import ChatOpenAI
if os.getenv("ZAI_API_KEY"):
api_base = "https://api.z.ai/api/paas/v4/"
if os.getenv("ZAI_USE_CODING_PLAN", "f") == "t":
api_base = "https://api.z.ai/api/coding/paas/v4/"
return ChatOpenAI(
model="GLM-4.5-Air",
temperature=0,
base_url=api_base,
api_key=SecretStr(os.getenv("ZAI_API_KEY", "")),
)
else:
return ChatOpenAI(model="gpt-4o-mini", temperature=0)
def _fetch_url_with_tables(url: str) -> str:
"""Fetch URL content including tables using Jina reader."""
import requests
try:
# Use Jina to get full page content including tables
api_key = os.getenv("JINA_API_KEY", "")
headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
response = requests.get(f"https://r.jina.ai/{url}", headers=headers, timeout=30)
return response.text
except Exception:
return ""
@tool
def wiki_search(query: str) -> str:
"""Search Wikipedia for a query and return relevant content including tables.
Args:
query: The search query."""
import wikipedia
try:
# Search for pages
search_results = wikipedia.search(query, results=3)
if not search_results:
return "No Wikipedia results found."
formatted_parts = []
for title in search_results[:2]:
try:
page = wikipedia.page(title, auto_suggest=False)
url = page.url
# Fetch the page via Jina to get full content including tables
content = _fetch_url_with_tables(url)
if not content:
# Fallback to wikipedia API content
content = page.content
# Use smart section extraction
extracted = _extract_relevant_content(content, query)
formatted_parts.append(
f'<Document source="{url}" title="{title}">\n{extracted}\n</Document>'
)
except (wikipedia.DisambiguationError, wikipedia.PageError):
continue
except Exception:
continue
return (
"\n\n---\n\n".join(formatted_parts)
if formatted_parts
else "No results found."
)
except Exception as e:
return f"Wikipedia search error: {e}"
_zai_mcp_tools = None
async def _get_zai_mcp_tools():
"""Lazy load Z.AI MCP tools."""
global _zai_mcp_tools
if _zai_mcp_tools is None:
from langchain_mcp_adapters.client import MultiServerMCPClient
api_key = os.getenv("ZAI_API_KEY", "")
client = MultiServerMCPClient(
{
"web-search": {
"transport": "streamable_http",
"url": "https://api.z.ai/api/mcp/web_search_prime/mcp",
"headers": {"Authorization": f"Bearer {api_key}"},
},
"web-reader": {
"transport": "streamable_http",
"url": "https://api.z.ai/api/mcp/web_reader/mcp",
"headers": {"Authorization": f"Bearer {api_key}"},
},
"zai-mcp": {
"transport": "stdio",
"command": "npx",
"args": ["-y", "@z_ai/mcp-server"],
"env": {
"Z_AI_API_KEY": api_key,
"Z_AI_MODE": "ZAI",
},
},
}
)
_zai_mcp_tools = await client.get_tools()
return _zai_mcp_tools
@tool
def jina_search(query: str) -> str:
"""Search the web using Jina AI and return clean results.
Args:
query: The search query."""
import requests
api_key = os.getenv("JINA_API_KEY", "")
headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
response = requests.get(f"https://s.jina.ai/{query}", headers=headers, timeout=30)
return response.text
def _extract_section_by_marker(
content: str, section_marker: str, context_lines: int = 50
) -> str:
"""Extract a section starting from a marker found in strategy steps.
This is the SMART extraction - uses strategy steps like "scrolled down to Studio albums"
to find the exact section we need.
"""
import re
lines = content.split("\n")
marker_lower = section_marker.lower().strip()
print(f"[EXTRACT_SECTION] Looking for section marker: '{section_marker}'")
# Find the line containing the section marker
start_idx = None
for i, line in enumerate(lines):
if marker_lower in line.lower():
start_idx = i
print(f"[EXTRACT_SECTION] Found marker at line {i}: {line[:80]}")
break
if start_idx is None:
# Try partial matching (e.g., "Studio albums" might be "Studio Albums" or "Discography")
for i, line in enumerate(lines):
# Check if most words from marker are in line
marker_words = [
w for w in re.findall(r"\b\w+\b", marker_lower) if len(w) > 2
]
line_lower = line.lower()
matches = sum(1 for w in marker_words if w in line_lower)
if matches >= len(marker_words) * 0.6: # 60% match threshold
start_idx = i
print(f"[EXTRACT_SECTION] Found partial match at line {i}: {line[:80]}")
break
if start_idx is None:
print(f"[EXTRACT_SECTION] Section marker not found")
return ""
# Extract from marker line + context_lines after it
end_idx = min(start_idx + context_lines, len(lines))
section = "\n".join(lines[start_idx:end_idx])
print(f"[EXTRACT_SECTION] Extracted {end_idx - start_idx} lines from section")
return section
def _parse_section_markers_from_strategy(strategy: str) -> list:
"""Parse strategy steps to extract section markers.
Looks for phrases like:
- "scrolled down to Studio albums" -> "Studio albums"
- "found the Discography section" -> "Discography"
- "went to Studio albums" -> "Studio albums"
"""
import re
markers = []
# Patterns that indicate a section name
patterns = [
r'scrolled?\s+(?:down\s+)?to\s+["\']?([^"\',.]+)["\']?', # scrolled down to X
r'went\s+to\s+(?:the\s+)?["\']?([^"\',.]+)["\']?\s+section', # went to X section
r'found\s+(?:the\s+)?["\']?([^"\',.]+)["\']?\s+section', # found X section
r'clicked\s+on\s+["\']?([^"\',.]+)["\']?', # clicked on X
r'looked\s+(?:at|under)\s+["\']?([^"\',.]+)["\']?', # looked at/under X
r'(?:in|under)\s+(?:the\s+)?["\']?([^"\',.]+)["\']?\s+section', # in/under X section
]
for pattern in patterns:
matches = re.findall(pattern, strategy.lower())
for match in matches:
cleaned = match.strip()
if cleaned and len(cleaned) > 2 and len(cleaned) < 50:
markers.append(cleaned)
# Also look for quoted section names
quoted = re.findall(r'"([^"]+)"', strategy)
for q in quoted:
if len(q) > 2 and len(q) < 50 and q.lower() not in ["wikipedia", "google"]:
markers.append(q)
print(f"[PARSE_STRATEGY] Extracted section markers: {markers}")
return markers
# Global variable to store current strategy for smart extraction
_current_strategy = None
def set_current_strategy(strategy: str):
"""Store the current strategy for use by content extraction."""
global _current_strategy
_current_strategy = strategy
print(f"[STRATEGY] Updated current strategy")
@tool
def jina_read(url: str, question: str = "") -> str:
"""Read a webpage and extract content relevant to the question.
Args:
url: The URL to read.
question: The question to extract relevant info for."""
import requests
api_key = os.getenv("JINA_API_KEY", "")
headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
response = requests.get(f"https://r.jina.ai/{url}", headers=headers, timeout=30)
content = response.text
# Use smart extraction with strategy section markers
if question:
return content[:10000]
@tool
def web_search(query: str) -> str:
"""Search the web and return summarized results with URLs."""
if os.getenv("TAVILY_API_KEY"):
from langchain_tavily import TavilySearch
web_search_tool = TavilySearch(
max_results=5,
include_answer=False,
)
else:
from langchain_community.tools import DuckDuckGoSearchResults
web_search_tool = DuckDuckGoSearchResults()
search_docs = web_search_tool.invoke(query)
if isinstance(search_docs, str):
return search_docs
elif isinstance(search_docs, dict) and "results" in search_docs:
results = search_docs["results"]
elif isinstance(search_docs, list):
results = search_docs
else:
return str(search_docs)
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.get("url", "")}"/>\n{doc.get("content", "")}\n</Document>'
for doc in results
]
)
return formatted_search_docs
@tool
def arxiv_search(query: str) -> str:
"""Search arXiv for a query and return maximum 2 results.
Args:
query: The search query."""
from langchain_community.document_loaders import ArxivLoader
search_docs = ArxivLoader(query=query, load_max_docs=2).load()
parts = []
for doc in search_docs:
source = doc.metadata.get("source", "")
parts.append(f"Source: {source}\n{doc.page_content}")
return "\n\n---\n\n".join(parts)
@tool
def analyze_text(text: str, question: str) -> str:
"""Analyze text and extract the answer to a specific question. Use after fetching a webpage or PDF."""
llm = _get_llm()
response = llm.invoke(
f"Given this text:\n\n{text[:8000]}\n\n"
f"Answer this question: {question}\n\n"
f"Be specific and list any relevant data points (numbers, dates, names). "
f"If counting items, list each one explicitly before giving the count."
)
return response.content
@tool
def read_excel(file_path: str) -> str:
"""Read and extract data from an Excel file (.xlsx, .xls).
Args:
file_path: Path to the Excel file."""
import pandas as pd
try:
# Read all sheets
xlsx = pd.ExcelFile(file_path)
results = []
for sheet_name in xlsx.sheet_names:
df = pd.read_excel(xlsx, sheet_name=sheet_name)
results.append(f"=== Sheet: {sheet_name} ===\n{df.to_string()}")
return "\n\n".join(results)[:15000]
except Exception as e:
return f"Error reading Excel: {e}"
@tool
def read_csv(file_path: str) -> str:
"""Read and extract data from a CSV file.
Args:
file_path: Path to the CSV file."""
import pandas as pd
try:
df = pd.read_csv(file_path)
return df.to_string()[:15000]
except Exception as e:
return f"Error reading CSV: {e}"
@tool
def read_docx(file_path: str) -> str:
"""Read and extract text from a Word document (.docx).
Args:
file_path: Path to the Word document."""
try:
from docx import Document
doc = Document(file_path)
text = "\n".join([para.text for para in doc.paragraphs])
return text[:15000]
except Exception as e:
return f"Error reading Word doc: {e}"
@tool
def read_pptx(file_path: str) -> str:
"""Read and extract text from a PowerPoint presentation (.pptx).
Args:
file_path: Path to the PowerPoint file."""
try:
from pptx import Presentation
prs = Presentation(file_path)
text_parts = []
for slide_num, slide in enumerate(prs.slides, 1):
slide_text = [f"=== Slide {slide_num} ==="]
for shape in slide.shapes:
if hasattr(shape, "text"):
slide_text.append(shape.text)
text_parts.append("\n".join(slide_text))
return "\n\n".join(text_parts)[:15000]
except Exception as e:
return f"Error reading PowerPoint: {e}"
@tool
def extract_zip(file_path: str) -> str:
"""Extract a zip file and list its contents.
Args:
file_path: Path to the zip file."""
import zipfile
from pathlib import Path
try:
extract_dir = Path(file_path).parent / Path(file_path).stem
extract_dir.mkdir(exist_ok=True)
with zipfile.ZipFile(file_path, "r") as zip_ref:
zip_ref.extractall(extract_dir)
file_list = zip_ref.namelist()
return f"Extracted to: {extract_dir}\nContents:\n" + "\n".join(file_list)
except Exception as e:
return f"Error extracting zip: {e}"
@tool
def analyze_image(file_path: str, question: str) -> str:
"""Analyze an image and answer a question about it using vision model.
Args:
file_path: Path to the image file (png, jpg, etc.)
question: Question to answer about the image."""
import base64
from langchain_openai import ChatOpenAI
try:
with open(file_path, "rb") as f:
image_data = base64.b64encode(f.read()).decode("utf-8")
# Determine mime type
ext = file_path.lower().split(".")[-1]
mime_type = {"png": "image/png", "jpg": "image/jpeg", "jpeg": "image/jpeg"}.get(
ext, "image/png"
)
# Use GPT-4o for vision
llm = ChatOpenAI(model="gpt-4o", temperature=0)
response = llm.invoke(
[
{
"role": "user",
"content": [
{"type": "text", "text": question},
{
"type": "image_url",
"image_url": {
"url": f"data:{mime_type};base64,{image_data}"
},
},
],
}
]
)
return response.content
except Exception as e:
return f"Error analyzing image: {e}"
@tool
def submit_answer(answer: str) -> str:
"""Submit your final answer. Use this when you have found the answer.
Args:
answer: The final answer to submit."""
print(f"[SUBMIT_ANSWER] {answer}")
return f"FINAL ANSWER: {answer}"
async def get_tools() -> list:
"""Retrieve the list of available tools for the agent."""
base_tools = [
get_solving_strategy, # Use FIRST to get approach
submit_answer,
# wiki_search,
download_file,
read_pdf,
read_excel,
read_csv,
read_docx,
read_pptx,
extract_zip,
analyze_image,
py_calc_tool,
youtube_transcript_tool,
transcribe_audio,
arxiv_search,
]
# Add Z.AI MCP tools (webSearchPrime, webReader)
zai_tools = await _get_zai_mcp_tools()
return base_tools + zai_tools
@tool
def py_calc_tool(expression: str) -> str:
"""Evaluate a Python expression safely."""
try:
allowed_builtins = {"__builtins__": {}}
result = eval(expression, allowed_builtins, {})
return str(result)
except Exception as e:
return f"Error evaluating expression: {e}"
@tool
def download_file(url: str) -> str:
"""Download a file (PDF, etc.) from URL and save locally. Returns the local file path."""
import hashlib
from pathlib import Path
import requests
try:
# Create downloads directory
downloads_dir = Path("downloads")
downloads_dir.mkdir(exist_ok=True)
# Generate filename from URL hash + extension
ext = Path(url).suffix or ".bin"
filename = hashlib.md5(url.encode()).hexdigest()[:12] + ext
filepath = downloads_dir / filename
# Download if not already cached
if not filepath.exists():
response = requests.get(url, timeout=60)
response.raise_for_status()
filepath.write_bytes(response.content)
return f"Downloaded to: {filepath}"
except Exception as e:
return f"Error downloading: {e}"
@tool
def read_pdf(file_path: str) -> str:
"""Read and extract text from a local PDF file."""
try:
from pypdf import PdfReader
reader = PdfReader(file_path)
text = "\n".join(page.extract_text() or "" for page in reader.pages)
return text[:15000] # Limit to 15k chars
except Exception as e:
return f"Error reading PDF: {e}"
@tool
def fetch_webpage(url: str) -> str:
"""Fetch and read content from a webpage URL. For PDFs, use download_file then read_pdf instead."""
import requests
# Reject PDF URLs
if url.lower().endswith(".pdf"):
return "Error: This is a PDF file. Use download_file(url) first, then read_pdf(filepath) to read it."
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
response = requests.get(url, timeout=30, headers=headers)
response.raise_for_status()
if "application/pdf" in response.headers.get("content-type", ""):
return "Error: This is a PDF file. Use download_file(url) first, then read_pdf(filepath) to read it."
import html2text
h = html2text.HTML2Text()
h.ignore_links = False
h.ignore_images = True
h.ignore_emphasis = False
h.body_width = 0 # No wrapping
markdown = h.handle(response.text)
return markdown[:10000] if markdown else "No content found"
except Exception as e:
return f"Error fetching URL: {e}"
@tool
def transcribe_audio(file_path: str) -> str:
"""Transcribe an audio file to text using OpenAI Whisper.
Args:
file_path: Path to the audio file (mp3, wav)."""
from openai import OpenAI
client = OpenAI()
with open(file_path, "rb") as audio_file:
transcription = client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
)
print(f"[TRANSCRIPTION]: {transcription.text}")
return transcription.text
@tool
def youtube_transcript_tool(video_url: str) -> List[Document]:
"""Fetch the transcript of a YouTube video given its URL."""
from langchain_community.document_loaders import YoutubeLoader
loader = YoutubeLoader.from_youtube_url(video_url, add_video_info=False)
return loader.load()