from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import os
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
from app.intent_parser import parser
from app.forecasting import forecast_demand
from app.data_loader import loader
from app.config import DEFAULT_HORIZON
from app.llm_engine import llm
from app.pdf_generator import generate_forecast_pdf, generate_general_pdf
app = FastAPI(title="BMS AI Assistant")
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Mount Static Files
# We mount this to serve assets like CSS/JS if they are referenced in the HTML
static_dir = "/app/static"
if not os.path.exists(static_dir):
# Fallback for local testing
static_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "static")
if os.path.exists(static_dir):
app.mount("/static", StaticFiles(directory=static_dir), name="static")
logger.info(f"Mounted static files from: {static_dir}")
else:
logger.warning(f"Static directory not found at {static_dir}")
class ChatRequest(BaseModel):
message: str
# Root Endpoint
@app.get("/", response_class=HTMLResponse)
async def read_root():
# Logic: Read the file content and return it.
possible_paths = [
"/app/static/index.html", # Docker absolute path
os.path.join(os.path.dirname(os.path.dirname(__file__)), "static", "index.html"), # Local relative
"static/index.html" # Simple relative
]
for path in possible_paths:
if os.path.exists(path):
logger.info(f"Serving HTML from: {path}")
with open(path, "r", encoding="utf-8") as f:
return HTMLResponse(content=f.read())
logger.error("Could not find index.html in any known location.")
return HTMLResponse(content="
Error: index.html not found
", status_code=500)
@app.get("/api/health")
async def health_check():
return {"status": "ok", "message": "BMS AI Assistant is running"}
# Chat Endpoint
chat_context = {
"last_forecast": None,
"last_answer": None,
"history": []
}
@app.post("/api/chat")
async def chat_endpoint(request: ChatRequest):
user_text = request.message
# 0. Check State (Conversational Flow)
if chat_context.get("state") == "waiting_for_location":
# The user's message is treated as the location
location = user_text
items = chat_context.get("pending_items", [])
# Create ONE order for all items
req_id, messages = loader.create_requisition(items, location)
# Clear state
chat_context["state"] = None
chat_context["pending_items"] = []
msg = f"I've created order **{req_id}** for **{location}** containing all your items.
Status: **Created**"
if messages:
msg += "
" + "
".join(messages)
return JSONResponse(content={
"intent": "create_requisition",
"answer": f"{msg}\n\n**Order Details:**\n- Priority: Normal\n- Shipping: Ground\n- Terms: Net 30"
return JSONResponse(content={
"intent": "create_requisition",
"answer": f"{msg}\n\n**Order Details:**\n- Priority: Normal\n- Shipping: Ground\n- Terms: Net 30"
})
elif chat_context.get("state") == "waiting_for_item_code":
# The user's message is likely the item code
# Try to extract item code from the text
parsed_check = parser.parse(user_text)
item_code = parsed_check.get("item_code")
if item_code:
# We have the item code, now execute the pending intent
pending_intent = chat_context.get("pending_intent")
# Clear state
chat_context["state"] = None
chat_context["pending_intent"] = None
# Inject the item code into the current request processing
# We can do this by modifying user_text or just letting the flow continue
# but forcing the intent and item_code.
# A cleaner way is to construct a mock parsed object, but we are about to parse again below.
# Let's just update the history and let the logic below handle it,
# BUT we need to ensure the intent is preserved.
# Actually, the best way is to override the parsed result below.
# Let's store it in a temporary variable to override later.
override_intent = pending_intent
override_item_code = item_code
else:
# Still no item code found
return JSONResponse(content={
"intent": "general_chat",
"answer": "I still didn't catch the item code. Please specify it like 'BMS0001'."
})
else:
override_intent = None
override_item_code = None
# Add user message to history (Global tracking)
chat_context["history"].append({"role": "user", "content": user_text})
try:
parsed = parser.parse(user_text)
# Override if we came from a waiting state
if 'override_intent' in locals() and override_intent:
intent = override_intent
item_code = override_item_code
# Location/Horizon might need to be preserved or extracted
location = parsed["location"]
horizon = parsed["horizon_days"]
else:
intent = parsed["intent"]
item_code = parsed["item_code"]
location = parsed["location"]
horizon = parsed["horizon_days"]
response_data = {
"intent": intent,
"answer": "",
"forecast": []
}
# Context Carry-over Logic
if not item_code and chat_context.get("last_context"):
last_ctx = chat_context["last_context"]
# Only carry over if it makes sense (e.g., user is asking for details about the same item)
# For now, we'll apply it broadly but can refine if needed.
if last_ctx.get("item_code"):
item_code = last_ctx["item_code"]
# Also carry over location if relevant and missing
if not location and last_ctx.get("location"):
location = last_ctx["location"]
if intent == "demand_forecast":
if not item_code:
response_data["answer"] = "I can help with forecasting, but I need to know which item you are interested in."
else:
if not horizon: horizon = DEFAULT_HORIZON
forecast = forecast_demand(item_code, horizon, location)
response_data["forecast"] = forecast
chat_context["last_context"] = {"type": "forecast", "item_code": item_code, "data": forecast, "location": location}
total = sum(d['qty'] for d in forecast) if forecast else 0
response_data["answer"] = f"Forecast for {item_code} over next {horizon} days is {total} units."
elif intent == "list_items":
if location:
items = loader.get_items_by_location(location)
if not items:
response_data["answer"] = f"No items found in **{location}**."
else:
msg = f"**Items available in {location}:**\n"
for item in items[:10]:
msg += f"- {item['item_code']}: {item['description']}\n"
if len(items) > 10:
msg += f"\n*(Showing 10 of {len(items)} items)*"
response_data["answer"] = msg
else:
items = loader.get_items()
msg = "**Available Items:**\n"
for item in items[:10]:
msg += f"- {item['item_code']}: {item['description']}\n"
response_data["answer"] = msg
elif intent == "list_locations":
# Hardcoded for now based on the requested data change
locations = ["USA", "CANADA", "CHICAGO RDC", "TORONTO BRANCH", "NEW YORK BRANCH", "LOS ANGELES RDC", "DALLAS RDC"]
msg = "**Available Locations:**\n"
for loc in locations:
msg += f"- {loc}\n"
response_data["answer"] = msg
elif intent == "check_inventory":
if not item_code:
response_data["answer"] = "Please specify an item code to check inventory for."
else:
inv_data = loader.get_inventory(item_code, location)
if not inv_data:
response_data["answer"] = f"No inventory data found for {item_code}."
else:
msg = f"**Inventory Status for {item_code}:**\n"
for record in inv_data:
msg += f"- **{record['region']}**: {record['qty_on_hand']} units ({record['status']})\n"
# User requested to hide Bin/Lot info in the main chat response to keep it clean
# if record.get('bin_location'):
# msg += f" - Bin: {record['bin_location']} | Lot: {record['lot_number']}\n"
# msg += f" - Expiry: {record['expiry_date']} | Cost: ${record['unit_cost']}\n"
response_data["answer"] = msg
elif intent == "create_requisition":
# Check for multi-items first
items_to_order = parsed["extra"].get("items", [])
# If no multi-items found, try single item
if not items_to_order and item_code:
qty = parsed["extra"].get("qty", 1)
items_to_order.append({"item_code": item_code, "qty": qty})
if not items_to_order:
response_data["answer"] = "I can help create an order, but I need to know which items and quantities. (e.g., 'Order 5 of BMS0001')"
elif not location:
# Missing Location -> Enter State
chat_context["state"] = "waiting_for_location"
chat_context["pending_items"] = items_to_order
# More natural prompt
response_data["answer"] = "I can certainly help with that order. Could you please specify the destination location? (e.g., Chicago RDC)"
else:
# Have everything -> Create Order
req_id, messages = loader.create_requisition(items_to_order, location)
msg = f"Order **{req_id}** has been created for **{location}**.
Status: **Created**"
if messages:
msg += "
" + "
".join(messages)
response_data["answer"] = msg
elif intent == "competitor_analysis":
if not item_code:
response_data["answer"] = "Please specify an item code for competitor analysis."
else:
analysis = loader.get_competitor_analysis(item_code)
if not analysis:
response_data["answer"] = f"No competitor data found for {item_code}."
else:
# Calculate Expert Insights
latest_year = analysis[-1]
price_gap = ((latest_year['our_price'] - latest_year['competitor_price']) / latest_year['competitor_price']) * 100
insight_text = ""
if price_gap > 0:
insight_text = f"**Expert Insight:** We are currently priced **{price_gap:.1f}% higher** than the competition. "
if latest_year['sales_qty'] > analysis[-2]['sales_qty']:
insight_text += "However, our sales volume is still growing, suggesting strong brand loyalty or superior product quality."
else:
insight_text += "This premium might be affecting our sales growth. Consider a promotional campaign."
else:
insight_text = f"**Expert Insight:** We are priced **{abs(price_gap):.1f}% lower** than the competition. "
insight_text += "This competitive pricing is likely driving our volume growth."
# Format as Table
msg = f"**Competitor Analysis for {item_code} (Last 5 Years):**
"
msg += ""
msg += "| Year | Competitor Price | Our Price | Sales Qty |
"
for row in analysis:
msg += f"| {row['year']} | "
msg += f"${row['competitor_price']:.2f} | "
msg += f"${row['our_price']:.2f} | "
msg += f"{row['sales_qty']} |
"
msg += "
"
msg += f"{insight_text}
"
response_data["answer"] = msg
# Save context for report generation
chat_context["last_context"] = {
"type": "competitor",
"item_code": item_code,
"data": analysis,
"insights": insight_text
}
elif intent == "market_share":
if not item_code:
response_data["answer"] = "Please specify an item code to see market share (e.g., 'Market share for BMS0001')."
else:
share_data = loader.get_market_share(item_code, location)
if not share_data:
response_data["answer"] = f"No market share data available for {item_code}."
else:
msg = f"**Market Share Analysis for {item_code}**"
if location: msg += f" in **{location}**"
msg += ":
"
msg += ""
msg += "| Competitor | Volume | Share % |
"
for row in share_data:
is_us = "Cummins" in row['competitor_name']
style = "font-weight:bold; color:#e53e3e;" if is_us else ""
msg += f"| {row['competitor_name']} | "
msg += f"{row['qty']} | "
msg += f"{row['share']}% |
"
msg += "
"
# Insight
our_share = next((x['share'] for x in share_data if "Cummins" in x['competitor_name']), 0)
leader = share_data[0]
if "Cummins" in leader['competitor_name']:
msg += f"
We are the market leader with {our_share}% share!"
else:
msg += f"
{leader['competitor_name']} is leading with {leader['share']}%. We are at {our_share}%."
response_data["answer"] = msg
elif intent == "lost_sales":
loss_data = loader.get_lost_sales_summary(item_code, location)
if not loss_data:
response_data["answer"] = "No lost sales records found matching your criteria."
else:
msg = "**Lost Sales Analysis**"
if item_code: msg += f" for **{item_code}**"
if location: msg += f" in **{location}**"
msg += ":
"
msg += ""
msg += "| Primary Reason | Lost Qty | Est. Revenue Lost |
"
total_rev = 0
for row in sorted(loss_data, key=lambda x: x['lost_qty'], reverse=True):
msg += f"| {row['reason']} | "
msg += f"{row['lost_qty']} | "
msg += f"${row['estimated_revenue_lost']:,.2f} |
"
total_rev += row['estimated_revenue_lost']
msg += "
"
msg += f"
Total Estimated Loss: ${total_rev:,.2f}"
response_data["answer"] = msg
# Save context for report generation
chat_context["last_context"] = {
"type": "lost_sales",
"item_code": item_code,
"data": loss_data,
"location": location
}
elif intent == "competitor_trend":
if not item_code:
response_data["answer"] = "Please specify an item code to see sales trends."
else:
trend_data = loader.get_competitor_trend(item_code)
if not trend_data:
response_data["answer"] = f"No trend data found for {item_code}."
else:
msg = f"**Sales Trend (Qty) for {item_code}:**
"
msg += ""
# Dynamic headers based on columns (Year + Competitors)
headers = list(trend_data[0].keys())
# Ensure Year is first
if 'year' in headers:
headers.remove('year')
headers.insert(0, 'year')
msg += ""
for h in headers:
msg += f"| {h.replace('year', 'Year')} | "
msg += "
"
for row in trend_data:
msg += ""
for h in headers:
val = row.get(h, 0)
# Format numbers
if isinstance(val, (int, float)) and h != 'year':
val = int(val)
msg += f"| {val} | "
msg += "
"
msg += "
"
response_data["answer"] = msg
elif intent == "top_selling":
top_items = loader.get_top_selling_items()
if not top_items:
response_data["answer"] = "No sales data available to determine top selling items."
else:
msg = "**Top Selling Items (All Time):**\n"
for i, item in enumerate(top_items, 1):
desc = item.get('description', 'N/A')
msg += f"{i}. **{item['item_code']}** ({desc}): {item['quantity']} units sold\n"
response_data["answer"] = msg
elif intent == "order_status":
req_id = parsed["extra"].get("req_id")
if not req_id:
response_data["answer"] = "Please provide the Order ID (e.g., 'Check status of REQ1234')."
else:
status = loader.get_order_status(req_id)
if not status:
response_data["answer"] = f"Order {req_id} not found."
else:
response_data["answer"] = f"**Order Status for {req_id}:**
Item: {status['item_code']}
Qty: {status['qty']}
Location: {status.get('location', 'Unknown')}
Status: **{status['status']}**"
elif intent == "item_details":
if not item_code:
chat_context["state"] = "waiting_for_item_code"
chat_context["pending_intent"] = "item_details"
response_data["answer"] = "Please specify an item code to get details."
else:
item = loader.get_item(item_code)
if not item:
response_data["answer"] = f"Item {item_code} not found in our catalog."
else:
response_data["answer"] = f"**Item Details for {item_code}:**
Description: {item['description']}
List Price: ${item['list_price']}
UOM: {item['uom']}"
elif intent == "supplier_info":
if not item_code:
response_data["answer"] = "Please specify an item code to find supplier information."
else:
suppliers = loader.get_supplier(item_code)
if not suppliers:
response_data["answer"] = f"No supplier information found for {item_code}."
else:
msg = f"**Supplier Information for {item_code}:**
"
for sup in suppliers:
msg += f"- **{sup['supplier_name']}** (Lead Time: {sup['lead_time_days']} days)
Contact: {sup['contact_email']}
"
response_data["answer"] = msg
elif intent == "generate_report":
# Check if we have a valid context
if chat_context.get("last_context") and chat_context["last_context"].get("item_code"):
ctx = chat_context["last_context"]
target_item = ctx["item_code"]
# If the user explicitly mentioned a different item, we might need to handle that,
# but for now let's stick to the context or warn if mismatch
if item_code and item_code != target_item:
# If mismatch, we can't easily generate a report without fetching data again.
# For simplicity, we'll stick to the context item but mention it.
pass
if ctx["type"] == "forecast":
filename = generate_forecast_pdf(target_item, ctx.get("data", []), ctx.get("location"))
response_data["answer"] = f"Forecast Report generated for {target_item}: Download PDF"
elif ctx["type"] == "inventory":
# We need to import generate_inventory_pdf. Ideally it's in the same module or we added it.
# Assuming we added it to app.pdf_generator
from app.pdf_generator import generate_inventory_pdf
filename = generate_inventory_pdf(target_item, ctx.get("data", []), ctx.get("location"))
response_data["answer"] = f"Inventory Report generated for {target_item}: Download PDF"
elif ctx["type"] == "competitor":
from app.pdf_generator import generate_competitor_pdf
filename = generate_competitor_pdf(target_item, ctx.get("data", []), ctx.get("insights", ""))
response_data["answer"] = f"Competitor Analysis Report generated for {target_item}: Download PDF"
elif ctx["type"] == "lost_sales":
from app.pdf_generator import generate_lost_sales_pdf
filename = generate_lost_sales_pdf(target_item, ctx.get("data", []), ctx.get("location"))
response_data["answer"] = f"Lost Sales Report generated for {target_item}: Download PDF"
else:
response_data["answer"] = "Unknown report type in context."
else:
response_data["answer"] = "I don't have enough context to generate a report. Please ask for a forecast or inventory check first."
else:
# Fallback to LLM
# Generate response with history AND dynamic context
last_ctx = chat_context.get("last_context")
response_data["answer"] = llm.generate_response(user_text, chat_context["history"], dynamic_context=last_ctx)
chat_context["last_answer"] = response_data["answer"]
# Add assistant response to history (Global tracking)
chat_context["history"].append({"role": "assistant", "content": response_data["answer"]})
return JSONResponse(content=response_data)
except Exception as e:
logger.error(f"Error in chat endpoint: {e}")
return JSONResponse(content={"answer": f"Error processing request: {str(e)}"}, status_code=500)