import json import time from .mcp_client import MCP_Client BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1" MODEL = "qwen3-max" TOKEN = "sk-ef26097310ec45c184e8d84b31ea9356" # Alpha Vantage MCP配置 MCP_CONFIG = { "mcpServers": { "alpha-vantage": { "command": "npx", "args": ["alpha-ventage-mcp"], "env": { "ALPHA_VANTAGE_API_KEY": "97Q9TT7I6J9ZOLDS" } } } } def extract_stock_symbol(user_input: str) -> str: """ 从用户输入中提取股票代码 支持多种格式: - "查询阿里巴巴股票" -> BABA - "查询AAPL股票" -> AAPL - "我想了解MSFT的股价" -> MSFT - "BABA股价如何" -> BABA """ # 预定义一些常见股票代码的映射 stock_mapping = { "阿里巴巴": "BABA", "阿里": "BABA", "腾讯": "TCEHY", "微软": "MSFT", "苹果": "AAPL", "谷歌": "GOOGL", "亚马逊": "AMZN", "特斯拉": "TSLA", "英伟达": "NVDA", "百度": "BIDU", "京东": "JD", "拼多多": "PDD" } # 检查预定义映射 for chinese_name, symbol in stock_mapping.items(): if chinese_name in user_input: return symbol # 尝试直接匹配常见的股票代码格式 (如 AAPL, MSFT 等) import re # 匹配常见的股票代码格式 patterns = [ r'\b([A-Z]{1,5})\b', # 大写字母组成的股票代码 r'股票代码[是为]?([A-Z]{1,5})', r'([A-Z]{1,5})股票' ] for pattern in patterns: match = re.search(pattern, user_input) if match: symbol = match.group(1) # 验证是否为有效的股票代码格式 if 1 <= len(symbol) <= 5 and symbol.isalpha(): return symbol # 默认返回阿里巴巴 return "BABA" def get_stock_price(symbol: str): """获取股票价格""" client = MCP_Client(MCP_CONFIG) if not client.initialize(): raise Exception("MCP初始化失败") try: result = client.call_tool("get_stock_price", {"symbol": symbol}) client.close() return result except Exception as e: client.close() raise e def get_company_overview(symbol: str): """获取公司概况""" client = MCP_Client(MCP_CONFIG) if not client.initialize(): raise Exception("MCP初始化失败") try: result = client.call_tool("get_company_overview", {"symbol": symbol}) client.close() return result except Exception as e: client.close() raise e def get_daily_time_series(symbol: str): """获取每日时间序列数据""" client = MCP_Client(MCP_CONFIG) if not client.initialize(): raise Exception("MCP初始化失败") try: result = client.call_tool("get_daily_time_series", {"symbol": symbol}) client.close() return result except Exception as e: client.close() raise e def get_forex_rate(from_currency: str = "USD", to_currency: str = "CNY"): """获取外汇汇率""" client = MCP_Client(MCP_CONFIG) if not client.initialize(): raise Exception("MCP初始化失败") try: result = client.call_tool("get_forex_rate", { "from_currency": from_currency, "to_currency": to_currency }) client.close() return result except Exception as e: client.close() raise e def get_tools_for_stock(symbol: str): """根据股票代码生成工具列表""" return [ { "name": "get_stock_price", "description": "获取股票价格", "params": {"symbol": symbol}, }, { "name": "get_company_overview", "description": "获取公司概况", "params": {"symbol": symbol}, }, { "name": "get_daily_time_series", "description": "获取每日时间序列数据", "params": {"symbol": symbol}, }, { "name": "get_forex_rate", "description": "获取外汇汇率", "params": {"from_currency": "USD", "to_currency": "CNY"}, } ] def call_llm(prompt: str): """模拟调用大模型的函数(你可以替换为真实请求)""" import requests headers = { "Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json" } data = { "model": MODEL, "messages": [{"role": "user", "content": prompt}], "stream": True # 启用流式响应 } response = requests.post(f"{BASE_URL}/chat/completions", headers=headers, json=data, stream=True) for line in response.iter_lines(): if line: yield line.decode('utf-8') def get_stock_data_example(symbol: str): """示例函数:获取股票数据的简化调用方式""" try: # 获取股票价格 price_data = get_stock_price(symbol) print(f"股票价格数据: {price_data}") # 获取公司概况 overview_data = get_company_overview(symbol) print(f"公司概况数据: {overview_data}") # 获取每日时间序列 time_series_data = get_daily_time_series(symbol) print(f"时间序列数据: {time_series_data}") # 获取外汇汇率 forex_data = get_forex_rate() print(f"外汇汇率数据: {forex_data}") return { "price": price_data, "overview": overview_data, "time_series": time_series_data, "forex": forex_data } except Exception as e: print(f"获取股票数据时出错: {str(e)}") raise e def process_tool_analysis(user_input: str, history: list): """ 处理工具分析的主要逻辑 - user_input: str - history: list of {"role": ..., "content": ...} Returns generator for streaming. """ if not user_input.strip(): yield "", history return # 添加用户消息到历史 history.append({"role": "user", "content": user_input}) try: # 从用户输入中提取股票代码 stock_symbol = extract_stock_symbol(user_input) # 根据股票代码生成工具列表 tools_to_test = get_tools_for_stock(stock_symbol) # 创建MCP客户端 client = MCP_Client(MCP_CONFIG) # 初始化MCP会话 if not client.initialize(): error_msg = "❌ MCP初始化失败" history.append({"role": "assistant", "content": error_msg}) yield "", history return # 收集所有工具的查询结果 all_results = [] # 依次调用每个工具 for i, tool in enumerate(tools_to_test, 1): # 添加工具标题(使用HTML实现可折叠效果,默认折叠) tool_content = f'''
[MCP] 查询中 🔧 工具 {i}/{len(tools_to_test)}: {tool['name']} ({tool['description']})
''' history.append({"role": "assistant", "content": tool_content}) yield "", history try: result = client.call_tool(tool["name"], tool["params"]) # 解析结果 if isinstance(result, dict) and "content" in result: content = result["content"] if isinstance(content, list) and len(content) > 0: text_content = content[0].get("text", "") if isinstance(content[0], dict) else str(content[0]) # 立即调用模型总结这个信息内容 summary_prompt = f"给我总结这个信息内容:{text_content}" # 更新状态为分析中 # 找到完整的summary标签并更新状态 start_idx = history[-1]["content"].find('') + len('') if start_idx != -1 and end_idx != -1: summary_content = history[-1]["content"][start_idx:end_idx] # 更新工具标题的类名和状态文本(添加过渡效果) updated_summary = summary_content.replace('tool-header querying', 'tool-header analyzing').replace( 'status-tag querying', 'status-tag analyzing').replace('查询中', '分析中') history[-1]["content"] = history[-1]["content"].replace(summary_content, updated_summary) yield "", history # 流式显示模型处理结果(在可折叠面板内部) model_summary = "" # 移除所有冗余提示文本,直接显示分析结果区域 history[-1]["content"] += "
" yield "", history for chunk in call_llm(summary_prompt): if not chunk or chunk == "[DONE]": continue if chunk.startswith("data: "): chunk = chunk[6:] try: response_data = json.loads(chunk) delta = response_data.get("choices", [{}])[0].get("delta", {}) content = delta.get("content", "") if content: model_summary += content # 更新分析结果(在可折叠面板内部) history[-1]["content"] = history[-1]["content"][0:history[-1]["content"].find( "
") + 26] + model_summary yield "", history except json.JSONDecodeError: continue # 忽略无效 JSON history[-1]["content"] += "
" result_msg = f"✅ {tool['name']}: {model_summary}" all_results.append(f"{tool['description']}: {model_summary}") # 添加最终结果到历史(在可折叠面板内部),移除所有冗余提示文本 history[-1]["content"] += f"

分析结果:

{result_msg}

" # 更新状态为已完成(添加过渡效果) # 找到完整的summary标签并更新状态 start_idx = history[-1]["content"].find('') + len('') if start_idx != -1 and end_idx != -1: summary_start = history[-1]["content"][start_idx:end_idx] updated_summary = summary_start.replace('tool-header analyzing', 'tool-header completed').replace( 'status-tag analyzing', 'status-tag completed').replace('分析中', '已完成') history[-1]["content"] = history[-1]["content"].replace(summary_start, updated_summary) # 关闭可折叠面板 history[-1]["content"] += "
\n
" except Exception as e: error_msg = f"❌ {tool['name']} 查询失败: {str(e)}" history.append({"role": "assistant", "content": error_msg}) yield "", history # 关闭可折叠面板(即使是错误情况) history[-1]["content"] += "\n" all_results.append(f"{tool['name']}: 查询失败 - {str(e)}") # 添加短暂延迟以改善用户体验 time.sleep(0.5) # 关闭MCP连接 client.close() # 将所有结果汇总后传递给AI模型进行总结 final_summary_header = '''

📈 最终总结

''' history.append({"role": "assistant", "content": final_summary_header}) yield "", history summary_msg = "📊 正在分析所有数据并生成总结..." history.append({"role": "assistant", "content": summary_msg}) yield "", history # 构造给AI模型的提示 all_results_text = "\n".join(all_results) summary_prompt = f"""用户问题: {user_input} 收集到的金融数据: {all_results_text} 请根据以上数据回答用户问题,提供简洁明了的总结。""" # 调用AI模型进行总结 bot_response = "" for chunk in call_llm(summary_prompt): if not chunk or chunk == "[DONE]": continue if chunk.startswith("data: "): chunk = chunk[6:] try: response_data = json.loads(chunk) delta = response_data.get("choices", [{}])[0].get("delta", {}) content = delta.get("content", "") if content: bot_response += content # 更新累积历史记录中的最后一条消息,实现流式显示 history[-1]["content"] = "📊 正在分析所有数据并生成总结...\n" + bot_response yield "", history except json.JSONDecodeError: continue # 忽略无效 JSON # 最终完整历史 final_footer = "
" history.append({"role": "assistant", "content": bot_response + final_footer}) yield "", history except Exception as e: error_msg = f"❌ 错误: {str(e)}" history.append({"role": "assistant", "content": error_msg}) yield "", history