#!/usr/bin/env python3 """ Pure Flask API for Hugging Face Spaces No Gradio - Just Flask REST API Uses local GPU models for inference """ from flask import Flask, request, jsonify from flask_cors import CORS from flask_limiter import Limiter from flask_limiter.util import get_remote_address import logging import sys import os import asyncio from pathlib import Path from logging.handlers import RotatingFileHandler # Validate and set OMP_NUM_THREADS (must be valid integer) omp_threads = os.getenv('OMP_NUM_THREADS', '4') try: omp_int = int(omp_threads) if omp_int <= 0: omp_int = 4 logger_basic = logging.getLogger(__name__) logger_basic.warning("OMP_NUM_THREADS must be positive, defaulting to 4") os.environ['OMP_NUM_THREADS'] = str(omp_int) os.environ['MKL_NUM_THREADS'] = str(omp_int) except (ValueError, TypeError): os.environ['OMP_NUM_THREADS'] = '4' os.environ['MKL_NUM_THREADS'] = '4' logger_basic = logging.getLogger(__name__) logger_basic.warning("Invalid OMP_NUM_THREADS, defaulting to 4") # Setup secure logging log_dir = os.getenv('LOG_DIR', '/tmp/logs') try: os.makedirs(log_dir, exist_ok=True, mode=0o700) # Secure permissions except OSError: # Fallback if /tmp/logs not writable log_dir = os.path.expanduser('~/.logs') if os.path.expanduser('~') else '/tmp' os.makedirs(log_dir, exist_ok=True) # Configure logging with file rotation logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout) # Console output ] ) logger = logging.getLogger(__name__) # Add file handler with rotation (if log directory is writable) try: log_file = os.path.join(log_dir, 'app.log') file_handler = RotatingFileHandler( log_file, maxBytes=10*1024*1024, # 10MB backupCount=5 ) file_handler.setFormatter(logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' )) file_handler.setLevel(logging.INFO) logger.addHandler(file_handler) # Set secure file permissions (Unix only) if os.name != 'nt': # Not Windows try: os.chmod(log_file, 0o600) except OSError: pass # Ignore permission errors logger.info(f"Logging to file: {log_file}") except (OSError, PermissionError) as e: logger.warning(f"Could not create log file: {e}. Using console logging only.") # Sanitize sensitive data in logs def sanitize_log_data(data): """Remove sensitive information from log data""" if isinstance(data, dict): sanitized = {} for key, value in data.items(): if any(sensitive in key.lower() for sensitive in ['token', 'password', 'secret', 'key', 'auth', 'api_key']): sanitized[key] = '***REDACTED***' else: sanitized[key] = sanitize_log_data(value) if isinstance(value, (dict, list)) else value return sanitized elif isinstance(data, list): return [sanitize_log_data(item) for item in data] return data # Add project root to path project_root = Path(__file__).parent sys.path.insert(0, str(project_root)) # Create Flask app app = Flask(__name__) CORS(app) # Enable CORS for all origins # Initialize rate limiter (use Redis in production for distributed systems) rate_limit_enabled = os.getenv('RATE_LIMIT_ENABLED', 'true').lower() == 'true' if rate_limit_enabled: limiter = Limiter( app=app, key_func=get_remote_address, default_limits=["200 per day", "50 per hour", "10 per minute"], storage_uri="memory://", # Use Redis in production: "redis://localhost:6379" headers_enabled=True ) logger.info("Rate limiting enabled") else: limiter = None logger.warning("Rate limiting disabled - NOT recommended for production") # Add security headers middleware @app.after_request def set_security_headers(response): """ Add comprehensive security headers to all responses. Implements OWASP-recommended security headers for enhanced protection against common web vulnerabilities. """ # Essential security headers (already implemented) response.headers['X-Content-Type-Options'] = 'nosniff' response.headers['X-Frame-Options'] = 'DENY' response.headers['X-XSS-Protection'] = '1; mode=block' response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains' response.headers['Content-Security-Policy'] = "default-src 'self'" response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin' # Additional security headers (Phase 1 enhancement) response.headers['Permissions-Policy'] = 'geolocation=(), microphone=(), camera=()' response.headers['Cross-Origin-Resource-Policy'] = 'same-origin' response.headers['Cross-Origin-Opener-Policy'] = 'same-origin' response.headers['X-Permitted-Cross-Domain-Policies'] = 'none' return response # Global orchestrator orchestrator = None orchestrator_available = False initialization_attempted = False initialization_error = None def initialize_orchestrator(): """Initialize the AI orchestrator with ZeroGPU Chat API (RunPod)""" global orchestrator, orchestrator_available, initialization_attempted, initialization_error initialization_attempted = True initialization_error = None try: logger.info("=" * 60) logger.info("INITIALIZING AI ORCHESTRATOR (ZeroGPU Chat API - RunPod)") logger.info("=" * 60) from src.agents.intent_agent import create_intent_agent from src.agents.synthesis_agent import create_synthesis_agent from src.agents.safety_agent import create_safety_agent from src.agents.skills_identification_agent import create_skills_identification_agent from src.llm_router import LLMRouter from src.orchestrator_engine import MVPOrchestrator from src.context_manager import EfficientContextManager logger.info("✓ Imports successful") # Initialize LLM Router - ZeroGPU Chat API logger.info("Initializing LLM Router (ZeroGPU Chat API)...") try: # Always use ZeroGPU Chat API (local models disabled) llm_router = LLMRouter(hf_token=None, use_local_models=False) logger.info("✓ LLM Router initialized (ZeroGPU Chat API)") except Exception as e: logger.error(f"❌ Failed to initialize LLM Router: {e}", exc_info=True) logger.error("This is a critical error - ZeroGPU Chat API is required") logger.error("Please ensure ZEROGPU_BASE_URL, ZEROGPU_EMAIL, and ZEROGPU_PASSWORD are set in environment variables") raise logger.info("Initializing Agents...") try: agents = { 'intent_recognition': create_intent_agent(llm_router), 'response_synthesis': create_synthesis_agent(llm_router), 'safety_check': create_safety_agent(llm_router), 'skills_identification': create_skills_identification_agent(llm_router) } logger.info("✓ All agents initialized") except Exception as e: logger.error(f"❌ Failed to initialize agents: {e}", exc_info=True) raise logger.info("Initializing Context Manager...") try: context_manager = EfficientContextManager(llm_router=llm_router) logger.info("✓ Context Manager initialized") except Exception as e: logger.error(f"❌ Failed to initialize Context Manager: {e}", exc_info=True) raise logger.info("Initializing Orchestrator...") try: orchestrator = MVPOrchestrator(llm_router, context_manager, agents) logger.info("✓ Orchestrator initialized") except Exception as e: logger.error(f"❌ Failed to initialize Orchestrator: {e}", exc_info=True) raise orchestrator_available = True logger.info("=" * 60) logger.info("✓ AI ORCHESTRATOR READY") logger.info(" - ZeroGPU Chat API enabled") logger.info(" - MAX_WORKERS: 4") logger.info("=" * 60) return True except ValueError as e: # Handle configuration errors (e.g., missing ZeroGPU credentials) if "ZEROGPU" in str(e) or "required" in str(e).lower(): logger.error("=" * 60) logger.error("❌ CONFIGURATION ERROR") logger.error("=" * 60) logger.error(f"Error: {e}") logger.error("") logger.error("SOLUTION:") logger.error("1. Set ZEROGPU_BASE_URL in environment variables (e.g., http://your-pod-ip:8000)") logger.error("2. Set ZEROGPU_EMAIL in environment variables") logger.error("3. Set ZEROGPU_PASSWORD in environment variables") logger.error("4. Register your account first via the /register endpoint if needed") logger.error("=" * 60) orchestrator_available = False initialization_error = f"Configuration Error: {str(e)}" else: raise return False except Exception as e: logger.error("=" * 60) logger.error("❌ FAILED TO INITIALIZE ORCHESTRATOR") logger.error("=" * 60) logger.error(f"Error type: {type(e).__name__}") logger.error(f"Error message: {str(e)}") logger.error("=" * 60) logger.error("Full traceback:", exc_info=True) orchestrator_available = False initialization_error = f"{type(e).__name__}: {str(e)}" return False # Root endpoint @app.route('/', methods=['GET']) def root(): """API information""" return jsonify({ 'name': 'AI Assistant Flask API', 'version': '1.0', 'status': 'running', 'orchestrator_ready': orchestrator_available, 'features': { 'local_gpu_models': True, 'max_workers': 4, 'hardware': 'NVIDIA T4 Medium' }, 'endpoints': { 'health': 'GET /api/health', 'chat': 'POST /api/chat', 'initialize': 'POST /api/initialize', 'context_mode_get': 'GET /api/context/mode', 'context_mode_set': 'POST /api/context/mode' } }) # Health check @app.route('/api/health', methods=['GET']) def health_check(): """Health check endpoint with detailed diagnostics""" status = { 'status': 'healthy' if orchestrator_available else 'unhealthy', 'orchestrator_ready': orchestrator_available, 'initialization_attempted': initialization_attempted, } if not orchestrator_available: if initialization_error: status['error'] = initialization_error status['message'] = 'Initialization failed. Check logs for details.' elif initialization_attempted: status['message'] = 'Initialization completed but orchestrator not available' else: status['message'] = 'Initialization not yet attempted' status['help'] = 'Try POST /api/initialize to trigger initialization' return jsonify(status) # Chat endpoint @app.route('/api/chat', methods=['POST']) @limiter.limit("10 per minute") if limiter else lambda f: f # Rate limit: 10 requests per minute per IP def chat(): """ Process chat message POST /api/chat { "message": "user message", "history": [[user, assistant], ...], "session_id": "session-123", "user_id": "user-456" } Returns: { "success": true, "message": "AI response", "history": [...], "reasoning": {...}, "performance": {...} } """ try: data = request.get_json() if not data or 'message' not in data: return jsonify({ 'success': False, 'error': 'Message is required' }), 400 message = data['message'] # Input validation if not isinstance(message, str): return jsonify({ 'success': False, 'error': 'Message must be a string' }), 400 # Strip whitespace and validate message = message.strip() if not message: return jsonify({ 'success': False, 'error': 'Message cannot be empty' }), 400 # Length limit (allow larger inputs for complex queries) MAX_MESSAGE_LENGTH = 100000 # 100KB limit (increased from 10KB) if len(message) > MAX_MESSAGE_LENGTH: return jsonify({ 'success': False, 'error': f'Message too long. Maximum length is {MAX_MESSAGE_LENGTH} characters (approximately {MAX_MESSAGE_LENGTH // 4} tokens)' }), 400 history = data.get('history', []) session_id = data.get('session_id') user_id = data.get('user_id', 'anonymous') context_mode = data.get('context_mode') # Optional: 'fresh' or 'relevant' logger.info(f"Chat request - User: {user_id}, Session: {session_id}") logger.info(f"Message length: {len(message)} chars, preview: {message[:100]}...") if not orchestrator_available or orchestrator is None: logger.warning("Chat request received but orchestrator not ready") return jsonify({ 'success': False, 'error': 'Orchestrator not ready', 'message': 'AI system is initializing. Please try again in a moment.', 'help': 'If this persists, check logs for initialization errors or try POST /api/initialize' }), 503 # Process with orchestrator (async method) # Set user_id for session tracking if session_id: orchestrator.set_user_id(session_id, user_id) # Set context mode if provided if context_mode and hasattr(orchestrator.context_manager, 'set_context_mode'): if context_mode in ['fresh', 'relevant']: orchestrator.context_manager.set_context_mode(session_id, context_mode, user_id) logger.info(f"Context mode set to '{context_mode}' for session {session_id}") else: logger.warning(f"Invalid context_mode '{context_mode}', ignoring. Use 'fresh' or 'relevant'") # Run async process_request in event loop loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: result = loop.run_until_complete( orchestrator.process_request( session_id=session_id or f"session-{user_id}", user_input=message ) ) finally: loop.close() # Extract response if isinstance(result, dict): response_text = result.get('response', '') or result.get('final_response', '') reasoning = result.get('reasoning', {}) performance = result.get('performance', {}) # ENHANCED: Log performance metrics for debugging if performance: logger.info("=" * 60) logger.info("PERFORMANCE METRICS") logger.info("=" * 60) logger.info(f"Processing Time: {performance.get('processing_time', 0)}ms") logger.info(f"Tokens Used: {performance.get('tokens_used', 0)}") logger.info(f"Agents Used: {performance.get('agents_used', 0)}") logger.info(f"Confidence Score: {performance.get('confidence_score', 0)}%") agent_contribs = performance.get('agent_contributions', []) if agent_contribs: logger.info("Agent Contributions:") for contrib in agent_contribs: logger.info(f" - {contrib.get('agent', 'Unknown')}: {contrib.get('percentage', 0)}%") logger.info(f"Safety Score: {performance.get('safety_score', 0)}%") logger.info("=" * 60) else: logger.warning("⚠️ No performance metrics in response!") logger.debug(f"Result keys: {list(result.keys())}") logger.debug(f"Result metadata keys: {list(result.get('metadata', {}).keys())}") # Try to extract from metadata as fallback metadata = result.get('metadata', {}) if 'performance_metrics' in metadata: performance = metadata['performance_metrics'] logger.info("✓ Found performance metrics in metadata") else: response_text = str(result) reasoning = {} performance = { "processing_time": 0, "tokens_used": 0, "agents_used": 0, "confidence_score": 0, "agent_contributions": [], "safety_score": 80, "error": "Response format error" } updated_history = history + [[message, response_text]] logger.info(f"✓ Response generated (length: {len(response_text)})") return jsonify({ 'success': True, 'message': response_text, 'history': updated_history, 'reasoning': reasoning, 'performance': performance }) except Exception as e: logger.error(f"Chat error: {e}", exc_info=True) return jsonify({ 'success': False, 'error': str(e), 'message': 'Error processing your request. Please try again.' }), 500 # Manual initialization endpoint @app.route('/api/initialize', methods=['POST']) @limiter.limit("5 per minute") if limiter else lambda f: f # Rate limit: 5 requests per minute per IP def initialize(): """Manually trigger initialization""" success = initialize_orchestrator() if success: return jsonify({ 'success': True, 'message': 'Orchestrator initialized successfully' }) else: return jsonify({ 'success': False, 'message': 'Initialization failed. Check logs for details.' }), 500 # Context mode management endpoints @app.route('/api/context/mode', methods=['GET']) def get_context_mode(): """ Get current context mode for a session GET /api/context/mode?session_id=session-123 Returns: { "success": true, "session_id": "session-123", "context_mode": "fresh" | "relevant", "description": { "fresh": "No user context included - starts fresh each time", "relevant": "Only relevant user context included based on relevance classification" } } """ try: session_id = request.args.get('session_id') if not session_id: return jsonify({ 'success': False, 'error': 'session_id query parameter is required' }), 400 if not orchestrator_available or orchestrator is None: return jsonify({ 'success': False, 'error': 'Orchestrator not ready' }), 503 if not hasattr(orchestrator.context_manager, 'get_context_mode'): return jsonify({ 'success': False, 'error': 'Context mode not available' }), 503 context_mode = orchestrator.context_manager.get_context_mode(session_id) return jsonify({ 'success': True, 'session_id': session_id, 'context_mode': context_mode, 'description': { 'fresh': 'No user context included - starts fresh each time', 'relevant': 'Only relevant user context included based on relevance classification' } }) except Exception as e: logger.error(f"Get context mode error: {e}", exc_info=True) return jsonify({ 'success': False, 'error': str(e) }), 500 @app.route('/api/context/mode', methods=['POST']) def set_context_mode(): """ Set context mode for a session POST /api/context/mode { "session_id": "session-123", "mode": "fresh" | "relevant", "user_id": "user-456" (optional) } Returns: { "success": true, "session_id": "session-123", "context_mode": "fresh" | "relevant", "message": "Context mode set successfully" } """ try: data = request.get_json() if not data: return jsonify({ 'success': False, 'error': 'Request body is required' }), 400 session_id = data.get('session_id') mode = data.get('mode') user_id = data.get('user_id', 'anonymous') if not session_id: return jsonify({ 'success': False, 'error': 'session_id is required' }), 400 if not mode: return jsonify({ 'success': False, 'error': 'mode is required' }), 400 if mode not in ['fresh', 'relevant']: return jsonify({ 'success': False, 'error': "mode must be 'fresh' or 'relevant'" }), 400 if not orchestrator_available or orchestrator is None: return jsonify({ 'success': False, 'error': 'Orchestrator not ready' }), 503 if not hasattr(orchestrator.context_manager, 'set_context_mode'): return jsonify({ 'success': False, 'error': 'Context mode not available' }), 503 success = orchestrator.context_manager.set_context_mode(session_id, mode, user_id) if success: return jsonify({ 'success': True, 'session_id': session_id, 'context_mode': mode, 'message': 'Context mode set successfully' }) else: return jsonify({ 'success': False, 'error': 'Failed to set context mode' }), 500 except Exception as e: logger.error(f"Set context mode error: {e}", exc_info=True) return jsonify({ 'success': False, 'error': str(e) }), 500 # Initialize orchestrator on module import (for Gunicorn compatibility) # This ensures initialization happens even when running via Gunicorn logger.info("=" * 60) logger.info("FLASK API MODULE LOADED") logger.info("=" * 60) logger.info("Initializing orchestrator on module import...") # Use a flag to prevent multiple simultaneous initializations import threading _init_lock = threading.Lock() _init_started = False def _start_initialization(): """Start initialization in background thread to avoid blocking""" global _init_started with _init_lock: if _init_started: return _init_started = True def init_worker(): try: logger.info("Background initialization thread started") initialize_orchestrator() except Exception as e: logger.error(f"Background initialization failed: {e}", exc_info=True) # Start initialization in background thread init_thread = threading.Thread(target=init_worker, daemon=True, name="OrchInit") init_thread.start() logger.info("Initialization thread started (non-blocking)") # Start initialization when module is imported _start_initialization() # Initialize on startup (for direct execution) if __name__ == '__main__': logger.info("=" * 60) logger.info("STARTING PURE FLASK API (Direct Execution)") logger.info("=" * 60) # Wait a moment for background initialization if it hasn't completed import time if not orchestrator_available: logger.info("Waiting for background initialization to complete...") for i in range(30): # Wait up to 30 seconds if orchestrator_available: break time.sleep(1) if i % 5 == 0: logger.info(f"Still waiting... ({i}s)") if not orchestrator_available: logger.warning("Orchestrator not ready after wait, attempting direct initialization...") initialize_orchestrator() port = int(os.getenv('PORT', 7860)) logger.info(f"Starting Flask on port {port}") logger.info("Endpoints available:") logger.info(" GET /") logger.info(" GET /api/health") logger.info(" POST /api/chat") logger.info(" POST /api/initialize") logger.info(" GET /api/context/mode") logger.info(" POST /api/context/mode") logger.info("=" * 60) # Development mode only - Use Gunicorn for production logger.warning("⚠️ Using Flask development server - NOT for production!") logger.warning("⚠️ Use Gunicorn for production: gunicorn flask_api_standalone:app") logger.info("=" * 60) app.run( host='0.0.0.0', port=port, debug=False, threaded=True # Enable threading for concurrent requests )