from fastapi import FastAPI from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import os import logging # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) from app.intent_parser import parser from app.forecasting import forecast_demand from app.data_loader import loader from app.config import DEFAULT_HORIZON from app.llm_engine import llm from app.pdf_generator import generate_forecast_pdf, generate_general_pdf app = FastAPI(title="BMS AI Assistant") # CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Mount Static Files # We mount this to serve assets like CSS/JS if they are referenced in the HTML static_dir = "/app/static" if not os.path.exists(static_dir): # Fallback for local testing static_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "static") if os.path.exists(static_dir): app.mount("/static", StaticFiles(directory=static_dir), name="static") logger.info(f"Mounted static files from: {static_dir}") else: logger.warning(f"Static directory not found at {static_dir}") class ChatRequest(BaseModel): message: str # Root Endpoint @app.get("/", response_class=HTMLResponse) async def read_root(): # Logic: Read the file content and return it. possible_paths = [ "/app/static/index.html", # Docker absolute path os.path.join(os.path.dirname(os.path.dirname(__file__)), "static", "index.html"), # Local relative "static/index.html" # Simple relative ] for path in possible_paths: if os.path.exists(path): logger.info(f"Serving HTML from: {path}") with open(path, "r", encoding="utf-8") as f: return HTMLResponse(content=f.read()) logger.error("Could not find index.html in any known location.") return HTMLResponse(content="

Error: index.html not found

", status_code=500) @app.get("/api/health") async def health_check(): return {"status": "ok", "message": "BMS AI Assistant is running"} # Chat Endpoint chat_context = { "last_forecast": None, "last_answer": None, "history": [] } @app.post("/api/chat") async def chat_endpoint(request: ChatRequest): user_text = request.message # 0. Check State (Conversational Flow) if chat_context.get("state") == "waiting_for_location": # The user's message is treated as the location location = user_text items = chat_context.get("pending_items", []) # Create ONE order for all items req_id, messages = loader.create_requisition(items, location) # Clear state chat_context["state"] = None chat_context["pending_items"] = [] msg = f"I've created order **{req_id}** for **{location}** containing all your items.
Status: **Created**" if messages: msg += "

" + "
".join(messages) return JSONResponse(content={ "intent": "create_requisition", "answer": f"{msg}\n\n**Order Details:**\n- Priority: Normal\n- Shipping: Ground\n- Terms: Net 30" return JSONResponse(content={ "intent": "create_requisition", "answer": f"{msg}\n\n**Order Details:**\n- Priority: Normal\n- Shipping: Ground\n- Terms: Net 30" }) elif chat_context.get("state") == "waiting_for_item_code": # The user's message is likely the item code # Try to extract item code from the text parsed_check = parser.parse(user_text) item_code = parsed_check.get("item_code") if item_code: # We have the item code, now execute the pending intent pending_intent = chat_context.get("pending_intent") # Clear state chat_context["state"] = None chat_context["pending_intent"] = None # Inject the item code into the current request processing # We can do this by modifying user_text or just letting the flow continue # but forcing the intent and item_code. # A cleaner way is to construct a mock parsed object, but we are about to parse again below. # Let's just update the history and let the logic below handle it, # BUT we need to ensure the intent is preserved. # Actually, the best way is to override the parsed result below. # Let's store it in a temporary variable to override later. override_intent = pending_intent override_item_code = item_code else: # Still no item code found return JSONResponse(content={ "intent": "general_chat", "answer": "I still didn't catch the item code. Please specify it like 'BMS0001'." }) else: override_intent = None override_item_code = None # Add user message to history (Global tracking) chat_context["history"].append({"role": "user", "content": user_text}) try: parsed = parser.parse(user_text) # Override if we came from a waiting state if 'override_intent' in locals() and override_intent: intent = override_intent item_code = override_item_code # Location/Horizon might need to be preserved or extracted location = parsed["location"] horizon = parsed["horizon_days"] else: intent = parsed["intent"] item_code = parsed["item_code"] location = parsed["location"] horizon = parsed["horizon_days"] response_data = { "intent": intent, "answer": "", "forecast": [] } # Context Carry-over Logic if not item_code and chat_context.get("last_context"): last_ctx = chat_context["last_context"] # Only carry over if it makes sense (e.g., user is asking for details about the same item) # For now, we'll apply it broadly but can refine if needed. if last_ctx.get("item_code"): item_code = last_ctx["item_code"] # Also carry over location if relevant and missing if not location and last_ctx.get("location"): location = last_ctx["location"] if intent == "demand_forecast": if not item_code: response_data["answer"] = "I can help with forecasting, but I need to know which item you are interested in." else: if not horizon: horizon = DEFAULT_HORIZON forecast = forecast_demand(item_code, horizon, location) response_data["forecast"] = forecast chat_context["last_context"] = {"type": "forecast", "item_code": item_code, "data": forecast, "location": location} total = sum(d['qty'] for d in forecast) if forecast else 0 response_data["answer"] = f"Forecast for {item_code} over next {horizon} days is {total} units." elif intent == "list_items": if location: items = loader.get_items_by_location(location) if not items: response_data["answer"] = f"No items found in **{location}**." else: msg = f"**Items available in {location}:**\n" for item in items[:10]: msg += f"- {item['item_code']}: {item['description']}\n" if len(items) > 10: msg += f"\n*(Showing 10 of {len(items)} items)*" response_data["answer"] = msg else: items = loader.get_items() msg = "**Available Items:**\n" for item in items[:10]: msg += f"- {item['item_code']}: {item['description']}\n" response_data["answer"] = msg elif intent == "list_locations": # Hardcoded for now based on the requested data change locations = ["USA", "CANADA", "CHICAGO RDC", "TORONTO BRANCH", "NEW YORK BRANCH", "LOS ANGELES RDC", "DALLAS RDC"] msg = "**Available Locations:**\n" for loc in locations: msg += f"- {loc}\n" response_data["answer"] = msg elif intent == "check_inventory": if not item_code: response_data["answer"] = "Please specify an item code to check inventory for." else: inv_data = loader.get_inventory(item_code, location) if not inv_data: response_data["answer"] = f"No inventory data found for {item_code}." else: msg = f"**Inventory Status for {item_code}:**\n" for record in inv_data: msg += f"- **{record['region']}**: {record['qty_on_hand']} units ({record['status']})\n" # User requested to hide Bin/Lot info in the main chat response to keep it clean # if record.get('bin_location'): # msg += f" - Bin: {record['bin_location']} | Lot: {record['lot_number']}\n" # msg += f" - Expiry: {record['expiry_date']} | Cost: ${record['unit_cost']}\n" response_data["answer"] = msg elif intent == "create_requisition": # Check for multi-items first items_to_order = parsed["extra"].get("items", []) # If no multi-items found, try single item if not items_to_order and item_code: qty = parsed["extra"].get("qty", 1) items_to_order.append({"item_code": item_code, "qty": qty}) if not items_to_order: response_data["answer"] = "I can help create an order, but I need to know which items and quantities. (e.g., 'Order 5 of BMS0001')" elif not location: # Missing Location -> Enter State chat_context["state"] = "waiting_for_location" chat_context["pending_items"] = items_to_order # More natural prompt response_data["answer"] = "I can certainly help with that order. Could you please specify the destination location? (e.g., Chicago RDC)" else: # Have everything -> Create Order req_id, messages = loader.create_requisition(items_to_order, location) msg = f"Order **{req_id}** has been created for **{location}**.
Status: **Created**" if messages: msg += "

" + "
".join(messages) response_data["answer"] = msg elif intent == "competitor_analysis": if not item_code: response_data["answer"] = "Please specify an item code for competitor analysis." else: analysis = loader.get_competitor_analysis(item_code) if not analysis: response_data["answer"] = f"No competitor data found for {item_code}." else: # Calculate Expert Insights latest_year = analysis[-1] price_gap = ((latest_year['our_price'] - latest_year['competitor_price']) / latest_year['competitor_price']) * 100 insight_text = "" if price_gap > 0: insight_text = f"**Expert Insight:** We are currently priced **{price_gap:.1f}% higher** than the competition. " if latest_year['sales_qty'] > analysis[-2]['sales_qty']: insight_text += "However, our sales volume is still growing, suggesting strong brand loyalty or superior product quality." else: insight_text += "This premium might be affecting our sales growth. Consider a promotional campaign." else: insight_text = f"**Expert Insight:** We are priced **{abs(price_gap):.1f}% lower** than the competition. " insight_text += "This competitive pricing is likely driving our volume growth." # Format as Table msg = f"**Competitor Analysis for {item_code} (Last 5 Years):**

" msg += "" msg += "" for row in analysis: msg += f"" msg += f"" msg += f"" msg += f"" msg += "
YearCompetitor PriceOur PriceSales Qty
{row['year']}${row['competitor_price']:.2f}${row['our_price']:.2f}{row['sales_qty']}

" msg += f"
{insight_text}
" response_data["answer"] = msg # Save context for report generation chat_context["last_context"] = { "type": "competitor", "item_code": item_code, "data": analysis, "insights": insight_text } elif intent == "market_share": if not item_code: response_data["answer"] = "Please specify an item code to see market share (e.g., 'Market share for BMS0001')." else: share_data = loader.get_market_share(item_code, location) if not share_data: response_data["answer"] = f"No market share data available for {item_code}." else: msg = f"**Market Share Analysis for {item_code}**" if location: msg += f" in **{location}**" msg += ":

" msg += "" msg += "" for row in share_data: is_us = "Cummins" in row['competitor_name'] style = "font-weight:bold; color:#e53e3e;" if is_us else "" msg += f"" msg += f"" msg += f"" msg += "
CompetitorVolumeShare %
{row['competitor_name']}{row['qty']}{row['share']}%
" # Insight our_share = next((x['share'] for x in share_data if "Cummins" in x['competitor_name']), 0) leader = share_data[0] if "Cummins" in leader['competitor_name']: msg += f"
We are the market leader with {our_share}% share!" else: msg += f"
{leader['competitor_name']} is leading with {leader['share']}%. We are at {our_share}%." response_data["answer"] = msg elif intent == "lost_sales": loss_data = loader.get_lost_sales_summary(item_code, location) if not loss_data: response_data["answer"] = "No lost sales records found matching your criteria." else: msg = "**Lost Sales Analysis**" if item_code: msg += f" for **{item_code}**" if location: msg += f" in **{location}**" msg += ":

" msg += "" msg += "" total_rev = 0 for row in sorted(loss_data, key=lambda x: x['lost_qty'], reverse=True): msg += f"" msg += f"" msg += f"" total_rev += row['estimated_revenue_lost'] msg += "
Primary ReasonLost QtyEst. Revenue Lost
{row['reason']}{row['lost_qty']}${row['estimated_revenue_lost']:,.2f}
" msg += f"
Total Estimated Loss: ${total_rev:,.2f}" response_data["answer"] = msg # Save context for report generation chat_context["last_context"] = { "type": "lost_sales", "item_code": item_code, "data": loss_data, "location": location } elif intent == "competitor_trend": if not item_code: response_data["answer"] = "Please specify an item code to see sales trends." else: trend_data = loader.get_competitor_trend(item_code) if not trend_data: response_data["answer"] = f"No trend data found for {item_code}." else: msg = f"**Sales Trend (Qty) for {item_code}:**

" msg += "" # Dynamic headers based on columns (Year + Competitors) headers = list(trend_data[0].keys()) # Ensure Year is first if 'year' in headers: headers.remove('year') headers.insert(0, 'year') msg += "" for h in headers: msg += f"" msg += "" for row in trend_data: msg += "" for h in headers: val = row.get(h, 0) # Format numbers if isinstance(val, (int, float)) and h != 'year': val = int(val) msg += f"" msg += "" msg += "
{h.replace('year', 'Year')}
{val}
" response_data["answer"] = msg elif intent == "top_selling": top_items = loader.get_top_selling_items() if not top_items: response_data["answer"] = "No sales data available to determine top selling items." else: msg = "**Top Selling Items (All Time):**\n" for i, item in enumerate(top_items, 1): desc = item.get('description', 'N/A') msg += f"{i}. **{item['item_code']}** ({desc}): {item['quantity']} units sold\n" response_data["answer"] = msg elif intent == "order_status": req_id = parsed["extra"].get("req_id") if not req_id: response_data["answer"] = "Please provide the Order ID (e.g., 'Check status of REQ1234')." else: status = loader.get_order_status(req_id) if not status: response_data["answer"] = f"Order {req_id} not found." else: response_data["answer"] = f"**Order Status for {req_id}:**
Item: {status['item_code']}
Qty: {status['qty']}
Location: {status.get('location', 'Unknown')}
Status: **{status['status']}**" elif intent == "item_details": if not item_code: chat_context["state"] = "waiting_for_item_code" chat_context["pending_intent"] = "item_details" response_data["answer"] = "Please specify an item code to get details." else: item = loader.get_item(item_code) if not item: response_data["answer"] = f"Item {item_code} not found in our catalog." else: response_data["answer"] = f"**Item Details for {item_code}:**
Description: {item['description']}
List Price: ${item['list_price']}
UOM: {item['uom']}" elif intent == "supplier_info": if not item_code: response_data["answer"] = "Please specify an item code to find supplier information." else: suppliers = loader.get_supplier(item_code) if not suppliers: response_data["answer"] = f"No supplier information found for {item_code}." else: msg = f"**Supplier Information for {item_code}:**
" for sup in suppliers: msg += f"- **{sup['supplier_name']}** (Lead Time: {sup['lead_time_days']} days)
Contact: {sup['contact_email']}
" response_data["answer"] = msg elif intent == "generate_report": # Check if we have a valid context if chat_context.get("last_context") and chat_context["last_context"].get("item_code"): ctx = chat_context["last_context"] target_item = ctx["item_code"] # If the user explicitly mentioned a different item, we might need to handle that, # but for now let's stick to the context or warn if mismatch if item_code and item_code != target_item: # If mismatch, we can't easily generate a report without fetching data again. # For simplicity, we'll stick to the context item but mention it. pass if ctx["type"] == "forecast": filename = generate_forecast_pdf(target_item, ctx.get("data", []), ctx.get("location")) response_data["answer"] = f"Forecast Report generated for {target_item}: Download PDF" elif ctx["type"] == "inventory": # We need to import generate_inventory_pdf. Ideally it's in the same module or we added it. # Assuming we added it to app.pdf_generator from app.pdf_generator import generate_inventory_pdf filename = generate_inventory_pdf(target_item, ctx.get("data", []), ctx.get("location")) response_data["answer"] = f"Inventory Report generated for {target_item}: Download PDF" elif ctx["type"] == "competitor": from app.pdf_generator import generate_competitor_pdf filename = generate_competitor_pdf(target_item, ctx.get("data", []), ctx.get("insights", "")) response_data["answer"] = f"Competitor Analysis Report generated for {target_item}: Download PDF" elif ctx["type"] == "lost_sales": from app.pdf_generator import generate_lost_sales_pdf filename = generate_lost_sales_pdf(target_item, ctx.get("data", []), ctx.get("location")) response_data["answer"] = f"Lost Sales Report generated for {target_item}: Download PDF" else: response_data["answer"] = "Unknown report type in context." else: response_data["answer"] = "I don't have enough context to generate a report. Please ask for a forecast or inventory check first." else: # Fallback to LLM # Generate response with history AND dynamic context last_ctx = chat_context.get("last_context") response_data["answer"] = llm.generate_response(user_text, chat_context["history"], dynamic_context=last_ctx) chat_context["last_answer"] = response_data["answer"] # Add assistant response to history (Global tracking) chat_context["history"].append({"role": "assistant", "content": response_data["answer"]}) return JSONResponse(content=response_data) except Exception as e: logger.error(f"Error in chat endpoint: {e}") return JSONResponse(content={"answer": f"Error processing request: {str(e)}"}, status_code=500)