import gradio as gr import os import datetime import re import pandas as pd from sqlalchemy import true from chatbot.chat_main import respond import globals as g from service.mysql_service import get_companys, insert_company, get_company_by_name from service.chat_service import get_analysis_report, get_stock_price_from_bailian, search_company, search_news, get_invest_suggest, chat_bot from service.company import check_company_exists from service.hf_upload import get_hf_files_with_links from MarketandStockMCP.news_quote_mcp import get_company_news, get_quote from EasyReportDataMCP.report_mcp import query_financial_data from service.three_year_table_tool import build_table_format from service.three_year_tool import process_financial_data_with_metadata from service.tool_processor import get_stock_price get_companys_state = True # JavaScript代码用于读取和存储数据 js_code = """ function handleStorage(operation, key, value) { if (operation === 'set') { localStorage.setItem(key, value); return `已存储: ${key} = ${value}`; } else if (operation === 'get') { let storedValue = localStorage.getItem(key); if (storedValue === null) { return `未找到键: ${key}`; } return `读取到: ${key} = ${storedValue}`; } else if (operation === 'clear') { localStorage.removeItem(key); return `已清除: ${key}`; } else if (operation === 'clearAll') { localStorage.clear(); return '已清除所有数据'; } } """ custom_css = """ /* 匹配所有以 gradio-container- 开头的类 */ div[class^="gradio-container-"], div[class*=" gradio-container-"] { -webkit-text-size-adjust: 100% !important; line-height: 1.5 !important; font-family: unset !important; -moz-tab-size: 4 !important; tab-size: 4 !important; } .company-list-container { background-color: white; border-radius: 0.5rem; padding: 0.75rem; margin-bottom: 0.75rem; border: 1px solid #e5e7eb; box-shadow: 0 1px 2px 0 rgba(0, 0, 0, 0.05); width: 100%; } /* 隐藏单选框 */ .company-list-container input[type="radio"] { display: none; } /* 自定义选项样式 */ .company-list-container label { display: block; padding: 0.75rem 1rem; margin: 0.25rem 0; border-radius: 0.375rem; cursor: pointer; transition: all 0.2s ease; background-color: #f9fafb; border: 1px solid #e5e7eb; font-size: 1rem; text-align: left; width: 100%; box-sizing: border-box; } /* 悬停效果 */ .company-list-container label:hover { background-color: #f3f4f6; border-color: #d1d5db; } /* 选中效果 - 确保背景色充满整个选项 */ .company-list-container input[type="radio"]:checked + span { # background: #3b82f6 !important; color: white !important; font-weight: 600 !important; display: block; width: 100%; height: 100%; padding: 0.75rem 1rem; margin: -0.75rem -1rem; border-radius: 0.375rem; } .company-list-container span { display: block; padding: 0; border-radius: 0.375rem; width: 100%; } /* 确保每行只有一个选项 */ .company-list-container .wrap { display: block !important; } .company-list-container .wrap li { display: block !important; width: 100% !important; } label.selected { background: #3b82f6 !important; color: white !important; } """ # 全局变量用于存储公司映射关系 companies_map = {} # 根据公司名称获取股票代码的函数 def get_stock_code_by_company_name(company_name): """根据公司名称获取股票代码""" if company_name in companies_map and "CODE" in companies_map[company_name]: return companies_map[company_name]["CODE"] return "" # 默认返回 # 创建一个简单的函数来获取公司列表 def get_company_list_choices(): choices = [] print(f"Getting init add company list choices...{get_companys_state}") if not get_companys_state: return gr.update(choices=choices) try: companies_data = get_companys() print(f"Getting init add company list choices...companies_data: {companies_data}") if isinstance(companies_data, pd.DataFrame) and not companies_data.empty: choices = [str(row.get('company_name', 'Unknown')) for _, row in companies_data.iterrows()] else: choices = [] except: choices = [] return gr.update(choices=choices) # Sidebar service functions # 处理公司点击事件的函数 def handle_company_click(company_name): """处理公司点击事件,先判断是否已经入库,如果没有则进行入库操作,然后刷新公司列表""" print(f"Handling click for company: {company_name}") # 1. 判断是否已经入库 if not check_company_exists(company_name): # 2. 如果没有入库,则进行入库操作 # 获取股票代码(如果有的话) stock_code = companies_map.get(company_name, {}).get("CODE", "Unknown") print(f"Inserting company {company_name} with code {stock_code}") # 插入公司到数据库 success = insert_company(company_name, stock_code) if success: print(f"Successfully inserted company: {company_name}") # 直接更新companies_map,而不是重新加载整个映射 companies_map[company_name] = {"NAME": company_name, "CODE": stock_code} # 使用Gradio的成功提示 gr.Info(f"Successfully added company: {company_name}") # 返回True表示添加成功,需要刷新列表 return True else: print(f"Failed to insert company: {company_name}") # 使用Gradio的错误提示 gr.Error(f"Failed to insert company: {company_name}") return False else: print(f"Company {company_name} already exists in database") # 使用Gradio的警告提示 gr.Warning(f"Company '{company_name}' already exists") # 3. 返回成功响应 return None def get_company_list_html(selected_company=""): try: # 从数据库获取所有公司 companies_data = get_companys() # 检查是否为错误信息 if isinstance(companies_data, str): if "查询执行失败" in companies_data: return "
获取公司列表失败
" else: # 如果是字符串但不是错误信息,可能需要特殊处理 return "" # 检查是否为DataFrame且为空 if not isinstance(companies_data, pd.DataFrame) or companies_data.empty: return "" # 生成HTML列表 html_items = [] for _, row in companies_data.iterrows(): company_name = row.get('company_name', 'Unknown') # 根据是否选中添加不同的样式类 css_class = "company-item" if company_name == selected_company: css_class += " selected-company" # 使用button元素来确保可点击性 html_items.append(f'') return "\n".join(html_items) except Exception as e: return f"
生成公司列表失败: {str(e)}
" def initialize_company_list(selected_company=""): return get_company_list_html(selected_company) def refresh_company_list(selected_company=""): """刷新公司列表,返回最新的HTML内容,带loading效果""" # 先返回loading状态 loading_html = '''
''' yield loading_html # 然后返回实际的数据 yield get_company_list_html(selected_company) # 新增函数:处理公司选择事件 def select_company(company_name): """处理公司选择事件,更新全局状态并返回更新后的公司列表""" # 更新全局变量 g.SELECT_COMPANY = company_name if company_name else "" # 对于Radio组件,我们只需要返回更新后的选项列表 try: companies_data = get_companys() if isinstance(companies_data, pd.DataFrame) and not companies_data.empty: choices = [str(row.get('company_name', 'Unknown')) for _, row in companies_data.iterrows()] else: choices = [] except: choices = [] return gr.update(choices=choices, value=company_name) def initialize_companies_map(): """初始化 companies_map 字典""" global companies_map companies_map = {} # 清空之前的映射 print("Initializing companies map...") try: # 获取预定义的公司列表 predefined_companies = [ { "NAME": "Alibaba", "CODE": "BABA" }, { "NAME": "阿里巴巴-W", "CODE": "09988" }, { "NAME": "NVIDIA", "CODE": "NVDA" }, { "NAME": "Amazon", "CODE": "AMZN" }, { "NAME": "Intel", "CODE": "INTC" }, { "NAME": "Meta", "CODE": "META" }, { "NAME": "Google", "CODE": "GOOGL" }, { "NAME": "Apple", "CODE": "AAPL" }, { "NAME": "Tesla", "CODE": "TSLA" }, { "NAME": "AMD", "CODE": "AMD" }, { "NAME": "Microsoft", "CODE": "MSFT" }, { "NAME": "ASML", "CODE": "ASML" } ] # 将预定义公司添加到映射中 for company in predefined_companies: companies_map[company["NAME"]] = {"NAME": company["NAME"], "CODE": company["CODE"]} print(f"Predefined companies added: {len(predefined_companies)}") # 从数据库获取公司数据 companies_data = get_companys() # companies_data = window.cachedCompanies or [] print(f"Companies data from DB: {companies_data}") # 如果数据库中有公司数据,则添加到映射中(去重) if isinstance(companies_data, pd.DataFrame) and not companies_data.empty: print(f"Adding {len(companies_data)} companies from database") for _, row in companies_data.iterrows(): company_name = row.get('company_name', 'Unknown') stock_code = row.get('stock_code', '') # 确保company_name和stock_code都是字符串类型 company_name = str(company_name) if company_name is not None else 'Unknown' stock_code = str(stock_code) if stock_code is not None else '' # 检查是否已存在于映射中(通过股票代码判断) is_duplicate = False for existing_company in companies_map.values(): if existing_company["CODE"] == stock_code: is_duplicate = True break # 如果不重复,则添加到映射中 if not is_duplicate: companies_map[company_name] = {"NAME": company_name, "CODE": stock_code} # print(f"Added company: {company_name}") else: print("No companies found in database") print(f"Final companies map: {companies_map}") except Exception as e: # 错误处理 print(f"Error initializing companies map: {str(e)}") pass # Sidebar company selector functions def update_company_choices(user_input: str): """更新公司选择列表""" # 第一次 yield:立即显示 modal + loading 提示 yield gr.update( choices=["Searching..."], visible=True ), gr.update(visible=False, value="") # 添加第二个返回值 # 第二次:执行耗时操作(调用 LLM) choices = search_company(user_input) # 这是你原来的同步函数 # 检查choices是否为错误信息 if len(choices) > 0 and isinstance(choices[0], str) and not choices[0].startswith("Searching"): # 如果是错误信息或非正常格式,显示提示消息 error_message = choices[0] if len(choices) > 0 else "未知错误" # 使用Ant Design风格的错误提示 error_html = f'''
{error_message}
''' yield gr.update(choices=["No results found"], visible=True), gr.update(visible=True, value=error_html) else: # 第三次:更新为真实结果 yield gr.update( choices=choices, visible=len(choices) > 0 ), gr.update(visible=False, value="") def add_company(selected, current_list): """添加选中的公司""" if selected == "No results found": return gr.update(visible=False), current_list, gr.update(visible=False, value="") if selected: # print(f"Selected company====: {selected}") # 从选择的文本中提取公司名称和股票代码 # 假设格式为 "公司名称 (股票代码)" selected_clean = selected.strip() match = re.match(r"^(.+?)\s*\(([^)]+)\)$", selected_clean) if match: company_name = match.group(1) stock_code = match.group(2) elif companies_map.get(selected_clean): company_name = selected_clean stock_code = companies_map[selected_clean]["CODE"] else: company_name = selected_clean stock_code = "Unknown" # print(f"Company name: {company_name}, Stock code: {stock_code}") # print(f"Company exists: {check_company_exists(company_name)}") if not check_company_exists(company_name): # 入库 success = insert_company(company_name, stock_code) if success: # 从数据库获取更新后的公司列表 try: companies_data = get_companys() if isinstance(companies_data, pd.DataFrame) and not companies_data.empty: updated_list = [str(row.get('company_name', 'Unknown')) for _, row in companies_data.iterrows()] else: updated_list = [] except: updated_list = [] # 添加默认公司选项 if not updated_list: updated_list = ['Alibaba', '腾讯控股', 'Tencent', '阿里巴巴-W', 'Apple'] # 成功插入后清除状态消息,并更新Radio组件的选项,同时默认选中刚添加的公司 # 通过设置value参数,会自动触发change事件来加载数据 return gr.update(visible=False), gr.update(choices=updated_list, value=company_name), gr.update(visible=False, value="") else: # 插入失败显示错误消息,使用Gradio内置的错误提示 gr.Error("插入公司失败") return gr.update(visible=False), current_list, gr.update(visible=False, value="") else: # 公司已存在,使用Gradio内置的警告消息 gr.Warning(f"公司 '{company_name}' 已存在") return gr.update(visible=False), current_list, gr.update(visible=False, value="") return gr.update(visible=False), current_list, gr.update(visible=False, value="") # Sidebar report section functions # 创建一个全局变量来存储公司按钮组件 company_buttons = {} def create_company_buttons(): """创建公司按钮组件""" # 确保companies_map已被初始化 if not companies_map: initialize_companies_map() # 显示companies_map中的公司列表 companies = list(companies_map.keys()) # 添加调试信息 print(f"Companies in map: {companies}") # 清空之前的按钮 company_buttons.clear() if not companies: # 如果没有公司,返回一个空的列 with gr.Column(): gr.Markdown("暂无公司数据") else: # 使用Gradio按钮组件创建公司列表 with gr.Column(elem_classes=["home-company-list"]): # 按每行两个公司进行分组 for i in range(0, len(companies), 2): # 检查是否是最后一行且只有一个元素 if i + 1 < len(companies): # 有两个元素 with gr.Row(elem_classes=["home-company-item-box"]): btn1 = gr.Button(companies[i], elem_classes=["home-company-item", "gradio-button"]) btn2 = gr.Button(companies[i + 1], elem_classes=["home-company-item", "gradio-button"]) # 保存按钮引用 company_buttons[companies[i]] = btn1 company_buttons[companies[i + 1]] = btn2 else: # 只有一个元素 with gr.Row(elem_classes=["home-company-item-box", "single-item"]): btn = gr.Button(companies[i], elem_classes=["home-company-item", "gradio-button"]) # 保存按钮引用 company_buttons[companies[i]] = btn # 返回按钮字典 return company_buttons def update_report_section(selected_company, report_data, stock_code): """根据选中的公司更新报告部分""" print(f"Updating report (报告部分): {selected_company}") # 添加调试信息 if selected_company == "" or selected_company is None or selected_company == "Unknown": # 没有选中的公司,显示公司列表 # html_content = get_initial_company_list_content() # 暂时返回空内容,稍后会用Gradio组件替换 html_content = "" return gr.update(value=html_content, visible=True) else: # 有选中的公司,显示相关报告 # try: # # 尝试从Hugging Face获取文件列表 # report_data = get_hf_files_with_links("JC321/files-world") # except Exception as e: # # 如果获取失败,使用模拟数据并显示错误消息 # print(f"获取Hugging Face文件列表失败: {str(e)}") # report_data = [] stock_code = get_stock_code_by_company_name(selected_company) report_data = query_financial_data(stock_code, "5-Year") # report_data = process_financial_data_with_metadata(financial_metrics_pre) html_content = '
' html_content += '

Financial Reports

' for report in report_data: html_content += f'''
{report['period']}-{stock_code}-{report['source_form']}
''' html_content += f'' html_content += '
' return gr.update(value=html_content, visible=True) def update_news_section(selected_company): """根据选中的公司更新报告部分""" html_content = "" if selected_company == "" or selected_company is None: # 没有选中的公司,显示公司列表 # html_content = get_initial_company_list_content() # 暂时返回空内容,稍后会用Gradio组件替换 return gr.update(value=html_content, visible=True) else: try: stock_code = get_stock_code_by_company_name(selected_company) report_data = get_company_news(stock_code, None, None) # print(f"新闻列表: {report_data['articles']}") # report_data = search_news(selected_company) if (report_data['articles']): report_data = report_data['articles'] news_html = "
" news_html += '

News

' from datetime import datetime for news in report_data: published_at = news['published'] # 解析 ISO 8601 时间字符串(注意:strptime 不直接支持 'Z',需替换或使用 fromisoformat) dt = datetime.fromisoformat(published_at.replace("Z", "+00:00")) # 格式化为 YYYY.MM.DD formatted_date = dt.strftime("%Y.%m.%d") news_html += f'''
[{formatted_date}] {news['headline']}
''' news_html += f'' news_html += '
' html_content += news_html except Exception as e: print(f"Error updating report section: {str(e)}") return gr.update(value=html_content, visible=True) # Component creation functions def create_header(): """创建头部组件""" # 获取当前时间 current_time = datetime.datetime.now().strftime("%B %d, %Y - Market Data Updated Today") with gr.Row(elem_classes=["header"]): # 左侧:图标和标题 with gr.Column(scale=8): # 使用圆柱体SVG图标表示数据库 gr.HTML('''
Easy Financial Report Dashboard
''', elem_classes=["text-2xl"]) # 右侧:时间信息 with gr.Column(scale=2): gr.Markdown(current_time, elem_classes=["text-sm-top-time"]) def create_company_list(get_companys_state): company_list = gr.Radio( choices=[], label="", interactive=True, elem_classes=["company-list-container"], container=False, # 不显示外部容器边框 visible=True ) if (get_companys_state == False): return company_list else: """创建公司列表组件""" # 获取公司列表数据 try: companies_data = get_companys() print(f"创建公司列表组件 - Companies data: {companies_data}") if isinstance(companies_data, pd.DataFrame) and not companies_data.empty: choices = [str(row.get('company_name', 'Unknown')) for _, row in companies_data.iterrows()] else: choices = [] except: choices = [] # 添加默认公司选项 if not choices: choices = [] # 使用Radio组件显示公司列表,不显示标签 company_list = gr.Radio( choices=choices, label="", interactive=True, elem_classes=["company-list-container"], container=False, # 不显示外部容器边框 visible=True ) return company_list def create_company_selector(): """创建公司选择器组件""" company_input = gr.Textbox( show_label=False, placeholder="Add Company", elem_classes=["company-input-box"], lines=1, max_lines=1, # container=False ) # 状态消息显示区域 status_message = gr.HTML( "", elem_classes=["status-message"], visible=False ) # 弹窗选择列表 company_modal = gr.Radio( show_label=False, choices=[], visible=False, elem_classes=["company-modal"] ) return company_input, status_message, company_modal def create_report_section(): """创建报告部分组件""" # 创建一个用于显示报告列表的组件,初始显示公司列表 # initial_content = get_initial_company_list_content() # 暂时返回空内容,稍后会用Gradio组件替换 initial_content = "" # print(f"Initial content: {initial_content}") # 添加调试信息 report_display = gr.HTML(initial_content) return report_display def create_news_section(): """创建新闻部分组件""" initial_content = "" news_display = gr.HTML(initial_content) return news_display def format_financial_metrics(data: dict, prev_data: dict = None) -> list: # pyright: ignore[reportArgumentType] """ 将原始财务数据转换为 financial_metrics 格式。 Args: data (dict): 当前财年数据(必须包含 total_revenue, net_income 等字段) prev_data (dict, optional): 上一财年数据,用于计算 change。若未提供,change 设为 "--" Returns: list[dict]: 符合 financial_metrics 格式的列表 """ def format_currency(value: float) -> str: """将数字格式化为 $XB / $XM / $XK""" if value >= 1e9: return f"${value / 1e9:.2f}B" elif value >= 1e6: return f"${value / 1e6:.2f}M" elif value >= 1e3: return f"${value / 1e3:.2f}K" else: return f"${value:.2f}" def calculate_change(current: float, previous: float) -> tuple: """计算变化百分比和颜色""" if previous == 0: return "--", "gray" change_pct = (current - previous) / abs(previous) * 100 sign = "+" if change_pct >= 0 else "" color = "green" if change_pct >= 0 else "red" return f"{sign}{change_pct:.1f}%", color # 定义指标映射 metrics_config = [ { "key": "total_revenue", "label": "Total Revenue", "is_currency": True, "eps_like": False }, { "key": "net_income", "label": "Net Income", "is_currency": True, "eps_like": False }, { "key": "earnings_per_share", "label": "Earnings Per Share", "is_currency": False, # EPS 不用 B/M 单位 "eps_like": True }, { "key": "operating_expenses", "label": "Operating Expenses", "is_currency": True, "eps_like": False }, { "key": "operating_cash_flow", "label": "Cash Flow", "is_currency": True, "eps_like": False } ] result = [] for item in metrics_config: key = item["key"] current_val = data.get(key) if current_val is None: continue # 格式化 value if item["eps_like"]: value_str = f"${current_val:.2f}" elif item["is_currency"]: value_str = format_currency(current_val) else: value_str = str(current_val) # 计算 change(如果有上期数据) if prev_data and key in prev_data: prev_val = prev_data[key] change_str, color = calculate_change(current_val, prev_val) else: change_str = "--" color = "gray" result.append({ "label": item["label"], "value": value_str, "change": change_str, "color": color }) return result def create_sidebar(): """创建侧边栏组件""" # 初始化 companies_map initialize_companies_map() with gr.Column(elem_classes=["sidebar"]): # 公司选择 with gr.Group(elem_classes=["card"]): gr.Markdown("### Select Company", elem_classes=["card-title", "left-card-title"]) with gr.Column(): company_list = create_company_list(get_companys_state) # 创建公司列表 # if not get_companys_state: # getCompanyFromStorage = gr.Button("读取") # getCompanyFromStorage.click( # fn=create_company_list(True), # inputs=[], # outputs=[company_list, status_message] # ) # 创建公司选择器 company_input, status_message, company_modal = create_company_selector() # 绑定事件 company_input.submit( fn=update_company_choices, inputs=[company_input], outputs=[company_modal, status_message] ) company_modal.change( fn=add_company, inputs=[company_modal, company_list], outputs=[company_modal, company_list, status_message] ) # 创建公司按钮组件 company_buttons = create_company_buttons() # 为每个公司按钮绑定点击事件 def make_click_handler(company_name): def handler(): result = handle_company_click(company_name) # 如果添加成功,刷新Select Company列表并默认选中刚添加的公司 if result is True: # 正确地刷新通过create_company_list()创建的Radio组件 try: companies_data = get_companys() if isinstance(companies_data, pd.DataFrame) and not companies_data.empty: updated_choices = [str(row.get('company_name', 'Unknown')) for _, row in companies_data.iterrows()] else: updated_choices = [] except: updated_choices = [] # 使用gr.update来正确更新Radio组件,并默认选中刚添加的公司 # 同时触发change事件来加载数据 return gr.update(choices=updated_choices, value=company_name) return None return handler for company_name, button in company_buttons.items(): button.click( fn=make_click_handler(company_name), inputs=[], outputs=[company_list] ) # 创建一个容器来容纳报告部分,初始时隐藏 with gr.Group(elem_classes=["report-news-box"]) as report_section_group: # gr.Markdown("### Financial Reports", elem_classes=["card-title", "left-card-title"]) report_display = create_report_section() news_display = create_news_section() # 处理公司选择事件 def select_company_handler(company_name): """处理公司选择事件的处理器""" # 更新全局变量 g.SELECT_COMPANY = company_name if company_name else "" # 更新报告部分的内容 updated_report_display = update_report_section(company_name, None, None) updated_news_display = update_news_section(company_name) # 根据是否选择了公司来决定显示/隐藏报告部分 if company_name: # 有选中的公司,显示报告部分 return gr.update(visible=True), updated_report_display, updated_news_display else: # 没有选中的公司,隐藏报告部分 return gr.update(visible=False), updated_report_display, updated_news_display company_list.change( fn=select_company_handler, inputs=[company_list], outputs=[report_section_group, report_display, news_display] ) # 返回公司列表组件和报告部分组件 return company_list, report_section_group, report_display, news_display def build_income_table(table_data): # 兼容两种数据结构: # 1. 新结构:包含 list_data 和 yoy_rates 的字典 # 2. 旧结构:直接是二维数组 if isinstance(table_data, dict) and "list_data" in table_data: # 新结构 income_statement = table_data["list_data"] yoy_rates = table_data["yoy_rates"] or [] else: # 旧结构,直接使用传入的数据 income_statement = table_data yoy_rates = [] # 创建一个映射,将年份列索引映射到增长率 yoy_map = {} if len(yoy_rates) > 1 and len(yoy_rates[0]) > 1: # 获取增长率表头(跳过第一列"Category") yoy_headers = yoy_rates[0][1:] # 为每个指标行创建增长率映射 for i, yoy_row in enumerate(yoy_rates[1:], 1): # 跳过标题行 category = yoy_row[0] yoy_map[category] = {} for j, rate in enumerate(yoy_row[1:]): if j < len(yoy_headers): yoy_map[category][yoy_headers[j]] = rate table_rows = "" header_row = income_statement[0] for i, row in enumerate(income_statement): if i == 0: row_style = "background-color: #f5f5f5; font-weight: 500;" else: row_style = "background-color: #f9f9f9;" cells = "" for j, cell in enumerate(row): if j == 0: cells += f"{cell}" else: # 添加增长率箭头(如果有的话) growth = None category = row[0] # j是当前单元格索引,0是类别列,1,2,3...是数据列 # yoy_map的键是年份,例如"2024/FY" if i > 0 and category in yoy_map and j > 0 and j < len(header_row): year_header = header_row[j] if year_header in yoy_map[category]: growth = yoy_map[category][year_header] if growth and growth != "N/A": arrow = "▲" if growth.startswith("+") else "▼" color = "green" if growth.startswith("+") else "red" cells += f"""
{cell}
{arrow}{growth}
""" else: cells += f"{cell}" table_rows += f"{cells}" html = f"""
Income Statement and Cash Flow
{table_rows}
""" return html def create_metrics_dashboard(): """创建指标仪表板组件""" with gr.Row(elem_classes=["metrics-dashboard"]): card_custom_style = ''' background-color: white; border-radius: 0.5rem; box-shadow: rgba(0, 0, 0, 0.1) 0px 1px 3px 0px, rgba(0, 0, 0, 0.1) 0px 1px 2px -1px; padding: 1.25rem; min-height: 250px !important; text-align: center; ''' # 模拟数据 company_info = { "name": "N/A", "symbol": "NYSE:N/A", "price": 0, "change": 0, "change_percent": 0.41, "open": 165.20, "high": 166.37, "low": 156.15, "prev_close": 157.01, "volume": "27.10M" } # financial_metrics = query_financial_data("NVDA", "最新财务数据") # print(f"最新财务数据: {financial_metrics}") financial_metrics = [ {"label": "Total Revenue", "value": "N/A", "change": "N/A", "color": "grey"}, {"label": "Net Income", "value": "N/A", "change": "N/A", "color": "grey"}, {"label": "Earnings Per Share", "value": "N/A", "change": "N/A", "color": "grey"}, {"label": "Operating Expenses", "value": "N/A", "change": "N/A", "color": "grey"}, {"label": "Cash Flow", "value": "N/A", "change": "N/A", "color": "grey"} ] # income_statement = [ # ["Category", "2024/FY", "2023/FY", "2022/FY"], # ["Total", "130350M", "126491M", "134567M"], # ["Net Income", "11081", "10598M", "9818.4M"], # ["Earnings Per Share", "4.38", "4.03", "3.62"], # ["Operating Expenses", "31990.9M", "31439.6M", "34516.2M"], # ["Cash Flow", "25289.9M", "29086M", "22517.2M"] # ] income_statement = { "list_data": [ ["Category", "N/A/FY", "N/A/FY", "N/A/FY"], ["Total", "N/A", "N/A", "N/A"], ["Net Income", "N/A", "N/A", "N/A.4M"], ["Earnings Per Share", "N/A", "N/A", "N/A"], ["Operating Expenses", "N/A", "N/A", "N/A"], ["Cash Flow", "N/A", "N/A", "N/A"] ], "yoy_rates": [] # "yoy_rates": [ # ["Category", "N/A/FY", "N/A/FY"], # ["Total", "N/A", "N/A"], # ["Net Income", "+3.05%", "-6.00%"], # ["Earnings Per Share", "+3.05%", "-6.00%"], # ["Operating Expenses", "+29.17%", "-6.00%"], # ["Cash Flow", "-13.05%", "-6.00%"] # ] } yearly_data = 'N/A' # 增长变化的 HTML 字符(箭头+百分比) def render_change(change: str, color: str): if change.startswith("+"): return f'▲{change}' else: return f'▼{change}' # 构建左侧卡片 def build_stock_card(): html = f"""
N/A
N/A
N/A
N/A
Open
N/A
High
N/A
Low
N/A
Prev Close
N/A
""" return html #
Vol
N/A
# 构建中间卡片 def build_financial_metrics(): metrics_html = "" for item in financial_metrics: change_html = render_change(item["change"], item["color"]) metrics_html += f"""
{item['label']}
{item['value']} {change_html}
""" html = f"""
{yearly_data} Financial Metrics
YTD data
{metrics_html}
""" return html # 主函数:返回所有 HTML 片段 def get_dashboard(): with gr.Row(): with gr.Column(scale=1, min_width=250, elem_classes=["metric-card-col-left"]): stock_card_html = gr.HTML(build_stock_card(), elem_classes=["metric-card-left"]) with gr.Column(scale=1, min_width=300, elem_classes=["metric-card-col-middle"]): financial_metrics_html = gr.HTML(build_financial_metrics(), elem_classes=["metric-card-middle"]) with gr.Column(scale=1, min_width=450, elem_classes=["metric-card-col-right"]): # 传递income_statement参数 income_table_html = gr.HTML(build_income_table(income_statement), elem_classes=["metric-card-right"]) return stock_card_html, financial_metrics_html, income_table_html # 创建指标仪表板并保存引用 stock_card_component, financial_metrics_component, income_table_component = get_dashboard() # 将组件引用保存到全局变量,以便在其他地方使用 global metrics_dashboard_components metrics_dashboard_components = (stock_card_component, financial_metrics_component, income_table_component) # 更新指标仪表板的函数 def update_metrics_dashboard(company_name): """根据选择的公司更新指标仪表板""" # 模拟数据 # company_info = { # "name": company_name, # "symbol": "NYSE:BABA", # "price": 157.65, # "change": 0.64, # "change_percent": 0.41, # "open": 165.20, # "high": 166.37, # "low": 156.15, # "prev_close": 157.01, # "volume": "27.10M" # } company_info = {} # 尝试获取股票价格数据,但不中断程序执行 stock_code = "" try: # 根据选择的公司获取股票代码 stock_code = get_stock_code_by_company_name(company_name) # result = get_quote(company_name.strip()) # company_info2 = get_stock_price(stock_code) # company_info2 = get_stock_price_from_bailian(stock_code) # print(f"股票价格数据: {company_info2}") company_info = get_quote(stock_code.strip()) company_info['company'] = company_name print(f"股票价格数据====: {company_info}") # 查询结果:{ # "company": "阿里巴巴", # "symbol": "BABA", # "open": "159.09", # "high": "161.46", # "low": "150.00", # "price": "157.60", # "volume": "21453064", # "latest trading day": "2025-11-27", # "previous close": "157.01", # "change": "+0.59", # "change_percent": "+0.38%" # }BABA # 如果成功获取数据,则用实际数据替换模拟数据 # if company_info2 and "content" in company_info2 and len(company_info2["content"]) > 0: # import json # # 解析返回的JSON数据 # data_text = company_info2["content"][0]["text"] # stock_data = json.loads(data_text) # # 提取数据 # quote = stock_data["Global Quote"] # # 转换交易量单位 # volume = int(quote['06. volume']) # if volume >= 1000000: # volume_str = f"{volume / 1000000:.2f}M" # elif volume >= 1000: # volume_str = f"{volume / 1000:.2f}K" # else: # volume_str = str(volume) # company_info = { # "name": company_name, # "symbol": f"NYSE:{quote['01. symbol']}", # "price": float(quote['05. price']), # "change": float(quote['09. change']), # "change_percent": float(quote['10. change percent'].rstrip('%')), # "open": float(quote['02. open']), # "high": float(quote['03. high']), # "low": float(quote['04. low']), # "prev_close": float(quote['08. previous close']), # "volume": volume_str # } except Exception as e: print(f"获取股票价格数据失败: {e}") company_info2 = None # financial_metrics = [ # {"label": "Total Revenue", "value": "$2.84B", "change": "+12.4%", "color": "green"}, # {"label": "Net Income", "value": "$685M", "change": "-3.2%", "color": "red"}, # {"label": "Earnings Per Share", "value": "$2.15", "change": "-3.2%", "color": "red"}, # {"label": "Operating Expenses", "value": "$1.2B", "change": "+5.1%", "color": "green"}, # {"label": "Cash Flow", "value": "$982M", "change": "+8.7%", "color": "green"} # ] financial_metrics_pre = query_financial_data(stock_code, "5-Year") # financial_metrics_pre = query_financial_data(company_name, "5年趋势") # print(f"最新财务数据: {financial_metrics_pre}") # financial_metrics = format_financial_metrics(financial_metrics_pre) # financial_metrics_pre_2 = extract_last_three_with_fallback(financial_metrics_pre) # print(f"提取的3年数据: {financial_metrics_pre_2}") # financial_metrics_pre = { # "metrics": financial_metrics_pre_2 # } financial_metrics = [] # try: # # financial_metrics = calculate_yoy_comparison(financial_metrics_pre) # financial_metrics = build_financial_metrics_three_year_data(financial_metrics_pre) # print(f"格式化后的财务数据: {financial_metrics}") # except Exception as e: # print(f"Error calculating YOY comparison: {e}") year_data = None three_year_data = None try: # financial_metrics = process_financial_data_with_metadata(financial_metrics_pre) result = process_financial_data_with_metadata(financial_metrics_pre) # 按需提取字段 financial_metrics = result["financial_metrics"] year_data = result["year_data"] three_year_data = result["three_year_data"] print(f"格式化后的财务数据: {financial_metrics}") # 拿report数据 # try: # # 从 result 中获取报告数据 # if 'report_data' in result: # 假设 result 中包含 report_data 键 # report_data = result['report_data'] # else: # # 如果 result 中没有直接包含 report_data,则从其他键中获取 # # 这需要根据实际的 result 数据结构来调整 # report_data = result.get('reports', []) # 示例:假设数据在 'reports' 键下 # 更新报告部分的内容 # 这里需要调用 update_report_section 函数并传入 report_data # 注意:update_report_section 可能需要修改以接受 report_data 参数 # updated_report_content = update_report_section(company_name, report_data, stock_code) # 然后将 updated_report_content 返回,以便在 UI 中更新 # 这需要修改函数的返回值以包含报告内容 # except Exception as e: # print(f"Error updating report section with result data: {e}") # updated_report_content = "
Failed to load report data
" except Exception as e: print(f"Error process_financial_data: {e}") # income_statement = [ # ["Category", "2024/FY", "2023/FY", "2022/FY"], # ["Total", "130350M", "126491M", "134567M"], # ["Net Income", "11081", "10598M", "9818.4M"], # ["Earnings Per Share", "4.38", "4.03", "3.62"], # ["Operating Expenses", "31990.9M", "31439.6M", "34516.2M"], # ["Cash Flow", "25289.9M", "29086M", "22517.2M"] # ] # table_data = None # try: # table_data = extract_financial_table(financial_metrics_pre) # print(table_data) # except Exception as e: # print(f"Error extract_financial_table: {e}") # yearly_data = None # try: # yearly_data = get_yearly_data(financial_metrics_pre) # except Exception as e: # print(f"Error get_yearly_data: {e}") # ====== # table_data = [ # ["Category", "2024/FY", "2023/FY", "2022/FY"], # ["Total", "130350M", "126491M", "134567M"], # ["Net Income", "11081", "10598M", "9818.4M"], # ["Earnings Per Share", "4.38", "4.03", "3.62"], # ["Operating Expenses", "31990.9M", "31439.6M", "34516.2M"], # ["Cash Flow", "25289.9M", "29086M", "22517.2M"] # ] yearly_data = year_data table_data = build_table_format(three_year_data) # print(f"table_data: {table_data}") # yearly_data = None # try: # yearly_data = get_yearly_data(financial_metrics_pre) # except Exception as e: # print(f"Error get_yearly_data: {e}") #======= # exp = { # "list_data": [ # ["Category", "2024/FY", "2023/FY", "2022/FY"], # ["Total", "130350M", "126491M", "134567M"], # ["Net Income", "11081", "10598M", "9818.4M"], # ["Earnings Per Share", "4.38", "4.03", "3.62"], # ["Operating Expenses", "31990.9M", "31439.6M", "34516.2M"], # ["Cash Flow", "25289.9M", "29086M", "22517.2M"] # ], # "yoy_rates": [ # ["Category", "2024/FY", "2023/FY"], # ["Total", "+3.05%", "-6.00%"], # ["Net Income", "+3.05%", "-6.00%"], # ["Earnings Per Share", "+3.05%", "-6.00%"], # ["Operating Expenses", "+29.17%", "-6.00%"], # ["Cash Flow", "-13.05%", "-6.00%"] # ] # } # 增长变化的 HTML 字符(箭头+百分比) def render_change(change: str, color: str): if change.startswith("+"): return f'▲{change}' else: return f'▼{change}' # 构建左侧卡片 def build_stock_card(company_info): try: if not company_info or not isinstance(company_info, dict): company_name = "N/A" symbol = "N/A" price = "N/A" change_html = 'N/A' open_val = high_val = low_val = prev_close_val = volume_display = "N/A" else: company_name = company_info.get("company", "N/A") symbol = company_info.get("symbol", "N/A") price = company_info.get("current_price", "N/A") # 解析 change change_str = company_info.get("change", "0") try: change = float(change_str) except (ValueError, TypeError): change = 0.0 # 解析 change_percent change_percent = company_info.get("percent_change", "0%") # try: # change_percent = float(change_percent_str.rstrip('%')) # except (ValueError, TypeError): # change_percent = 0.0 change_color = "green" if change >= 0 else "red" sign = "+" if change >= 0 else "" change_html = f'{sign}{change:.2f} ({change_percent:+.2f}%)' # 其他价格字段(可选:也可格式化为 2 位小数) open_val = company_info.get("open", "N/A") high_val = company_info.get("high", "N/A") low_val = company_info.get("low", "N/A") prev_close_val = company_info.get("previous_close", "N/A") # raw_volume = company_info.get("volume", "N/A") # volume_display = format_volume(raw_volume) html = f"""
{company_name}
NYSE:{symbol}
{price}
{change_html}
Open
{open_val}
High
{high_val}
Low
{low_val}
Prev Close
{prev_close_val}
""" #
Vol
{volume_display}
return html except Exception as e: print(f"Error building stock card: {e}") return '
Error loading stock data
' # 构建中间卡片 def build_financial_metrics(yearly_data): metrics_html = "" for item in financial_metrics: change_html = render_change(item["change"], item["color"]) metrics_html += f"""
{item['label']}
{item['value']} {change_html}
""" html = f"""
{yearly_data} Financial Metrics
YTD data
{metrics_html}
""" return html # 构建右侧表格 # def build_income_table(income_statement): # table_rows = "" # for i, row in enumerate(income_statement): # if i == 0: # row_style = "background-color: #f5f5f5; font-weight: 500;" # else: # row_style = "background-color: #f9f9f9;" # cells = "" # for j, cell in enumerate(row): # if j == 0: # cells += f"{cell}" # else: # # 添加增长箭头(模拟数据) # growth = None # if i == 1 and j == 1: growth = "+3.05%" # elif i == 1 and j == 2: growth = "-6.00%" # elif i == 2 and j == 1: growth = "+3.05%" # elif i == 2 and j == 2: growth = "-6.00%" # elif i == 3 and j == 1: growth = "+3.05%" # elif i == 3 and j == 2: growth = "-6.00%" # elif i == 4 and j == 1: growth = "+29.17%" # elif i == 4 and j == 2: growth = "+29.17%" # elif i == 5 and j == 1: growth = "-13.05%" # elif i == 5 and j == 2: growth = "+29.17%" # if growth: # arrow = "▲" if growth.startswith("+") else "▼" # color = "green" if growth.startswith("+") else "red" # cells += f""" #
{cell}
#
{arrow}{growth}
# """ # else: # cells += f"{cell}" # table_rows += f"{cells}" # html = f""" #
#
# # # #
Income Statement and Cash Flow
#
# # {table_rows} #
#
# """ # return html # 返回三个HTML组件的内容 return build_stock_card(company_info), build_financial_metrics(yearly_data), build_income_table(table_data) def create_tab_content(tab_name, company_name): """创建Tab内容组件""" if tab_name == "summary": print(f"company_name: {company_name}") # content = get_invest_suggest(company_name) gr.Markdown("# 11111", elem_classes=["invest-suggest-md-box"]) # gr.Markdown(content, elem_classes=["invest-suggest-md-box"]) # gr.Markdown(""" # ## Investment Suggestions # ### Company Overview # GlobalTech inc. is a leading technology company with strong performance in the Q3 2025 period. The companyshows consistent revenue growth and maintains a healthy fnancial position. # ### Key Strengths # - Revenue Growth: 12.4% year-over-year increase demonstrates strong market demandDiversifed Portfolio: Multiple revenue streams reduce business risk # - Innovation Focus: Continued investment in R&D drives future growth potential # ### Financial Health Indicators # - Liquidity: Current ratio of 1.82 indicates good short-term fnancial health # - Proftability: Net income of $685M, though down slightly quarter-over-quarter0 # - Cash Flow: Strong operating cash flow of $982M supports operations and growth initiatives # ### Investment Recommendation # BUY - GlobalTech Inc. presents a solid investment opportunity with: # - Consistent revenue growth trajectory # - Strong market position in key technology segments # - Healthy balance sheet and cash flow generation # ### Risk Considerations # Quarterly net income decline warrants monitoring # | Category | Q3 2025 | Q2 2025 | YoY % | # |--------------------|-----------|-----------|----------| # | Total Revenue | $2,842M | $2,712M | +12.4% | # | Gross Profit | $1,203M | $1,124M | +7.0% | # | Operating Income | $742M | $798M | -7.0% | # | Net Income | $685M | $708M | -3.2% | # | Earnings Per Share | $2.15 | $2.22 | -3.2% | # """, elem_classes=["invest-suggest-md-box"]) elif tab_name == "detailed": with gr.Column(elem_classes=["tab-content"]): gr.Markdown("Financial Statements", elem_classes=["text-xl", "font-semibold", "text-gray-900", "mb-6"]) with gr.Row(elem_classes=["gap-6"]): # 收入报表 (3/5宽度) with gr.Column(elem_classes=["w-3/5", "bg-gray-50", "rounded-xl", "p-4"]): gr.Markdown("Income Statement", elem_classes=["font-medium", "mb-3"]) # 这里将显示收入报表表格 # 资产负债表和现金流量表 (2/5宽度) with gr.Column(elem_classes=["w-2/5", "flex", "flex-col", "gap-6"]): # 资产负债表 with gr.Column(elem_classes=["bg-gray-50", "rounded-xl", "p-4"]): gr.Markdown("Balance Sheet Summary", elem_classes=["font-medium", "mb-3"]) # 这里将显示资产负债表图表 # 现金流量表 with gr.Column(elem_classes=["bg-gray-50", "rounded-xl", "p-4"]): with gr.Row(elem_classes=["justify-between", "items-start"]): gr.Markdown("Cash Flow Statement", elem_classes=["font-medium"]) gr.Markdown("View Detailed", elem_classes=["text-xs", "text-blue-600", "font-medium"]) with gr.Column(elem_classes=["mt-4", "space-y-3"]): # 经营现金流 with gr.Column(): with gr.Row(elem_classes=["justify-between"]): gr.Markdown("Operating Cash Flow") gr.Markdown("$982M", elem_classes=["font-medium"]) with gr.Row(elem_classes=["w-full", "bg-gray-200", "rounded-full", "h-1.5", "mt-1"]): with gr.Column(elem_classes=["bg-green-500", "h-1.5", "rounded-full"], scale=85): gr.Markdown("") # 投资现金流 with gr.Column(): with gr.Row(elem_classes=["justify-between"]): gr.Markdown("Investing Cash Flow") gr.Markdown("-$415M", elem_classes=["font-medium"]) with gr.Row(elem_classes=["w-full", "bg-gray-200", "rounded-full", "h-1.5", "mt-1"]): with gr.Column(elem_classes=["bg-blue-500", "h-1.5", "rounded-full"], scale=42): gr.Markdown("") # 融资现金流 with gr.Column(): with gr.Row(elem_classes=["justify-between"]): gr.Markdown("Financing Cash Flow") gr.Markdown("-$212M", elem_classes=["font-medium"]) with gr.Row(elem_classes=["w-full", "bg-gray-200", "rounded-full", "h-1.5", "mt-1"]): with gr.Column(elem_classes=["bg-red-500", "h-1.5", "rounded-full"], scale=25): gr.Markdown("") elif tab_name == "comparative": with gr.Column(elem_classes=["tab-content"]): gr.Markdown("Industry Benchmarking", elem_classes=["text-xl", "font-semibold", "text-gray-900", "mb-6"]) # 收入增长对比 with gr.Column(elem_classes=["bg-gray-50", "rounded-xl", "p-4", "mb-6"]): gr.Markdown("Revenue Growth - Peer Comparison", elem_classes=["font-medium", "mb-3"]) # 这里将显示对比图表 # 利润率和报告预览网格 with gr.Row(elem_classes=["grid-cols-2", "gap-6"]): # 利润率表格 with gr.Column(elem_classes=["bg-gray-50", "rounded-xl", "p-4"]): gr.Markdown("Profitability Ratios", elem_classes=["font-medium", "mb-3"]) # 这里将显示利润率表格 # 报告预览 with gr.Column(elem_classes=["bg-gray-50", "rounded-xl", "p-4"]): gr.Markdown("Report Preview", elem_classes=["font-medium", "mb-3"]) # 这里将显示报告预览 def create_chat_panel(): """创建聊天面板组件""" # with gr.Column(elem_classes=["chat-panel"]): # 聊天头部 # with gr.Row(elem_classes=["p-4", "border-b", "border-gray-200", "items-center", "gap-2"]): # gr.Markdown("🤖", elem_classes=["text-xl", "text-blue-600"]) # gr.Markdown("Financial Assistant", elem_classes=["font-medium"]) # 聊天区域 # 一行代码嵌入! # chat_component = create_financial_chatbot() # chat_component.render() # create_financial_chatbot() # gr.LoginButton() # chatbot = gr.Chatbot( # value=[ # {"role": "assistant", "content": "I'm your financial assistant, how can I help you today?"}, # # {"role": "assistant", "content": "Hello! I can help you analyze financial data. Ask questions like \"Show revenue trends\" or \"Compare profitability ratios\""}, # # {"role": "user", "content": "Show revenue trends for last 4 quarters"}, # # {"role": "assistant", "content": "Revenue trend for GlobalTech Inc.:\n\nQ4 2024: $2.53B (+8.2%)\nQ1 2025: $2.61B (+9.8%)\nQ2 2025: $2.71B (+11.6%)\nQ3 2025: $2.84B (+12.4%)"}, # # {"role": "assistant", "content": "Revenue trend for GlobalTech Inc.:\n\nQ4 2024: $2.53B (+8.2%)\nQ1 2025: $2.61B (+9.8%)\nQ2 2025: $2.71B (+11.6%)\nQ3 2025: $2.84B (+12.4%)"}, # # {"role": "assistant", "content": "Revenue trend for GlobalTech Inc.:\n\nQ4 2024: $2.53B (+8.2%)\nQ1 2025: $2.61B (+9.8%)\nQ2 2025: $2.71B (+11.6%)\nQ3 2025: $2.84B (+12.4%)"}, # # {"role": "assistant", "content": "Revenue trend for GlobalTech Inc.:\n\nQ4 2024: $2.53B (+8.2%)\nQ1 2025: $2.61B (+9.8%)\nQ2 2025: $2.71B (+11.6%)\nQ3 2025: $2.84B (+12.4%)"} # ], # type="messages", # # elem_classes=["min-h-0", "overflow-y-auto", "space-y-4", "chat-content-box"], # show_label=False, # autoscroll=True, # show_copy_button=True, # height=400, # container=False, # ) # # 输入区域 # with gr.Row(elem_classes=["border-t", "border-gray-200", "gap-2"]): # msg = gr.Textbox( # placeholder="Ask a financial question...", # elem_classes=["flex-1", "border", "border-gray-300", "rounded-lg", "px-4", "py-2", "focus:border-blue-500"], # show_label=False, # lines=1, # submit_btn=True, # container=False, # ) # msg.submit( # chat_bot, # [msg, chatbot], # [msg, chatbot], # queue=True, # ) # def load_css_files(css_dir, filenames): # css_content = "" # for filename in filenames: # path = os.path.join(css_dir, filename) # if os.path.exists(path): # with open(path, "r", encoding="utf-8") as f: # css_content += f.read() + "\n" # else: # print(f"⚠️ CSS file not found: {path}") # return css_content def main(): # 获取当前目录 current_dir = os.path.dirname(os.path.abspath(__file__)) css_dir = os.path.join(current_dir, "css") # def load_css_files(css_dir, filenames): # """读取多个 CSS 文件并合并为一个字符串""" # css_content = "" # for filename in filenames: # path = os.path.join(css_dir, filename) # if os.path.exists(path): # with open(path, "r", encoding="utf-8") as f: # css_content += f.read() + "\n" # else: # print(f"Warning: CSS file not found: {path}") # return css_content # 设置CSS路径 css_paths = [ os.path.join(css_dir, "main.css"), os.path.join(css_dir, "components.css"), os.path.join(css_dir, "layout.css") ] # css_dir = "path/to/your/css/folder" # 替换为你的实际路径 # 自动定位 css 文件夹(与 app.py 同级) # BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # CSS_DIR = os.path.join(BASE_DIR, "css") # css_files = ["main.css", "components.css", "layout.css"] # combined_css = load_css_files(CSS_DIR, css_files) # print(combined_css) with gr.Blocks( title="Financial Analysis Dashboard", css_paths=css_paths, css=custom_css, # css=combined_css ) as demo: # 添加处理公司点击事件的路由 # 创建一个状态组件来跟踪选中的公司 selected_company_state = gr.State("") with gr.Column(elem_classes=["container", "container-h"]): # 头部 create_header() # 创建主布局 with gr.Row(elem_classes=["main-content-box"]): # 左侧边栏 with gr.Column(scale=1, min_width=350): # 获取company_list组件的引用 company_list_component, report_section_component, report_display_component, news_display_component = create_sidebar() # 主内容区域 with gr.Column(scale=9): # 指标仪表板 create_metrics_dashboard() with gr.Row(elem_classes=["main-content-box"]): with gr.Column(scale=8): # Tab内容 with gr.Tabs(): with gr.TabItem("Invest Suggest", elem_classes=["tab-item"]): # 创建一个用于显示公司名称的组件 # company_display = gr.Markdown("# Please select a company") # 创建一个占位符用于显示tab内容 tab_content = gr.Markdown(elem_classes=["invest-suggest-md-box"]) # 当选中的公司改变时,更新显示 # selected_company_state.change( # fn=lambda company: f"# Investment Suggestions for {company}" if company else "# Please select a company", # inputs=[selected_company_state], # outputs=[company_display] # ) # 当选中的公司改变时,重新加载tab内容 def update_tab_content(company): if company: # 显示loading状态 loading_html = f'''

Loading investment suggestions for {company}...

''' yield loading_html # 获取投资建议数据 try: content = get_invest_suggest(company) yield content except Exception as e: error_html = f'''

Error loading investment suggestions: {str(e)}

Please try again later.

''' yield error_html else: yield "
Please select a company
" selected_company_state.change( fn=update_tab_content, inputs=[selected_company_state], outputs=[tab_content], ) with gr.TabItem("Analysis Report", elem_classes=["tab-item"]): # 创建一个用于显示公司名称的组件 # analysis_company_display = gr.Markdown("# Please select a company") # 创建一个占位符用于显示tab内容 analysis_tab_content = gr.Markdown(elem_classes=["analysis-report-md-box"]) # 当选中的公司改变时,更新显示 # selected_company_state.change( # fn=lambda company: f"# Analysis Report for {company}" if company else "# Please select a company", # inputs=[selected_company_state], # outputs=[analysis_company_display] # ) # 当选中的公司改变时,重新加载tab内容 def update_analysis_tab_content(company): if company: # 显示loading状态 loading_html = f'''

Loading analysis report for {company}...

''' yield loading_html # 获取分析报告数据 try: # 这里应该调用获取详细分析报告的函数 # 暂时使用占位内容,您需要替换为实际的函数调用 # content = f"# Analysis Report for {company}\n\nDetailed financial analysis for {company} will be displayed here." yield get_analysis_report(company) except Exception as e: error_html = f'''

Error loading analysis report: {str(e)}

Please try again later.

''' yield error_html else: yield "
Please select a company
" selected_company_state.change( fn=update_analysis_tab_content, inputs=[selected_company_state], outputs=[analysis_tab_content] ) # with gr.TabItem("Comparison", elem_classes=["tab-item"]): # create_tab_content("comparison") with gr.Column(scale=2, min_width=400): # 聊天面板 # gr.ChatInterface( # respond, # title="Easy Financial Report", # # label="Easy Financial Report", # additional_inputs=[ # # gr.Textbox(value="You are a financial analysis assistant. Provide concise investment insights from company financial reports.", label="System message"), # # gr.Slider(minimum=1, maximum=4096, value=1024, step=1, label="Max new tokens"), # # gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"), # # gr.Slider( # # minimum=0.1, # # maximum=1.0, # # value=0.95, # # step=0.05, # # label="Top-p (nucleus sampling)", # # ), # gr.State(value="") # CRITICAL: Add State to store session URL across turns # ], # ) # chatbot.render() # gr.LoginButton() gr.ChatInterface( respond, title="Easy Financial Report", additional_inputs=[ gr.State(value=""), # CRITICAL: Store session URL across turns (hidden from UI) gr.State(value={}) # CRITICAL: Store agent context across turns (hidden from UI) ], additional_inputs_accordion=gr.Accordion(label="Settings", open=False, visible=False), # Hide the accordion completely ) # chatbot.render() # with gr.Blocks() as demo: # # Add custom CSS for Agent Plan styling # gr.Markdown(""" # # """) # chatbot.render() # 在页面加载时自动刷新公司列表,确保显示最新的数据 # demo.load( # fn=get_company_list_choices, # inputs=[], # outputs=[company_list_component], # concurrency_limit=None, # ) # 绑定公司选择事件到状态更新 # 注意:这里需要确保create_sidebar中没有重复绑定相同的事件 company_list_component.change( fn=lambda x: x, # 直接返回选中的公司名称 inputs=[company_list_component], outputs=[selected_company_state], concurrency_limit=None ) # 绑定公司选择事件到指标仪表板更新 def update_metrics_dashboard_wrapper(company_name): if company_name: # 显示loading状态 loading_html = f'''

Loading financial data for {company_name}...

''' yield loading_html, loading_html, loading_html # 获取更新后的数据 try: stock_card_html, financial_metrics_html, income_table_html = update_metrics_dashboard(company_name) yield stock_card_html, financial_metrics_html, income_table_html except Exception as e: error_html = f'''

Error loading financial data: {str(e)}

Please try again later.

''' yield error_html, error_html, error_html else: # 如果没有选择公司,返回空内容 empty_html = "
Please select a company
" yield empty_html, empty_html, empty_html selected_company_state.change( fn=update_metrics_dashboard_wrapper, inputs=[selected_company_state], outputs=list(metrics_dashboard_components), concurrency_limit=None ) return demo if __name__ == "__main__": demo = main() demo.launch(share=True)