|
|
import os |
|
|
from typing import List |
|
|
|
|
|
from langchain_chroma import Chroma |
|
|
from langchain_core.documents.base import Document |
|
|
from langchain_core.tools import tool |
|
|
from langchain_core.tools.base import ArgsSchema |
|
|
from langchain_huggingface import HuggingFaceEmbeddings |
|
|
from pydantic import SecretStr |
|
|
from sqlalchemy.sql.selectable import ForUpdateParameter |
|
|
|
|
|
|
|
|
CHROMA_PATH = "./chroma_gaia_db" |
|
|
_embeddings = None |
|
|
_vector_store = None |
|
|
|
|
|
|
|
|
def _get_vector_store(): |
|
|
"""Lazy load vector store.""" |
|
|
global _embeddings, _vector_store |
|
|
if _vector_store is None: |
|
|
_embeddings = HuggingFaceEmbeddings( |
|
|
model_name="sentence-transformers/all-mpnet-base-v2" |
|
|
) |
|
|
_vector_store = Chroma( |
|
|
persist_directory=CHROMA_PATH, embedding_function=_embeddings |
|
|
) |
|
|
return _vector_store |
|
|
|
|
|
|
|
|
@tool |
|
|
def get_solving_strategy(question: str) -> str: |
|
|
"""Search for similar solved questions and get the solving strategy. |
|
|
Use this FIRST to understand how to approach a problem before using other tools. |
|
|
|
|
|
Args: |
|
|
question: The question you need to solve.""" |
|
|
print(f"\n[GET_SOLVING_STRATEGY] Searching for: {question[:80]}...") |
|
|
try: |
|
|
vector_store = _get_vector_store() |
|
|
similar_docs = vector_store.similarity_search(question, k=1) |
|
|
print(f"[GET_SOLVING_STRATEGY] Found {len(similar_docs)} similar questions") |
|
|
|
|
|
if similar_docs: |
|
|
doc = similar_docs[0] |
|
|
steps = ( |
|
|
doc.page_content.split("Steps to solve:")[-1] |
|
|
.split("Tools needed:")[0] |
|
|
.strip() |
|
|
) |
|
|
tools_raw = doc.metadata.get("tools", "") |
|
|
|
|
|
tools = tools_raw.replace("\n", "\n- ").strip() |
|
|
if tools and not tools.startswith("-"): |
|
|
tools = "- " + tools |
|
|
|
|
|
set_current_strategy(steps) |
|
|
|
|
|
return f"""Similar question found! |
|
|
|
|
|
## Strategy to solve (按此策略执行): |
|
|
{steps} |
|
|
|
|
|
## Rules (必须严格遵守): |
|
|
1. Use EXACT wording from sources. Do not paraphrase or shorten. |
|
|
2. For lists: sort items alphabetically, separate with comma and space. |
|
|
3. Use tools to find information. Do not guess. |
|
|
4. When you find the answer, call `submit_answer` immediately. 不要继续搜索。 |
|
|
|
|
|
""" |
|
|
else: |
|
|
return "No similar questions found. Use your best judgment." |
|
|
except Exception as e: |
|
|
return f"Error searching for strategy: {e}" |
|
|
|
|
|
|
|
|
def _get_llm(): |
|
|
"""Get LLM for post-processing.""" |
|
|
from langchain_openai import ChatOpenAI |
|
|
|
|
|
if os.getenv("ZAI_API_KEY"): |
|
|
api_base = "https://api.z.ai/api/paas/v4/" |
|
|
if os.getenv("ZAI_USE_CODING_PLAN", "f") == "t": |
|
|
api_base = "https://api.z.ai/api/coding/paas/v4/" |
|
|
return ChatOpenAI( |
|
|
model="GLM-4.5-Air", |
|
|
temperature=0, |
|
|
base_url=api_base, |
|
|
api_key=SecretStr(os.getenv("ZAI_API_KEY", "")), |
|
|
) |
|
|
else: |
|
|
return ChatOpenAI(model="gpt-4o-mini", temperature=0) |
|
|
|
|
|
|
|
|
def _fetch_url_with_tables(url: str) -> str: |
|
|
"""Fetch URL content including tables using Jina reader.""" |
|
|
import requests |
|
|
|
|
|
try: |
|
|
|
|
|
api_key = os.getenv("JINA_API_KEY", "") |
|
|
headers = {"Authorization": f"Bearer {api_key}"} if api_key else {} |
|
|
|
|
|
response = requests.get(f"https://r.jina.ai/{url}", headers=headers, timeout=30) |
|
|
return response.text |
|
|
except Exception: |
|
|
return "" |
|
|
|
|
|
|
|
|
@tool |
|
|
def wiki_search(query: str) -> str: |
|
|
"""Search Wikipedia for a query and return relevant content including tables. |
|
|
|
|
|
Args: |
|
|
query: The search query.""" |
|
|
import wikipedia |
|
|
|
|
|
try: |
|
|
|
|
|
search_results = wikipedia.search(query, results=3) |
|
|
if not search_results: |
|
|
return "No Wikipedia results found." |
|
|
|
|
|
formatted_parts = [] |
|
|
for title in search_results[:2]: |
|
|
try: |
|
|
page = wikipedia.page(title, auto_suggest=False) |
|
|
url = page.url |
|
|
|
|
|
|
|
|
content = _fetch_url_with_tables(url) |
|
|
|
|
|
if not content: |
|
|
|
|
|
content = page.content |
|
|
|
|
|
|
|
|
extracted = _extract_relevant_content(content, query) |
|
|
formatted_parts.append( |
|
|
f'<Document source="{url}" title="{title}">\n{extracted}\n</Document>' |
|
|
) |
|
|
except (wikipedia.DisambiguationError, wikipedia.PageError): |
|
|
continue |
|
|
except Exception: |
|
|
continue |
|
|
|
|
|
return ( |
|
|
"\n\n---\n\n".join(formatted_parts) |
|
|
if formatted_parts |
|
|
else "No results found." |
|
|
) |
|
|
except Exception as e: |
|
|
return f"Wikipedia search error: {e}" |
|
|
|
|
|
|
|
|
_zai_mcp_tools = None |
|
|
|
|
|
|
|
|
async def _get_zai_mcp_tools(): |
|
|
"""Lazy load Z.AI MCP tools.""" |
|
|
global _zai_mcp_tools |
|
|
if _zai_mcp_tools is None: |
|
|
from langchain_mcp_adapters.client import MultiServerMCPClient |
|
|
|
|
|
api_key = os.getenv("ZAI_API_KEY", "") |
|
|
client = MultiServerMCPClient( |
|
|
{ |
|
|
"web-search": { |
|
|
"transport": "streamable_http", |
|
|
"url": "https://api.z.ai/api/mcp/web_search_prime/mcp", |
|
|
"headers": {"Authorization": f"Bearer {api_key}"}, |
|
|
}, |
|
|
"web-reader": { |
|
|
"transport": "streamable_http", |
|
|
"url": "https://api.z.ai/api/mcp/web_reader/mcp", |
|
|
"headers": {"Authorization": f"Bearer {api_key}"}, |
|
|
}, |
|
|
"zai-mcp": { |
|
|
"transport": "stdio", |
|
|
"command": "npx", |
|
|
"args": ["-y", "@z_ai/mcp-server"], |
|
|
"env": { |
|
|
"Z_AI_API_KEY": api_key, |
|
|
"Z_AI_MODE": "ZAI", |
|
|
}, |
|
|
}, |
|
|
} |
|
|
) |
|
|
_zai_mcp_tools = await client.get_tools() |
|
|
return _zai_mcp_tools |
|
|
|
|
|
|
|
|
@tool |
|
|
def jina_search(query: str) -> str: |
|
|
"""Search the web using Jina AI and return clean results. |
|
|
|
|
|
Args: |
|
|
query: The search query.""" |
|
|
import requests |
|
|
|
|
|
api_key = os.getenv("JINA_API_KEY", "") |
|
|
headers = {"Authorization": f"Bearer {api_key}"} if api_key else {} |
|
|
|
|
|
response = requests.get(f"https://s.jina.ai/{query}", headers=headers, timeout=30) |
|
|
return response.text |
|
|
|
|
|
|
|
|
def _extract_section_by_marker( |
|
|
content: str, section_marker: str, context_lines: int = 50 |
|
|
) -> str: |
|
|
"""Extract a section starting from a marker found in strategy steps. |
|
|
|
|
|
This is the SMART extraction - uses strategy steps like "scrolled down to Studio albums" |
|
|
to find the exact section we need. |
|
|
""" |
|
|
import re |
|
|
|
|
|
lines = content.split("\n") |
|
|
marker_lower = section_marker.lower().strip() |
|
|
|
|
|
print(f"[EXTRACT_SECTION] Looking for section marker: '{section_marker}'") |
|
|
|
|
|
|
|
|
start_idx = None |
|
|
for i, line in enumerate(lines): |
|
|
if marker_lower in line.lower(): |
|
|
start_idx = i |
|
|
print(f"[EXTRACT_SECTION] Found marker at line {i}: {line[:80]}") |
|
|
break |
|
|
|
|
|
if start_idx is None: |
|
|
|
|
|
for i, line in enumerate(lines): |
|
|
|
|
|
marker_words = [ |
|
|
w for w in re.findall(r"\b\w+\b", marker_lower) if len(w) > 2 |
|
|
] |
|
|
line_lower = line.lower() |
|
|
matches = sum(1 for w in marker_words if w in line_lower) |
|
|
if matches >= len(marker_words) * 0.6: |
|
|
start_idx = i |
|
|
print(f"[EXTRACT_SECTION] Found partial match at line {i}: {line[:80]}") |
|
|
break |
|
|
|
|
|
if start_idx is None: |
|
|
print(f"[EXTRACT_SECTION] Section marker not found") |
|
|
return "" |
|
|
|
|
|
|
|
|
end_idx = min(start_idx + context_lines, len(lines)) |
|
|
section = "\n".join(lines[start_idx:end_idx]) |
|
|
|
|
|
print(f"[EXTRACT_SECTION] Extracted {end_idx - start_idx} lines from section") |
|
|
return section |
|
|
|
|
|
|
|
|
def _parse_section_markers_from_strategy(strategy: str) -> list: |
|
|
"""Parse strategy steps to extract section markers. |
|
|
|
|
|
Looks for phrases like: |
|
|
- "scrolled down to Studio albums" -> "Studio albums" |
|
|
- "found the Discography section" -> "Discography" |
|
|
- "went to Studio albums" -> "Studio albums" |
|
|
""" |
|
|
import re |
|
|
|
|
|
markers = [] |
|
|
|
|
|
|
|
|
patterns = [ |
|
|
r'scrolled?\s+(?:down\s+)?to\s+["\']?([^"\',.]+)["\']?', |
|
|
r'went\s+to\s+(?:the\s+)?["\']?([^"\',.]+)["\']?\s+section', |
|
|
r'found\s+(?:the\s+)?["\']?([^"\',.]+)["\']?\s+section', |
|
|
r'clicked\s+on\s+["\']?([^"\',.]+)["\']?', |
|
|
r'looked\s+(?:at|under)\s+["\']?([^"\',.]+)["\']?', |
|
|
r'(?:in|under)\s+(?:the\s+)?["\']?([^"\',.]+)["\']?\s+section', |
|
|
] |
|
|
|
|
|
for pattern in patterns: |
|
|
matches = re.findall(pattern, strategy.lower()) |
|
|
for match in matches: |
|
|
cleaned = match.strip() |
|
|
if cleaned and len(cleaned) > 2 and len(cleaned) < 50: |
|
|
markers.append(cleaned) |
|
|
|
|
|
|
|
|
quoted = re.findall(r'"([^"]+)"', strategy) |
|
|
for q in quoted: |
|
|
if len(q) > 2 and len(q) < 50 and q.lower() not in ["wikipedia", "google"]: |
|
|
markers.append(q) |
|
|
|
|
|
print(f"[PARSE_STRATEGY] Extracted section markers: {markers}") |
|
|
return markers |
|
|
|
|
|
|
|
|
|
|
|
_current_strategy = None |
|
|
|
|
|
|
|
|
def set_current_strategy(strategy: str): |
|
|
"""Store the current strategy for use by content extraction.""" |
|
|
global _current_strategy |
|
|
_current_strategy = strategy |
|
|
print(f"[STRATEGY] Updated current strategy") |
|
|
|
|
|
|
|
|
@tool |
|
|
def jina_read(url: str, question: str = "") -> str: |
|
|
"""Read a webpage and extract content relevant to the question. |
|
|
|
|
|
Args: |
|
|
url: The URL to read. |
|
|
question: The question to extract relevant info for.""" |
|
|
import requests |
|
|
|
|
|
api_key = os.getenv("JINA_API_KEY", "") |
|
|
headers = {"Authorization": f"Bearer {api_key}"} if api_key else {} |
|
|
|
|
|
response = requests.get(f"https://r.jina.ai/{url}", headers=headers, timeout=30) |
|
|
content = response.text |
|
|
|
|
|
|
|
|
if question: |
|
|
return content[:10000] |
|
|
|
|
|
|
|
|
@tool |
|
|
def web_search(query: str) -> str: |
|
|
"""Search the web and return summarized results with URLs.""" |
|
|
if os.getenv("TAVILY_API_KEY"): |
|
|
from langchain_tavily import TavilySearch |
|
|
|
|
|
web_search_tool = TavilySearch( |
|
|
max_results=5, |
|
|
include_answer=False, |
|
|
) |
|
|
else: |
|
|
from langchain_community.tools import DuckDuckGoSearchResults |
|
|
|
|
|
web_search_tool = DuckDuckGoSearchResults() |
|
|
|
|
|
search_docs = web_search_tool.invoke(query) |
|
|
|
|
|
if isinstance(search_docs, str): |
|
|
return search_docs |
|
|
elif isinstance(search_docs, dict) and "results" in search_docs: |
|
|
results = search_docs["results"] |
|
|
elif isinstance(search_docs, list): |
|
|
results = search_docs |
|
|
else: |
|
|
return str(search_docs) |
|
|
|
|
|
formatted_search_docs = "\n\n---\n\n".join( |
|
|
[ |
|
|
f'<Document source="{doc.get("url", "")}"/>\n{doc.get("content", "")}\n</Document>' |
|
|
for doc in results |
|
|
] |
|
|
) |
|
|
return formatted_search_docs |
|
|
|
|
|
|
|
|
@tool |
|
|
def arxiv_search(query: str) -> str: |
|
|
"""Search arXiv for a query and return maximum 2 results. |
|
|
|
|
|
Args: |
|
|
query: The search query.""" |
|
|
from langchain_community.document_loaders import ArxivLoader |
|
|
|
|
|
search_docs = ArxivLoader(query=query, load_max_docs=2).load() |
|
|
parts = [] |
|
|
for doc in search_docs: |
|
|
source = doc.metadata.get("source", "") |
|
|
parts.append(f"Source: {source}\n{doc.page_content}") |
|
|
return "\n\n---\n\n".join(parts) |
|
|
|
|
|
|
|
|
@tool |
|
|
def analyze_text(text: str, question: str) -> str: |
|
|
"""Analyze text and extract the answer to a specific question. Use after fetching a webpage or PDF.""" |
|
|
llm = _get_llm() |
|
|
response = llm.invoke( |
|
|
f"Given this text:\n\n{text[:8000]}\n\n" |
|
|
f"Answer this question: {question}\n\n" |
|
|
f"Be specific and list any relevant data points (numbers, dates, names). " |
|
|
f"If counting items, list each one explicitly before giving the count." |
|
|
) |
|
|
return response.content |
|
|
|
|
|
|
|
|
@tool |
|
|
def read_excel(file_path: str) -> str: |
|
|
"""Read and extract data from an Excel file (.xlsx, .xls). |
|
|
|
|
|
Args: |
|
|
file_path: Path to the Excel file.""" |
|
|
import pandas as pd |
|
|
|
|
|
try: |
|
|
|
|
|
xlsx = pd.ExcelFile(file_path) |
|
|
results = [] |
|
|
for sheet_name in xlsx.sheet_names: |
|
|
df = pd.read_excel(xlsx, sheet_name=sheet_name) |
|
|
results.append(f"=== Sheet: {sheet_name} ===\n{df.to_string()}") |
|
|
return "\n\n".join(results)[:15000] |
|
|
except Exception as e: |
|
|
return f"Error reading Excel: {e}" |
|
|
|
|
|
|
|
|
@tool |
|
|
def read_csv(file_path: str) -> str: |
|
|
"""Read and extract data from a CSV file. |
|
|
|
|
|
Args: |
|
|
file_path: Path to the CSV file.""" |
|
|
import pandas as pd |
|
|
|
|
|
try: |
|
|
df = pd.read_csv(file_path) |
|
|
return df.to_string()[:15000] |
|
|
except Exception as e: |
|
|
return f"Error reading CSV: {e}" |
|
|
|
|
|
|
|
|
@tool |
|
|
def read_docx(file_path: str) -> str: |
|
|
"""Read and extract text from a Word document (.docx). |
|
|
|
|
|
Args: |
|
|
file_path: Path to the Word document.""" |
|
|
try: |
|
|
from docx import Document |
|
|
|
|
|
doc = Document(file_path) |
|
|
text = "\n".join([para.text for para in doc.paragraphs]) |
|
|
return text[:15000] |
|
|
except Exception as e: |
|
|
return f"Error reading Word doc: {e}" |
|
|
|
|
|
|
|
|
@tool |
|
|
def read_pptx(file_path: str) -> str: |
|
|
"""Read and extract text from a PowerPoint presentation (.pptx). |
|
|
|
|
|
Args: |
|
|
file_path: Path to the PowerPoint file.""" |
|
|
try: |
|
|
from pptx import Presentation |
|
|
|
|
|
prs = Presentation(file_path) |
|
|
text_parts = [] |
|
|
for slide_num, slide in enumerate(prs.slides, 1): |
|
|
slide_text = [f"=== Slide {slide_num} ==="] |
|
|
for shape in slide.shapes: |
|
|
if hasattr(shape, "text"): |
|
|
slide_text.append(shape.text) |
|
|
text_parts.append("\n".join(slide_text)) |
|
|
return "\n\n".join(text_parts)[:15000] |
|
|
except Exception as e: |
|
|
return f"Error reading PowerPoint: {e}" |
|
|
|
|
|
|
|
|
@tool |
|
|
def extract_zip(file_path: str) -> str: |
|
|
"""Extract a zip file and list its contents. |
|
|
|
|
|
Args: |
|
|
file_path: Path to the zip file.""" |
|
|
import zipfile |
|
|
from pathlib import Path |
|
|
|
|
|
try: |
|
|
extract_dir = Path(file_path).parent / Path(file_path).stem |
|
|
extract_dir.mkdir(exist_ok=True) |
|
|
|
|
|
with zipfile.ZipFile(file_path, "r") as zip_ref: |
|
|
zip_ref.extractall(extract_dir) |
|
|
file_list = zip_ref.namelist() |
|
|
|
|
|
return f"Extracted to: {extract_dir}\nContents:\n" + "\n".join(file_list) |
|
|
except Exception as e: |
|
|
return f"Error extracting zip: {e}" |
|
|
|
|
|
|
|
|
@tool |
|
|
def analyze_image(file_path: str, question: str) -> str: |
|
|
"""Analyze an image and answer a question about it using vision model. |
|
|
|
|
|
Args: |
|
|
file_path: Path to the image file (png, jpg, etc.) |
|
|
question: Question to answer about the image.""" |
|
|
import base64 |
|
|
|
|
|
from langchain_openai import ChatOpenAI |
|
|
|
|
|
try: |
|
|
with open(file_path, "rb") as f: |
|
|
image_data = base64.b64encode(f.read()).decode("utf-8") |
|
|
|
|
|
|
|
|
ext = file_path.lower().split(".")[-1] |
|
|
mime_type = {"png": "image/png", "jpg": "image/jpeg", "jpeg": "image/jpeg"}.get( |
|
|
ext, "image/png" |
|
|
) |
|
|
|
|
|
|
|
|
llm = ChatOpenAI(model="gpt-4o", temperature=0) |
|
|
response = llm.invoke( |
|
|
[ |
|
|
{ |
|
|
"role": "user", |
|
|
"content": [ |
|
|
{"type": "text", "text": question}, |
|
|
{ |
|
|
"type": "image_url", |
|
|
"image_url": { |
|
|
"url": f"data:{mime_type};base64,{image_data}" |
|
|
}, |
|
|
}, |
|
|
], |
|
|
} |
|
|
] |
|
|
) |
|
|
return response.content |
|
|
except Exception as e: |
|
|
return f"Error analyzing image: {e}" |
|
|
|
|
|
|
|
|
@tool |
|
|
def submit_answer(answer: str) -> str: |
|
|
"""Submit your final answer. Use this when you have found the answer. |
|
|
|
|
|
Args: |
|
|
answer: The final answer to submit.""" |
|
|
print(f"[SUBMIT_ANSWER] {answer}") |
|
|
return f"FINAL ANSWER: {answer}" |
|
|
|
|
|
|
|
|
async def get_tools() -> list: |
|
|
"""Retrieve the list of available tools for the agent.""" |
|
|
base_tools = [ |
|
|
get_solving_strategy, |
|
|
submit_answer, |
|
|
|
|
|
download_file, |
|
|
read_pdf, |
|
|
read_excel, |
|
|
read_csv, |
|
|
read_docx, |
|
|
read_pptx, |
|
|
extract_zip, |
|
|
analyze_image, |
|
|
py_calc_tool, |
|
|
youtube_transcript_tool, |
|
|
transcribe_audio, |
|
|
arxiv_search, |
|
|
] |
|
|
|
|
|
zai_tools = await _get_zai_mcp_tools() |
|
|
return base_tools + zai_tools |
|
|
|
|
|
|
|
|
@tool |
|
|
def py_calc_tool(expression: str) -> str: |
|
|
"""Evaluate a Python expression safely.""" |
|
|
try: |
|
|
allowed_builtins = {"__builtins__": {}} |
|
|
result = eval(expression, allowed_builtins, {}) |
|
|
return str(result) |
|
|
except Exception as e: |
|
|
return f"Error evaluating expression: {e}" |
|
|
|
|
|
|
|
|
@tool |
|
|
def download_file(url: str) -> str: |
|
|
"""Download a file (PDF, etc.) from URL and save locally. Returns the local file path.""" |
|
|
import hashlib |
|
|
from pathlib import Path |
|
|
|
|
|
import requests |
|
|
|
|
|
try: |
|
|
|
|
|
downloads_dir = Path("downloads") |
|
|
downloads_dir.mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
ext = Path(url).suffix or ".bin" |
|
|
filename = hashlib.md5(url.encode()).hexdigest()[:12] + ext |
|
|
filepath = downloads_dir / filename |
|
|
|
|
|
|
|
|
if not filepath.exists(): |
|
|
response = requests.get(url, timeout=60) |
|
|
response.raise_for_status() |
|
|
filepath.write_bytes(response.content) |
|
|
|
|
|
return f"Downloaded to: {filepath}" |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error downloading: {e}" |
|
|
|
|
|
|
|
|
@tool |
|
|
def read_pdf(file_path: str) -> str: |
|
|
"""Read and extract text from a local PDF file.""" |
|
|
try: |
|
|
from pypdf import PdfReader |
|
|
|
|
|
reader = PdfReader(file_path) |
|
|
text = "\n".join(page.extract_text() or "" for page in reader.pages) |
|
|
return text[:15000] |
|
|
except Exception as e: |
|
|
return f"Error reading PDF: {e}" |
|
|
|
|
|
|
|
|
@tool |
|
|
def fetch_webpage(url: str) -> str: |
|
|
"""Fetch and read content from a webpage URL. For PDFs, use download_file then read_pdf instead.""" |
|
|
import requests |
|
|
|
|
|
|
|
|
if url.lower().endswith(".pdf"): |
|
|
return "Error: This is a PDF file. Use download_file(url) first, then read_pdf(filepath) to read it." |
|
|
|
|
|
try: |
|
|
headers = { |
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" |
|
|
} |
|
|
response = requests.get(url, timeout=30, headers=headers) |
|
|
response.raise_for_status() |
|
|
|
|
|
if "application/pdf" in response.headers.get("content-type", ""): |
|
|
return "Error: This is a PDF file. Use download_file(url) first, then read_pdf(filepath) to read it." |
|
|
|
|
|
import html2text |
|
|
|
|
|
h = html2text.HTML2Text() |
|
|
h.ignore_links = False |
|
|
h.ignore_images = True |
|
|
h.ignore_emphasis = False |
|
|
h.body_width = 0 |
|
|
|
|
|
markdown = h.handle(response.text) |
|
|
|
|
|
return markdown[:10000] if markdown else "No content found" |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error fetching URL: {e}" |
|
|
|
|
|
|
|
|
@tool |
|
|
def transcribe_audio(file_path: str) -> str: |
|
|
"""Transcribe an audio file to text using OpenAI Whisper. |
|
|
|
|
|
Args: |
|
|
file_path: Path to the audio file (mp3, wav).""" |
|
|
from openai import OpenAI |
|
|
|
|
|
client = OpenAI() |
|
|
|
|
|
with open(file_path, "rb") as audio_file: |
|
|
transcription = client.audio.transcriptions.create( |
|
|
model="whisper-1", |
|
|
file=audio_file, |
|
|
) |
|
|
|
|
|
print(f"[TRANSCRIPTION]: {transcription.text}") |
|
|
return transcription.text |
|
|
|
|
|
|
|
|
@tool |
|
|
def youtube_transcript_tool(video_url: str) -> List[Document]: |
|
|
"""Fetch the transcript of a YouTube video given its URL.""" |
|
|
|
|
|
from langchain_community.document_loaders import YoutubeLoader |
|
|
|
|
|
loader = YoutubeLoader.from_youtube_url(video_url, add_video_info=False) |
|
|
|
|
|
return loader.load() |
|
|
|