import gradio as gr
import os
import datetime
import re
import pandas as pd
from sqlalchemy import true
from chatbot.chat_main import respond
import globals as g
from service.mysql_service import get_companys, insert_company, get_company_by_name
from service.chat_service import get_analysis_report, get_stock_price_from_bailian, search_company, search_news, get_invest_suggest, chat_bot
from service.company import check_company_exists
from service.hf_upload import get_hf_files_with_links
from MarketandStockMCP.news_quote_mcp import get_company_news, get_quote
from EasyReportDataMCP.report_mcp import query_financial_data
from service.three_year_table_tool import build_table_format
from service.three_year_tool import process_financial_data_with_metadata
from service.tool_processor import get_stock_price
get_companys_state = True
# JavaScript代码用于读取和存储数据
js_code = """
function handleStorage(operation, key, value) {
if (operation === 'set') {
localStorage.setItem(key, value);
return `已存储: ${key} = ${value}`;
} else if (operation === 'get') {
let storedValue = localStorage.getItem(key);
if (storedValue === null) {
return `未找到键: ${key}`;
}
return `读取到: ${key} = ${storedValue}`;
} else if (operation === 'clear') {
localStorage.removeItem(key);
return `已清除: ${key}`;
} else if (operation === 'clearAll') {
localStorage.clear();
return '已清除所有数据';
}
}
"""
custom_css = """
/* 匹配所有以 gradio-container- 开头的类 */
div[class^="gradio-container-"],
div[class*=" gradio-container-"] {
-webkit-text-size-adjust: 100% !important;
line-height: 1.5 !important;
font-family: unset !important;
-moz-tab-size: 4 !important;
tab-size: 4 !important;
}
.company-list-container {
background-color: white;
border-radius: 0.5rem;
padding: 0.75rem;
margin-bottom: 0.75rem;
border: 1px solid #e5e7eb;
box-shadow: 0 1px 2px 0 rgba(0, 0, 0, 0.05);
width: 100%;
}
/* 隐藏单选框 */
.company-list-container input[type="radio"] {
display: none;
}
/* 自定义选项样式 */
.company-list-container label {
display: block;
padding: 0.75rem 1rem;
margin: 0.25rem 0;
border-radius: 0.375rem;
cursor: pointer;
transition: all 0.2s ease;
background-color: #f9fafb;
border: 1px solid #e5e7eb;
font-size: 1rem;
text-align: left;
width: 100%;
box-sizing: border-box;
}
/* 悬停效果 */
.company-list-container label:hover {
background-color: #f3f4f6;
border-color: #d1d5db;
}
/* 选中效果 - 确保背景色充满整个选项 */
.company-list-container input[type="radio"]:checked + span {
# background: #3b82f6 !important;
color: white !important;
font-weight: 600 !important;
display: block;
width: 100%;
height: 100%;
padding: 0.75rem 1rem;
margin: -0.75rem -1rem;
border-radius: 0.375rem;
}
.company-list-container span {
display: block;
padding: 0;
border-radius: 0.375rem;
width: 100%;
}
/* 确保每行只有一个选项 */
.company-list-container .wrap {
display: block !important;
}
.company-list-container .wrap li {
display: block !important;
width: 100% !important;
}
label.selected {
background: #3b82f6 !important;
color: white !important;
}
"""
# 全局变量用于存储公司映射关系
companies_map = {}
# 根据公司名称获取股票代码的函数
def get_stock_code_by_company_name(company_name):
"""根据公司名称获取股票代码"""
if company_name in companies_map and "CODE" in companies_map[company_name]:
return companies_map[company_name]["CODE"]
return "" # 默认返回
# 创建一个简单的函数来获取公司列表
def get_company_list_choices():
choices = []
print(f"Getting init add company list choices...{get_companys_state}")
if not get_companys_state:
return gr.update(choices=choices)
try:
companies_data = get_companys()
print(f"Getting init add company list choices...companies_data: {companies_data}")
if isinstance(companies_data, pd.DataFrame) and not companies_data.empty:
choices = [str(row.get('company_name', 'Unknown')) for _, row in companies_data.iterrows()]
else:
choices = []
except:
choices = []
return gr.update(choices=choices)
# Sidebar service functions
# 处理公司点击事件的函数
def handle_company_click(company_name):
"""处理公司点击事件,先判断是否已经入库,如果没有则进行入库操作,然后刷新公司列表"""
print(f"Handling click for company: {company_name}")
# 1. 判断是否已经入库
if not check_company_exists(company_name):
# 2. 如果没有入库,则进行入库操作
# 获取股票代码(如果有的话)
stock_code = companies_map.get(company_name, {}).get("CODE", "Unknown")
print(f"Inserting company {company_name} with code {stock_code}")
# 插入公司到数据库
success = insert_company(company_name, stock_code)
if success:
print(f"Successfully inserted company: {company_name}")
# 直接更新companies_map,而不是重新加载整个映射
companies_map[company_name] = {"NAME": company_name, "CODE": stock_code}
# 使用Gradio的成功提示
gr.Info(f"Successfully added company: {company_name}")
# 返回True表示添加成功,需要刷新列表
return True
else:
print(f"Failed to insert company: {company_name}")
# 使用Gradio的错误提示
gr.Error(f"Failed to insert company: {company_name}")
return False
else:
print(f"Company {company_name} already exists in database")
# 使用Gradio的警告提示
gr.Warning(f"Company '{company_name}' already exists")
# 3. 返回成功响应
return None
def get_company_list_html(selected_company=""):
try:
# 从数据库获取所有公司
companies_data = get_companys()
# 检查是否为错误信息
if isinstance(companies_data, str):
if "查询执行失败" in companies_data:
return "
获取公司列表失败
"
else:
# 如果是字符串但不是错误信息,可能需要特殊处理
return ""
# 检查是否为DataFrame且为空
if not isinstance(companies_data, pd.DataFrame) or companies_data.empty:
return ""
# 生成HTML列表
html_items = []
for _, row in companies_data.iterrows():
company_name = row.get('company_name', 'Unknown')
# 根据是否选中添加不同的样式类
css_class = "company-item"
if company_name == selected_company:
css_class += " selected-company"
# 使用button元素来确保可点击性
html_items.append(f'{company_name} ')
return "\n".join(html_items)
except Exception as e:
return f"生成公司列表失败: {str(e)}
"
def initialize_company_list(selected_company=""):
return get_company_list_html(selected_company)
def refresh_company_list(selected_company=""):
"""刷新公司列表,返回最新的HTML内容,带loading效果"""
# 先返回loading状态
loading_html = '''
'''
yield loading_html
# 然后返回实际的数据
yield get_company_list_html(selected_company)
# 新增函数:处理公司选择事件
def select_company(company_name):
"""处理公司选择事件,更新全局状态并返回更新后的公司列表"""
# 更新全局变量
g.SELECT_COMPANY = company_name if company_name else ""
# 对于Radio组件,我们只需要返回更新后的选项列表
try:
companies_data = get_companys()
if isinstance(companies_data, pd.DataFrame) and not companies_data.empty:
choices = [str(row.get('company_name', 'Unknown')) for _, row in companies_data.iterrows()]
else:
choices = []
except:
choices = []
return gr.update(choices=choices, value=company_name)
def initialize_companies_map():
"""初始化 companies_map 字典"""
global companies_map
companies_map = {} # 清空之前的映射
print("Initializing companies map...")
try:
# 获取预定义的公司列表
predefined_companies = [
{ "NAME": "Alibaba", "CODE": "BABA" },
{ "NAME": "阿里巴巴-W", "CODE": "09988" },
{ "NAME": "NVIDIA", "CODE": "NVDA" },
{ "NAME": "Amazon", "CODE": "AMZN" },
{ "NAME": "Intel", "CODE": "INTC" },
{ "NAME": "Meta", "CODE": "META" },
{ "NAME": "Google", "CODE": "GOOGL" },
{ "NAME": "Apple", "CODE": "AAPL" },
{ "NAME": "Tesla", "CODE": "TSLA" },
{ "NAME": "AMD", "CODE": "AMD" },
{ "NAME": "Microsoft", "CODE": "MSFT" },
{ "NAME": "ASML", "CODE": "ASML" }
]
# 将预定义公司添加到映射中
for company in predefined_companies:
companies_map[company["NAME"]] = {"NAME": company["NAME"], "CODE": company["CODE"]}
print(f"Predefined companies added: {len(predefined_companies)}")
# 从数据库获取公司数据
companies_data = get_companys()
# companies_data = window.cachedCompanies or []
print(f"Companies data from DB: {companies_data}")
# 如果数据库中有公司数据,则添加到映射中(去重)
if isinstance(companies_data, pd.DataFrame) and not companies_data.empty:
print(f"Adding {len(companies_data)} companies from database")
for _, row in companies_data.iterrows():
company_name = row.get('company_name', 'Unknown')
stock_code = row.get('stock_code', '')
# 确保company_name和stock_code都是字符串类型
company_name = str(company_name) if company_name is not None else 'Unknown'
stock_code = str(stock_code) if stock_code is not None else ''
# 检查是否已存在于映射中(通过股票代码判断)
is_duplicate = False
for existing_company in companies_map.values():
if existing_company["CODE"] == stock_code:
is_duplicate = True
break
# 如果不重复,则添加到映射中
if not is_duplicate:
companies_map[company_name] = {"NAME": company_name, "CODE": stock_code}
# print(f"Added company: {company_name}")
else:
print("No companies found in database")
print(f"Final companies map: {companies_map}")
except Exception as e:
# 错误处理
print(f"Error initializing companies map: {str(e)}")
pass
# Sidebar company selector functions
def update_company_choices(user_input: str):
"""更新公司选择列表"""
# 第一次 yield:立即显示 modal + loading 提示
yield gr.update(
choices=["Searching..."],
visible=True
), gr.update(visible=False, value="") # 添加第二个返回值
# 第二次:执行耗时操作(调用 LLM)
choices = search_company(user_input) # 这是你原来的同步函数
# 检查choices是否为错误信息
if len(choices) > 0 and isinstance(choices[0], str) and not choices[0].startswith("Searching"):
# 如果是错误信息或非正常格式,显示提示消息
error_message = choices[0] if len(choices) > 0 else "未知错误"
# 使用Ant Design风格的错误提示
error_html = f'''
'''
yield gr.update(choices=["No results found"], visible=True), gr.update(visible=True, value=error_html)
else:
# 第三次:更新为真实结果
yield gr.update(
choices=choices,
visible=len(choices) > 0
), gr.update(visible=False, value="")
def add_company(selected, current_list):
"""添加选中的公司"""
if selected == "No results found":
return gr.update(visible=False), current_list, gr.update(visible=False, value="")
if selected:
# print(f"Selected company====: {selected}")
# 从选择的文本中提取公司名称和股票代码
# 假设格式为 "公司名称 (股票代码)"
selected_clean = selected.strip()
match = re.match(r"^(.+?)\s*\(([^)]+)\)$", selected_clean)
if match:
company_name = match.group(1)
stock_code = match.group(2)
elif companies_map.get(selected_clean):
company_name = selected_clean
stock_code = companies_map[selected_clean]["CODE"]
else:
company_name = selected_clean
stock_code = "Unknown"
# print(f"Company name: {company_name}, Stock code: {stock_code}")
# print(f"Company exists: {check_company_exists(company_name)}")
if not check_company_exists(company_name):
# 入库
success = insert_company(company_name, stock_code)
if success:
# 从数据库获取更新后的公司列表
try:
companies_data = get_companys()
if isinstance(companies_data, pd.DataFrame) and not companies_data.empty:
updated_list = [str(row.get('company_name', 'Unknown')) for _, row in companies_data.iterrows()]
else:
updated_list = []
except:
updated_list = []
# 添加默认公司选项
if not updated_list:
updated_list = ['Alibaba', '腾讯控股', 'Tencent', '阿里巴巴-W', 'Apple']
# 成功插入后清除状态消息,并更新Radio组件的选项,同时默认选中刚添加的公司
# 通过设置value参数,会自动触发change事件来加载数据
return gr.update(visible=False), gr.update(choices=updated_list, value=company_name), gr.update(visible=False, value="")
else:
# 插入失败显示错误消息,使用Gradio内置的错误提示
gr.Error("插入公司失败")
return gr.update(visible=False), current_list, gr.update(visible=False, value="")
else:
# 公司已存在,使用Gradio内置的警告消息
gr.Warning(f"公司 '{company_name}' 已存在")
return gr.update(visible=False), current_list, gr.update(visible=False, value="")
return gr.update(visible=False), current_list, gr.update(visible=False, value="")
# Sidebar report section functions
# 创建一个全局变量来存储公司按钮组件
company_buttons = {}
def create_company_buttons():
"""创建公司按钮组件"""
# 确保companies_map已被初始化
if not companies_map:
initialize_companies_map()
# 显示companies_map中的公司列表
companies = list(companies_map.keys())
# 添加调试信息
print(f"Companies in map: {companies}")
# 清空之前的按钮
company_buttons.clear()
if not companies:
# 如果没有公司,返回一个空的列
with gr.Column():
gr.Markdown("暂无公司数据")
else:
# 使用Gradio按钮组件创建公司列表
with gr.Column(elem_classes=["home-company-list"]):
# 按每行两个公司进行分组
for i in range(0, len(companies), 2):
# 检查是否是最后一行且只有一个元素
if i + 1 < len(companies):
# 有两个元素
with gr.Row(elem_classes=["home-company-item-box"]):
btn1 = gr.Button(companies[i], elem_classes=["home-company-item", "gradio-button"])
btn2 = gr.Button(companies[i + 1], elem_classes=["home-company-item", "gradio-button"])
# 保存按钮引用
company_buttons[companies[i]] = btn1
company_buttons[companies[i + 1]] = btn2
else:
# 只有一个元素
with gr.Row(elem_classes=["home-company-item-box", "single-item"]):
btn = gr.Button(companies[i], elem_classes=["home-company-item", "gradio-button"])
# 保存按钮引用
company_buttons[companies[i]] = btn
# 返回按钮字典
return company_buttons
def update_report_section(selected_company, report_data, stock_code):
"""根据选中的公司更新报告部分"""
print(f"Updating report (报告部分): {selected_company}") # 添加调试信息
if selected_company == "" or selected_company is None or selected_company == "Unknown":
# 没有选中的公司,显示公司列表
# html_content = get_initial_company_list_content()
# 暂时返回空内容,稍后会用Gradio组件替换
html_content = ""
return gr.update(value=html_content, visible=True)
else:
# 有选中的公司,显示相关报告
# try:
# # 尝试从Hugging Face获取文件列表
# report_data = get_hf_files_with_links("JC321/files-world")
# except Exception as e:
# # 如果获取失败,使用模拟数据并显示错误消息
# print(f"获取Hugging Face文件列表失败: {str(e)}")
# report_data = []
stock_code = get_stock_code_by_company_name(selected_company)
report_data = query_financial_data(stock_code, "5-Year")
# report_data = process_financial_data_with_metadata(financial_metrics_pre)
html_content = ''
html_content += '
Financial Reports '
for report in report_data:
html_content += f'''
{report['period']}-{stock_code}-{report['source_form']}
'''
html_content += f''
html_content += '
'
return gr.update(value=html_content, visible=True)
def update_news_section(selected_company):
"""根据选中的公司更新报告部分"""
html_content = ""
if selected_company == "" or selected_company is None:
# 没有选中的公司,显示公司列表
# html_content = get_initial_company_list_content()
# 暂时返回空内容,稍后会用Gradio组件替换
return gr.update(value=html_content, visible=True)
else:
try:
stock_code = get_stock_code_by_company_name(selected_company)
report_data = get_company_news(stock_code, None, None)
# print(f"新闻列表: {report_data['articles']}")
# report_data = search_news(selected_company)
if (report_data['articles']):
report_data = report_data['articles']
news_html = ""
news_html += '
News '
from datetime import datetime
for news in report_data:
published_at = news['published']
# 解析 ISO 8601 时间字符串(注意:strptime 不直接支持 'Z',需替换或使用 fromisoformat)
dt = datetime.fromisoformat(published_at.replace("Z", "+00:00"))
# 格式化为 YYYY.MM.DD
formatted_date = dt.strftime("%Y.%m.%d")
news_html += f'''
[{formatted_date}]
{news['headline']}
'''
news_html += f''
news_html += '
'
html_content += news_html
except Exception as e:
print(f"Error updating report section: {str(e)}")
return gr.update(value=html_content, visible=True)
# Component creation functions
def create_header():
"""创建头部组件"""
# 获取当前时间
current_time = datetime.datetime.now().strftime("%B %d, %Y - Market Data Updated Today")
with gr.Row(elem_classes=["header"]):
# 左侧:图标和标题
with gr.Column(scale=8):
# 使用圆柱体SVG图标表示数据库
gr.HTML('''
Easy Financial Report Dashboard
''', elem_classes=["text-2xl"])
# 右侧:时间信息
with gr.Column(scale=2):
gr.Markdown(current_time, elem_classes=["text-sm-top-time"])
def create_company_list(get_companys_state):
company_list = gr.Radio(
choices=[],
label="",
interactive=True,
elem_classes=["company-list-container"],
container=False, # 不显示外部容器边框
visible=True
)
if (get_companys_state == False):
return company_list
else:
"""创建公司列表组件"""
# 获取公司列表数据
try:
companies_data = get_companys()
print(f"创建公司列表组件 - Companies data: {companies_data}")
if isinstance(companies_data, pd.DataFrame) and not companies_data.empty:
choices = [str(row.get('company_name', 'Unknown')) for _, row in companies_data.iterrows()]
else:
choices = []
except:
choices = []
# 添加默认公司选项
if not choices:
choices = []
# 使用Radio组件显示公司列表,不显示标签
company_list = gr.Radio(
choices=choices,
label="",
interactive=True,
elem_classes=["company-list-container"],
container=False, # 不显示外部容器边框
visible=True
)
return company_list
def create_company_selector():
"""创建公司选择器组件"""
company_input = gr.Textbox(
show_label=False,
placeholder="Add Company",
elem_classes=["company-input-box"],
lines=1,
max_lines=1,
# container=False
)
# 状态消息显示区域
status_message = gr.HTML(
"",
elem_classes=["status-message"],
visible=False
)
# 弹窗选择列表
company_modal = gr.Radio(
show_label=False,
choices=[],
visible=False,
elem_classes=["company-modal"]
)
return company_input, status_message, company_modal
def create_report_section():
"""创建报告部分组件"""
# 创建一个用于显示报告列表的组件,初始显示公司列表
# initial_content = get_initial_company_list_content()
# 暂时返回空内容,稍后会用Gradio组件替换
initial_content = ""
# print(f"Initial content: {initial_content}") # 添加调试信息
report_display = gr.HTML(initial_content)
return report_display
def create_news_section():
"""创建新闻部分组件"""
initial_content = ""
news_display = gr.HTML(initial_content)
return news_display
def format_financial_metrics(data: dict, prev_data: dict = None) -> list: # pyright: ignore[reportArgumentType]
"""
将原始财务数据转换为 financial_metrics 格式。
Args:
data (dict): 当前财年数据(必须包含 total_revenue, net_income 等字段)
prev_data (dict, optional): 上一财年数据,用于计算 change。若未提供,change 设为 "--"
Returns:
list[dict]: 符合 financial_metrics 格式的列表
"""
def format_currency(value: float) -> str:
"""将数字格式化为 $XB / $XM / $XK"""
if value >= 1e9:
return f"${value / 1e9:.2f}B"
elif value >= 1e6:
return f"${value / 1e6:.2f}M"
elif value >= 1e3:
return f"${value / 1e3:.2f}K"
else:
return f"${value:.2f}"
def calculate_change(current: float, previous: float) -> tuple:
"""计算变化百分比和颜色"""
if previous == 0:
return "--", "gray"
change_pct = (current - previous) / abs(previous) * 100
sign = "+" if change_pct >= 0 else ""
color = "green" if change_pct >= 0 else "red"
return f"{sign}{change_pct:.1f}%", color
# 定义指标映射
metrics_config = [
{
"key": "total_revenue",
"label": "Total Revenue",
"is_currency": True,
"eps_like": False
},
{
"key": "net_income",
"label": "Net Income",
"is_currency": True,
"eps_like": False
},
{
"key": "earnings_per_share",
"label": "Earnings Per Share",
"is_currency": False, # EPS 不用 B/M 单位
"eps_like": True
},
{
"key": "operating_expenses",
"label": "Operating Expenses",
"is_currency": True,
"eps_like": False
},
{
"key": "operating_cash_flow",
"label": "Cash Flow",
"is_currency": True,
"eps_like": False
}
]
result = []
for item in metrics_config:
key = item["key"]
current_val = data.get(key)
if current_val is None:
continue
# 格式化 value
if item["eps_like"]:
value_str = f"${current_val:.2f}"
elif item["is_currency"]:
value_str = format_currency(current_val)
else:
value_str = str(current_val)
# 计算 change(如果有上期数据)
if prev_data and key in prev_data:
prev_val = prev_data[key]
change_str, color = calculate_change(current_val, prev_val)
else:
change_str = "--"
color = "gray"
result.append({
"label": item["label"],
"value": value_str,
"change": change_str,
"color": color
})
return result
def create_sidebar():
"""创建侧边栏组件"""
# 初始化 companies_map
initialize_companies_map()
with gr.Column(elem_classes=["sidebar"]):
# 公司选择
with gr.Group(elem_classes=["card"]):
gr.Markdown("### Select Company", elem_classes=["card-title", "left-card-title"])
with gr.Column():
company_list = create_company_list(get_companys_state)
# 创建公司列表
# if not get_companys_state:
# getCompanyFromStorage = gr.Button("读取")
# getCompanyFromStorage.click(
# fn=create_company_list(True),
# inputs=[],
# outputs=[company_list, status_message]
# )
# 创建公司选择器
company_input, status_message, company_modal = create_company_selector()
# 绑定事件
company_input.submit(
fn=update_company_choices,
inputs=[company_input],
outputs=[company_modal, status_message]
)
company_modal.change(
fn=add_company,
inputs=[company_modal, company_list],
outputs=[company_modal, company_list, status_message]
)
# 创建公司按钮组件
company_buttons = create_company_buttons()
# 为每个公司按钮绑定点击事件
def make_click_handler(company_name):
def handler():
result = handle_company_click(company_name)
# 如果添加成功,刷新Select Company列表并默认选中刚添加的公司
if result is True:
# 正确地刷新通过create_company_list()创建的Radio组件
try:
companies_data = get_companys()
if isinstance(companies_data, pd.DataFrame) and not companies_data.empty:
updated_choices = [str(row.get('company_name', 'Unknown')) for _, row in companies_data.iterrows()]
else:
updated_choices = []
except:
updated_choices = []
# 使用gr.update来正确更新Radio组件,并默认选中刚添加的公司
# 同时触发change事件来加载数据
return gr.update(choices=updated_choices, value=company_name)
return None
return handler
for company_name, button in company_buttons.items():
button.click(
fn=make_click_handler(company_name),
inputs=[],
outputs=[company_list]
)
# 创建一个容器来容纳报告部分,初始时隐藏
with gr.Group(elem_classes=["report-news-box"]) as report_section_group:
# gr.Markdown("### Financial Reports", elem_classes=["card-title", "left-card-title"])
report_display = create_report_section()
news_display = create_news_section()
# 处理公司选择事件
def select_company_handler(company_name):
"""处理公司选择事件的处理器"""
# 更新全局变量
g.SELECT_COMPANY = company_name if company_name else ""
# 更新报告部分的内容
updated_report_display = update_report_section(company_name, None, None)
updated_news_display = update_news_section(company_name)
# 根据是否选择了公司来决定显示/隐藏报告部分
if company_name:
# 有选中的公司,显示报告部分
return gr.update(visible=True), updated_report_display, updated_news_display
else:
# 没有选中的公司,隐藏报告部分
return gr.update(visible=False), updated_report_display, updated_news_display
company_list.change(
fn=select_company_handler,
inputs=[company_list],
outputs=[report_section_group, report_display, news_display]
)
# 返回公司列表组件和报告部分组件
return company_list, report_section_group, report_display, news_display
def build_income_table(table_data):
# 兼容两种数据结构:
# 1. 新结构:包含 list_data 和 yoy_rates 的字典
# 2. 旧结构:直接是二维数组
if isinstance(table_data, dict) and "list_data" in table_data:
# 新结构
income_statement = table_data["list_data"]
yoy_rates = table_data["yoy_rates"] or []
else:
# 旧结构,直接使用传入的数据
income_statement = table_data
yoy_rates = []
# 创建一个映射,将年份列索引映射到增长率
yoy_map = {}
if len(yoy_rates) > 1 and len(yoy_rates[0]) > 1:
# 获取增长率表头(跳过第一列"Category")
yoy_headers = yoy_rates[0][1:]
# 为每个指标行创建增长率映射
for i, yoy_row in enumerate(yoy_rates[1:], 1): # 跳过标题行
category = yoy_row[0]
yoy_map[category] = {}
for j, rate in enumerate(yoy_row[1:]):
if j < len(yoy_headers):
yoy_map[category][yoy_headers[j]] = rate
table_rows = ""
header_row = income_statement[0]
for i, row in enumerate(income_statement):
if i == 0:
row_style = "background-color: #f5f5f5; font-weight: 500;"
else:
row_style = "background-color: #f9f9f9;"
cells = ""
for j, cell in enumerate(row):
if j == 0:
cells += f"{cell} "
else:
# 添加增长率箭头(如果有的话)
growth = None
category = row[0]
# j是当前单元格索引,0是类别列,1,2,3...是数据列
# yoy_map的键是年份,例如"2024/FY"
if i > 0 and category in yoy_map and j > 0 and j < len(header_row):
year_header = header_row[j]
if year_header in yoy_map[category]:
growth = yoy_map[category][year_header]
if growth and growth != "N/A":
arrow = "▲" if growth.startswith("+") else "▼"
color = "green" if growth.startswith("+") else "red"
cells += f"""
{cell}
{arrow}{growth}
"""
else:
cells += f"{cell} "
table_rows += f"{cells} "
html = f"""
Income Statement and Cash Flow
"""
return html
def create_metrics_dashboard():
"""创建指标仪表板组件"""
with gr.Row(elem_classes=["metrics-dashboard"]):
card_custom_style = '''
background-color: white;
border-radius: 0.5rem;
box-shadow: rgba(0, 0, 0, 0.1) 0px 1px 3px 0px, rgba(0, 0, 0, 0.1) 0px 1px 2px -1px;
padding: 1.25rem;
min-height: 250px !important;
text-align: center;
'''
# 模拟数据
company_info = {
"name": "N/A",
"symbol": "NYSE:N/A",
"price": 0,
"change": 0,
"change_percent": 0.41,
"open": 165.20,
"high": 166.37,
"low": 156.15,
"prev_close": 157.01,
"volume": "27.10M"
}
# financial_metrics = query_financial_data("NVDA", "最新财务数据")
# print(f"最新财务数据: {financial_metrics}")
financial_metrics = [
{"label": "Total Revenue", "value": "N/A", "change": "N/A", "color": "grey"},
{"label": "Net Income", "value": "N/A", "change": "N/A", "color": "grey"},
{"label": "Earnings Per Share", "value": "N/A", "change": "N/A", "color": "grey"},
{"label": "Operating Expenses", "value": "N/A", "change": "N/A", "color": "grey"},
{"label": "Cash Flow", "value": "N/A", "change": "N/A", "color": "grey"}
]
# income_statement = [
# ["Category", "2024/FY", "2023/FY", "2022/FY"],
# ["Total", "130350M", "126491M", "134567M"],
# ["Net Income", "11081", "10598M", "9818.4M"],
# ["Earnings Per Share", "4.38", "4.03", "3.62"],
# ["Operating Expenses", "31990.9M", "31439.6M", "34516.2M"],
# ["Cash Flow", "25289.9M", "29086M", "22517.2M"]
# ]
income_statement = {
"list_data": [
["Category", "N/A/FY", "N/A/FY", "N/A/FY"],
["Total", "N/A", "N/A", "N/A"],
["Net Income", "N/A", "N/A", "N/A.4M"],
["Earnings Per Share", "N/A", "N/A", "N/A"],
["Operating Expenses", "N/A", "N/A", "N/A"],
["Cash Flow", "N/A", "N/A", "N/A"]
],
"yoy_rates": []
# "yoy_rates": [
# ["Category", "N/A/FY", "N/A/FY"],
# ["Total", "N/A", "N/A"],
# ["Net Income", "+3.05%", "-6.00%"],
# ["Earnings Per Share", "+3.05%", "-6.00%"],
# ["Operating Expenses", "+29.17%", "-6.00%"],
# ["Cash Flow", "-13.05%", "-6.00%"]
# ]
}
yearly_data = 'N/A'
# 增长变化的 HTML 字符(箭头+百分比)
def render_change(change: str, color: str):
if change.startswith("+"):
return f'▲{change} '
else:
return f'▼{change} '
# 构建左侧卡片
def build_stock_card():
html = f"""
N/A
N/A
N/A
N/A
Open
N/A
High
N/A
Low
N/A
Prev Close
N/A
"""
return html
# Vol
N/A
# 构建中间卡片
def build_financial_metrics():
metrics_html = ""
for item in financial_metrics:
change_html = render_change(item["change"], item["color"])
metrics_html += f"""
{item['label']}
{item['value']} {change_html}
"""
html = f"""
{yearly_data} Financial Metrics
YTD data
{metrics_html}
"""
return html
# 主函数:返回所有 HTML 片段
def get_dashboard():
with gr.Row():
with gr.Column(scale=1, min_width=250, elem_classes=["metric-card-col-left"]):
stock_card_html = gr.HTML(build_stock_card(), elem_classes=["metric-card-left"])
with gr.Column(scale=1, min_width=300, elem_classes=["metric-card-col-middle"]):
financial_metrics_html = gr.HTML(build_financial_metrics(), elem_classes=["metric-card-middle"])
with gr.Column(scale=1, min_width=450, elem_classes=["metric-card-col-right"]):
# 传递income_statement参数
income_table_html = gr.HTML(build_income_table(income_statement), elem_classes=["metric-card-right"])
return stock_card_html, financial_metrics_html, income_table_html
# 创建指标仪表板并保存引用
stock_card_component, financial_metrics_component, income_table_component = get_dashboard()
# 将组件引用保存到全局变量,以便在其他地方使用
global metrics_dashboard_components
metrics_dashboard_components = (stock_card_component, financial_metrics_component, income_table_component)
# 更新指标仪表板的函数
def update_metrics_dashboard(company_name):
"""根据选择的公司更新指标仪表板"""
# 模拟数据
# company_info = {
# "name": company_name,
# "symbol": "NYSE:BABA",
# "price": 157.65,
# "change": 0.64,
# "change_percent": 0.41,
# "open": 165.20,
# "high": 166.37,
# "low": 156.15,
# "prev_close": 157.01,
# "volume": "27.10M"
# }
company_info = {}
# 尝试获取股票价格数据,但不中断程序执行
stock_code = ""
try:
# 根据选择的公司获取股票代码
stock_code = get_stock_code_by_company_name(company_name)
# result = get_quote(company_name.strip())
# company_info2 = get_stock_price(stock_code)
# company_info2 = get_stock_price_from_bailian(stock_code)
# print(f"股票价格数据: {company_info2}")
company_info = get_quote(stock_code.strip())
company_info['company'] = company_name
print(f"股票价格数据====: {company_info}")
# 查询结果:{
# "company": "阿里巴巴",
# "symbol": "BABA",
# "open": "159.09",
# "high": "161.46",
# "low": "150.00",
# "price": "157.60",
# "volume": "21453064",
# "latest trading day": "2025-11-27",
# "previous close": "157.01",
# "change": "+0.59",
# "change_percent": "+0.38%"
# }BABA
# 如果成功获取数据,则用实际数据替换模拟数据
# if company_info2 and "content" in company_info2 and len(company_info2["content"]) > 0:
# import json
# # 解析返回的JSON数据
# data_text = company_info2["content"][0]["text"]
# stock_data = json.loads(data_text)
# # 提取数据
# quote = stock_data["Global Quote"]
# # 转换交易量单位
# volume = int(quote['06. volume'])
# if volume >= 1000000:
# volume_str = f"{volume / 1000000:.2f}M"
# elif volume >= 1000:
# volume_str = f"{volume / 1000:.2f}K"
# else:
# volume_str = str(volume)
# company_info = {
# "name": company_name,
# "symbol": f"NYSE:{quote['01. symbol']}",
# "price": float(quote['05. price']),
# "change": float(quote['09. change']),
# "change_percent": float(quote['10. change percent'].rstrip('%')),
# "open": float(quote['02. open']),
# "high": float(quote['03. high']),
# "low": float(quote['04. low']),
# "prev_close": float(quote['08. previous close']),
# "volume": volume_str
# }
except Exception as e:
print(f"获取股票价格数据失败: {e}")
company_info2 = None
# financial_metrics = [
# {"label": "Total Revenue", "value": "$2.84B", "change": "+12.4%", "color": "green"},
# {"label": "Net Income", "value": "$685M", "change": "-3.2%", "color": "red"},
# {"label": "Earnings Per Share", "value": "$2.15", "change": "-3.2%", "color": "red"},
# {"label": "Operating Expenses", "value": "$1.2B", "change": "+5.1%", "color": "green"},
# {"label": "Cash Flow", "value": "$982M", "change": "+8.7%", "color": "green"}
# ]
financial_metrics_pre = query_financial_data(stock_code, "5-Year")
# financial_metrics_pre = query_financial_data(company_name, "5年趋势")
# print(f"最新财务数据: {financial_metrics_pre}")
# financial_metrics = format_financial_metrics(financial_metrics_pre)
# financial_metrics_pre_2 = extract_last_three_with_fallback(financial_metrics_pre)
# print(f"提取的3年数据: {financial_metrics_pre_2}")
# financial_metrics_pre = {
# "metrics": financial_metrics_pre_2
# }
financial_metrics = []
# try:
# # financial_metrics = calculate_yoy_comparison(financial_metrics_pre)
# financial_metrics = build_financial_metrics_three_year_data(financial_metrics_pre)
# print(f"格式化后的财务数据: {financial_metrics}")
# except Exception as e:
# print(f"Error calculating YOY comparison: {e}")
year_data = None
three_year_data = None
try:
# financial_metrics = process_financial_data_with_metadata(financial_metrics_pre)
result = process_financial_data_with_metadata(financial_metrics_pre)
# 按需提取字段
financial_metrics = result["financial_metrics"]
year_data = result["year_data"]
three_year_data = result["three_year_data"]
print(f"格式化后的财务数据: {financial_metrics}")
# 拿report数据
# try:
# # 从 result 中获取报告数据
# if 'report_data' in result: # 假设 result 中包含 report_data 键
# report_data = result['report_data']
# else:
# # 如果 result 中没有直接包含 report_data,则从其他键中获取
# # 这需要根据实际的 result 数据结构来调整
# report_data = result.get('reports', []) # 示例:假设数据在 'reports' 键下
# 更新报告部分的内容
# 这里需要调用 update_report_section 函数并传入 report_data
# 注意:update_report_section 可能需要修改以接受 report_data 参数
# updated_report_content = update_report_section(company_name, report_data, stock_code)
# 然后将 updated_report_content 返回,以便在 UI 中更新
# 这需要修改函数的返回值以包含报告内容
# except Exception as e:
# print(f"Error updating report section with result data: {e}")
# updated_report_content = "Failed to load report data
"
except Exception as e:
print(f"Error process_financial_data: {e}")
# income_statement = [
# ["Category", "2024/FY", "2023/FY", "2022/FY"],
# ["Total", "130350M", "126491M", "134567M"],
# ["Net Income", "11081", "10598M", "9818.4M"],
# ["Earnings Per Share", "4.38", "4.03", "3.62"],
# ["Operating Expenses", "31990.9M", "31439.6M", "34516.2M"],
# ["Cash Flow", "25289.9M", "29086M", "22517.2M"]
# ]
# table_data = None
# try:
# table_data = extract_financial_table(financial_metrics_pre)
# print(table_data)
# except Exception as e:
# print(f"Error extract_financial_table: {e}")
# yearly_data = None
# try:
# yearly_data = get_yearly_data(financial_metrics_pre)
# except Exception as e:
# print(f"Error get_yearly_data: {e}")
# ======
# table_data = [
# ["Category", "2024/FY", "2023/FY", "2022/FY"],
# ["Total", "130350M", "126491M", "134567M"],
# ["Net Income", "11081", "10598M", "9818.4M"],
# ["Earnings Per Share", "4.38", "4.03", "3.62"],
# ["Operating Expenses", "31990.9M", "31439.6M", "34516.2M"],
# ["Cash Flow", "25289.9M", "29086M", "22517.2M"]
# ]
yearly_data = year_data
table_data = build_table_format(three_year_data)
# print(f"table_data: {table_data}")
# yearly_data = None
# try:
# yearly_data = get_yearly_data(financial_metrics_pre)
# except Exception as e:
# print(f"Error get_yearly_data: {e}")
#=======
# exp = {
# "list_data": [
# ["Category", "2024/FY", "2023/FY", "2022/FY"],
# ["Total", "130350M", "126491M", "134567M"],
# ["Net Income", "11081", "10598M", "9818.4M"],
# ["Earnings Per Share", "4.38", "4.03", "3.62"],
# ["Operating Expenses", "31990.9M", "31439.6M", "34516.2M"],
# ["Cash Flow", "25289.9M", "29086M", "22517.2M"]
# ],
# "yoy_rates": [
# ["Category", "2024/FY", "2023/FY"],
# ["Total", "+3.05%", "-6.00%"],
# ["Net Income", "+3.05%", "-6.00%"],
# ["Earnings Per Share", "+3.05%", "-6.00%"],
# ["Operating Expenses", "+29.17%", "-6.00%"],
# ["Cash Flow", "-13.05%", "-6.00%"]
# ]
# }
# 增长变化的 HTML 字符(箭头+百分比)
def render_change(change: str, color: str):
if change.startswith("+"):
return f'▲{change} '
else:
return f'▼{change} '
# 构建左侧卡片
def build_stock_card(company_info):
try:
if not company_info or not isinstance(company_info, dict):
company_name = "N/A"
symbol = "N/A"
price = "N/A"
change_html = 'N/A '
open_val = high_val = low_val = prev_close_val = volume_display = "N/A"
else:
company_name = company_info.get("company", "N/A")
symbol = company_info.get("symbol", "N/A")
price = company_info.get("current_price", "N/A")
# 解析 change
change_str = company_info.get("change", "0")
try:
change = float(change_str)
except (ValueError, TypeError):
change = 0.0
# 解析 change_percent
change_percent = company_info.get("percent_change", "0%")
# try:
# change_percent = float(change_percent_str.rstrip('%'))
# except (ValueError, TypeError):
# change_percent = 0.0
change_color = "green" if change >= 0 else "red"
sign = "+" if change >= 0 else ""
change_html = f'{sign}{change:.2f} ({change_percent:+.2f}%) '
# 其他价格字段(可选:也可格式化为 2 位小数)
open_val = company_info.get("open", "N/A")
high_val = company_info.get("high", "N/A")
low_val = company_info.get("low", "N/A")
prev_close_val = company_info.get("previous_close", "N/A")
# raw_volume = company_info.get("volume", "N/A")
# volume_display = format_volume(raw_volume)
html = f"""
{company_name}
NYSE:{symbol}
Open
{open_val}
High
{high_val}
Low
{low_val}
Prev Close
{prev_close_val}
"""
# Vol
{volume_display}
return html
except Exception as e:
print(f"Error building stock card: {e}")
return 'Error loading stock data
'
# 构建中间卡片
def build_financial_metrics(yearly_data):
metrics_html = ""
for item in financial_metrics:
change_html = render_change(item["change"], item["color"])
metrics_html += f"""
{item['label']}
{item['value']} {change_html}
"""
html = f"""
{yearly_data} Financial Metrics
YTD data
{metrics_html}
"""
return html
# 构建右侧表格
# def build_income_table(income_statement):
# table_rows = ""
# for i, row in enumerate(income_statement):
# if i == 0:
# row_style = "background-color: #f5f5f5; font-weight: 500;"
# else:
# row_style = "background-color: #f9f9f9;"
# cells = ""
# for j, cell in enumerate(row):
# if j == 0:
# cells += f"{cell} "
# else:
# # 添加增长箭头(模拟数据)
# growth = None
# if i == 1 and j == 1: growth = "+3.05%"
# elif i == 1 and j == 2: growth = "-6.00%"
# elif i == 2 and j == 1: growth = "+3.05%"
# elif i == 2 and j == 2: growth = "-6.00%"
# elif i == 3 and j == 1: growth = "+3.05%"
# elif i == 3 and j == 2: growth = "-6.00%"
# elif i == 4 and j == 1: growth = "+29.17%"
# elif i == 4 and j == 2: growth = "+29.17%"
# elif i == 5 and j == 1: growth = "-13.05%"
# elif i == 5 and j == 2: growth = "+29.17%"
# if growth:
# arrow = "▲" if growth.startswith("+") else "▼"
# color = "green" if growth.startswith("+") else "red"
# cells += f"""
# {cell}
# {arrow}{growth}
# """
# else:
# cells += f"{cell} "
# table_rows += f"{cells} "
# html = f"""
#
#
#
#
#
#
Income Statement and Cash Flow
#
#
#
# """
# return html
# 返回三个HTML组件的内容
return build_stock_card(company_info), build_financial_metrics(yearly_data), build_income_table(table_data)
def create_tab_content(tab_name, company_name):
"""创建Tab内容组件"""
if tab_name == "summary":
print(f"company_name: {company_name}")
# content = get_invest_suggest(company_name)
gr.Markdown("# 11111", elem_classes=["invest-suggest-md-box"])
# gr.Markdown(content, elem_classes=["invest-suggest-md-box"])
# gr.Markdown("""
# ## Investment Suggestions
# ### Company Overview
# GlobalTech inc. is a leading technology company with strong performance in the Q3 2025 period. The companyshows consistent revenue growth and maintains a healthy fnancial position.
# ### Key Strengths
# - Revenue Growth: 12.4% year-over-year increase demonstrates strong market demandDiversifed Portfolio: Multiple revenue streams reduce business risk
# - Innovation Focus: Continued investment in R&D drives future growth potential
# ### Financial Health Indicators
# - Liquidity: Current ratio of 1.82 indicates good short-term fnancial health
# - Proftability: Net income of $685M, though down slightly quarter-over-quarter0
# - Cash Flow: Strong operating cash flow of $982M supports operations and growth initiatives
# ### Investment Recommendation
# BUY - GlobalTech Inc. presents a solid investment opportunity with:
# - Consistent revenue growth trajectory
# - Strong market position in key technology segments
# - Healthy balance sheet and cash flow generation
# ### Risk Considerations
# Quarterly net income decline warrants monitoring
# | Category | Q3 2025 | Q2 2025 | YoY % |
# |--------------------|-----------|-----------|----------|
# | Total Revenue | $2,842M | $2,712M | +12.4% |
# | Gross Profit | $1,203M | $1,124M | +7.0% |
# | Operating Income | $742M | $798M | -7.0% |
# | Net Income | $685M | $708M | -3.2% |
# | Earnings Per Share | $2.15 | $2.22 | -3.2% |
# """, elem_classes=["invest-suggest-md-box"])
elif tab_name == "detailed":
with gr.Column(elem_classes=["tab-content"]):
gr.Markdown("Financial Statements", elem_classes=["text-xl", "font-semibold", "text-gray-900", "mb-6"])
with gr.Row(elem_classes=["gap-6"]):
# 收入报表 (3/5宽度)
with gr.Column(elem_classes=["w-3/5", "bg-gray-50", "rounded-xl", "p-4"]):
gr.Markdown("Income Statement", elem_classes=["font-medium", "mb-3"])
# 这里将显示收入报表表格
# 资产负债表和现金流量表 (2/5宽度)
with gr.Column(elem_classes=["w-2/5", "flex", "flex-col", "gap-6"]):
# 资产负债表
with gr.Column(elem_classes=["bg-gray-50", "rounded-xl", "p-4"]):
gr.Markdown("Balance Sheet Summary", elem_classes=["font-medium", "mb-3"])
# 这里将显示资产负债表图表
# 现金流量表
with gr.Column(elem_classes=["bg-gray-50", "rounded-xl", "p-4"]):
with gr.Row(elem_classes=["justify-between", "items-start"]):
gr.Markdown("Cash Flow Statement", elem_classes=["font-medium"])
gr.Markdown("View Detailed", elem_classes=["text-xs", "text-blue-600", "font-medium"])
with gr.Column(elem_classes=["mt-4", "space-y-3"]):
# 经营现金流
with gr.Column():
with gr.Row(elem_classes=["justify-between"]):
gr.Markdown("Operating Cash Flow")
gr.Markdown("$982M", elem_classes=["font-medium"])
with gr.Row(elem_classes=["w-full", "bg-gray-200", "rounded-full", "h-1.5", "mt-1"]):
with gr.Column(elem_classes=["bg-green-500", "h-1.5", "rounded-full"], scale=85):
gr.Markdown("")
# 投资现金流
with gr.Column():
with gr.Row(elem_classes=["justify-between"]):
gr.Markdown("Investing Cash Flow")
gr.Markdown("-$415M", elem_classes=["font-medium"])
with gr.Row(elem_classes=["w-full", "bg-gray-200", "rounded-full", "h-1.5", "mt-1"]):
with gr.Column(elem_classes=["bg-blue-500", "h-1.5", "rounded-full"], scale=42):
gr.Markdown("")
# 融资现金流
with gr.Column():
with gr.Row(elem_classes=["justify-between"]):
gr.Markdown("Financing Cash Flow")
gr.Markdown("-$212M", elem_classes=["font-medium"])
with gr.Row(elem_classes=["w-full", "bg-gray-200", "rounded-full", "h-1.5", "mt-1"]):
with gr.Column(elem_classes=["bg-red-500", "h-1.5", "rounded-full"], scale=25):
gr.Markdown("")
elif tab_name == "comparative":
with gr.Column(elem_classes=["tab-content"]):
gr.Markdown("Industry Benchmarking", elem_classes=["text-xl", "font-semibold", "text-gray-900", "mb-6"])
# 收入增长对比
with gr.Column(elem_classes=["bg-gray-50", "rounded-xl", "p-4", "mb-6"]):
gr.Markdown("Revenue Growth - Peer Comparison", elem_classes=["font-medium", "mb-3"])
# 这里将显示对比图表
# 利润率和报告预览网格
with gr.Row(elem_classes=["grid-cols-2", "gap-6"]):
# 利润率表格
with gr.Column(elem_classes=["bg-gray-50", "rounded-xl", "p-4"]):
gr.Markdown("Profitability Ratios", elem_classes=["font-medium", "mb-3"])
# 这里将显示利润率表格
# 报告预览
with gr.Column(elem_classes=["bg-gray-50", "rounded-xl", "p-4"]):
gr.Markdown("Report Preview", elem_classes=["font-medium", "mb-3"])
# 这里将显示报告预览
def create_chat_panel():
"""创建聊天面板组件"""
# with gr.Column(elem_classes=["chat-panel"]):
# 聊天头部
# with gr.Row(elem_classes=["p-4", "border-b", "border-gray-200", "items-center", "gap-2"]):
# gr.Markdown("🤖", elem_classes=["text-xl", "text-blue-600"])
# gr.Markdown("Financial Assistant", elem_classes=["font-medium"])
# 聊天区域
# 一行代码嵌入!
# chat_component = create_financial_chatbot()
# chat_component.render()
# create_financial_chatbot()
# gr.LoginButton()
# chatbot = gr.Chatbot(
# value=[
# {"role": "assistant", "content": "I'm your financial assistant, how can I help you today?"},
# # {"role": "assistant", "content": "Hello! I can help you analyze financial data. Ask questions like \"Show revenue trends\" or \"Compare profitability ratios\""},
# # {"role": "user", "content": "Show revenue trends for last 4 quarters"},
# # {"role": "assistant", "content": "Revenue trend for GlobalTech Inc.:\n\nQ4 2024: $2.53B (+8.2%)\nQ1 2025: $2.61B (+9.8%)\nQ2 2025: $2.71B (+11.6%)\nQ3 2025: $2.84B (+12.4%)"},
# # {"role": "assistant", "content": "Revenue trend for GlobalTech Inc.:\n\nQ4 2024: $2.53B (+8.2%)\nQ1 2025: $2.61B (+9.8%)\nQ2 2025: $2.71B (+11.6%)\nQ3 2025: $2.84B (+12.4%)"},
# # {"role": "assistant", "content": "Revenue trend for GlobalTech Inc.:\n\nQ4 2024: $2.53B (+8.2%)\nQ1 2025: $2.61B (+9.8%)\nQ2 2025: $2.71B (+11.6%)\nQ3 2025: $2.84B (+12.4%)"},
# # {"role": "assistant", "content": "Revenue trend for GlobalTech Inc.:\n\nQ4 2024: $2.53B (+8.2%)\nQ1 2025: $2.61B (+9.8%)\nQ2 2025: $2.71B (+11.6%)\nQ3 2025: $2.84B (+12.4%)"}
# ],
# type="messages",
# # elem_classes=["min-h-0", "overflow-y-auto", "space-y-4", "chat-content-box"],
# show_label=False,
# autoscroll=True,
# show_copy_button=True,
# height=400,
# container=False,
# )
# # 输入区域
# with gr.Row(elem_classes=["border-t", "border-gray-200", "gap-2"]):
# msg = gr.Textbox(
# placeholder="Ask a financial question...",
# elem_classes=["flex-1", "border", "border-gray-300", "rounded-lg", "px-4", "py-2", "focus:border-blue-500"],
# show_label=False,
# lines=1,
# submit_btn=True,
# container=False,
# )
# msg.submit(
# chat_bot,
# [msg, chatbot],
# [msg, chatbot],
# queue=True,
# )
# def load_css_files(css_dir, filenames):
# css_content = ""
# for filename in filenames:
# path = os.path.join(css_dir, filename)
# if os.path.exists(path):
# with open(path, "r", encoding="utf-8") as f:
# css_content += f.read() + "\n"
# else:
# print(f"⚠️ CSS file not found: {path}")
# return css_content
def main():
# 获取当前目录
current_dir = os.path.dirname(os.path.abspath(__file__))
css_dir = os.path.join(current_dir, "css")
# def load_css_files(css_dir, filenames):
# """读取多个 CSS 文件并合并为一个字符串"""
# css_content = ""
# for filename in filenames:
# path = os.path.join(css_dir, filename)
# if os.path.exists(path):
# with open(path, "r", encoding="utf-8") as f:
# css_content += f.read() + "\n"
# else:
# print(f"Warning: CSS file not found: {path}")
# return css_content
# 设置CSS路径
css_paths = [
os.path.join(css_dir, "main.css"),
os.path.join(css_dir, "components.css"),
os.path.join(css_dir, "layout.css")
]
# css_dir = "path/to/your/css/folder" # 替换为你的实际路径
# 自动定位 css 文件夹(与 app.py 同级)
# BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# CSS_DIR = os.path.join(BASE_DIR, "css")
# css_files = ["main.css", "components.css", "layout.css"]
# combined_css = load_css_files(CSS_DIR, css_files)
# print(combined_css)
with gr.Blocks(
title="Financial Analysis Dashboard",
css_paths=css_paths,
css=custom_css,
# css=combined_css
) as demo:
# 添加处理公司点击事件的路由
# 创建一个状态组件来跟踪选中的公司
selected_company_state = gr.State("")
with gr.Column(elem_classes=["container", "container-h"]):
# 头部
create_header()
# 创建主布局
with gr.Row(elem_classes=["main-content-box"]):
# 左侧边栏
with gr.Column(scale=1, min_width=350):
# 获取company_list组件的引用
company_list_component, report_section_component, report_display_component, news_display_component = create_sidebar()
# 主内容区域
with gr.Column(scale=9):
# 指标仪表板
create_metrics_dashboard()
with gr.Row(elem_classes=["main-content-box"]):
with gr.Column(scale=8):
# Tab内容
with gr.Tabs():
with gr.TabItem("Invest Suggest", elem_classes=["tab-item"]):
# 创建一个用于显示公司名称的组件
# company_display = gr.Markdown("# Please select a company")
# 创建一个占位符用于显示tab内容
tab_content = gr.Markdown(elem_classes=["invest-suggest-md-box"])
# 当选中的公司改变时,更新显示
# selected_company_state.change(
# fn=lambda company: f"# Investment Suggestions for {company}" if company else "# Please select a company",
# inputs=[selected_company_state],
# outputs=[company_display]
# )
# 当选中的公司改变时,重新加载tab内容
def update_tab_content(company):
if company:
# 显示loading状态
loading_html = f'''
Loading investment suggestions for {company}...
'''
yield loading_html
# 获取投资建议数据
try:
content = get_invest_suggest(company)
yield content
except Exception as e:
error_html = f'''
Error loading investment suggestions: {str(e)}
Please try again later.
'''
yield error_html
else:
yield "Please select a company
"
selected_company_state.change(
fn=update_tab_content,
inputs=[selected_company_state],
outputs=[tab_content],
)
with gr.TabItem("Analysis Report", elem_classes=["tab-item"]):
# 创建一个用于显示公司名称的组件
# analysis_company_display = gr.Markdown("# Please select a company")
# 创建一个占位符用于显示tab内容
analysis_tab_content = gr.Markdown(elem_classes=["analysis-report-md-box"])
# 当选中的公司改变时,更新显示
# selected_company_state.change(
# fn=lambda company: f"# Analysis Report for {company}" if company else "# Please select a company",
# inputs=[selected_company_state],
# outputs=[analysis_company_display]
# )
# 当选中的公司改变时,重新加载tab内容
def update_analysis_tab_content(company):
if company:
# 显示loading状态
loading_html = f'''
Loading analysis report for {company}...
'''
yield loading_html
# 获取分析报告数据
try:
# 这里应该调用获取详细分析报告的函数
# 暂时使用占位内容,您需要替换为实际的函数调用
# content = f"# Analysis Report for {company}\n\nDetailed financial analysis for {company} will be displayed here."
yield get_analysis_report(company)
except Exception as e:
error_html = f'''
Error loading analysis report: {str(e)}
Please try again later.
'''
yield error_html
else:
yield "Please select a company
"
selected_company_state.change(
fn=update_analysis_tab_content,
inputs=[selected_company_state],
outputs=[analysis_tab_content]
)
# with gr.TabItem("Comparison", elem_classes=["tab-item"]):
# create_tab_content("comparison")
with gr.Column(scale=2, min_width=400):
# 聊天面板
# gr.ChatInterface(
# respond,
# title="Easy Financial Report",
# # label="Easy Financial Report",
# additional_inputs=[
# # gr.Textbox(value="You are a financial analysis assistant. Provide concise investment insights from company financial reports.", label="System message"),
# # gr.Slider(minimum=1, maximum=4096, value=1024, step=1, label="Max new tokens"),
# # gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"),
# # gr.Slider(
# # minimum=0.1,
# # maximum=1.0,
# # value=0.95,
# # step=0.05,
# # label="Top-p (nucleus sampling)",
# # ),
# gr.State(value="") # CRITICAL: Add State to store session URL across turns
# ],
# )
# chatbot.render()
# gr.LoginButton()
gr.ChatInterface(
respond,
title="Easy Financial Report",
additional_inputs=[
gr.State(value=""), # CRITICAL: Store session URL across turns (hidden from UI)
gr.State(value={}) # CRITICAL: Store agent context across turns (hidden from UI)
],
additional_inputs_accordion=gr.Accordion(label="Settings", open=False, visible=False), # Hide the accordion completely
)
# chatbot.render()
# with gr.Blocks() as demo:
# # Add custom CSS for Agent Plan styling
# gr.Markdown("""
#
# """)
# chatbot.render()
# 在页面加载时自动刷新公司列表,确保显示最新的数据
# demo.load(
# fn=get_company_list_choices,
# inputs=[],
# outputs=[company_list_component],
# concurrency_limit=None,
# )
# 绑定公司选择事件到状态更新
# 注意:这里需要确保create_sidebar中没有重复绑定相同的事件
company_list_component.change(
fn=lambda x: x, # 直接返回选中的公司名称
inputs=[company_list_component],
outputs=[selected_company_state],
concurrency_limit=None
)
# 绑定公司选择事件到指标仪表板更新
def update_metrics_dashboard_wrapper(company_name):
if company_name:
# 显示loading状态
loading_html = f'''
Loading financial data for {company_name}...
'''
yield loading_html, loading_html, loading_html
# 获取更新后的数据
try:
stock_card_html, financial_metrics_html, income_table_html = update_metrics_dashboard(company_name)
yield stock_card_html, financial_metrics_html, income_table_html
except Exception as e:
error_html = f'''
Error loading financial data: {str(e)}
Please try again later.
'''
yield error_html, error_html, error_html
else:
# 如果没有选择公司,返回空内容
empty_html = "Please select a company
"
yield empty_html, empty_html, empty_html
selected_company_state.change(
fn=update_metrics_dashboard_wrapper,
inputs=[selected_company_state],
outputs=list(metrics_dashboard_components),
concurrency_limit=None
)
return demo
if __name__ == "__main__":
demo = main()
demo.launch(share=True)