import gradio as gr import os import datetime import re import pandas as pd # from dotenv import load_dotenv # # 加载.env文件中的环境变量 # load_dotenv() # from EasyFinancialAgent.chat import query_company from EasyFinancialAgent.chat_direct import advanced_search_company_detailed, search_and_format, search_company_direct, chatbot_response, format_search_result_for_display, format_search_result from chatbot.chat_main import respond import globals as g from service.mysql_service import get_companys, insert_company, get_company_by_name from service.chat_service import get_analysis_report, get_stock_price_from_bailian, search_company, search_news, get_invest_suggest, chat_bot from service.company import check_company_exists from service.hf_upload import get_hf_files_with_links from MarketandStockMCP.news_quote_mcp import get_company_news, get_quote from EasyReportDataMCP.report_mcp import query_financial_data from service.report_service import get_report_data, query_company_advanced from service.report_tools import build_financial_metrics_three_year_data, calculate_yoy_comparison, extract_financial_table, extract_last_three_with_fallback, get_yearly_data from service.three_year_table_tool import build_table_format from service.three_year_tool import process_financial_data_with_metadata from service.tool_processor import get_stock_price # ✅ 导入缓存管理器 from service.report_cache_manager import ReportCacheManager from service.financial_data_cache_manager import FinancialDataCacheManager get_companys_state = True my_companies = [ {'company_name': 'Alibaba', 'stock_code': 'BABA', "cik": "0001577552"}, {'company_name': 'NVIDIA', 'stock_code': 'NVDA', "cik": "0001045810"}, {'company_name': 'Amazon', 'stock_code': 'AMZN', "cik": "0001018724"}, {'company_name': 'Intel', 'stock_code': 'INTC', "cik": "0000050863"}, {'company_name': 'Meta', 'stock_code': 'META', "cik": "0001326801"}, {'company_name': 'Google', 'stock_code': 'GOOGL', "cik": "0001652044"}, {'company_name': 'Apple', 'stock_code': 'AAPL', "cik": "0000320193"}, {'company_name': 'Tesla', 'stock_code': 'TSLA', "cik": "0001318605"}, {'company_name': 'AMD', 'stock_code': 'AMD', "cik": "0000002488"}, {'company_name': 'Microsoft', 'stock_code': 'MSFT', "cik": "0000789019"} ] # JavaScript代码用于读取和存储数据 js_code = """ function handleStorage(operation, key, value) { if (operation === 'set') { localStorage.setItem(key, value); return `已存储: ${key} = ${value}`; } else if (operation === 'get') { let storedValue = localStorage.getItem(key); if (storedValue === null) { return `未找到键: ${key}`; } return `读取到: ${key} = ${storedValue}`; } else if (operation === 'clear') { localStorage.removeItem(key); return `已清除: ${key}`; } else if (operation === 'clearAll') { localStorage.clear(); return '已清除所有数据'; } } """ custom_css = """ /* 匹配所有以 gradio-container- 开头的类 */ div[class^="gradio-container-"], div[class*=" gradio-container-"] { -webkit-text-size-adjust: 100% !important; line-height: 1.5 !important; font-family: unset !important; -moz-tab-size: 4 !important; tab-size: 4 !important; } .company-list-container { background-color: white; border-radius: 0.5rem; padding: 0.75rem; margin-bottom: 0.75rem; border: 1px solid #e5e7eb; box-shadow: 0 1px 2px 0 rgba(0, 0, 0, 0.05); width: 100%; } /* 隐藏单选框 */ .company-list-container input[type="radio"] { display: none; } /* 自定义选项样式 - 小而精致 */ .company-list-container label { display: block; padding: 0.5rem 0.75rem; margin: 0.2rem 0; border-radius: 0.25rem; cursor: pointer; transition: all 0.2s ease; background-color: #f9fafb; border: 1px solid #e5e7eb; font-size: 0.875rem; text-align: left; width: 100%; box-sizing: border-box; position: relative; padding-right: 2rem; } /* ✅ 为每个公司选项添加删除按钮 */ .company-list-container label::after { content: '×'; position: absolute; right: 8px; top: 50%; transform: translateY(-50%); width: 18px; height: 18px; border-radius: 50%; background: #d1d5db; color: white; font-size: 14px; font-weight: bold; display: flex; align-items: center; justify-content: center; opacity: 0; transition: opacity 0.2s; cursor: pointer; line-height: 1; } .company-list-container label:hover::after { opacity: 1; } .company-list-container label::after:hover { background: #9ca3af; } /* 悬停效果 */ .company-list-container label:hover { background-color: #f3f4f6; border-color: #d1d5db; } /* 选中效果 - 使用蓝色 */ .company-list-container input[type="radio"]:checked + span { background: #3b82f6 !important; color: white !important; font-weight: 600 !important; display: block; width: 100%; height: 100%; padding: 0.75rem 1rem; margin: -0.75rem -1rem; border-radius: 0.375rem; } .company-list-container span { display: block; padding: 0; border-radius: 0.375rem; width: 100%; } label.selected { background: #3b82f6 !important; color: white !important; } /* 确保每行只有一个选项 */ .company-list-container .wrap { display: block !important; } .company-list-container .wrap li { display: block !important; width: 100% !important; } /* ✅ 搜索框样式 - 带内置图标 */ .company-input-search { position: relative; } .company-input-search input, .company-input-search textarea { padding-left: 36px !important; background-image: url('data:image/svg+xml,%3Csvg xmlns="http://www.w3.org/2000/svg" width="16" height="16" viewBox="0 0 24 24" fill="none" stroke="%23999" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"%3E%3Ccircle cx="11" cy="11" r="8"%3E%3C/circle%3E%3Cpath d="m21 21-4.35-4.35"%3E%3C/path%3E%3C/svg%3E') !important; background-repeat: no-repeat !important; background-position: 10px center !important; background-size: 18px 18px !important; border: 1px solid #e5e7eb !important; border-radius: 8px !important; font-size: 14px !important; transition: all 0.2s !important; } .company-input-search input:focus, .company-input-search textarea:focus { border-color: #667eea !important; box-shadow: 0 0 0 3px rgba(102, 126, 234, 0.1) !important; outline: none !important; } /* ✅ 对齐Tabs和ChatInterface的横线 */ .tabs { border-bottom: 1px solid #e5e7eb !important; } .chatbot { border-top: 1px solid #e5e7eb !important; } /* 确俟tab内容和chatbot的padding一致 */ .tab-item { padding: 16px !important; } /* ✅ 为三个板块添加间距 */ #select-company-section { margin-bottom: 20px !important; } .report-news-box { margin-top: 20px !important; } .report-list-box, .news-list-box { margin-bottom: 16px !important; } /* ✅ 对齐ChatInterface标题和Tabs标签栏 */ .chatbot .wrap { margin-top: 0 !important; } /* ChatInterface标题样式 */ .chatbot > .wrap > .head { height: 40px !important; display: flex !important; align-items: center !important; padding: 8px 16px !important; border-bottom: 1px solid #e5e7eb !important; background: #f9fafb !important; } /* ✅ 缩小Easy Financial AI Assistant标题 - 更精致 */ .chatbot > .wrap > .head h1, .chatbot > .wrap > .head h2, .chatbot > .wrap > .head h3 { font-size: 13px !important; margin: 0 !important; font-weight: 500 !important; color: #6b7280 !important; letter-spacing: 0.3px !important; } /* Tabs标签栏样式 */ .tab-container { height: 48px !important; display: flex !important; align-items: center !important; border-bottom: 1px solid #e5e7eb !important; background: #f9fafb !important; } /* ✅ 移除Tabs底部横线 */ .tabs { border-bottom: none !important; } """ # 全局变量用于存储公司映射关系 companies_map = {} # ✅ 全局变量用于缓存搜索结果(包含CIK等完整信息) search_result_cache = {} # 根据公司名称获取股票代码的函数 def get_stock_code_by_company_name(company_name): """根据公司名称获取股票代码""" if company_name in companies_map and "CODE" in companies_map[company_name]: return companies_map[company_name]["CODE"] return "" # 默认返回 # 创建一个简单的函数来获取公司列表 def get_company_list_choices(): choices = [] print(f"Getting init add company list choices...{get_companys_state}") if not get_companys_state: return gr.update(choices=choices) try: # companies_data = get_companys() companies_data = my_companies print(f"Getting init add company list choices...companies_data: {companies_data}") if isinstance(companies_data, pd.DataFrame) and not companies_data.empty: choices = [str(row.get('company_name', 'Unknown')) for _, row in companies_data.iterrows()] else: choices = [] except: choices = [] return gr.update(choices=choices) # Sidebar service functions # 处理公司点击事件的函数 def handle_company_click(company_name): """处理公司点击事件,先判断是否已经入库,如果没有则进行入库操作,然后刷新公司列表""" print(f"Handling click for company: {company_name}") # 1. 判断是否已经入库 if not check_company_exists(my_companies, company_name): # 2. 如果没有入库,则进行入库操作 # 获取股票代码(如果有的话) stock_code = companies_map.get(company_name, {}).get("CODE", "Unknown") print(f"Inserting company {company_name} with code {stock_code}") # 插入公司到数据库 # success = insert_company(company_name, stock_code) my_companies.append({"company_name": company_name, "stock_code": stock_code}) print(f"Successfully inserted company: {company_name}") # 直接更新companies_map,而不是重新加载整个映射 # 直接更新companies_map,而不是重新加载整个映射 companies_map[company_name] = {"NAME": company_name, "CODE": stock_code} # 使用Gradio的成功提示 gr.Info(f"Successfully added company: {company_name}") # 返回True表示添加成功,需要刷新列表 return True else: print(f"Company {company_name} already exists in database") # 使用Gradio的警告提示 gr.Warning(f"Company '{company_name}' already exists") # 3. 返回成功响应 return None def get_company_list_html(selected_company=""): try: # 从数据库获取所有公司 # companies_data = get_companys() companies_data = my_companies # 检查是否为错误信息 if isinstance(companies_data, str): if "查询执行失败" in companies_data: return "
获取公司列表失败
" else: # 如果是字符串但不是错误信息,可能需要特殊处理 return "" # 检查是否为DataFrame且为空 if not isinstance(companies_data, pd.DataFrame) or companies_data.empty: return "" # 生成HTML列表 html_items = [] for _, row in companies_data.iterrows(): company_name = row.get('company_name', 'Unknown') # 根据是否选中添加不同的样式类 css_class = "company-item" if company_name == selected_company: css_class += " selected-company" # 使用button元素来确保可点击性 html_items.append(f'') return "\n".join(html_items) except Exception as e: return f"
生成公司列表失败: {str(e)}
" def initialize_company_list(selected_company=""): return get_company_list_html(selected_company) def refresh_company_list(selected_company=""): """刷新公司列表,返回最新的HTML内容,带loading效果""" # 先返回loading状态 loading_html = '''
''' yield loading_html # 然后返回实际的数据 yield get_company_list_html(selected_company) # 新增函数:处理公司选择事件 def select_company(company_name): """处理公司选择事件,更新全局状态并返回更新后的公司列表""" # 更新全局变量 g.SELECT_COMPANY = company_name if company_name else "" # 对于Radio组件,我们只需要返回更新后的选项列表 try: # companies_data = get_companys() companies_data = my_companies if isinstance(companies_data, list) and len(companies_data) > 0: # my_companies 是对象列表 [{company_name: '', stock_code: ''}, ...] choices = [str(item.get('company_name', 'Unknown')) for item in companies_data] elif isinstance(companies_data, pd.DataFrame) and not companies_data.empty: choices = [str(row.get('company_name', 'Unknown')) for _, row in companies_data.iterrows()] else: choices = [] except: choices = [] return gr.update(choices=choices, value=company_name) def initialize_companies_map(): """初始化 companies_map 字典""" global companies_map companies_map = {} # 清空之前的映射 print("Initializing companies map...") try: # 获取预定义的公司列表 predefined_companies = [ { "NAME": "Alibaba", "CODE": "BABA" }, { "NAME": "NVIDIA", "CODE": "NVDA" }, { "NAME": "Amazon", "CODE": "AMZN" }, { "NAME": "Intel", "CODE": "INTC" }, { "NAME": "Meta", "CODE": "META" }, { "NAME": "Google", "CODE": "GOOGL" }, { "NAME": "Apple", "CODE": "AAPL" }, { "NAME": "Tesla", "CODE": "TSLA" }, { "NAME": "AMD", "CODE": "AMD" }, { "NAME": "Microsoft", "CODE": "MSFT" }, ] # 将预定义公司添加到映射中 for company in predefined_companies: companies_map[company["NAME"]] = {"NAME": company["NAME"], "CODE": company["CODE"]} # print(f"Predefined companies added: {len(predefined_companies)}") # 从数据库获取公司数据 # companies_data = get_companys() companies_data = my_companies # companies_data = window.cachedCompanies or [] print(f"Companies data from DB: {companies_data}") # 如果数据库中有公司数据,则添加到映射中(去重) if isinstance(companies_data, pd.DataFrame) and not companies_data.empty: print(f"Adding {len(companies_data)} companies from database") for _, row in companies_data.iterrows(): company_name = row.get('company_name', 'Unknown') stock_code = row.get('stock_code', '') # 确保company_name和stock_code都是字符串类型 company_name = str(company_name) if company_name is not None else 'Unknown' stock_code = str(stock_code) if stock_code is not None else '' # 检查是否已存在于映射中(通过股票代码判断) is_duplicate = False for existing_company in companies_map.values(): if existing_company["CODE"] == stock_code: is_duplicate = True break # 如果不重复,则添加到映射中 if not is_duplicate: companies_map[company_name] = {"NAME": company_name, "CODE": stock_code} # print(f"Added company: {company_name}") else: print("No companies found in database") print(f"Final companies map: {companies_map}") except Exception as e: # 错误处理 print(f"Error initializing companies map: {str(e)}") pass # Sidebar company selector functions def update_company_choices(user_input: str): """更新公司选择列表""" # 第一次 yield:立即显示 modal + loading 提示 yield gr.update( choices=["Searching..."], visible=True ), gr.update(visible=False, value="") # 第二次:执行耗时操作(调用 LLM) search_result = advanced_search_company_detailed(user_input) # ✅ 获取格式化的完整数据(包含CIK) formatted_data = format_search_result(search_result) # ✅ 使用新的显示格式:"公司名 (Ticker)" choices = format_search_result_for_display(search_result) # ✅ 将完整数据存储到全局变量,供add_company使用 global search_result_cache search_result_cache = {choice: data for choice, data in zip(choices, formatted_data)} # 检查choices是否为错误信息 if len(choices) == 0: # 如果是错误信息或非正常格式,显示提示消息 error_message = "未找到匹配的公司" # 使用Ant Design风格的错误提示 error_html = f'''
{error_message}
''' yield gr.update(choices=["No results found"], visible=True), gr.update(visible=True, value=error_html) else: # 第三次:更新为真实结果 yield gr.update( choices=choices, visible=len(choices) > 0 ), gr.update(visible=False, value="") def add_company(selected, current_list): """添加选中的公司""" if selected == "No results found": return gr.update(visible=False), current_list, gr.update(visible=False, value="") if selected: # ✅ 从缓存中获取完整数据(包含CIK) global search_result_cache selected_data = search_result_cache.get(selected, {}) # 从选择的文本中提取公司名称和股票代码 selected_clean = selected.strip() match = re.match(r"^(.+?)\s*\(([^)]+)\)$", selected_clean) if match: company_name = match.group(1) stock_code = match.group(2) elif companies_map.get(selected_clean): company_name = selected_clean stock_code = companies_map[selected_clean]["CODE"] else: company_name = selected_clean stock_code = "Unknown" # ✅ 获取CIK用于重复判断 cik = selected_data.get('cik', '') # ✅ 通过CIK判断是否重复 existing_company = None if cik: for company in my_companies: # 如果已有公司的CIK与新选择的CIK相同,则为重复 if company.get('cik') == cik: existing_company = company break if existing_company: # ✅ 公司已存在,直接选中已有的公司 existing_name = existing_company.get('company_name', company_name) gr.Info(f"公司 '{company_name}' 已存在(名称: {existing_name}),已自动选中") return gr.update(visible=False), gr.update(value=existing_name), gr.update(visible=False, value="") # ✅ 新公司,添加到列表(保存CIK) my_companies.append({ "company_name": company_name, "stock_code": stock_code, "cik": cik # ✅ 保存CIK用于后续重复判断 }) # ✅ 同时更新 companies_map,确保其他板块能获取到股票代码 companies_map[company_name] = {"NAME": company_name, "CODE": stock_code} # 从数据库获取更新后的公司列表 try: companies_data = my_companies if isinstance(companies_data, list) and len(companies_data) > 0: updated_list = [str(item.get('company_name', 'Unknown')) for item in companies_data] elif isinstance(companies_data, pd.DataFrame) and not companies_data.empty: updated_list = [str(row.get('company_name', 'Unknown')) for _, row in companies_data.iterrows()] else: updated_list = [] except: updated_list = [] # 添加默认公司选项 if not updated_list: updated_list = ['Alibaba', '腾讯控股', 'Tencent', '阿里巴巴-W', 'Apple'] # 成功插入后清除状态消息,并更新Radio组件的选项,同时默认选中刚添加的公司 return gr.update(visible=False), gr.update(choices=updated_list, value=company_name), gr.update(visible=False, value="") return gr.update(visible=False), current_list, gr.update(visible=False, value="") # Sidebar report section functions # 创建一个全局变量来存储公司按钮组件 company_buttons = {} def create_company_buttons(): """创建公司按钮组件""" # 确保companies_map已被初始化 if not companies_map: initialize_companies_map() # 显示companies_map中的公司列表 companies = list(companies_map.keys()) # 添加调试信息 print(f"Companies in map: {companies}") # 清空之前的按钮 company_buttons.clear() if not companies: # 如果没有公司,返回一个空的列 with gr.Column(): gr.Markdown("暂无公司数据") else: # 使用Gradio按钮组件创建公司列表 with gr.Column(elem_classes=["home-company-list"]): # 按每行两个公司进行分组 for i in range(0, len(companies), 2): # 检查是否是最后一行且只有一个元素 if i + 1 < len(companies): # 有两个元素 with gr.Row(elem_classes=["home-company-item-box"]): btn1 = gr.Button(companies[i], elem_classes=["home-company-item", "gradio-button"]) btn2 = gr.Button(companies[i + 1], elem_classes=["home-company-item", "gradio-button"]) # 保存按钮引用 company_buttons[companies[i]] = btn1 company_buttons[companies[i + 1]] = btn2 else: # 只有一个元素 with gr.Row(elem_classes=["home-company-item-box", "single-item"]): btn = gr.Button(companies[i], elem_classes=["home-company-item", "gradio-button"]) # 保存按钮引用 company_buttons[companies[i]] = btn # 返回按钮字典 return company_buttons def update_report_section(selected_company, report_data, stock_code): """根据选中的公司更新报告部分""" print(f"Updating report (报告部分): {selected_company}") # 添加调试信息 if selected_company == "" or selected_company is None or selected_company == "Unknown": # 没有选中的公司,显示公司列表 # html_content = get_initial_company_list_content() # 暂时返回空内容,稍后会用Gradio组件替换 html_content = "" return gr.update(value=html_content, visible=True) else: # 有选中的公司,显示相关报告 try: # prmpt = f""" # """ stock_code = get_stock_code_by_company_name(selected_company) # result = get_report_data(stock_code) # print(f"get_report_data=====================: {result}") report_data = query_financial_data(stock_code, "5-Year") # report_data = process_financial_data_with_metadata(financial_metrics_pre) # 检查 report_data 是否是列表且第一个元素是字典 if not isinstance(report_data, list) or len(report_data) == 0: return gr.update(value="", visible=True) # 检查第一个元素是否是字典 if not isinstance(report_data[0], dict): return gr.update(value="
数据格式不正常
", visible=True) # ✅ 可折叠的Financial Reports,默认显示5个 total_reports = len(report_data) show_limit = 5 html_content = '
' # ✅ 美化Financial Reports标题 html_content += '''

Financial Reports

''' # 添加CSS样式 html_content += '''''' # 显示前5个 for i, report in enumerate(report_data[:show_limit]): source_url = report.get('source_url', '#') period = report.get('period', 'N/A') source_form = report.get('source_form', 'N/A') html_content += f'''
{period}-{stock_code}-{source_form}
''' # 剩余的放在可折叠区域 if total_reports > show_limit: html_content += '
' for i, report in enumerate(report_data[show_limit:]): source_url = report.get('source_url', '#') period = report.get('period', 'N/A') source_form = report.get('source_form', 'N/A') html_content += f'''
{period}-{stock_code}-{source_form}
''' html_content += '
' # 添加展开/收起按钮 html_content += f'''
↓ Show All ({total_reports} reports)
''' html_content += '
' return gr.update(value=html_content, visible=True) except Exception as e: print(f"Error in update_report_section: {str(e)}") return gr.update(value=f"
报告载入失败: {str(e)}
", visible=True) def update_news_section(selected_company): """根据选中的公司更新报告部分""" html_content = "" if selected_company == "" or selected_company is None: # 没有选中的公司,显示公司列表 # html_content = get_initial_company_list_content() # 暂时返回空内容,稍后会用Gradio组件替换 return gr.update(value=html_content, visible=True) else: try: stock_code = get_stock_code_by_company_name(selected_company) report_data = get_company_news(stock_code, None, None) # print(f"新闻列表: {report_data['articles']}") # report_data = search_news(selected_company) if (report_data['articles']): report_data = report_data['articles'] # ✅ 可折叠的News,默认显示5个 total_news = len(report_data) show_limit = 5 news_html = "
" # ✅ 美化News标题 news_html += '''

News

''' # 添加CSS样式 news_html += '''''' from datetime import datetime # 显示前5个 for news in report_data[:show_limit]: published_at = news['published'] dt = datetime.fromisoformat(published_at.replace("Z", "+00:00")) formatted_date = dt.strftime("%Y.%m.%d") news_html += f'''
[{formatted_date}] {news['headline']}
''' # 剩余的放在可折叠区域 if total_news > show_limit: news_html += '
' for news in report_data[show_limit:]: published_at = news['published'] dt = datetime.fromisoformat(published_at.replace("Z", "+00:00")) formatted_date = dt.strftime("%Y.%m.%d") news_html += f'''
[{formatted_date}] {news['headline']}
''' news_html += '
' # 添加展开/收起按钮 news_html += f'''
↓ Show All ({total_news} news)
''' news_html += '
' html_content += news_html except Exception as e: print(f"Error updating report section: {str(e)}") return gr.update(value=html_content, visible=True) # Component creation functions def create_header(): """创建头部组件""" # 获取当前时间 # current_time = datetime.datetime.now().strftime("%B %d, %Y - Market Data Updated Today") current_time = "" # ✅ 不再显示日期 with gr.Row(elem_classes=["header"]): # 左侧:图标和标题 with gr.Column(scale=8): # 使用圆柱体SVG图标表示数据库 gr.HTML('''
Easy Financial Report Dashboard
''', elem_classes=["text-2xl"]) # 右侧:时间信息 with gr.Column(scale=2): gr.Markdown(current_time, elem_classes=["text-sm-top-time"]) def create_company_list(get_companys_state): """创建公司列表组件""" try: # 获取公司列表数据 # companies_data = get_companys() companies_data = my_companies print(f"创建公司列表组件 - Companies data: {companies_data}") if isinstance(companies_data, list) and len(companies_data) > 0: # my_companies 是对象列表 [{company_name: '', stock_code: ''}, ...] choices = [str(item.get('company_name', 'Unknown')) for item in companies_data] elif isinstance(companies_data, pd.DataFrame) and not companies_data.empty: choices = [str(row.get('company_name', 'Unknown')) for _, row in companies_data.iterrows()] else: choices = [] except Exception as e: print(f"Error creating company list: {str(e)}") choices = [] # 添加默认公司选项 if not choices: choices = [] # 使用Radio组件显示公司列表,不显示标签 company_list = gr.Radio( choices=choices, label="", interactive=True, elem_classes=["company-list-container"], container=False, # 不显示外部容器边框 visible=True ) return company_list def create_company_selector(): """创建公司选择器组件""" # ✅ 使用HTML和CSS创建带内置图标的搜索框 company_input = gr.Textbox( show_label=False, placeholder=" Name, ticker, or CIK", # 留出空间给图标 elem_classes=["company-input-search"], container=False ) # 状态消息显示区域 status_message = gr.HTML( "", elem_classes=["status-message"], visible=False ) # 弹窗选择列表 company_modal = gr.Radio( show_label=False, choices=[], visible=False, elem_classes=["company-modal"] ) return company_input, status_message, company_modal def create_report_section(): """创建报告部分组件""" # 创建一个用于显示报告列表的组件,初始显示公司列表 # 先加载默认公司的报告数据 initial_content = "" try: if my_companies and len(my_companies) > 0: default_company = my_companies[0]['company_name'] initial_content_result = update_report_section(default_company, None, None) # update_report_section 返回 gr.update() 字典,提取 value 字段 initial_content = initial_content_result.get('value', '') if isinstance(initial_content_result, dict) else "" except: initial_content = "" report_display = gr.HTML(initial_content) return report_display def create_news_section(): """创建新闻部分组件""" initial_content = "" news_display = gr.HTML(initial_content) return news_display def format_financial_metrics(data: dict, prev_data: dict = None) -> list: # pyright: ignore[reportArgumentType] """ 将原始财务数据转换为 financial_metrics 格式。 Args: data (dict): 当前财年数据(必须包含 total_revenue, net_income 等字段) prev_data (dict, optional): 上一财年数据,用于计算 change。若未提供,change 设为 "--" Returns: list[dict]: 符合 financial_metrics 格式的列表 """ def format_currency(value: float) -> str: """将数字格式化为 $XB / $XM / $XK""" if value >= 1e9: return f"${value / 1e9:.2f}B" elif value >= 1e6: return f"${value / 1e6:.2f}M" elif value >= 1e3: return f"${value / 1e3:.2f}K" else: return f"${value:.2f}" def calculate_change(current: float, previous: float) -> tuple: """计算变化百分比和颜色""" if previous == 0: return "--", "gray" change_pct = (current - previous) / abs(previous) * 100 sign = "+" if change_pct >= 0 else "" color = "green" if change_pct >= 0 else "red" return f"{sign}{change_pct:.1f}%", color # 定义指标映射 metrics_config = [ { "key": "total_revenue", "label": "Total Revenue", "is_currency": True, "eps_like": False }, { "key": "net_income", "label": "Net Income", "is_currency": True, "eps_like": False }, { "key": "earnings_per_share", "label": "Earnings Per Share", "is_currency": False, # EPS 不用 B/M 单位 "eps_like": True }, { "key": "operating_expenses", "label": "Operating Expenses", "is_currency": True, "eps_like": False }, { "key": "operating_cash_flow", "label": "Cash Flow", "is_currency": True, "eps_like": False } ] result = [] for item in metrics_config: key = item["key"] current_val = data.get(key) if current_val is None: continue # 格式化 value if item["eps_like"]: value_str = f"${current_val:.2f}" elif item["is_currency"]: value_str = format_currency(current_val) else: value_str = str(current_val) # 计算 change(如果有上期数据) if prev_data and key in prev_data: prev_val = prev_data[key] change_str, color = calculate_change(current_val, prev_val) else: change_str = "--" color = "gray" result.append({ "label": item["label"], "value": value_str, "change": change_str, "color": color }) return result def create_sidebar(): """创建侧边栏组件""" # 初始化 companies_map initialize_companies_map() with gr.Column(elem_classes=["sidebar"]): # 公司选择 with gr.Group(elem_classes=["card"], elem_id="select-company-section"): # ✅ 美化标题:居中对齐,添加背景色 gr.HTML("""

Select Company

""") with gr.Column(): company_list = create_company_list(get_companys_state) # 创建公司列表 # if not get_companys_state: # getCompanyFromStorage = gr.Button("读取") # getCompanyFromStorage.click( # fn=create_company_list(True), # inputs=[], # outputs=[company_list, status_message] # ) # 创建公司选择器 company_input, status_message, company_modal = create_company_selector() # 绑定事件 - 只需要submit事件 company_input.submit( fn=update_company_choices, inputs=[company_input], outputs=[company_modal, status_message] ) company_modal.change( fn=add_company, inputs=[company_modal, company_list], outputs=[company_modal, company_list, status_message] ) # 创建公司按钮组件 # # company_buttons = create_company_buttons() # # 为每个公司按钮绑定点击事件 # def make_click_handler(company_name): # def handler(): # result = handle_company_click(company_name) # # 如果添加成功,刷新Select Company列表并默认选中刚添加的公司 # if result is True: # # 正确地刷新通过create_company_list()创建的Radio组件 # try: # # companies_data = get_companys() # companies_data = my_companies # if isinstance(companies_data, list) and len(companies_data) > 0: # # my_companies 是对象列表 [{company_name: '', stock_code: ''}, ...] # updated_choices = [str(item.get('company_name', 'Unknown')) for item in companies_data] # elif isinstance(companies_data, pd.DataFrame) and not companies_data.empty: # updated_choices = [str(row.get('company_name', 'Unknown')) for _, row in companies_data.iterrows()] # else: # updated_choices = [] # except: # updated_choices = [] # # 使用gr.update来正确更新Radio组件,并默认选中刚添加的公司 # # 同时触发change事件来加载数据 # return gr.update(choices=updated_choices, value=company_name) # return None # return handler # for company_name, button in company_buttons.items(): # button.click( # fn=make_click_handler(company_name), # inputs=[], # outputs=[company_list] # ) # 创建一个容器来容纳报告部分,初始时隐藏 with gr.Group(elem_classes=["report-news-box"]) as report_section_group: # gr.Markdown("### Financial Reports", elem_classes=["card-title", "left-card-title"]) report_display = create_report_section() news_display = create_news_section() # 处理公司选择事件 def select_company_handler(company_name): """处理公司选择事件的处理器""" # 更新全局变量 g.SELECT_COMPANY = company_name if company_name else "" # 更新报告部分的内容 updated_report_display = update_report_section(company_name, None, None) updated_news_display = update_news_section(company_name) # 根据是否选择了公司来决定显示/隐藏报告部分 if company_name: # 有选中的公司,显示报告部分 return gr.update(visible=True), updated_report_display, updated_news_display else: # 没有选中的公司,隐藏报告部分 return gr.update(visible=False), updated_report_display, updated_news_display company_list.change( fn=select_company_handler, inputs=[company_list], outputs=[report_section_group, report_display, news_display] ) # 返回公司列表组件和报告部分组件 return company_list, report_section_group, report_display, news_display def build_income_table(table_data): # 兼容两种数据结构: # 1. 新结构:包含 list_data 和 yoy_rates 的字典 # 2. 旧结构:直接是二维数组 if isinstance(table_data, dict) and "list_data" in table_data: # 新结构 income_statement = table_data["list_data"] yoy_rates = table_data["yoy_rates"] or [] else: # 旧结构,直接使用传入的数据 income_statement = table_data yoy_rates = [] # 创建一个映射,将年份列索引映射到增长率 yoy_map = {} if len(yoy_rates) > 1 and len(yoy_rates[0]) > 1: # 获取增长率表头(跳过第一列"Category") yoy_headers = yoy_rates[0][1:] # 为每个指标行创建增长率映射 for i, yoy_row in enumerate(yoy_rates[1:], 1): # 跳过标题行 category = yoy_row[0] yoy_map[category] = {} for j, rate in enumerate(yoy_row[1:]): if j < len(yoy_headers): yoy_map[category][yoy_headers[j]] = rate table_rows = "" header_row = income_statement[0] for i, row in enumerate(income_statement): if i == 0: row_style = "background-color: #f5f5f5; font-weight: 500;" else: row_style = "background-color: #f9f9f9;" cells = "" for j, cell in enumerate(row): if j == 0: # Category列样式 - 恢复为居中 cells += f"{cell}" else: # 添加增长率箭头(如果有的话) growth = None category = row[0] if i > 0 and category in yoy_map and j > 0 and j < len(header_row): year_header = header_row[j] if year_header in yoy_map[category]: growth = yoy_map[category][year_header] if growth and growth != "N/A" and growth != "--": arrow = "▲" if growth.startswith("+") else "▼" color = "green" if growth.startswith("+") else "red" # 年份数据列,包含增幅指示 - 恢复为不分离的版本 cells += f"""
{cell}
{arrow}{growth}
""" else: # 无增幅的单元格 cells += f"{cell}" table_rows += f"{cells}" html = f"""
Latest 3 Years Financial Metrics
{table_rows}
""" return html def create_metrics_dashboard(): """创建指标仪表板组件""" with gr.Row(elem_classes=["metrics-dashboard"]): card_custom_style = ''' background-color: white; border-radius: 0.5rem; box-shadow: rgba(0, 0, 0, 0.1) 0px 1px 3px 0px, rgba(0, 0, 0, 0.1) 0px 1px 2px -1px; padding: 1.25rem; min-height: 250px !important; text-align: center; ''' # 构建左侧卡片 def build_stock_card(): # 尝试加载默认公司的数据 default_company = my_companies[0]['company_name'] if my_companies else "N/A" try: stock_code = get_stock_code_by_company_name(default_company) company_info = get_quote(stock_code.strip()) company_info['company'] = default_company except: company_info = {} try: if not company_info or not isinstance(company_info, dict): company_name = "N/A" symbol = "N/A" price = "N/A" change_html = 'N/A' open_val = high_val = low_val = prev_close_val = volume_display = "N/A" else: company_name = company_info.get("company", "N/A") symbol = company_info.get("symbol", "N/A") price = company_info.get("current_price", "N/A") # 解析 change change_str = company_info.get("change", "0") try: change = float(change_str) except (ValueError, TypeError): change = 0.0 # 解析 change_percent change_percent = company_info.get("percent_change", "0%") change_color = "green" if change >= 0 else "red" sign = "+" if change >= 0 else "" change_html = f'{sign}{change:.2f} ({change_percent:+.2f}%)' # 其他价格字段 open_val = company_info.get("open", "N/A") high_val = company_info.get("high", "N/A") low_val = company_info.get("low", "N/A") prev_close_val = company_info.get("previous_close", "N/A") html = f"""
{company_name}
NYSE:{symbol}
{price}
{change_html}
Open
{open_val}
High
{high_val}
Low
{low_val}
Prev Close
{prev_close_val}
""" return html except Exception as e: print(f"Error building stock card: {e}") return '
Error loading stock data
' # 模拟数据 - 第一次打开页面时的默认值 financial_metrics = [ {"label": "Total Revenue", "value": "N/A", "change": "N/A", "color": "grey"}, {"label": "Net Income", "value": "N/A", "change": "N/A", "color": "grey"}, {"label": "Earnings Per Share", "value": "N/A", "change": "N/A", "color": "grey"}, {"label": "Operating Expenses", "value": "N/A", "change": "N/A", "color": "grey"}, {"label": "Cash Flow", "value": "N/A", "change": "N/A", "color": "grey"} ] income_statement = { "list_data": [ ["Category", "N/A/FY", "N/A/FY", "N/A/FY"], ["Total", "N/A", "N/A", "N/A"], ["Net Income", "N/A", "N/A", "N/A.4M"], ["Earnings Per Share", "N/A", "N/A", "N/A"], ["Operating Expenses", "N/A", "N/A", "N/A"], ["Cash Flow", "N/A", "N/A", "N/A"] ], "yoy_rates": [] } yearly_data = 'N/A' # 增长变化的 HTML 字符(箭头+百分比) def render_change(change: str, color: str): if change.startswith("+"): return f'▲{change}' else: return f'▼{change}' # 构建中间卡片 def build_financial_metrics(): # 尝试加载默认公司的财务指标数据 default_company = my_companies[0]['company_name'] if my_companies else "N/A" try: stock_code = get_stock_code_by_company_name(default_company) financial_metrics_pre = query_financial_data(stock_code, "5-Year") result = process_financial_data_with_metadata(financial_metrics_pre) default_financial_metrics = result["financial_metrics"] default_yearly_data = result["year_data"] except: default_financial_metrics = financial_metrics default_yearly_data = yearly_data metrics_html = "" for item in default_financial_metrics: change_html = render_change(item["change"], item["color"]) metrics_html += f"""
{item['label']}
{item['value']} {change_html}
""" html = f"""
{default_yearly_data} Financial Metrics
YTD data
{metrics_html}
""" return html # 主函数:返回所有 HTML 片段 def get_dashboard(): # 尝试加载默认公司的收入表数据 default_company = my_companies[0]['company_name'] if my_companies else "N/A" try: stock_code = get_stock_code_by_company_name(default_company) financial_metrics_pre = query_financial_data(stock_code, "5-Year") result = process_financial_data_with_metadata(financial_metrics_pre) default_three_year_data = result["three_year_data"] default_table_data = build_table_format(default_three_year_data) except: default_table_data = income_statement with gr.Row(): # ✅ 调整宽度比例为 1:1:2 with gr.Column(scale=1, min_width=250, elem_classes=["metric-card-col-left"]): stock_card_html = gr.HTML(build_stock_card(), elem_classes=["metric-card-left"]) with gr.Column(scale=1, min_width=300, elem_classes=["metric-card-col-middle"]): financial_metrics_html = gr.HTML(build_financial_metrics(), elem_classes=["metric-card-middle"]) with gr.Column(scale=2, min_width=450, elem_classes=["metric-card-col-right"]): # 传递default_table_data参数 income_table_html = gr.HTML(build_income_table(default_table_data), elem_classes=["metric-card-right"]) return stock_card_html, financial_metrics_html, income_table_html # 创建指标仪表板并保存引用 stock_card_component, financial_metrics_component, income_table_component = get_dashboard() # 将组件引用保存到全局变量,以便在其他地方使用 global metrics_dashboard_components metrics_dashboard_components = (stock_card_component, financial_metrics_component, income_table_component) # 更新指标仪表板的函数 def update_metrics_dashboard(company_name): """根据选择的公司更新指标仪表板""" company_info = {} # 尝试获取股票价格数据,但不中断程序执行 stock_code = "" try: # 根据选择的公司获取股票代码 stock_code = get_stock_code_by_company_name(company_name) # ✅ 检查股票代码是否有效 if not stock_code or stock_code.strip() == "": print(f"⚠️ Warning: No stock code found for company '{company_name}'") print(f"Current companies_map keys: {list(companies_map.keys())}") # 返回友好的错误提示 error_html = f'''
⚠️ Stock code not found
Company: {company_name}
''' error_metrics = f'''
N/A Financial Metrics
Unable to load data for {company_name}
''' error_table = f'''
Latest 3 Years Financial Metrics
Please check if the company has been added correctly
''' return error_html, error_metrics, error_table company_info = get_quote(stock_code.strip()) company_info['company'] = company_name print(f"股票价格数据 {company_info}") except Exception as e: print(f"获取股票价格数据失败: {e}") financial_metrics_pre = query_financial_data(stock_code, "5-Year") financial_metrics = [] year_data = None three_year_data = None try: # financial_metrics = process_financial_data_with_metadata(financial_metrics_pre) result = process_financial_data_with_metadata(financial_metrics_pre) # 按需提取字段 financial_metrics = result["financial_metrics"] year_data = result["year_data"] three_year_data = result["three_year_data"] print(f"格式化后的财务数据: {financial_metrics}") except Exception as e: print(f"Error process_financial_data: {e}") yearly_data = year_data table_data = build_table_format(three_year_data) # 增长变化的 HTML 字符(箭头+百分比) def render_change(change: str, color: str): if change.startswith("+"): return f'▲{change}' else: return f'▼{change}' # 构建左侧卡片 def build_stock_card(company_info): try: if not company_info or not isinstance(company_info, dict): company_name = "N/A" symbol = "N/A" price = "N/A" change_html = 'N/A' open_val = high_val = low_val = prev_close_val = volume_display = "N/A" else: company_name = company_info.get("company", "N/A") symbol = company_info.get("symbol", "N/A") price = company_info.get("current_price", "N/A") # 解析 change change_str = company_info.get("change", "0") try: change = float(change_str) except (ValueError, TypeError): change = 0.0 # 解析 change_percent change_percent = company_info.get("percent_change", "0%") # try: # change_percent = float(change_percent_str.rstrip('%')) # except (ValueError, TypeError): # change_percent = 0.0 change_color = "green" if change >= 0 else "red" sign = "+" if change >= 0 else "" change_html = f'{sign}{change:.2f} ({change_percent:+.2f}%)' # 其他价格字段(可选:也可格式化为 2 位小数) open_val = company_info.get("open", "N/A") high_val = company_info.get("high", "N/A") low_val = company_info.get("low", "N/A") prev_close_val = company_info.get("previous_close", "N/A") # raw_volume = company_info.get("volume", "N/A") # volume_display = format_volume(raw_volume) html = f"""
{company_name}
NYSE:{symbol}
{price}
{change_html}
Open
{open_val}
High
{high_val}
Low
{low_val}
Prev Close
{prev_close_val}
""" #
Vol
{volume_display}
return html except Exception as e: print(f"Error building stock card: {e}") return '
Error loading stock data
' # 构建中间卡片 def build_financial_metrics(yearly_data): metrics_html = "" for item in financial_metrics: change_html = render_change(item["change"], item["color"]) metrics_html += f"""
{item['label']}
{item['value']} {change_html}
""" html = f"""
{yearly_data} Financial Metrics
YTD data
{metrics_html}
""" return html # 返回三个HTML组件的内容 return build_stock_card(company_info), build_financial_metrics(yearly_data), build_income_table(table_data) def create_tab_content(tab_name, company_name): """创建Tab内容组件""" if tab_name == "summary": print(f"company_name: {company_name}") # content = get_invest_suggest(company_name) gr.Markdown("# 11111", elem_classes=["invest-suggest-md-box"]) elif tab_name == "detailed": with gr.Column(elem_classes=["tab-content"]): gr.Markdown("Financial Statements", elem_classes=["text-xl", "font-semibold", "text-gray-900", "mb-6"]) with gr.Row(elem_classes=["gap-6"]): # 收入报表 (3/5宽度) with gr.Column(elem_classes=["w-3/5", "bg-gray-50", "rounded-xl", "p-4"]): gr.Markdown("Income Statement", elem_classes=["font-medium", "mb-3"]) # 这里将显示收入报表表格 # 资产负债表和现金流量表 (2/5宽度) with gr.Column(elem_classes=["w-2/5", "flex", "flex-col", "gap-6"]): # 资产负债表 with gr.Column(elem_classes=["bg-gray-50", "rounded-xl", "p-4"]): gr.Markdown("Balance Sheet Summary", elem_classes=["font-medium", "mb-3"]) # 这里将显示资产负债表图表 # 现金流量表 with gr.Column(elem_classes=["bg-gray-50", "rounded-xl", "p-4"]): with gr.Row(elem_classes=["justify-between", "items-start"]): gr.Markdown("Cash Flow Statement", elem_classes=["font-medium"]) gr.Markdown("View Detailed", elem_classes=["text-xs", "text-blue-600", "font-medium"]) with gr.Column(elem_classes=["mt-4", "space-y-3"]): # 经营现金流 with gr.Column(): with gr.Row(elem_classes=["justify-between"]): gr.Markdown("Operating Cash Flow") gr.Markdown("$982M", elem_classes=["font-medium"]) with gr.Row(elem_classes=["w-full", "bg-gray-200", "rounded-full", "h-1.5", "mt-1"]): with gr.Column(elem_classes=["bg-green-500", "h-1.5", "rounded-full"], scale=85): gr.Markdown("") # 投资现金流 with gr.Column(): with gr.Row(elem_classes=["justify-between"]): gr.Markdown("Investing Cash Flow") gr.Markdown("-$415M", elem_classes=["font-medium"]) with gr.Row(elem_classes=["w-full", "bg-gray-200", "rounded-full", "h-1.5", "mt-1"]): with gr.Column(elem_classes=["bg-blue-500", "h-1.5", "rounded-full"], scale=42): gr.Markdown("") # 融资现金流 with gr.Column(): with gr.Row(elem_classes=["justify-between"]): gr.Markdown("Financing Cash Flow") gr.Markdown("-$212M", elem_classes=["font-medium"]) with gr.Row(elem_classes=["w-full", "bg-gray-200", "rounded-full", "h-1.5", "mt-1"]): with gr.Column(elem_classes=["bg-red-500", "h-1.5", "rounded-full"], scale=25): gr.Markdown("") elif tab_name == "comparative": with gr.Column(elem_classes=["tab-content"]): gr.Markdown("Industry Benchmarking", elem_classes=["text-xl", "font-semibold", "text-gray-900", "mb-6"]) # 收入增长对比 with gr.Column(elem_classes=["bg-gray-50", "rounded-xl", "p-4", "mb-6"]): gr.Markdown("Revenue Growth - Peer Comparison", elem_classes=["font-medium", "mb-3"]) # 这里将显示对比图表 # 利润率和报告预览网格 with gr.Row(elem_classes=["grid-cols-2", "gap-6"]): # 利润率表格 with gr.Column(elem_classes=["bg-gray-50", "rounded-xl", "p-4"]): gr.Markdown("Profitability Ratios", elem_classes=["font-medium", "mb-3"]) # 这里将显示利润率表格 # 报告预览 with gr.Column(elem_classes=["bg-gray-50", "rounded-xl", "p-4"]): gr.Markdown("Report Preview", elem_classes=["font-medium", "mb-3"]) # 这里将显示报告预览 def create_chat_panel(): """创建聊天面板组件""" # with gr.Column(elem_classes=["chat-panel"]): # 聊天头部 # with gr.Row(elem_classes=["p-4", "border-b", "border-gray-200", "items-center", "gap-2"]): # gr.Markdown("🤖", elem_classes=["text-xl", "text-blue-600"]) # gr.Markdown("Financial Assistant", elem_classes=["font-medium"]) # 聊天区域 # 一行代码嵌入! # chat_component = create_financial_chatbot() # chat_component.render() # create_financial_chatbot() # gr.LoginButton() # chatbot = gr.Chatbot( # value=[ # {"role": "assistant", "content": "I'm your financial assistant, how can I help you today?"}, # # {"role": "assistant", "content": "Hello! I can help you analyze financial data. Ask questions like \"Show revenue trends\" or \"Compare profitability ratios\""}, # # {"role": "user", "content": "Show revenue trends for last 4 quarters"}, # # {"role": "assistant", "content": "Revenue trend for GlobalTech Inc.:\n\nQ4 2024: $2.53B (+8.2%)\nQ1 2025: $2.61B (+9.8%)\nQ2 2025: $2.71B (+11.6%)\nQ3 2025: $2.84B (+12.4%)"}, # # {"role": "assistant", "content": "Revenue trend for GlobalTech Inc.:\n\nQ4 2024: $2.53B (+8.2%)\nQ1 2025: $2.61B (+9.8%)\nQ2 2025: $2.71B (+11.6%)\nQ3 2025: $2.84B (+12.4%)"}, # # {"role": "assistant", "content": "Revenue trend for GlobalTech Inc.:\n\nQ4 2024: $2.53B (+8.2%)\nQ1 2025: $2.61B (+9.8%)\nQ2 2025: $2.71B (+11.6%)\nQ3 2025: $2.84B (+12.4%)"}, # # {"role": "assistant", "content": "Revenue trend for GlobalTech Inc.:\n\nQ4 2024: $2.53B (+8.2%)\nQ1 2025: $2.61B (+9.8%)\nQ2 2025: $2.71B (+11.6%)\nQ3 2025: $2.84B (+12.4%)"} # ], # type="messages", # # elem_classes=["min-h-0", "overflow-y-auto", "space-y-4", "chat-content-box"], # show_label=False, # autoscroll=True, # show_copy_button=True, # height=400, # container=False, # ) # # 输入区域 # with gr.Row(elem_classes=["border-t", "border-gray-200", "gap-2"]): # msg = gr.Textbox( # placeholder="Ask a financial question...", # elem_classes=["flex-1", "border", "border-gray-300", "rounded-lg", "px-4", "py-2", "focus:border-blue-500"], # show_label=False, # lines=1, # submit_btn=True, # container=False, # ) # msg.submit( # chat_bot, # [msg, chatbot], # [msg, chatbot], # queue=True, # ) # def load_css_files(css_dir, filenames): # css_content = "" # for filename in filenames: # path = os.path.join(css_dir, filename) # if os.path.exists(path): # with open(path, "r", encoding="utf-8") as f: # css_content += f.read() + "\n" # else: # print(f"⚠️ CSS file not found: {path}") # return css_content def main(): # 获取当前目录 current_dir = os.path.dirname(os.path.abspath(__file__)) css_dir = os.path.join(current_dir, "css") # ✅ 初始化缓存管理器 report_cache = ReportCacheManager(cache_ttl_seconds=3600, max_cache_size=50) data_cache = FinancialDataCacheManager(cache_ttl_seconds=1800, max_cache_size=100) # def load_css_files(css_dir, filenames): # """读取多个 CSS 文件并合并为一个字符串""" # css_content = "" # for filename in filenames: # path = os.path.join(css_dir, filename) # if os.path.exists(path): # with open(path, "r", encoding="utf-8") as f: # css_content += f.read() + "\n" # else: # print(f"Warning: CSS file not found: {path}") # return css_content # 设置CSS路径 css_paths = [ os.path.join(css_dir, "main.css"), os.path.join(css_dir, "components.css"), os.path.join(css_dir, "layout.css") ] # css_dir = "path/to/your/css/folder" # 替换为你的实际路径 # 自动定位 css 文件夹(与 app.py 同级) # BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # CSS_DIR = os.path.join(BASE_DIR, "css") # css_files = ["main.css", "components.css", "layout.css"] # combined_css = load_css_files(CSS_DIR, css_files) # print(combined_css) # 获取默认选中的公司(第一个) default_company = my_companies[0]['company_name'] if my_companies else "" with gr.Blocks( title="Financial Analysis Dashboard", css_paths=css_paths, css=custom_css, # css=combined_css ) as demo: # 添加处理公司点击事件的路由 # 创建一个状态组件来跟踪选中的公司 selected_company_state = gr.State(default_company) with gr.Column(elem_classes=["container", "container-h"]): # 头部 create_header() # 创建主布局 with gr.Row(elem_classes=["main-content-box"]): # 左侧边栏 with gr.Column(scale=1, min_width=350): # 获取company_list组件的引用 company_list_component, report_section_component, report_display_component, news_display_component = create_sidebar() # 主内容区域 with gr.Column(scale=9): # ✅ 指标仪表板置顶显示 create_metrics_dashboard() with gr.Row(elem_classes=["main-content-box"]): with gr.Column(scale=8): # Tab内容 with gr.Tabs(): with gr.TabItem("Investment Suggestion", elem_classes=["tab-item"]): # 创建一个用于显示公司名称的组件 # company_display = gr.Markdown("# Please select a company") # 创建一个占位符用于显示tab内容 tab_content = gr.Markdown(elem_classes=["invest-suggest-md-box"]) # 当选中的公司改变时,更新显示 # selected_company_state.change( # fn=lambda company: f"# Investment Suggestions for {company}" if company else "# Please select a company", # inputs=[selected_company_state], # outputs=[company_display] # ) # 当选中的公司改变时,重新加载tab内容 def update_tab_content(company): if company: # ✅ 使用缓存管理器生成投资建议 def generate_suggestion(): stock_code = get_stock_code_by_company_name(company) return query_company_advanced(stock_code, "suggestion") # 使用生成器yield缓存管理器的输出 for result in report_cache.get_or_create_report(company, "suggestion", generate_suggestion): yield result else: yield "
Please select a company
" selected_company_state.change( fn=update_tab_content, inputs=[selected_company_state], outputs=[tab_content], ) with gr.TabItem("Analysis Report", elem_classes=["tab-item"]): # 创建一个用于显示公司名称的组件 # analysis_company_display = gr.Markdown("# Please select a company") # 创建一个占位符用于显示tab内容 analysis_tab_content = gr.Markdown(elem_classes=["analysis-report-md-box"]) # 当选中的公司改变时,更新显示 # selected_company_state.change( # fn=lambda company: f"# Analysis Report for {company}" if company else "# Please select a company", # inputs=[selected_company_state], # outputs=[analysis_company_display] # ) # 当选中的公司改变时,重新加载tab内容 def update_analysis_tab_content(company): if company: # ✅ 使用缓存管理器生成分析报告 def generate_report(): stock_code = get_stock_code_by_company_name(company) return query_company_advanced(stock_code, "report") # 使用生成器yield缓存管理器的输出 for result in report_cache.get_or_create_report(company, "report", generate_report): yield result else: yield "
Please select a company
" selected_company_state.change( fn=update_analysis_tab_content, inputs=[selected_company_state], outputs=[analysis_tab_content] ) # with gr.TabItem("Comparison", elem_classes=["tab-item"]): # create_tab_content("comparison") with gr.Column(scale=2, min_width=400): # 聊天面板 # ✅ 使用 chatbot.chat_main.respond (MCP_Financial_Report智能体) # Investment Suggestion和Analysis Report继续使用EasyFinancialAgent gr.ChatInterface( respond, title="Easy Financial AI Assistant", additional_inputs=[ gr.State(value=""), # CRITICAL: Store session URL across turns (hidden from UI) gr.State(value={}) # CRITICAL: Store agent context across turns (hidden from UI) ], additional_inputs_accordion=gr.Accordion(label="Settings", open=False, visible=False), # Hide the accordion completely ) # 在页面加载时设置默认选中的公司并加载数据 def load_default_company(): # 获取公司列表选项 try: companies_data = my_companies if isinstance(companies_data, list) and len(companies_data) > 0: choices = [str(item.get('company_name', 'Unknown')) for item in companies_data] elif isinstance(companies_data, pd.DataFrame) and not companies_data.empty: choices = [str(row.get('company_name', 'Unknown')) for _, row in companies_data.iterrows()] else: choices = [] except: choices = [] if default_company: # ✅ 真正调用缓存管理器生成数据,而不是只返回loading HTML # 为Investment Suggestion生成数据 def generate_suggestion(): stock_code = get_stock_code_by_company_name(default_company) return query_company_advanced(stock_code, "suggestion") # 为Analysis Report生成数据 def generate_report(): stock_code = get_stock_code_by_company_name(default_company) return query_company_advanced(stock_code, "report") # 使用生成器逐步yield结果 suggestion_gen = report_cache.get_or_create_report(default_company, "suggestion", generate_suggestion) report_gen = report_cache.get_or_create_report(default_company, "report", generate_report) # 获取第一个结果(可能是loading或者缓存) suggestion_result = next(suggestion_gen, "
Loading...
") report_result = next(report_gen, "
Loading...
") # ✅ Financial Metrics不在这里加载,而是通过selected_company_state.change事件触发 # 这样避免重复加载(demo.load设置状态 → .change事件触发) return ( default_company, gr.update(choices=choices, value=default_company), suggestion_result, report_result ) return ( "", gr.update(choices=choices), "
Please select a company
", "
Please select a company
" ) demo.load( fn=load_default_company, inputs=[], outputs=[ selected_company_state, company_list_component, tab_content, analysis_tab_content # ✅ Financial Metrics组件不在这里输出,由selected_company_state.change事件触发更新 ], concurrency_limit=None, ) # 绑定公司选择事件到状态更新 # 注意:这里需要确保create_sidebar中没有重复绑定相同的事件 company_list_component.change( fn=lambda x: x, # 直接返回选中的公司名称 inputs=[company_list_component], outputs=[selected_company_state], concurrency_limit=None ) # 绑定公司选择事件到指标仪表板更新 def update_metrics_dashboard_wrapper(company_name): if company_name: # ✅ 使用缓存管理器加载财务数据 def load_financial_data(): return update_metrics_dashboard(company_name) # 使用缓存管理器获取数据 try: result = data_cache.get_or_load_data(company_name, "metrics", load_financial_data) stock_card_html, financial_metrics_html, income_table_html = result yield stock_card_html, financial_metrics_html, income_table_html except Exception as e: error_html = f'''

Error loading financial data: {str(e)}

Please try again later.

''' yield error_html, error_html, error_html else: # 如果没有选择公司,返回空内容 empty_html = "
Please select a company
" yield empty_html, empty_html, empty_html selected_company_state.change( fn=update_metrics_dashboard_wrapper, inputs=[selected_company_state], outputs=list(metrics_dashboard_components), concurrency_limit=None ) return demo if __name__ == "__main__": demo = main() demo.launch(share=True)