import json # ----------------------------- # 辅助函数:安全转换数值 # ----------------------------- def safe_int(val, default=0): if val is None: return default try: return int(float(val)) except (ValueError, TypeError): return default def safe_float(val, default=0.0): if val is None: return default try: return float(val) except (ValueError, TypeError): return default # ----------------------------- # 步骤1:提取带 fallback 的数据(支持最新年+历史4年,共5年) # ----------------------------- def extract_last_three_with_fallback(data_list): """ 从数据列表中提取最新的5年数据(最新1年 + 历史4年) 优先级: FY > Q4 > Q3 > Q2 > Q1 返回: 最新的5个period的数据 """ years = [2025, 2024, 2023, 2022, 2021] # ✅ 扩展到5年,确保Latest 3 Years有完整数据 priority_levels = [ ("FY", [f"FY{y}" for y in years]), ("Q4", [f"{y}Q4" for y in years]), ("Q3", [f"{y}Q3" for y in years]), ("Q2", [f"{y}Q2" for y in years]), ("Q1", [f"{y}Q1" for y in years]), ] data_map = {item["period"]: item for item in data_list if isinstance(item, dict) and "period" in item} # 尝试找到完整的5年数据 for level_name, periods in priority_levels: records = [] valid = True for period in periods: item = data_map.get(period) if not isinstance(item, dict) or item.get("total_revenue") is None: valid = False break # 从period中提取fiscal_year if level_name == "FY": fiscal_year = int(period[2:]) # FY2025 -> 2025 else: fiscal_year = int(period[:4]) # 2025Q3 -> 2025 records.append({ "period": period, "fiscal_year": fiscal_year, "level": level_name, "total_revenue": item.get("total_revenue"), "net_income": item.get("net_income"), "earnings_per_share": item.get("earnings_per_share"), "operating_expenses": item.get("operating_expenses"), "operating_cash_flow": item.get("operating_cash_flow") }) if valid: return records # Fallback: 返回第一个有数据的层级(即使不全) for level_name, periods in priority_levels: records = [] for period in periods: item = data_map.get(period) if isinstance(item, dict) and item.get("total_revenue") is not None: if level_name == "FY": fiscal_year = int(period[2:]) else: fiscal_year = int(period[:4]) records.append({ "period": period, "fiscal_year": fiscal_year, "level": level_name, "total_revenue": item.get("total_revenue"), "net_income": item.get("net_income"), "earnings_per_share": item.get("earnings_per_share"), "operating_expenses": item.get("operating_expenses"), "operating_cash_flow": item.get("operating_cash_flow") }) if records: return records return [] # ----------------------------- # 步骤2:格式化数字(B / M) # ----------------------------- def format_number(value): if value >= 1_000_000_000: num = value / 1_000_000_000 if num == int(num): return f"${int(num)}B" else: return f"${num:.2f}B".rstrip('0').rstrip('.') elif value >= 1_000_000: num = value / 1_000_000 if num == int(num): return f"${int(num)}M" else: return f"${num:.1f}M".rstrip('0').rstrip('.') elif value >= 1_000: return f"${value:,.0f}" else: return f"${value}" def format_eps(value): return f"${value:.2f}" def calculate_change(current, previous): if previous == 0: return "+0.0%" if current >= 0 else "-0.0%" change = (current - previous) / abs(previous) * 100 sign = "+" if change >= 0 else "-" return f"{sign}{abs(change):.1f}%" # ----------------------------- # 步骤3:构建最终 metrics # ----------------------------- def build_financial_metrics(three_year_data): if len(three_year_data) < 2: raise ValueError("至少需要两年数据来计算同比变化") sorted_data = sorted(three_year_data, key=lambda x: x["fiscal_year"], reverse=True) latest = sorted_data[0] previous = sorted_data[1] rev_curr = safe_int(latest["total_revenue"]) rev_prev = safe_int(previous["total_revenue"]) net_curr = safe_int(latest["net_income"]) net_prev = safe_int(previous["net_income"]) eps_curr = safe_float(latest["earnings_per_share"]) eps_prev = safe_float(previous["earnings_per_share"]) opex_curr = safe_int(latest["operating_expenses"]) opex_prev = safe_int(previous["operating_expenses"]) cash_curr = safe_int(latest["operating_cash_flow"]) cash_prev = safe_int(previous["operating_cash_flow"]) return [ { "label": "Total Revenue", "value": format_number(rev_curr), "change": calculate_change(rev_curr, rev_prev), "color": "green" if rev_curr >= rev_prev else "red" }, { "label": "Net Income", "value": format_number(net_curr), "change": calculate_change(net_curr, net_prev), "color": "green" if net_curr >= net_prev else "red" }, { "label": "Earnings Per Share", "value": format_eps(eps_curr), "change": calculate_change(eps_curr, eps_prev), "color": "green" if eps_curr >= eps_prev else "red" }, { "label": "Operating Expenses", "value": format_number(opex_curr), "change": calculate_change(opex_curr, opex_prev), "color": "green" if opex_curr >= opex_prev else "red" }, { "label": "Operating Cash Flow", "value": format_number(cash_curr), "change": calculate_change(cash_curr, cash_prev), "color": "green" if cash_curr >= cash_prev else "red" } ] # ----------------------------- # 主流程:输入 raw_data,输出 financial_metrics # ----------------------------- # def process_financial_data(raw_data): # # 如果是字符串,先解析 JSON # if isinstance(raw_data, str): # raw_data = json.loads(raw_data) # # 确保是列表 # if not isinstance(raw_data, list): # raise TypeError("raw_data 必须是列表或 JSON 字符串表示的列表") # # 提取三年数据 # three_years = extract_last_three_with_fallback(raw_data) # if not three_years: # raise ValueError("无法提取有效的三年财务数据") # # 构建指标 # return build_financial_metrics(three_years) def process_financial_data_with_metadata(raw_data): """ 返回包含 financial_metrics + year_data + three_year_data 的完整结果 financial_metrics: 最新1年的指标(与前一年对比) three_year_data: 最新的前3年数据(排除最新年,用于Latest 3 Years表格) """ return_value = {"financial_metrics": [], "year_data": "N/A", "three_year_data": []} if not raw_data: return return_value if not isinstance(raw_data, list): return return_value if not isinstance(raw_data[0], dict): return {"financial_metrics": [], "year_data": "N/A", "three_year_data": []} # 1. 解析输入 if isinstance(raw_data, str): raw_data = json.loads(raw_data) if not isinstance(raw_data, list): raise TypeError("raw_data 必须是列表或 JSON 字符串") # 2. ✅ 提取5年数据(最新1年 + 历史4年,带 fallback) five_years = extract_last_three_with_fallback(raw_data) if not five_years: print("无法提取有效的财务数据") return return_value # 按fiscal_year降序排序 five_years_sorted = sorted(five_years, key=lambda x: x["fiscal_year"], reverse=True) # 3. 分离最新1年和历史3年 if len(five_years_sorted) < 2: print("数据不足,至少需要2年数据") return return_value # 最新年用于financial_metrics latest_year = five_years_sorted[0] previous_year = five_years_sorted[1] # ✅ 历史3年用于three_year_data表格(取第2-4年,确保有完整3年数据) # 例如: [2024, 2023, 2022, 2021, 2020] -> 取索引1-3 -> [2023, 2022, 2021] three_years_for_table = five_years_sorted[1:4] if len(five_years_sorted) >= 4 else five_years_sorted[1:] # 4. 获取最新年份和报告类型(用于 year_data) year = latest_year["fiscal_year"] level = latest_year["level"] if level == "FY": year_data = f"FY {year}" # ✅ 改为"FY 2025"格式 else: # Q1, Q2, Q3, Q4 year_data = f"{year} {level}" # 5. 构建最新年的financial_metrics(与前一年对比) financial_metrics = build_financial_metrics([latest_year, previous_year]) # 6. 返回完整结构 return { "financial_metrics": financial_metrics, # 最新1年的指标 "year_data": year_data, # 最新年份标签 "three_year_data": three_years_for_table # 历史3年数据(用于表格) } # ----------------------------- # 示例使用(替换为你的真实数据) # ----------------------------- # if __name__ == "__main__": # # 👇 在这里粘贴你的原始数据(可以是字符串或变量) # # 示例:从文件读取或直接赋值 # with open("financial_data.json", "r", encoding="utf-8") as f: # raw_input = f.read() # 或者直接赋值为你的数据变量 # # 处理 # try: # financial_metrics = process_financial_data(raw_input) # print(json.dumps(financial_metrics, indent=2)) # except Exception as e: # print("处理失败:", e)