from app.config import ITEMS_FILE, DEMAND_FILE import os import pandas as pd INVENTORY_FILE = os.path.join(os.path.dirname(ITEMS_FILE), "inventory.csv") SUPPLIERS_FILE = os.path.join(os.path.dirname(ITEMS_FILE), "suppliers.csv") REQUISITIONS_FILE = os.path.join(os.path.dirname(ITEMS_FILE), "requisitions.csv") class DataLoader: def __init__(self): self.items_df = None self.demand_df = None self.inventory_df = None self.suppliers_df = None self.requisitions_df = None self.load_data() def load_data(self): if os.path.exists(ITEMS_FILE): self.items_df = pd.read_csv(ITEMS_FILE) self.items_df['item_code'] = self.items_df['item_code'].str.upper() else: self.items_df = pd.DataFrame(columns=['item_code', 'description', 'uom', 'list_price']) if os.path.exists(DEMAND_FILE): self.demand_df = pd.read_csv(DEMAND_FILE, keep_default_na=False) self.demand_df['date'] = pd.to_datetime(self.demand_df['date']) self.demand_df['item_code'] = self.demand_df['item_code'].str.upper() if 'region' in self.demand_df.columns: self.demand_df['region'] = self.demand_df['region'].str.upper() else: self.demand_df = pd.DataFrame(columns=['item_code', 'date', 'quantity', 'region']) if os.path.exists(INVENTORY_FILE): self.inventory_df = pd.read_csv(INVENTORY_FILE) self.inventory_df.rename(columns={'on_hand': 'qty_on_hand'}, inplace=True) self.inventory_df['item_code'] = self.inventory_df['item_code'].str.upper() if 'region' in self.inventory_df.columns: self.inventory_df['region'] = self.inventory_df['region'].str.upper() # Calculate Status def get_status(row): if row['available'] < 50: return 'Critical' elif row['available'] < 100: return 'Low Stock' else: return 'In Stock' if 'available' in self.inventory_df.columns: self.inventory_df['status'] = self.inventory_df.apply(get_status, axis=1) else: self.inventory_df['status'] = 'Unknown' else: self.inventory_df = pd.DataFrame(columns=['item_code', 'region', 'qty_on_hand', 'reorder_point', 'status']) if os.path.exists(SUPPLIERS_FILE): self.suppliers_df = pd.read_csv(SUPPLIERS_FILE) # self.suppliers_df['item_code'] = self.suppliers_df['item_code'].str.upper() # Removed as item_code is not in suppliers.csv else: self.suppliers_df = pd.DataFrame(columns=['id', 'name', 'lead_time', 'email']) if os.path.exists(REQUISITIONS_FILE): self.requisitions_df = pd.read_csv(REQUISITIONS_FILE) else: self.requisitions_df = pd.DataFrame(columns=['req_id', 'item_code', 'qty', 'date', 'status']) def get_item(self, item_code): if self.items_df is None or item_code is None: return None item = self.items_df[self.items_df['item_code'] == item_code.upper()] if not item.empty: return item.iloc[0].to_dict() return None def get_time_series(self, item_code, location=None): if self.demand_df is None: return pd.DataFrame() mask = (self.demand_df['item_code'] == item_code.upper()) if location: mask &= (self.demand_df['region'] == location.upper()) df_filtered = self.demand_df[mask].copy() if df_filtered.empty: return pd.DataFrame() df_grouped = df_filtered.groupby('date')['quantity'].sum().reset_index() df_grouped = df_grouped.sort_values('date') df_grouped.set_index('date', inplace=True) df_grouped = df_grouped.asfreq('D', fill_value=0) return df_grouped def get_inventory(self, item_code, location=None): if self.inventory_df is None: return [] mask = (self.inventory_df['item_code'] == item_code.upper()) if location: mask &= (self.inventory_df['region'] == location.upper()) return self.inventory_df[mask].to_dict('records') def get_supplier(self, item_code): if self.suppliers_df is None or self.items_df is None: return [] # Find supplier_id from items item_row = self.items_df[self.items_df['item_code'] == item_code.upper()] if item_row.empty: return [] supplier_id = item_row.iloc[0]['supplier_id'] # Find supplier details mask = (self.suppliers_df['id'] == supplier_id) suppliers = self.suppliers_df[mask].copy() # Rename columns to match what main.py expects suppliers.rename(columns={ 'name': 'supplier_name', 'lead_time': 'lead_time_days', 'email': 'contact_email' }, inplace=True) return suppliers.to_dict('records') def create_requisition(self, item_code, qty): import datetime import random req_id = f"REQ{random.randint(1000, 9999)}" new_row = { "req_id": req_id, "item_code": item_code.upper(), "qty": qty, "date": datetime.date.today().strftime("%Y-%m-%d"), "status": "Pending" } # Append to DataFrame self.requisitions_df = pd.concat([self.requisitions_df, pd.DataFrame([new_row])], ignore_index=True) # Save to CSV self.requisitions_df.to_csv(REQUISITIONS_FILE, index=False) return req_id def get_alerts(self): if self.inventory_df is None: return [] # Filter for Critical or Low Stock mask = self.inventory_df['status'].isin(['Critical', 'Low Stock']) return self.inventory_df[mask].to_dict('records') def get_items(self): if self.items_df is None: return [] return self.items_df[['item_code', 'description']].to_dict('records') # Global instance loader = DataLoader()