JC321's picture
Upload news_quote_mcp.py
8a124cf verified
raw
history blame
28.3 kB
import os
import requests
import gradio as gr
from typing import Optional, List, Dict, Any
from datetime import datetime, timedelta
from mcp.server.fastmcp import FastMCP
# from dotenv import load_dotenv
# # 加载.env文件中的环境变量
# # 修复:使用更可靠的方式加载.env文件
# dotenv_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), '.env')
# if os.path.exists(dotenv_path):
# load_dotenv(dotenv_path)
# print(f"Loaded .env file from: {dotenv_path}")
# else:
# # 如果在当前目录找不到,尝试在项目根目录查找
# root_dotenv_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), '.env')
# if os.path.exists(root_dotenv_path):
# load_dotenv(root_dotenv_path)
# print(f"Loaded .env file from root: {root_dotenv_path}")
# else:
# print("No .env file found, checking environment variables...")
# Initialize FastMCP server with standard MCP protocol (JSON-RPC over SSE)
mcp = FastMCP("finnhub-market-info")
# Global variable to store API key
API_KEY = None
def set_api_key(key: str):
"""Set the Finnhub API key"""
global API_KEY
API_KEY = key
def get_api_key() -> Optional[str]:
"""Get the Finnhub API key"""
global API_KEY
# 修复:按优先级顺序获取API密钥
api_key = (
API_KEY or # 全局变量
os.getenv("FINNHUB_API_KEY") # or # 环境变量
# os.environ.get('FINNHUB_API_KEY') # 备用环境变量获取方式
)
# 添加调试信息
print(f"API Key sources:")
print(f" Global API_KEY: {API_KEY}")
print(f" os.getenv('FINNHUB_API_KEY'): {os.getenv('FINNHUB_API_KEY')}")
# print(f" os.environ.get('FINNHUB_API_KEY'): {os.environ.get('FINNHUB_API_KEY')}")
print(f" Final API key: {'*' * len(api_key) if api_key else 'None'}")
return api_key
def make_finnhub_request(endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Make a request to Finnhub API
Args:
endpoint: API endpoint path
params: Query parameters
Returns:
API response as dictionary
"""
api_key = get_api_key()
if not api_key:
return {"error": "API key not configured. Please set your Finnhub API key."}
params["token"] = api_key
base_url = "https://finnhub.io/api/v1"
url = f"{base_url}/{endpoint}"
try:
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {"error": f"API request failed: {str(e)}"}
@mcp.tool()
def get_quote(symbol: str) -> dict:
"""
Get real-time quote data for US stocks. Use this tool when you need current stock price
information and market performance metrics for any US-listed stock.
When to use:
- User asks "What's the current price of [stock]?"
- Need real-time stock quote data
- User mentions "stock price", "current value", "how is [stock] trading?"
- Want to check latest market price and daily changes
Examples:
- "What's Apple's stock price?" → get_quote(symbol="AAPL")
- "How is Tesla trading today?" → get_quote(symbol="TSLA")
- "Show me Microsoft's current quote" → get_quote(symbol="MSFT")
Args:
symbol: Stock ticker symbol (e.g., 'AAPL', 'MSFT', 'TSLA', 'GOOGL')
Returns:
dict: Real-time quote data containing:
- symbol: Stock ticker symbol
- current_price: Current trading price (c)
- change: Price change in dollars (d)
- percent_change: Price change in percentage (dp)
- high: Today's high price (h)
- low: Today's low price (l)
- open: Opening price (o)
- previous_close: Previous trading day's closing price (pc)
- timestamp: Quote timestamp
"""
result = make_finnhub_request("quote", {"symbol": symbol.upper()})
if "error" in result:
return {
"error": result["error"],
"symbol": symbol.upper()
}
# Return structured data
return {
"symbol": symbol.upper(),
"current_price": result.get('c'),
"change": result.get('d'),
"percent_change": result.get('dp'),
"high": result.get('h'),
"low": result.get('l'),
"open": result.get('o'),
"previous_close": result.get('pc'),
"timestamp": datetime.fromtimestamp(result.get('t', 0)).strftime('%Y-%m-%d %H:%M:%S') if result.get('t') else None
}
@mcp.tool()
def get_market_news(category: str = "general", min_id: int = 0) -> dict:
"""
Get latest market news across different categories. Use this tool when you need current market
news, trends, and developments in general markets, forex, cryptocurrency, or mergers.
When to use:
- User asks "What's the latest market news?"
- Need current financial news and market updates
- User mentions "news", "market trends", "what's happening in the market?"
- Want to get news for specific categories (forex, crypto, M&A)
Categories explained:
- general: General market news, stocks, economy, major companies
- forex: Foreign exchange and currency market news
- crypto: Cryptocurrency and blockchain news
- merger: Mergers & acquisitions, corporate deals
Examples:
- "What's the latest market news?" → get_market_news(category="general")
- "Show me crypto news" → get_market_news(category="crypto")
- "Any forex updates?" → get_market_news(category="forex")
- "Recent merger news" → get_market_news(category="merger")
Args:
category: News category - "general", "forex", "crypto", or "merger" (default: "general")
min_id: Minimum news ID for pagination (default: 0, use 0 to get latest news)
Returns:
dict: Market news data containing:
- category: News category requested
- total_articles: Total number of articles returned
- articles: List of news articles (max 10), each with:
* id: Article ID
* headline: News headline
* summary: Brief summary of the article
* source: News source
* url: Link to full article
* published: Publication timestamp
* image: Article image URL (if available)
"""
params = {"category": category}
if min_id > 0:
params["minId"] = min_id # pyright: ignore[reportArgumentType]
result = make_finnhub_request("news", params)
if isinstance(result, dict) and "error" in result:
return {
"error": result["error"],
"category": category
}
if not result or len(result) == 0:
return {
"category": category,
"total_articles": 0,
"articles": [],
"message": "No news articles found for this category"
}
# Format the news articles
articles = []
for article in result[:10]: # Limit to 10 articles # pyright: ignore[reportArgumentType]
articles.append({
"id": article.get('id'),
"headline": article.get('headline', 'No headline'),
"summary": article.get('summary', ''),
"source": article.get('source', 'Unknown'),
"url": article.get('url'),
"published": datetime.fromtimestamp(article.get('datetime', 0)).strftime('%Y-%m-%d %H:%M:%S') if article.get('datetime') else None,
"image": article.get('image')
})
return {
"category": category,
"total_articles": len(articles),
"articles": articles
}
@mcp.tool()
def get_company_news(symbol: str, from_date: Optional[str] = None, to_date: Optional[str] = None) -> dict:
"""
Get latest news for a specific company by stock symbol. This endpoint provides company-specific
news, press releases, and announcements. Only available for North American companies.
When to use:
- User asks about a specific company's news (e.g., "Apple news", "Tesla updates")
- Need company-specific announcements or press releases
- User mentions "[company name] news", "recent [company] developments"
- Want to filter news by date range
Date range tips:
- Default: Last 7 days if no dates specified
- Can go back up to several years
- Use YYYY-MM-DD format (e.g., "2024-01-01")
Examples:
- "What's the latest Apple news?" → get_company_news(symbol="AAPL")
- "Tesla news from last month" → get_company_news(symbol="TSLA", from_date="2024-10-01", to_date="2024-10-31")
- "Microsoft announcements this week" → get_company_news(symbol="MSFT")
- "Show me Amazon news from January 2024" → get_company_news(symbol="AMZN", from_date="2024-01-01", to_date="2024-01-31")
Args:
symbol: Company stock ticker symbol (e.g., 'AAPL', 'MSFT', 'TSLA', 'GOOGL')
Must be a North American (US/Canada) listed company
from_date: Start date in YYYY-MM-DD format (default: 7 days ago)
to_date: End date in YYYY-MM-DD format (default: today)
Returns:
dict: Company news data containing:
- symbol: Stock ticker symbol
- from_date: Start date of news search
- to_date: End date of news search
- total_articles: Number of articles found
- articles: List of news articles (max 10), each with:
* id: Article ID
* headline: News headline
* summary: Article summary
* source: News source
* url: Link to full article
* published: Publication date and time
* image: Article image URL (if available)
* related_symbols: Other stock symbols mentioned
"""
# Set default dates if not provided
if not to_date:
to_date = datetime.now().strftime('%Y-%m-%d')
if not from_date:
from_date = (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d')
params = {
"symbol": symbol.upper(),
"from": from_date,
"to": to_date
}
result = make_finnhub_request("company-news", params)
if isinstance(result, dict) and "error" in result:
return {
"error": result["error"],
"symbol": symbol.upper(),
"from_date": from_date,
"to_date": to_date
}
if not result or len(result) == 0:
# Provide helpful suggestions based on the symbol
suggestion = "Try expanding the date range or check if the symbol is correct."
if symbol.upper() in ["BABA", "BIDU", "JD", "PDD", "NIO"]:
suggestion = "Note: Chinese ADRs may have limited news coverage. Try US companies like AAPL, MSFT, TSLA, or GOOGL for better results."
return {
"symbol": symbol.upper(),
"from_date": from_date,
"to_date": to_date,
"total_articles": 0,
"articles": [],
"message": f"No news articles found for {symbol.upper()} between {from_date} and {to_date}.",
"suggestion": suggestion,
"note": "Company news is only available for North American companies. Some companies may have limited news coverage during certain periods."
}
# Format the news articles
articles = []
for article in result[:10]: # Limit to 10 articles # pyright: ignore[reportArgumentType]
articles.append({
"id": article.get('id'),
"headline": article.get('headline', 'No headline'),
"summary": article.get('summary', ''),
"source": article.get('source', 'Unknown'),
"url": article.get('url'),
"published": datetime.fromtimestamp(article.get('datetime', 0)).strftime('%Y-%m-%d %H:%M:%S') if article.get('datetime') else None,
"image": article.get('image'),
"related_symbols": article.get('related', [])
})
return {
"symbol": symbol.upper(),
"from_date": from_date,
"to_date": to_date,
"total_articles": len(articles),
"articles": articles
}
# Gradio Interface Functions
def configure_api_key(api_key: str) -> str:
"""Configure the Finnhub API key"""
if not api_key or api_key.strip() == "":
return "❌ Please provide a valid API key"
set_api_key(api_key.strip())
return "✅ API Key configured successfully! You can now use the MCP tools."
def test_quote_tool(symbol: str) -> str:
"""Test the Quote tool"""
if not symbol or symbol.strip() == "":
return "❌ Please provide a stock symbol"
result = get_quote(symbol.strip())
if "error" in result:
return f"❌ Error: {result['error']}"
# Format for display
output = f"""📊 Real-time Quote for {result['symbol']}
Current Price: ${result.get('current_price', 'N/A')}
Change: ${result.get('change', 'N/A')}
Percent Change: {result.get('percent_change', 'N/A')}%
High: ${result.get('high', 'N/A')}
Low: ${result.get('low', 'N/A')}
Open: ${result.get('open', 'N/A')}
Previous Close: ${result.get('previous_close', 'N/A')}
Timestamp: {result.get('timestamp', 'N/A')}
"""
return output.strip()
def test_market_news_tool(category: str) -> str:
"""Test the Market News tool"""
result = get_market_news(category)
if "error" in result:
return f"❌ Error: {result['error']}"
if result.get('total_articles', 0) == 0:
return result.get('message', 'No news articles found')
# Format for display
output = f"📰 Latest Market News ({result['category']})\n"
output += f"Total Articles: {result['total_articles']}\n\n"
for idx, article in enumerate(result['articles'], 1):
output += f"{idx}. {article['headline']}\n"
output += f" Source: {article['source']}\n"
if article.get('summary'):
summary = article['summary'][:200] + "..." if len(article['summary']) > 200 else article['summary']
output += f" Summary: {summary}\n"
output += f" URL: {article.get('url', 'N/A')}\n"
output += f" Published: {article.get('published', 'N/A')}\n\n"
return output.strip()
def test_company_news_tool(symbol: str, from_date: str, to_date: str) -> str:
"""Test the Company News tool"""
if not symbol or symbol.strip() == "":
return "❌ Please provide a stock symbol"
# Use None if dates are empty
from_d = from_date.strip() if from_date and from_date.strip() else None
to_d = to_date.strip() if to_date and to_date.strip() else None
result = get_company_news(symbol.strip(), from_d, to_d)
if "error" in result:
return f"❌ Error: {result['error']}"
if result.get('total_articles', 0) == 0:
# Show detailed message with suggestions
output = f"⚠️ {result.get('message', 'No news articles found')}\n\n"
if 'suggestion' in result:
output += f"💡 {result['suggestion']}\n\n"
if 'note' in result:
output += f"📝 {result['note']}"
return output
# Format for display
output = f"📰 Company News for {result['symbol']}\n"
output += f"Period: {result['from_date']} to {result['to_date']}\n"
output += f"Total Articles: {result['total_articles']}\n\n"
for idx, article in enumerate(result['articles'], 1):
output += f"{idx}. {article['headline']}\n"
output += f" Source: {article['source']}\n"
if article.get('summary'):
summary = article['summary'][:200] + "..." if len(article['summary']) > 200 else article['summary']
output += f" Summary: {summary}\n"
output += f" URL: {article.get('url', 'N/A')}\n"
output += f" Published: {article.get('published', 'N/A')}\n\n"
return output.strip()
# def check_server_health() -> str:
# """Check server health status and API connectivity"""
# status_parts = []
# # Check API key configuration
# api_key = get_api_key()
# if not api_key:
# status_parts.append("❌ API Key: Not configured")
# status_parts.append("\n⚠️ Please configure your Finnhub API key in the Configuration tab or set FINNHUB_API_KEY environment variable.")
# return "\n".join(status_parts)
# else:
# status_parts.append("✅ API Key: Configured")
# # Test Finnhub API connectivity
# status_parts.append("\n🔍 Testing Finnhub API connectivity...")
# try:
# test_result = make_finnhub_request("quote", {"symbol": "AAPL"})
# if "error" in test_result:
# status_parts.append(f"❌ Finnhub API: {test_result['error']}")
# status_parts.append("\n⚠️ Possible issues:")
# status_parts.append(" • Invalid API key")
# status_parts.append(" • Network connectivity problem")
# status_parts.append(" • Finnhub API service is down")
# status_parts.append(" • Rate limit exceeded")
# else:
# status_parts.append(f"✅ Finnhub API: Online and reachable")
# status_parts.append(f" Test query result: AAPL @ ${test_result.get('c', 'N/A')}")
# except Exception as e:
# status_parts.append(f"❌ Finnhub API: Connection failed")
# status_parts.append(f" Error: {str(e)}")
# return "\n".join(status_parts)
# # Load HTML content from file
# def load_html_content():
# """Load the HTML welcome page"""
# try:
# # 优先使用精简版
# if os.path.exists('index_simple.html'):
# with open('index_simple.html', 'r', encoding='utf-8') as f:
# return f.read()
# # 后备:Gradio 优化版
# elif os.path.exists('index_gradio.html'):
# with open('index_gradio.html', 'r', encoding='utf-8') as f:
# return f.read()
# # 后备:完整版
# elif os.path.exists('index.html'):
# with open('index.html', 'r', encoding='utf-8') as f:
# return f.read()
# else:
# return "<h1>📈 Finnhub MCP Server</h1><p>欢迎使用</p>"
# except Exception as e:
# print(f"Error loading HTML: {e}")
# return "<h1>📈 Finnhub MCP Server</h1><p>欢迎使用</p>"
# # Create Gradio Interface
# with gr.Blocks(title="Finnhub Market Info MCP Server") as demo:
# # Welcome tab with HTML content
# with gr.Tab("🏠 Home"):
# gr.HTML(load_html_content())
# # Health Check tab
# with gr.Tab("🩺 Health Check"):
# gr.Markdown("### Server Health Status")
# gr.Markdown("Check if the MCP server and Finnhub API are working properly.")
# health_check_btn = gr.Button("Check Health", variant="primary", size="lg")
# health_status_output = gr.Textbox(
# label="Health Status",
# lines=15,
# interactive=False,
# placeholder="Click 'Check Health' to test the server and API connectivity..."
# )
# health_check_btn.click(
# fn=check_server_health,
# inputs=[],
# outputs=[health_status_output]
# )
# gr.Markdown("""
# ### What this checks:
# 1. **API Key Configuration** - Verifies if Finnhub API key is set
# 2. **Finnhub API Connectivity** - Tests connection to Finnhub servers
# 3. **API Response** - Validates that the API returns data correctly
# ### Troubleshooting:
# If health check fails:
# - ❌ **API Key Not Configured**: Go to API Key Config tab and enter your key
# - ❌ **Invalid API Key**: Check if your key is correct at [finnhub.io](https://finnhub.io/dashboard)
# - ❌ **Network Error**: Check your internet connection
# - ❌ **Rate Limit**: Wait a minute before trying again (free tier: 60 calls/min)
# """)
# with gr.Tab("📊 Quote Tool"):
# gr.Markdown("### Test Real-time Stock Quote")
# gr.Markdown("Get real-time quote data for US stocks. Example symbols: AAPL, MSFT, TSLA, GOOGL")
# with gr.Row():
# quote_symbol = gr.Textbox(
# label="Stock Symbol",
# placeholder="AAPL",
# value="AAPL"
# )
# quote_btn = gr.Button("Get Quote", variant="primary")
# quote_output = gr.Textbox(label="Quote Data", lines=12, interactive=False)
# quote_btn.click(
# fn=test_quote_tool,
# inputs=[quote_symbol],
# outputs=[quote_output]
# )
# with gr.Tab("📰 Market News Tool"):
# gr.Markdown("### Test Market News")
# gr.Markdown("Get latest market news by category")
# with gr.Row():
# news_category = gr.Dropdown(
# choices=["general", "forex", "crypto", "merger"],
# label="News Category",
# value="general"
# )
# market_news_btn = gr.Button("Get Market News", variant="primary")
# market_news_output = gr.Textbox(label="Market News", lines=15, interactive=False)
# market_news_btn.click(
# fn=test_market_news_tool,
# inputs=[news_category],
# outputs=[market_news_output]
# )
# with gr.Tab("🏢 Company News Tool"):
# gr.Markdown("### Test Company News")
# gr.Markdown("Get latest company news by symbol. Only available for North American companies.")
# with gr.Row():
# company_symbol = gr.Textbox(
# label="Stock Symbol",
# placeholder="AAPL",
# value="AAPL"
# )
# with gr.Row():
# company_from_date = gr.Textbox(
# label="From Date (YYYY-MM-DD)",
# placeholder="Leave empty for 7 days ago",
# value=""
# )
# company_to_date = gr.Textbox(
# label="To Date (YYYY-MM-DD)",
# placeholder="Leave empty for today",
# value=""
# )
# company_news_btn = gr.Button("Get Company News", variant="primary")
# company_news_output = gr.Textbox(label="Company News", lines=15, interactive=False)
# company_news_btn.click(
# fn=test_company_news_tool,
# inputs=[company_symbol, company_from_date, company_to_date],
# outputs=[company_news_output]
# )
# with gr.Tab("ℹ️ MCP Connection"):
# gr.Markdown("""
# ### Connect via MCP Client
# This server implements the **standard MCP protocol (JSON-RPC over SSE)**.
# #### 📥 Step 1: Clone Repository
# ```bash
# git clone https://huggingface.co/spaces/JC321/MarketandStockMCP
# cd MarketandStockMCP
# pip install -r requirements.txt
# ```
# #### 🔑 Step 2: Get API Key
# Get your free Finnhub API key at [finnhub.io/register](https://finnhub.io/register)
# #### ⚙️ Step 3: Configure MCP Client
# **For Claude Desktop:**
# Edit config file:
# - macOS: `~/Library/Application Support/Claude/claude_desktop_config.json`
# - Windows: `%APPDATA%\Claude\claude_desktop_config.json`
# Add this configuration:
# ```json
# {
# "mcpServers": {
# "finnhub": {
# "command": "python",
# "args": ["C:\\path\\to\\MarketandStockMCP\\mcp_server.py"],
# "env": {
# "FINNHUB_API_KEY": "your_finnhub_api_key_here"
# }
# }
# }
# }
# ```
# **For Cursor IDE:**
# Use the same JSON configuration in MCP settings.
# #### 🚀 Step 4: Test
# Restart your MCP client and try:
# - "What's the current price of Apple stock?"
# - "Show me the latest market news"
# - "Get Tesla company news from last week"
# #### 🛠️ Available Tools:
# - `get_quote(symbol: str)` - Real-time stock quote
# - `get_market_news(category: str, min_id: int)` - Market news by category
# - `get_company_news(symbol: str, from_date: str, to_date: str)` - Company-specific news
# #### 📖 Documentation:
# See [README.md](https://huggingface.co/spaces/JC321/MarketandStockMCP/blob/main/README.md) for detailed instructions.
# """)
# # API Key Config tab (moved to last)
# with gr.Tab("🔑 API Key Config"):
# gr.Markdown("### Configure Finnhub API Key for Web Testing")
# gr.Markdown("""
# 💡 **Note:** This is only for testing tools in the web UI above.
# For MCP client use, configure the API key in your MCP client settings (see "MCP Connection" tab).
# """)
# with gr.Row():
# api_key_input = gr.Textbox(
# label="Finnhub API Key",
# placeholder="Enter your Finnhub API key here...",
# type="password"
# )
# config_btn = gr.Button("Configure", variant="primary")
# config_output = gr.Textbox(label="Status", interactive=False)
# config_btn.click(
# fn=configure_api_key,
# inputs=[api_key_input],
# outputs=[config_output]
# )
# if __name__ == "__main__":
# import sys
# # Check for API key in environment variable
# env_api_key = os.getenv("FINNHUB_API_KEY")
# if env_api_key:
# set_api_key(env_api_key)
# print("✅ API Key loaded from environment variable")
# # Set port from environment (HF Space sets PORT=7860)
# port = int(os.getenv("PORT", "7870"))
# host = os.getenv("HOST", "0.0.0.0")
# # Check if running as MCP server or Gradio UI
# if "--mcp" in sys.argv or os.getenv("RUN_MCP_SERVER") == "true":
# # Run as MCP server (standalone mode)
# print("▶️ Starting MCP Server (standalone mode)...")
# mcp.run(transport="sse")
# else:
# # Launch Gradio interface
# print("▶️ Starting Gradio Interface...")
# print("🌐 Gradio UI available at: /")
# print("📡 MCP tools available through Gradio tabs")
# demo.launch(
# server_name=host,
# server_port=port,
# share=False
# )
if __name__ == "__main__":
import sys
# Set port from environment
port = int(os.getenv("PORT", "7870"))
host = os.getenv("HOST", "0.0.0.0")
# Check for API key in environment variable
env_api_key = os.getenv("FINNHUB_API_KEY")
if env_api_key:
set_api_key(env_api_key)
print("✅ API Key loaded from environment variable")
# Run as MCP server (standalone mode for chat_direct.py)
print("▶️ Starting MarketandStockMCP Server...")
print(f"📡 MCP server will listen on {host}:{port}")
print("✅ Available tools: get_quote, get_market_news, get_company_news")
print(f"🔗 MCP endpoint: http://{host}:{port}/mcp")
# ✅ Monkeypatch uvicorn.Config to use our port
import uvicorn
original_config_init = uvicorn.Config.__init__
def patched_init(self, *args, **kwargs):
kwargs['host'] = host
kwargs['port'] = port
return original_config_init(self, *args, **kwargs)
uvicorn.Config.__init__ = patched_init
# ✅ Run FastMCP with HTTP transport (stateless mode)
# HTTP transport provides /mcp endpoint for JSON-RPC POST requests
mcp.run(transport="http")