""" 审稿人推荐系统主应用 基于AgentReview框架风格实现的Gradio界面 """ import json import os import time from datetime import datetime from typing import List, Dict, Any import gradio as gr from reviewer_recommendation import ( PaperInfo, Reviewer, RecommendationRequest, RecommendationResponse, AppState, AcademicSearcher, DynamicAcademicSearcher, LLMRecommendationEngine, OpenAlexSearcher ) # 设置API密钥 # 从环境变量读取API密钥,如果没有设置则使用默认值 openai_api_key = os.environ.get('OPENAI_API_KEY', 'sk-cnaekqCE8TTvhJrY8q4GljfzLiHHbgR5nlzwgc14FG8iH3uj') os.environ['OPENAI_API_KEY'] = openai_api_key # CSS样式 css = """ /* 调整列间距 */ .gradio-container .row { gap: 40px !important; /* 可以调整这个数值来改变间距 */ } /* 或者更精确地控制特定行的间距 */ .gradio-container .row > div { margin-right: 20px !important; /* 右侧边距 */ } .gradio-container .row > div:last-child { margin-right: 0 !important; /* 最后一列不需要右边距 */ } /* 对齐推荐结果标题和内容 */ .gradio-container .row > div:last-child h2 { margin-top: 0 !important; /* 移除标题顶部边距 */ } .gradio-container .row > div:last-child .html-content { margin-top: 0 !important; /* 移除内容顶部边距 */ } /* 隐藏标题下方的默认边框 */ .gradio-container h1 { border-bottom: none !important; margin-bottom: 20px !important; } /* 隐藏可能的默认分隔线 */ .gradio-container hr { display: none !important; } """ # 全局状态 app_state = AppState() def format_reviewers_output(reviewers: List[Dict[str, Any]]) -> str: """格式化审稿人输出""" if not reviewers: return "

未找到合适的审稿人推荐,请尝试调整关键词或查询条件。

" output_lines = [] for i, reviewer in enumerate(reviewers, 1): # 处理字典格式的reviewer if isinstance(reviewer, dict): name = reviewer.get('name', 'N/A') affiliation = reviewer.get('affiliation', 'N/A') email = reviewer.get('email', 'N/A') expertise_areas = reviewer.get('expertise_areas', []) reason = reviewer.get('reason', 'N/A') relevance_score = reviewer.get('relevance_score', 0) else: # 处理Reviewer对象 name = getattr(reviewer, 'name', 'N/A') affiliation = getattr(reviewer, 'affiliation', 'N/A') email = getattr(reviewer, 'email', 'N/A') expertise_areas = getattr(reviewer, 'expertise_areas', []) reason = getattr(reviewer, 'reason', 'N/A') relevance_score = getattr(reviewer, 'relevance_score', 0) output_lines.append(f"

审稿人 {i}

") output_lines.append(f"

姓名: {name}

") output_lines.append(f"

单位: {affiliation}

") output_lines.append(f"

邮箱: {email}

") if expertise_areas: output_lines.append(f"

专业领域: {', '.join(expertise_areas)}

") output_lines.append(f"

推荐理由: {reason}

") # 确保relevance_score是数字类型 try: score_display = f"{float(relevance_score):.2f}" except (ValueError, TypeError): score_display = "0.00" output_lines.append(f"

匹配度: {score_display}

") output_lines.append("
") return "".join(output_lines) def recommend_reviewers(paper_title: str, paper_abstract: str, paper_authors: str, paper_affiliations: str, reviewer_count) -> str: """推荐审稿人的主要函数""" try: # 更新状态 app_state.is_processing = True app_state.last_error = None # 确保reviewer_count是整数类型 try: reviewer_count = int(reviewer_count) except (ValueError, TypeError): return "错误:推荐审稿人数量必须是有效的数字。" # 验证输入 if not paper_title.strip() or not paper_abstract.strip(): return "错误:论文标题和摘要不能为空。" if reviewer_count < 1 or reviewer_count > 10: return "错误:推荐审稿人数量应在1-10之间。" # 解析作者和单位 authors = [author.strip() for author in paper_authors.split(',') if author.strip()] affiliations = [aff.strip() for aff in paper_affiliations.split(',') if aff.strip()] # 创建论文信息对象 paper = PaperInfo( title=paper_title.strip(), abstract=paper_abstract.strip(), keywords=[], # 不再使用关键词输入 authors=authors, affiliations=affiliations ) # 创建推荐请求 request = RecommendationRequest( paper=paper, reviewer_count=reviewer_count ) app_state.current_request = request # 开始计时 start_time = time.time() # 初始化推荐系统 print("初始化推荐系统...") # 使用OpenAlex检索器(提供更准确的引用量数据) openalex_searcher = OpenAlexSearcher(limit=reviewer_count * 3) dynamic_searcher = DynamicAcademicSearcher(openalex_searcher=openalex_searcher) engine = LLMRecommendationEngine() # 执行推荐流程 print("开始双数据源检索候选文献...") # 设置年份过滤(例如:只检索近5年发表的论文) years_after = 3 # 只检索近10年发表的论文 channel1_candidates, channel2_candidates, channel3_candidates = dynamic_searcher.search_with_dynamic_queries( paper=paper, num_reviewers=reviewer_count, years_after=years_after ) if not channel1_candidates and not channel2_candidates and not channel3_candidates: return "未找到相关候选文献,请尝试调整关键词或查询条件。" print(f"找到候选文献:{len(channel1_candidates)} 篇,开始分析...") # 分析候选者(使用第一个通道的候选者) found_reviewers = engine.analyze_candidates(paper, channel1_candidates, reviewer_count) # 计算耗时 search_time = time.time() - start_time # 创建响应 response = RecommendationResponse( reviewers=found_reviewers[:reviewer_count], search_time=search_time, total_candidates=len(channel1_candidates) + len(channel2_candidates) + len(channel3_candidates), success=True ) app_state.current_response = response app_state.is_processing = False # 格式化输出 output = format_reviewers_output(response.reviewers) return output except Exception as e: app_state.is_processing = False app_state.last_error = str(e) error_msg = f"推荐过程中发生错误: {str(e)}" print(error_msg) return error_msg def clear_form(): """清空表单""" return "", "", "", "", "3" # 构建Gradio界面 def create_interface(): """创建Gradio界面""" with gr.Blocks(css=css, title="审稿人推荐系统") as demo: # 标题 gr.Markdown("# 审稿人推荐系统") with gr.Row(): with gr.Column(scale=1): # 输入区域 gr.Markdown("## 论文信息") paper_title = gr.Textbox( lines=2, label="论文标题", placeholder="请输入论文标题..." ) paper_abstract = gr.Textbox( lines=8, label="论文摘要", placeholder="请输入论文摘要..." ) paper_authors = gr.Textbox( lines=2, label="作者姓名(用逗号分隔)", placeholder="必填" ) paper_affiliations = gr.Textbox( lines=2, label="作者所属单位(用逗号分隔)", placeholder="必填" ) reviewer_count = gr.Dropdown( choices=["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"], value="3", label="推荐审稿人数量" ) with gr.Row(): recommend_btn = gr.Button("开始推荐", variant="primary") clear_btn = gr.Button("清空表单") with gr.Column(scale=1): # 输出区域 gr.Markdown("## 推荐结果") output_text = gr.HTML( value="

请输入论文信息并点击'开始推荐'按钮

", label="推荐结果" ) # 事件处理 recommend_btn.click( fn=recommend_reviewers, inputs=[paper_title, paper_abstract, paper_authors, paper_affiliations, reviewer_count], outputs=[output_text] ) clear_btn.click( fn=clear_form, outputs=[paper_title, paper_abstract, paper_authors, paper_affiliations, reviewer_count] ) return demo if __name__ == "__main__": # 创建并启动应用 demo = create_interface() demo.launch( server_name="0.0.0.0", # share=True, # 在 Hugging Face Spaces 上不支持 debug=False, # 生产环境关闭调试模式 show_error=True, quiet=False # 减少输出,加快启动 )