""" 智能报告缓存管理器 用于管理 Investment Suggestion 和 Analysis Report 的后台生成和缓存 """ from datetime import datetime from typing import Callable, Dict import threading import time class ReportTask: """报告生成任务""" def __init__(self, company: str, report_type: str, generator_func: Callable): self.company = company self.report_type = report_type self.generator_func = generator_func self.status = "pending" # pending, running, completed, error self.result = None self.error = None self.start_time = None self.end_time = None self.thread = None def run(self): """执行任务""" self.status = "running" self.start_time = datetime.now() try: self.result = self.generator_func() self.status = "completed" except Exception as e: self.status = "error" self.error = str(e) import traceback self.error_trace = traceback.format_exc() finally: self.end_time = datetime.now() def get_age_seconds(self): """获取任务年龄(秒)""" if self.end_time: return (datetime.now() - self.end_time).total_seconds() elif self.start_time: return (datetime.now() - self.start_time).total_seconds() return 0 class ReportCacheManager: """智能报告缓存管理器""" def __init__(self, cache_ttl_seconds=3600, max_cache_size=50): self.cache_ttl = cache_ttl_seconds self.max_cache_size = max_cache_size self.tasks: Dict[tuple, ReportTask] = {} self.lock = threading.Lock() self.stats = { "cache_hits": 0, "cache_misses": 0, "background_completions": 0, "total_requests": 0 } def get_or_create_report(self, company: str, report_type: str, generator_func: Callable): """获取或创建报告""" # ✅ 不使用with lock包裹整个函数,而是在关键操作时锁定 self.lock.acquire() try: self.stats["total_requests"] += 1 key = (company, report_type) # 清理过期缓存 self._cleanup_expired_cache() task = self.tasks.get(key) # ✅ 场景1: 缓存已存在且已完成 - 立即返回,无需等待 if task and task.status == "completed": if task.get_age_seconds() < self.cache_ttl: self.stats["cache_hits"] += 1 print(f"✅ [Cache HIT] {company} - {report_type} (age: {task.get_age_seconds():.1f}s)") result = task.result self.lock.release() yield result # ✅ 立即返回缓存,不阻塞 return else: print(f"⏰ [Cache EXPIRED] {company} - {report_type}") del self.tasks[key] task = None # ✅ 场景2: 其他公司的任务正在运行 - 检查当前请求的公司是否有缓存 # 如果Intel正在生成,但用户切换到NVDA,应该先检查NVDA的缓存 if task and task.status == "running": # ✅ 关键优化: 检查当前请求的公司是否有缓存 # 如果有,立即返回缓存;如果没有,显示正在等待的loading self.stats["cache_hits"] += 1 print(f"🔄 [Task RUNNING] {task.company} - {report_type}, but {company} requested") # 如果请求的是同一个公司,等待任务完成 if task.company == company: self.lock.release() yield self._get_loading_html(company, report_type, task.get_age_seconds()) # 等待后台任务完成 max_wait = 90 waited = 0 while task.status == "running" and waited < max_wait: time.sleep(1) waited += 1 yield self._get_loading_html(company, report_type, task.get_age_seconds()) if task.status == "completed": with self.lock: self.stats["background_completions"] += 1 print(f"✅ [Background COMPLETED] {company} - {report_type}") yield task.result return elif task.status == "error": print(f"❌ [Background ERROR] {company} - {report_type}: {task.error}") yield self._get_error_html(company, report_type, task.error) return else: # ✅ 不同公司: 启动新任务,让旧任务在后台继续 print(f"🆕 [Different company] Starting {company} while {task.company} is running") # 旧任务继续在后台运行,不干扰 task = None # 重置task,下面会创建新任务 # 场景3: 之前失败了,重试 if task and task.status == "error": print(f"🔄 [Retry after ERROR] {company} - {report_type}") del self.tasks[key] task = None # ✅ 场景4: 缓存不存在,启动新任务 if not task: self.stats["cache_misses"] += 1 print(f"🆕 [Cache MISS] {company} - {report_type} - Starting background generation") task = ReportTask(company, report_type, generator_func) self.tasks[key] = task task.thread = threading.Thread(target=task.run, daemon=True) task.thread.start() self.lock.release() # ✅ 释放锁再 yield # ✅ 立即yield第一个loading状态,不要等待 yield self._get_loading_html(company, report_type, 0) # 等待任务完成 max_wait = 90 waited = 0 while task.status == "running" and waited < max_wait: time.sleep(1) waited += 1 yield self._get_loading_html(company, report_type, task.get_age_seconds()) if task.status == "completed": print(f"✅ [NEW COMPLETED] {company} - {report_type}") yield task.result return elif task.status == "error": print(f"❌ [NEW ERROR] {company} - {report_type}: {task.error}") yield self._get_error_html(company, report_type, task.error) return finally: # 确保锁被释放 if self.lock.locked(): self.lock.release() def _cleanup_expired_cache(self): """清理过期缓存""" keys_to_remove = [] for key, task in self.tasks.items(): if task.status == "completed" and task.get_age_seconds() > self.cache_ttl: keys_to_remove.append(key) for key in keys_to_remove: company, report_type = key print(f"🗑️ [Cache CLEANUP] {company} - {report_type}") del self.tasks[key] # 限制缓存大小 if len(self.tasks) > self.max_cache_size: completed_tasks = [(k, v) for k, v in self.tasks.items() if v.status == "completed"] completed_tasks.sort(key=lambda x: x[1].end_time or datetime.min) to_remove = len(self.tasks) - self.max_cache_size for i in range(to_remove): key, task = completed_tasks[i] company, report_type = key print(f"🗑️ [Cache SIZE LIMIT] {company} - {report_type}") del self.tasks[key] def _get_loading_html(self, company: str, report_type: str, elapsed_seconds: float): """生成加载状态HTML""" report_name = "Investment Suggestion" if report_type == "suggestion" else "Analysis Report" elapsed_str = f"{elapsed_seconds:.0f}s" if elapsed_seconds > 0 else "just started" return f'''

🤖 Generating {report_name} for {company}...
Elapsed: {elapsed_str}

''' def _get_error_html(self, company: str, report_type: str, error: str): """生成错误状态HTML""" report_name = "Investment Suggestion" if report_type == "suggestion" else "Analysis Report" return f'''

⚠️ Generation Failed

Report: {report_name}

Company: {company}

Error: {error}

''' def get_stats(self): """获取缓存统计""" with self.lock: total = self.stats["total_requests"] hits = self.stats["cache_hits"] misses = self.stats["cache_misses"] hit_rate = (hits / total * 100) if total > 0 else 0 return { **self.stats, "hit_rate": f"{hit_rate:.1f}%", "active_tasks": len([t for t in self.tasks.values() if t.status == "running"]), "cached_reports": len([t for t in self.tasks.values() if t.status == "completed"]) }