"""
审稿人推荐系统主应用
基于AgentReview框架风格实现的Gradio界面
"""
import json
import os
import time
from datetime import datetime
from typing import List, Dict, Any
import gradio as gr
from reviewer_recommendation import (
PaperInfo, Reviewer, RecommendationRequest, RecommendationResponse, AppState,
AcademicSearcher, DynamicAcademicSearcher, LLMRecommendationEngine, OpenAlexSearcher
)
# 设置API密钥
# 从环境变量读取API密钥,如果没有设置则使用默认值
openai_api_key = os.environ.get('OPENAI_API_KEY', 'sk-cnaekqCE8TTvhJrY8q4GljfzLiHHbgR5nlzwgc14FG8iH3uj')
os.environ['OPENAI_API_KEY'] = openai_api_key
# CSS样式
css = """
/* 调整列间距 */
.gradio-container .row {
gap: 40px !important; /* 可以调整这个数值来改变间距 */
}
/* 或者更精确地控制特定行的间距 */
.gradio-container .row > div {
margin-right: 20px !important; /* 右侧边距 */
}
.gradio-container .row > div:last-child {
margin-right: 0 !important; /* 最后一列不需要右边距 */
}
/* 对齐推荐结果标题和内容 */
.gradio-container .row > div:last-child h2 {
margin-top: 0 !important; /* 移除标题顶部边距 */
}
.gradio-container .row > div:last-child .html-content {
margin-top: 0 !important; /* 移除内容顶部边距 */
}
/* 隐藏标题下方的默认边框 */
.gradio-container h1 {
border-bottom: none !important;
margin-bottom: 20px !important;
}
/* 隐藏可能的默认分隔线 */
.gradio-container hr {
display: none !important;
}
"""
# 全局状态
app_state = AppState()
def format_reviewers_output(reviewers: List[Dict[str, Any]]) -> str:
"""格式化审稿人输出"""
if not reviewers:
return "
未找到合适的审稿人推荐,请尝试调整关键词或查询条件。
"
output_lines = []
for i, reviewer in enumerate(reviewers, 1):
# 处理字典格式的reviewer
if isinstance(reviewer, dict):
name = reviewer.get('name', 'N/A')
affiliation = reviewer.get('affiliation', 'N/A')
email = reviewer.get('email', 'N/A')
expertise_areas = reviewer.get('expertise_areas', [])
reason = reviewer.get('reason', 'N/A')
relevance_score = reviewer.get('relevance_score', 0)
else:
# 处理Reviewer对象
name = getattr(reviewer, 'name', 'N/A')
affiliation = getattr(reviewer, 'affiliation', 'N/A')
email = getattr(reviewer, 'email', 'N/A')
expertise_areas = getattr(reviewer, 'expertise_areas', [])
reason = getattr(reviewer, 'reason', 'N/A')
relevance_score = getattr(reviewer, 'relevance_score', 0)
output_lines.append(f"审稿人 {i}
")
output_lines.append(f"姓名: {name}
")
output_lines.append(f"单位: {affiliation}
")
output_lines.append(f"邮箱: {email}
")
if expertise_areas:
output_lines.append(f"专业领域: {', '.join(expertise_areas)}
")
output_lines.append(f"推荐理由: {reason}
")
# 确保relevance_score是数字类型
try:
score_display = f"{float(relevance_score):.2f}"
except (ValueError, TypeError):
score_display = "0.00"
output_lines.append(f"匹配度: {score_display}
")
output_lines.append("
")
return "".join(output_lines)
def recommend_reviewers(paper_title: str, paper_abstract: str, paper_authors: str, paper_affiliations: str, reviewer_count) -> str:
"""推荐审稿人的主要函数"""
try:
# 更新状态
app_state.is_processing = True
app_state.last_error = None
# 确保reviewer_count是整数类型
try:
reviewer_count = int(reviewer_count)
except (ValueError, TypeError):
return "错误:推荐审稿人数量必须是有效的数字。"
# 验证输入
if not paper_title.strip() or not paper_abstract.strip():
return "错误:论文标题和摘要不能为空。"
if reviewer_count < 1 or reviewer_count > 10:
return "错误:推荐审稿人数量应在1-10之间。"
# 解析作者和单位
authors = [author.strip() for author in paper_authors.split(',') if author.strip()]
affiliations = [aff.strip() for aff in paper_affiliations.split(',') if aff.strip()]
# 创建论文信息对象
paper = PaperInfo(
title=paper_title.strip(),
abstract=paper_abstract.strip(),
keywords=[], # 不再使用关键词输入
authors=authors,
affiliations=affiliations
)
# 创建推荐请求
request = RecommendationRequest(
paper=paper,
reviewer_count=reviewer_count
)
app_state.current_request = request
# 开始计时
start_time = time.time()
# 初始化推荐系统
print("初始化推荐系统...")
# 使用OpenAlex检索器(提供更准确的引用量数据)
openalex_searcher = OpenAlexSearcher(limit=reviewer_count * 3)
dynamic_searcher = DynamicAcademicSearcher(openalex_searcher=openalex_searcher)
engine = LLMRecommendationEngine()
# 执行推荐流程
print("开始双数据源检索候选文献...")
# 设置年份过滤(例如:只检索近5年发表的论文)
years_after = 3 # 只检索近10年发表的论文
channel1_candidates, channel2_candidates, channel3_candidates = dynamic_searcher.search_with_dynamic_queries(
paper=paper,
num_reviewers=reviewer_count,
years_after=years_after
)
if not channel1_candidates and not channel2_candidates and not channel3_candidates:
return "未找到相关候选文献,请尝试调整关键词或查询条件。"
print(f"找到候选文献:{len(channel1_candidates)} 篇,开始分析...")
# 分析候选者(使用第一个通道的候选者)
found_reviewers = engine.analyze_candidates(paper, channel1_candidates, reviewer_count)
# 计算耗时
search_time = time.time() - start_time
# 创建响应
response = RecommendationResponse(
reviewers=found_reviewers[:reviewer_count],
search_time=search_time,
total_candidates=len(channel1_candidates) + len(channel2_candidates) + len(channel3_candidates),
success=True
)
app_state.current_response = response
app_state.is_processing = False
# 格式化输出
output = format_reviewers_output(response.reviewers)
return output
except Exception as e:
app_state.is_processing = False
app_state.last_error = str(e)
error_msg = f"推荐过程中发生错误: {str(e)}"
print(error_msg)
return error_msg
def clear_form():
"""清空表单"""
return "", "", "", "", "3"
# 构建Gradio界面
def create_interface():
"""创建Gradio界面"""
with gr.Blocks(css=css, title="审稿人推荐系统") as demo:
# 标题
gr.Markdown("# 审稿人推荐系统")
with gr.Row():
with gr.Column(scale=1):
# 输入区域
gr.Markdown("## 论文信息")
paper_title = gr.Textbox(
lines=2,
label="论文标题",
placeholder="请输入论文标题..."
)
paper_abstract = gr.Textbox(
lines=8,
label="论文摘要",
placeholder="请输入论文摘要..."
)
paper_authors = gr.Textbox(
lines=2,
label="作者姓名(用逗号分隔)",
placeholder="必填"
)
paper_affiliations = gr.Textbox(
lines=2,
label="作者所属单位(用逗号分隔)",
placeholder="必填"
)
reviewer_count = gr.Dropdown(
choices=["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"],
value="3",
label="推荐审稿人数量"
)
with gr.Row():
recommend_btn = gr.Button("开始推荐", variant="primary")
clear_btn = gr.Button("清空表单")
with gr.Column(scale=1):
# 输出区域
gr.Markdown("## 推荐结果")
output_text = gr.HTML(
value="请输入论文信息并点击'开始推荐'按钮
",
label="推荐结果"
)
# 事件处理
recommend_btn.click(
fn=recommend_reviewers,
inputs=[paper_title, paper_abstract, paper_authors, paper_affiliations, reviewer_count],
outputs=[output_text]
)
clear_btn.click(
fn=clear_form,
outputs=[paper_title, paper_abstract, paper_authors, paper_affiliations, reviewer_count]
)
return demo
if __name__ == "__main__":
# 创建并启动应用
demo = create_interface()
demo.launch(
server_name="0.0.0.0",
# share=True, # 在 Hugging Face Spaces 上不支持
debug=False, # 生产环境关闭调试模式
show_error=True,
quiet=False # 减少输出,加快启动
)