"""Financial Data Analysis Module""" from EasyReportDataMCP.edgar_client import EdgarDataClient from datetime import datetime from functools import lru_cache import json class FinancialAnalyzer: def __init__(self, user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)"): """ Initialize financial analyzer Args: user_agent (str): User agent string for identifying request source """ self.edgar_client = EdgarDataClient(user_agent) # 新增:实例级缓存,进一步提升性能 self._search_cache = {} self._extract_metrics_cache = {} # 缓存 extract_financial_metrics 结果 def search_company(self, company_input): """ Search company information (by name or CIK) - Optimized version Args: company_input (str): Company name or CIK Returns: dict: Company information """ # 实例级缓存检查 if company_input in self._search_cache: return self._search_cache[company_input] # If input is numeric, assume it's a CIK if company_input.isdigit() and len(company_input) >= 8: # Get company information from cache (will use @lru_cache) company_info = self.edgar_client.get_company_info(company_input) if company_info: self._search_cache[company_input] = company_info return company_info else: return {"error": "Company not found for specified CIK"} else: # Search company by name/ticker (uses cached company_tickers.json) company = self.edgar_client.search_company_by_name(company_input) if company: # ✅ OPTIMIZATION: Return basic info directly without calling get_company_info # search_company_by_name already returns: cik, name, ticker # Only call get_company_info if we need SIC code or description # For basic searches, the ticker data is sufficient # This eliminates the 3-5 second delay from get_company_info result = { "cik": company['cik'], "name": company['name'], "tickers": [company['ticker']] if company.get('ticker') else [], "_source": "company_tickers_cache" # Debug info } self._search_cache[company_input] = result return result else: return {"error": "No matching company found"} def get_company_filings_list(self, cik, form_types=['10-K', '10-Q']): """ Get company filings list Args: cik (str): Company CIK form_types (list): List of form types Returns: list: Filings list """ filings = self.edgar_client.get_company_filings(cik, form_types) return filings def extract_financial_metrics(self, cik, years=3): """ Extract financial metrics for specified number of years (optimized) Args: cik (str): Company CIK years (int): Number of years to extract, default is 3 years Returns: list: List of financial data """ # 实例级缓存检查(避免重复计算) cache_key = f"{cik}_{years}" if cache_key in self._extract_metrics_cache: return self._extract_metrics_cache[cache_key] financial_data = [] # Step 1: Get company facts ONCE (will be cached) facts = self.edgar_client.get_company_facts(cik) if not facts: return [] # Step 2: Get company filings ONCE to determine available years # Use tuple for caching compatibility filings_10k = self.edgar_client.get_company_filings(cik, ('10-K',)) filings_20f = self.edgar_client.get_company_filings(cik, ('20-F',)) all_annual_filings = filings_10k + filings_20f if not all_annual_filings: return [] # Detect if company is a 20-F filer (foreign company) is_20f_filer = len(filings_20f) > 0 and len(filings_10k) == 0 has_quarterly = False # 20-F filers typically don't have quarterly reports # Step 3: Extract filing years from annual reports filing_year_map = {} # Map: filing_year -> list of filings for filing in all_annual_filings: filing_date = filing.get('filing_date', '') if filing_date and len(filing_date) >= 4: try: file_year = int(filing_date[:4]) if file_year not in filing_year_map: filing_year_map[file_year] = [] filing_year_map[file_year].append(filing) except ValueError: continue if not filing_year_map: return [] # Step 4: Sort years in descending order and take the most recent N years sorted_years = sorted(filing_year_map.keys(), reverse=True) target_years = sorted_years[:years] # Step 5: Map filing years to fiscal years using facts (already fetched) filing_to_fiscal_year = {} # Map: filing_year -> fiscal_year # Try to map filing years to fiscal years using Company Facts for data_source in ["us-gaap", "ifrs-full"]: if data_source in facts.get("facts", {}): source_data = facts["facts"][data_source] # Look for Revenue tag to get fiscal year mapping revenue_tags = ["Revenues", "RevenueFromContractWithCustomerExcludingAssessedTax", "Revenue", "RevenueFromContractWithCustomer"] for tag in revenue_tags: if tag in source_data: units = source_data[tag].get("units", {}) if "USD" in units: for entry in units["USD"]: form = entry.get("form", "") fy = entry.get("fy", 0) filed = entry.get("filed", "") # Filing date fp = entry.get("fp", "") # Map filing year to fiscal year if form in ["10-K", "20-F"] and fy > 0 and filed and (fp == "FY" or not fp): if len(filed) >= 10: # Format: YYYY-MM-DD try: file_year = int(filed[:4]) # Store the mapping: filing_year -> fiscal_year if file_year not in filing_to_fiscal_year: filing_to_fiscal_year[file_year] = fy except ValueError: continue break # Found revenue tag, no need to check more # Step 6: Generate period list for target years # For each year: FY -> Q4 -> Q3 -> Q2 -> Q1 (descending order) # For 20-F filers: only FY (no quarterly data) periods = [] for file_year in target_years: # Try to get fiscal year from mapping, otherwise use filing year fiscal_year = filing_to_fiscal_year.get(file_year, file_year) # First add annual data for this fiscal year periods.append({ 'period': str(fiscal_year), 'type': 'annual', 'fiscal_year': fiscal_year, 'filing_year': file_year }) # Only add quarterly data for 10-K filers (not for 20-F filers) if not is_20f_filer: # Then add quarterly data in descending order: Q4, Q3, Q2, Q1 for quarter in range(4, 0, -1): periods.append({ 'period': f"{fiscal_year}Q{quarter}", 'type': 'quarterly', 'fiscal_year': fiscal_year, 'filing_year': file_year }) # Step 7: Get financial data for each period for idx, period_info in enumerate(periods): period = period_info['period'] fiscal_year = period_info['fiscal_year'] data = self.edgar_client.get_financial_data_for_period(cik, period) if data and "period" in data: # Add fiscal year prefix for annual data if period_info['type'] == 'annual': data["period"] = f"FY{fiscal_year}" # Add sequence number to maintain order data["_sequence"] = idx financial_data.append(data) # 缓存结果 if financial_data: self._extract_metrics_cache[cache_key] = financial_data return financial_data def get_latest_financial_data(self, cik): """ Get latest financial data (optimized) Args: cik (str): Company CIK Returns: dict: Latest financial data """ # Get latest filing year (supports 10-K and 20-F) # Use tuple for caching filings_10k = self.edgar_client.get_company_filings(cik, ('10-K',)) filings_20f = self.edgar_client.get_company_filings(cik, ('20-F',)) filings = filings_10k + filings_20f if not filings: return {} # Get latest filing year latest_filing_year = None for filing in filings: if 'filing_date' in filing and filing['filing_date']: try: filing_year = int(filing['filing_date'][:4]) if latest_filing_year is None or filing_year > latest_filing_year: latest_filing_year = filing_year except ValueError: continue if latest_filing_year is None: return {} # Get financial data for latest year return self.edgar_client.get_financial_data_for_period(cik, str(latest_filing_year)) def format_financial_data(self, financial_data): """ Format financial data for display Args: financial_data (dict or list): Financial data Returns: dict or list: Formatted financial data """ if isinstance(financial_data, list): # Sort by _sequence to maintain correct order (FY -> Q4 -> Q3 -> Q2 -> Q1) sorted_data = sorted(financial_data, key=lambda x: x.get("_sequence", 999)) formatted_data = [] for data in sorted_data: formatted_data.append(self._format_single_financial_data(data)) return formatted_data else: return self._format_single_financial_data(financial_data) def _format_single_financial_data(self, data): """ Format single financial data entry - optimized structure Args: data (dict): Financial data with new optimized structure Returns: dict: Formatted financial data """ formatted = { "period": data.get("period"), "_sequence": data.get("_sequence") } # Handle new optimized structure with metrics if "metrics" in data and isinstance(data["metrics"], dict): # Extract metrics to top level for backward compatibility for metric_key, metric_data in data["metrics"].items(): if isinstance(metric_data, dict): formatted[metric_key] = metric_data.get("value") else: # Fallback for old format formatted[metric_key] = metric_data # Add metadata to top level if "_metadata" in data: metadata = data["_metadata"] formatted["source_url"] = metadata.get("source_url") formatted["source_form"] = metadata.get("form") formatted["data_source"] = metadata.get("data_source") else: # Fallback: old format compatibility formatted.update(data) # Ensure all key fields exist, even if None key_fields = ['total_revenue', 'net_income', 'earnings_per_share', 'operating_expenses', 'operating_cash_flow', 'source_url', 'source_form'] for key in key_fields: if key not in formatted: formatted[key] = None # Format EPS, keep two decimal places if 'earnings_per_share' in formatted and isinstance(formatted['earnings_per_share'], (int, float)): formatted['earnings_per_share'] = round(formatted['earnings_per_share'], 2) return formatted