import json def extract_last_three_with_fallback(data_list): # 定义年份范围(当前最新是 FY2025,所以前三年是 2025, 2024, 2023) years = [2025, 2024, 2023] # 构建 period 映射:按优先级 priority_levels = [ ("FY", [f"FY{y}" for y in years]), ("Q4", [f"{y}Q4" for y in years]), ("Q3", [f"{y}Q3" for y in years]), ("Q2", [f"{y}Q2" for y in years]), ("Q1", [f"{y}Q1" for y in years]), ] # 转为字典便于查找 data_map = {item["period"]: item for item in data_list if "period" in item} # 按优先级尝试 for level_name, periods in priority_levels: records = [] valid = True for period in periods: item = data_map.get(period) if item is None or item.get("total_revenue") is None: valid = False break # 提取关键字段 clean_item = { "period": period, "fiscal_year": int(period[:4]) if level_name != "FY" else int(period[2:]), "level": level_name, "total_revenue": item["total_revenue"], "net_income": item.get("net_income"), "earnings_per_share": item.get("earnings_per_share"), "operating_expenses": item.get("operating_expenses"), "operating_cash_flow": item.get("operating_cash_flow"), "source_url": item.get("source_url") } records.append(clean_item) if valid: # 找到完整三年数据,返回 return records # 如果所有层级都不完整,可选择返回最高优先级中有效的部分(或抛异常) # 这里我们返回最高优先级中非空的记录(保守策略) for level_name, periods in priority_levels: records = [] for period in periods: item = data_map.get(period) if item and item.get("total_revenue") is not None: clean_item = { "period": period, "fiscal_year": int(period[:4]) if level_name != "FY" else int(period[2:]), "level": level_name, "total_revenue": item["total_revenue"], "net_income": item.get("net_income"), "earnings_per_share": item.get("earnings_per_share"), "operating_expenses": item.get("operating_expenses"), "operating_cash_flow": item.get("operating_cash_flow"), "source_url": item.get("source_url") } records.append(clean_item) if records: return records # 返回第一个有数据的层级(即使不全) return [] # 完全无数据 def format_number(value): """将大数字格式化为 $XM 或 $XB""" if value >= 1_000_000_000: return f"${value / 1_000_000_000:.2f}B".replace(".00B", "B").replace(".0B", "B") elif value >= 1_000_000: return f"${value / 1_000_000:.1f}M".replace(".0M", "M") else: return f"${value:,.0f}" def format_eps(value): """EPS 保留两位小数""" return f"${value:.2f}" def safe_int(val): """安全转换为 int,支持字符串或 None""" if val is None: return 0 try: return int(float(val)) # 兼容字符串或 float except (ValueError, TypeError): return 0 def calculate_change(current, previous): """计算同比变化百分比,返回如 '+12.4%' 或 '-3.2%'""" if previous == 0: return "+0.0%" if current >= 0 else "-0.0%" change = (current - previous) / abs(previous) * 100 sign = "+" if change >= 0 else "-" return f"{sign}{abs(change):.1f}%" def build_financial_metrics_three_year_data(three_year_data): # 确保按 fiscal_year 降序排列(最新在前) sorted_data = sorted(three_year_data, key=lambda x: x["fiscal_year"], reverse=True) if len(sorted_data) < 2: raise ValueError("至少需要两年数据来计算同比变化") latest = sorted_data[0] previous = sorted_data[1] # 提取并转为 int rev_curr = safe_int(latest.get("total_revenue")) rev_prev = safe_int(previous.get("total_revenue")) net_curr = safe_int(latest.get("net_income")) net_prev = safe_int(previous.get("net_income")) eps_curr = float(latest.get("earnings_per_share", 0) or 0) eps_prev = float(previous.get("earnings_per_share", 0) or 0) opex_curr = safe_int(latest.get("operating_expenses")) opex_prev = safe_int(previous.get("operating_expenses")) cash_curr = safe_int(latest.get("operating_cash_flow")) cash_prev = safe_int(previous.get("operating_cash_flow")) metrics = [ { "label": "Total Revenue", "value": format_number(rev_curr), "change": calculate_change(rev_curr, rev_prev), "color": "green" if rev_curr >= rev_prev else "red" }, { "label": "Net Income", "value": format_number(net_curr), "change": calculate_change(net_curr, net_prev), "color": "green" if net_curr >= net_prev else "red" }, { "label": "Earnings Per Share", "value": format_eps(eps_curr), "change": calculate_change(eps_curr, eps_prev), "color": "green" if eps_curr >= eps_prev else "red" }, { "label": "Operating Expenses", "value": format_number(opex_curr), "change": calculate_change(opex_curr, opex_prev), "color": "green" if opex_curr >= opex_prev else "red" }, { "label": "Cash Flow", "value": format_number(cash_curr), "change": calculate_change(cash_curr, cash_prev), "color": "green" if cash_curr >= cash_prev else "red" } ] return metrics # 假设你的原始数据变量名为 raw_data(即你提供的大列表) # raw_data = [ {...}, ... ] # 执行 # result = extract_last_three_with_fallback(raw_data) # # 输出 JSON # json_output = json.dumps(result, indent=2) # print(json_output) # ========== from collections import defaultdict import re def parse_period(period): """解析 period 字符串,返回 (year, type, quarter)""" if period.startswith('FY'): year = int(period[2:]) return year, 'FY', None elif re.match(r'Q[1-4]-\d{4}', period): q, year = period.split('-') return int(year), 'Q', int(q[1]) else: raise ValueError(f"Unknown period format: {period}") def get_best_value_for_year(year_data, key): """ year_data: dict like {'FY': value, 'Q1': val, 'Q2': val, ...} 返回该财年该指标的最佳可用值(优先 FY,其次 Q4->Q3->Q2->Q1) """ if year_data.get('FY') is not None: return year_data['FY'] # 否则从 Q4 到 Q1 找第一个非 None for q in ['Q4', 'Q3', 'Q2', 'Q1']: if year_data.get(q) is not None: return year_data[q] return None # def get_yearly_data(data_json): # metrics_list = data_json['metrics'] # # 按年份组织数据:year -> { 'FY': {...}, 'Q1': {...}, ... } # yearly_data = "N/A" # for metric in metrics_list: # period = metric['period'] # year, ptype, quarter = parse_period(period) # if ptype == 'FY': # yearly_data = f"{year} {ptype}" # else: # yearly_data = f"{year} {ptype} Q{quarter}" # return yearly_data import re def parse_period_year_data(period): """ 支持以下格式: - FY2025 - Q1-2025 - 2025Q1 (新增支持) """ if not isinstance(period, str): return None, None, None # 格式 1: FY2025 if period.startswith('FY'): try: year = int(period[2:]) return year, 'FY', None except ValueError: pass # 格式 2: Q1-2025 match = re.match(r'Q([1-4])-(\d{4})', period) if match: quarter = int(match.group(1)) year = int(match.group(2)) return year, 'Q', quarter # 格式 3: 2025Q1 (新增) match = re.match(r'(\d{4})Q([1-4])', period) if match: year = int(match.group(1)) quarter = int(match.group(2)) return year, 'Q', quarter # 无法解析 return None, None, None def get_yearly_data(data_json): metrics_list = data_json.get('metrics', []) latest_desc = "N/A" for metric in metrics_list: period = metric.get('period') if not period: continue year, ptype, quarter = parse_period_year_data(period) if year is None: continue # 跳过无法解析的 if ptype == 'FY': desc = f"{year} FY" else: desc = f"{year} Q{quarter}" # 简单认为列表顺序是时间顺序,最后一条最新 latest_desc = desc return latest_desc def parse_period_yoy(period): """解析 period 为 (year, type, quarter)""" if period.startswith('FY'): year = int(period[2:]) return year, 'FY', None elif re.match(r'Q[1-4]-\d{4}', period): q_part, year_str = period.split('-') return int(year_str), 'Q', int(q_part[1]) else: # 忽略无法解析的 period return None, None, None def get_best_value_for_year_yoy(values_dict, key): """ 从年度数据中获取指定指标的最佳值(优先 FY,其次 Q4 → Q1) values_dict: {'FY': {...}, 'Q1': {...}, ...} """ order = ['FY', 'Q4', 'Q3', 'Q2', 'Q1'] for q in order: metric = values_dict.get(q) if metric is not None and isinstance(metric, dict): val = metric.get(key) if val is not None: return val return None import json def calculate_yoy_comparison(data_json): metrics_list = data_json.get('metrics', []) if not metrics_list: return [] if not isinstance(metrics_list, list): return [] if not isinstance(metrics_list[0], dict): return [] # 安全处理:确保每个 metric 是字典(防止双重 JSON 编码) cleaned_metrics = [] for i, metric in enumerate(metrics_list): if isinstance(metric, str): try: metric = json.loads(metric) # metric = metric except Exception as e: raise ValueError(f"Failed to parse metrics[{i}] as JSON string: {metric}") from e if not isinstance(metric, dict): raise TypeError(f"metrics[{i}] is not a dictionary or valid JSON string. Type: {type(metric)}") cleaned_metrics.append(metric) # 按年份组织数据:year -> { 'FY': {...}, 'Q1': {...}, ... } yearly_data = defaultdict(lambda: defaultdict(dict)) for metric in cleaned_metrics: period = metric.get('period') if not period: continue # 跳过没有 period 的条目 year, ptype, quarter = parse_period_yoy(period) if year is None: continue # 跳过无法解析的 period if ptype == 'FY': yearly_data[year]['FY'] = metric elif ptype == 'Q': yearly_data[year][f'Q{quarter}'] = metric # 否则忽略 # 获取所有年份并排序(最新在前) years = sorted(yearly_data.keys(), reverse=True) if len(years) < 2: raise ValueError("至少需要两个财年的数据") latest_year = years[0] prev_year = years[1] result = [] indicators = [ ("Total Revenue", "total_revenue"), ("Net Income", "net_income"), ("Earnings Per Share", "earnings_per_share"), ("Operating Expenses", "operating_expenses"), ("Cash Flow", "operating_cash_flow") ] def format_value(val): if val is None: return "N/A" try: val = float(val) except (TypeError, ValueError): return "N/A" abs_val = abs(val) if abs_val >= 1e9: return f"${val / 1e9:.2f}B" elif abs_val >= 1e6: return f"${val / 1e6:.1f}M" elif abs_val >= 1e3: return f"${val / 1e3:.1f}K" else: return f"${val:.2f}" for label, key in indicators: # 获取本财年最佳值 current_val = get_best_value_for_year_yoy(yearly_data[latest_year], key) # 获取去年财年最佳值 prev_val = get_best_value_for_year_yoy(yearly_data[prev_year], key) if current_val is None or prev_val is None or prev_val == 0: change_str = "N/A" color = "N/A" else: try: current_val = float(current_val) prev_val = float(prev_val) except (TypeError, ValueError): change_str = "N/A" color = "N/A" else: change_pct = (current_val - prev_val) / abs(prev_val) * 100 if change_pct > 0: change_str = f"+{change_pct:.1f}%" color = "green" elif change_pct < 0: change_str = f"{change_pct:.1f}%" color = "red" else: change_str = "0.0%" color = "N/A" formatted_value = format_value(current_val) result.append({ "label": label, "value": formatted_value, "change": change_str, "color": color }) return result # def parse_period_yoy(period): # """解析 period 为 (year, type, quarter)""" # if period.startswith('FY'): # year = int(period[2:]) # return year, 'FY', None # elif re.match(r'Q[1-4]-\d{4}', period): # q_part, year_str = period.split('-') # return int(year_str), 'Q', int(q_part[1]) # else: # # 忽略无法解析的 period # return None, None, None # def calculate_yoy_comparison(data_json): # metrics_list = data_json['metrics'] # # 按年份组织数据:year -> { 'FY': {...}, 'Q1': {...}, ... } # yearly_data = defaultdict(lambda: defaultdict(dict)) # for metric in metrics_list: # period = metric['period'] # year, ptype, quarter = parse_period_yoy(period) # if ptype == 'FY': # yearly_data[year]['FY'] = metric # else: # yearly_data[year][f'Q{quarter}'] = metric # # 获取所有年份并排序(最新在前) # years = sorted(yearly_data.keys(), reverse=True) # if len(years) < 2: # raise ValueError("至少需要两个财年的数据") # latest_year = years[0] # prev_year = years[1] # result = [] # indicators = [ # ("Total Revenue", "total_revenue"), # ("Net Income", "net_income"), # ("Earnings Per Share", "earnings_per_share"), # ("Operating Expenses", "operating_expenses"), # ("Cash Flow", "operating_cash_flow") # ] # def format_value(val): # if val is None: # return "N/A" # abs_val = abs(val) # if abs_val >= 1e9: # return f"${val / 1e9:.2f}B" # elif abs_val >= 1e6: # return f"${val / 1e6:.1f}M" # elif abs_val >= 1e3: # return f"${val / 1e3:.1f}K" # else: # return f"${val:.2f}" # for label, key in indicators: # # 获取本财年最佳值 # current_val = get_best_value_for_year( # {k: v.get(key) for k, v in yearly_data[latest_year].items()}, # key # ) # # 获取去年财年最佳值 # prev_val = get_best_value_for_year( # {k: v.get(key) for k, v in yearly_data[prev_year].items()}, # key # ) # if current_val is None or prev_val is None or prev_val == 0: # change_str = "N/A" # color = "N/A" # else: # change_pct = (current_val - prev_val) / abs(prev_val) * 100 # if change_pct > 0: # change_str = f"+{change_pct:.1f}%" # color = "green" # elif change_pct < 0: # change_str = f"{change_pct:.1f}%" # color = "red" # else: # change_str = "0.0%" # color = "N/A" # formatted_value = format_value(current_val) # result.append({ # "label": label, # "value": formatted_value, # "change": change_str, # "color": color # }) # return result import re import json from collections import defaultdict def parse_period_three_year(period): """解析 period 为 (year, type, quarter)""" if period.startswith('FY'): year = int(period[2:]) return year, 'FY', None elif re.match(r'Q[1-4]-\d{4}', period): q_part, year_str = period.split('-') return int(year_str), 'Q', int(q_part[1]) else: # 忽略无法解析的 period return None, None, None def extract_financial_table(data_json): metrics_list = data_json.get('metrics', []) if not metrics_list: return [] if not isinstance(metrics_list, list): return [] if not isinstance(metrics_list[0], dict): return [] # === 安全清洗:确保每个 metric 是字典 === cleaned_metrics = [] for i, metric in enumerate(metrics_list): if isinstance(metric, str): try: metric = json.loads(metric) except Exception as e: raise ValueError(f"Failed to parse metrics[{i}] as JSON string: {metric}") from e if not isinstance(metric, dict): raise TypeError(f"metrics[{i}] is not a dictionary or valid JSON string. Type: {type(metric)}") cleaned_metrics.append(metric) # 按年份组织所有报告:year -> { 'FY': metric_dict, 'Q1': ..., 'Q2': ... } yearly_reports = defaultdict(dict) all_years = set() for metric in cleaned_metrics: period = metric.get('period') if not period: continue # 跳过无 period 的条目 year, ptype, quarter = parse_period_three_year(period) if year is None: continue all_years.add(year) if ptype == 'FY': yearly_reports[year]['FY'] = metric elif ptype == 'Q': yearly_reports[year][f'Q{quarter}'] = metric if not all_years: raise ValueError("未找到任何有效报告期") # 取最近三个财年(倒序) sorted_years = sorted(all_years, reverse=True)[:3] # 补齐到3年(如果不足) while len(sorted_years) < 3: sorted_years.append(None) # 为每个年份获取最佳值(优先 FY,其次 Q4→Q1) def get_best_value(year, key): if year is None: return None reports = yearly_reports.get(year, {}) # 确保 reports[q] 是 dict fy_report = reports.get('FY') if fy_report and isinstance(fy_report, dict): fy_val = fy_report.get(key) if fy_val is not None: return fy_val # 否则 Q4 → Q1 for q in ['Q4', 'Q3', 'Q2', 'Q1']: q_report = reports.get(q) if q_report and isinstance(q_report, dict): q_val = q_report.get(key) if q_val is not None: return q_val return None # 指标定义 indicators = [ ("Total", "total_revenue"), ("Net Income", "net_income"), ("Earnings Per Share", "earnings_per_share"), ("Operating Expenses", "operating_expenses"), ("Cash Flow", "operating_cash_flow") ] # 格式化函数 def format_to_m(value): if value is None: return "N/A" try: val = float(value) except (TypeError, ValueError): return "N/A" val_in_m = val / 1e6 if abs(val_in_m - round(val_in_m)) < 1e-6: return f"{int(round(val_in_m))}M" else: return f"{val_in_m:.1f}M" def format_eps(value): if value is None: return "N/A" try: val = float(value) except (TypeError, ValueError): return "N/A" return f"{val:.2f}" # 构建 list_data header = ["Category"] + [f"{year}/FY" for year in sorted_years if year is not None] list_data = [header] for label, key in indicators: row = [label] for year in sorted_years: if year is None: row.append("N/A") else: val = get_best_value(year, key) if label == "Earnings Per Share": row.append(format_eps(val)) else: row.append(format_to_m(val)) list_data.append(row) # 构建 yoy_rates valid_years = [y for y in sorted_years if y is not None] yoy_header = ["Category"] yoy_pairs = [] if len(valid_years) >= 2: yoy_header.append(f"{valid_years[0]}/FY") yoy_pairs.append((valid_years[0], valid_years[1])) if len(valid_years) >= 3: yoy_header.append(f"{valid_years[1]}/FY") yoy_pairs.append((valid_years[1], valid_years[2])) yoy_rates = [yoy_header] for label, key in indicators: row = [label] for curr_y, prev_y in yoy_pairs: curr_val = get_best_value(curr_y, key) prev_val = get_best_value(prev_y, key) if curr_val is None or prev_val is None or prev_val == 0: row.append("N/A") else: try: curr_val = float(curr_val) prev_val = float(prev_val) except (TypeError, ValueError): row.append("N/A") else: pct = (curr_val - prev_val) / abs(prev_val) * 100 if pct >= 0: row.append(f"+{pct:.2f}%") else: row.append(f"{pct:.2f}%") yoy_rates.append(row) return { "list_data": list_data, "yoy_rates": yoy_rates }