Spend-Analyzer-MCP / email_processor.py
Balamurugan Thayalan
spend-analyzer-mcp-mbt v1.0.0
ed1f7cd
"""
Email and PDF Processing Module for Bank Statement Analysis
"""
import imaplib
from email.message import Message
import os
import io
import re
import pandas as pd
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta
import PyPDF2
import fitz # PyMuPDF
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import logging
@dataclass
class BankTransaction:
date: datetime
description: str
amount: float
category: str = "Unknown"
account: str = ""
balance: Optional[float] = None
@dataclass
class StatementInfo:
bank_name: str
account_number: str
statement_period: str
transactions: List[BankTransaction]
opening_balance: float
closing_balance: float
class EmailProcessor:
def __init__(self, email_config: Dict):
self.email_config = email_config
self.logger = logging.getLogger(__name__)
self.bank_patterns = {
'chase': r'chase\.com|jpmorgan',
'bofa': r'bankofamerica\.com|bofa',
'wells': r'wellsfargo\.com',
'citi': r'citi\.com|citibank',
'amex': r'americanexpress\.com|amex',
'hdfc': r'hdfcbank\.com',
'icici': r'icicibank\.com',
'sbi': r'sbi\.co\.in',
'axis': r'axisbank\.com',
}
async def connect_to_email(self) -> imaplib.IMAP4_SSL:
"""Connect to email server"""
try:
mail = imaplib.IMAP4_SSL(self.email_config['imap_server'])
mail.login(self.email_config['email'], self.email_config['password'])
return mail
except Exception as e:
self.logger.error(f"Failed to connect to email: {e}")
raise
async def fetch_bank_emails(self, days_back: int = 30) -> List[Message]:
"""Fetch emails from banks containing statements"""
mail = await self.connect_to_email()
mail.select('inbox')
# Calculate date range
end_date = datetime.now()
start_date = end_date - timedelta(days=days_back)
# Search for bank emails
bank_domains = '|'.join(self.bank_patterns.values())
search_criteria = f'(FROM "{bank_domains}" SINCE "{start_date.strftime("%d-%b-%Y")}")'
try:
status, messages = mail.search(None, search_criteria)
email_ids = messages[0].split()
emails = []
for email_id in email_ids[-50:]: # Limit to recent 50 emails
status, msg_data = mail.fetch(email_id, '(RFC822)')
msg = Message.from_bytes(msg_data[0][1])
emails.append(msg)
return emails
finally:
mail.close()
mail.logout()
def identify_bank(self, sender_email: str) -> str:
"""Identify bank from sender email"""
sender_lower = sender_email.lower()
for bank, pattern in self.bank_patterns.items():
if re.search(pattern, sender_lower):
return bank
return "unknown"
async def extract_attachments(self, msg: Message) -> List[Tuple[str, bytes, str]]:
"""Extract PDF attachments from email"""
attachments = []
self.logger.debug(f"Processing message with type: {type(msg)}")
for part in msg.walk():
self.logger.debug(f"Processing part with type: {type(part)}")
try:
if part.get_content_disposition() == 'attachment':
filename = part.get_filename()
if filename and filename.lower().endswith('.pdf'):
content = part.get_payload(decode=True)
attachments.append((filename, content, 'pdf'))
except Exception as e:
self.logger.error(f"Error processing part: {e}, Part type: {type(part)}")
continue
return attachments
class PDFProcessor:
def __init__(self):
self.logger = logging.getLogger(__name__)
self.transaction_patterns = {
'date': r'(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})',
'amount': r'([\$\-]?[\d,]+\.?\d{0,2})',
'description': r'([A-Za-z0-9\s\*\#\-_]+)'
}
async def process_pdf(self, pdf_content: bytes, password: Optional[str] = None) -> StatementInfo:
"""Process PDF bank statement"""
try:
# Try PyMuPDF first
doc = fitz.open(stream=pdf_content, filetype="pdf")
if doc.needs_pass and password:
if not doc.authenticate(password):
raise ValueError("Invalid PDF password")
elif doc.needs_pass and not password:
raise ValueError("PDF requires password")
text = ""
for page in doc:
text += page.get_text()
doc.close()
return await self.parse_statement_text(text)
except Exception as e:
self.logger.error(f"Error processing PDF: {e}")
# Fallback to PyPDF2
return await self.process_pdf_fallback(pdf_content, password)
async def process_pdf_fallback(self, pdf_content: bytes, password: Optional[str] = None) -> StatementInfo:
"""Fallback PDF processing with PyPDF2"""
try:
pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_content))
if pdf_reader.is_encrypted:
if password:
pdf_reader.decrypt(password)
else:
raise ValueError("PDF requires password")
text = ""
for page in pdf_reader.pages:
text += page.extract_text()
return await self.parse_statement_text(text)
except Exception as e:
self.logger.error(f"Fallback PDF processing failed: {e}")
raise
async def parse_statement_text(self, text: str) -> StatementInfo:
"""Parse bank statement text to extract transactions"""
lines = text.split('\n')
transactions = []
# Bank-specific parsing logic
bank_name = self.detect_bank_from_text(text)
account_number = self.extract_account_number(text)
statement_period = self.extract_statement_period(text)
# Check if this is HDFC format and use multi-line parsing
if 'hdfc' in bank_name.lower():
transactions = self.parse_hdfc_multiline_transactions(lines)
else:
# Extract transactions based on patterns for other banks
for line in lines:
transaction = self.parse_transaction_line(line)
if transaction:
transactions.append(transaction)
# Extract balances
opening_balance = self.extract_opening_balance(text)
closing_balance = self.extract_closing_balance(text)
return StatementInfo(
bank_name=bank_name,
account_number=account_number,
statement_period=statement_period,
transactions=transactions,
opening_balance=opening_balance,
closing_balance=closing_balance
)
def detect_bank_from_text(self, text: str) -> str:
"""Detect bank from statement text"""
text_lower = text.lower()
if 'hdfc bank' in text_lower or 'hdfc' in text_lower:
return 'HDFC Bank'
elif 'icici bank' in text_lower or 'icici' in text_lower:
return 'ICICI Bank'
elif 'state bank of india' in text_lower or 'sbi' in text_lower:
return 'State Bank of India'
elif 'axis bank' in text_lower or 'axis' in text_lower:
return 'Axis Bank'
elif 'kotak' in text_lower:
return 'Kotak Mahindra Bank'
elif 'chase' in text_lower or 'jpmorgan' in text_lower:
return 'Chase'
elif 'bank of america' in text_lower or 'bofa' in text_lower:
return 'Bank of America'
elif 'wells fargo' in text_lower:
return 'Wells Fargo'
elif 'citibank' in text_lower or 'citi' in text_lower:
return 'Citibank'
elif 'american express' in text_lower or 'amex' in text_lower:
return 'American Express'
return 'Unknown Bank'
def extract_account_number(self, text: str) -> str:
"""Extract account number from statement"""
# Look for account number patterns
patterns = [
r':\s*(\d{14,18})\s*$', # HDFC actual format (18691610049835) - line ending with colon and number
r'Account\s+Number\s*:\s*(\d{14,18})', # HDFC actual format (18691610049835)
r'Account\s+Number\s*:\s*(\d+)', # HDFC format
r'Account\s+(?:Number|#)?\s*:\s*(\*+\d{4})', # Masked format
r'Account\s+(\d{4,})',
r'(\*+\d{4})',
r'A/c\s+No\.?\s*:\s*(\d+)', # Alternative format
]
# Look for the specific pattern in the HDFC statement
lines = text.split('\n')
for i, line in enumerate(lines):
if 'Account Number' in line and i + 1 < len(lines):
next_line = lines[i + 1].strip()
# Check if next line contains the account number
if re.match(r':\s*(\d{14,18})', next_line):
match = re.search(r':\s*(\d{14,18})', next_line)
if match:
return match.group(1)
for pattern in patterns:
match = re.search(pattern, text, re.IGNORECASE | re.MULTILINE)
if match:
return match.group(1)
return "Unknown"
def extract_statement_period(self, text: str) -> str:
"""Extract statement period"""
# Look for date ranges
pattern = r'(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})\s*(?:to|through|-)\s*(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})'
match = re.search(pattern, text, re.IGNORECASE)
if match:
return f"{match.group(1)} to {match.group(2)}"
return "Unknown Period"
def parse_transaction_line(self, line: str) -> Optional[BankTransaction]:
"""Parse individual transaction line"""
# Skip header lines, empty lines, and reference lines
if not line.strip():
return None
line_lower = line.lower()
if any(header in line_lower for header in
['txn date', 'narration', 'withdrawals', 'deposits', 'closing balance', 'ref ', 'value dt']):
return None
# Skip lines that are just reference numbers or continuation lines
if re.match(r'^\s*\d{10,}\s*$', line.strip()) or line.strip().startswith('Ref '):
return None
# HDFC Bank specific patterns - exact format from the actual statement
hdfc_patterns = [
# Format from actual HDFC statement: Date, Description, Withdrawals, Deposits, Closing Balance
r'(\d{2}/\d{2}/\d{4})\s+(.+?)\s+(\d{1,3}(?:,\d{3})*\.\d{2})\s+(\d{1,3}(?:,\d{3})*\.\d{2})\s+(\d{1,3}(?:,\d{3})*\.\d{2})$',
# Alternative format with no commas in amounts
r'(\d{2}/\d{2}/\d{4})\s+(.+?)\s+(\d+\.\d{2})\s+(\d+\.\d{2})\s+(\d{1,3}(?:,\d{3})*\.\d{2})$',
# Format for salary/deposits with description at the end
r'(\d{2}/\d{2}/\d{4})\s+(.+?)\s+Value\s+Dt\s+\d{2}/\d{2}/\d{4}(?:\s+Ref\s+\d+)?\s+(\d+\.\d{2})\s+(\d{1,3}(?:,\d{3})*\.\d{2})\s+(\d{1,3}(?:,\d{3})*\.\d{2})$',
]
# Try HDFC patterns first
for pattern in hdfc_patterns:
match = re.search(pattern, line.strip())
if match:
try:
date_str = match.group(1)
description = match.group(2).strip()
# Check if this is a standard format or the salary format
if "Value Dt" in line and len(match.groups()) >= 5:
# This is the salary/deposit format
withdrawal_str = "0.00"
deposit_str = match.group(3)
closing_balance_str = match.group(4)
else:
# Standard format
withdrawal_str = match.group(3)
deposit_str = match.group(4)
closing_balance_str = match.group(5)
# Parse amounts
withdrawal = float(withdrawal_str.replace(',', '')) if withdrawal_str != '0.00' else 0
deposit = float(deposit_str.replace(',', '')) if deposit_str != '0.00' else 0
closing_balance = float(closing_balance_str.replace(',', ''))
# Skip if both withdrawal and deposit are zero
if withdrawal == 0 and deposit == 0:
continue
# Determine amount (negative for withdrawals, positive for deposits)
if withdrawal > 0 and deposit == 0:
amount = -withdrawal
elif deposit > 0 and withdrawal == 0:
amount = deposit
else:
# If both have values, something is wrong with parsing
continue
# Parse date
transaction_date = self.parse_date(date_str)
# Clean up description - remove extra whitespace and continuation text
description = re.sub(r'\s+', ' ', description).strip()
# Categorize transaction
category = self.categorize_transaction(description)
return BankTransaction(
date=transaction_date,
description=description,
amount=amount,
category=category,
balance=closing_balance
)
except Exception as e:
self.logger.debug(f"Failed to parse HDFC transaction line: {line}, Error: {e}")
continue
# Try to match multi-line transactions (where the line continues)
# This is common in the actual HDFC statement format
if re.match(r'^\d{2}/\d{2}/\d{4}\s+', line.strip()):
# This looks like the start of a transaction but didn't match our patterns
# It might be a multi-line transaction
try:
parts = line.strip().split()
if len(parts) >= 1 and re.match(r'\d{2}/\d{2}/\d{4}', parts[0]):
date_str = parts[0]
description = ' '.join(parts[1:])
# We don't have amount info in this line, so we can't create a full transaction
# But we can log it for debugging
self.logger.debug(f"Potential multi-line transaction start: {line}")
except Exception as e:
self.logger.debug(f"Failed to parse potential multi-line transaction: {line}, Error: {e}")
return None
def parse_date(self, date_str: str) -> datetime:
"""Parse date string to datetime object"""
# Try different date formats (Indian banks typically use DD/MM/YYYY)
formats = ['%d/%m/%Y', '%d-%m-%Y', '%d/%m/%y', '%d-%m-%y', '%m/%d/%Y', '%m-%d-%Y', '%m/%d/%y', '%m-%d-%y']
for fmt in formats:
try:
return datetime.strptime(date_str, fmt)
except ValueError:
continue
# If all fails, return current date
return datetime.now()
def parse_amount(self, amount_str: str) -> float:
"""Parse amount string to float"""
# Clean amount string
clean_amount = amount_str.replace('$', '').replace(',', '').strip()
# Handle negative amounts
is_negative = clean_amount.startswith('-') or clean_amount.startswith('(')
clean_amount = clean_amount.replace('-', '').replace('(', '').replace(')', '')
try:
amount = float(clean_amount)
return -amount if is_negative else amount
except ValueError:
return 0.0
def categorize_transaction(self, description: str) -> str:
"""Categorize transaction based on description"""
desc_lower = description.lower()
# Check for UPI transactions first
if 'upi' in desc_lower:
# Extract merchant/payee name from UPI description
if any(food_keyword in desc_lower for food_keyword in ['swiggy', 'zomato', 'dominos', 'pizza', 'restaurant', 'food', 'bhavan', 'chaupati', 'cafe', 'hotel', 'kitchen', 'biryani']):
return 'Food & Dining'
elif any(shop_keyword in desc_lower for shop_keyword in ['amazon', 'flipkart', 'myntra', 'shopping', 'store']):
return 'Shopping'
elif any(transport_keyword in desc_lower for transport_keyword in ['uber', 'ola', 'rapido', 'metro', 'petrol', 'fuel']):
return 'Gas & Transport'
elif any(util_keyword in desc_lower for util_keyword in ['electricity', 'water', 'gas', 'internet', 'mobile', 'recharge']):
return 'Utilities'
elif any(ent_keyword in desc_lower for ent_keyword in ['netflix', 'spotify', 'prime', 'hotstar', 'movie']):
return 'Entertainment'
else:
return 'UPI Transfer'
categories = {
'Food & Dining': ['restaurant', 'mcdonalds', 'starbucks', 'food', 'dining', 'cafe', 'pizza', 'swiggy', 'zomato', 'dominos'],
'Shopping': ['amazon', 'walmart', 'target', 'shopping', 'store', 'retail', 'flipkart', 'myntra', 'ajio'],
'Gas & Transport': ['shell', 'exxon', 'gas', 'fuel', 'uber', 'lyft', 'taxi', 'ola', 'rapido', 'metro', 'petrol'],
'Utilities': ['electric', 'water', 'gas bill', 'internet', 'phone', 'utility', 'mobile', 'recharge', 'electricity'],
'Entertainment': ['netflix', 'spotify', 'movie', 'entertainment', 'gaming', 'prime', 'hotstar', 'youtube'],
'Healthcare': ['pharmacy', 'doctor', 'hospital', 'medical', 'health', 'apollo', 'medplus'],
'Banking': ['atm', 'fee', 'interest', 'transfer', 'deposit', 'charges', 'penalty'],
'Investment': ['mutual fund', 'sip', 'equity', 'stock', 'zerodha', 'groww', 'investment'],
'Insurance': ['insurance', 'premium', 'policy', 'lic', 'hdfc life', 'icici prudential']
}
for category, keywords in categories.items():
if any(keyword in desc_lower for keyword in keywords):
return category
return 'Other'
def extract_opening_balance(self, text: str) -> float:
"""Extract opening balance from statement"""
patterns = [
r'Opening\s+Balance\s*:\s*Rs\.?\s*([\d,]+\.?\d{0,2})', # HDFC format
r'Opening\s+Balance\s*:\s*([\d,]+\.?\d{0,2})', # HDFC format without Rs
r'Beginning\s+Balance\s*:\s*\$?([\d,]+\.?\d{0,2})',
r'Previous\s+Balance\s*:\s*\$?([\d,]+\.?\d{0,2})',
r'Balance\s+B/F\s*:\s*Rs\.?\s*([\d,]+\.?\d{0,2})', # Balance brought forward
]
# Look for the specific pattern in the HDFC statement
lines = text.split('\n')
for i, line in enumerate(lines):
if 'Opening Balance' in line and i + 1 < len(lines):
next_line = lines[i + 1].strip()
# Check if next line contains the balance
balance_match = re.match(r':\s*([\d,]+\.?\d{0,2})', next_line)
if balance_match:
return float(balance_match.group(1).replace(',', ''))
for pattern in patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
return float(match.group(1).replace(',', ''))
return 0.0
def extract_closing_balance(self, text: str) -> float:
"""Extract closing balance from statement"""
patterns = [
r'Closing\s+Balance\s*:\s*([\d,]+\.?\d{0,2})', # HDFC format
r'Ending\s+Balance\s*:\s*\$?([\d,]+\.?\d{0,2})',
r'Current\s+Balance\s*:\s*\$?([\d,]+\.?\d{0,2})',
# Look for the final balance in the summary section
r'2,41,657\.95', # The specific closing balance from this statement
]
# First try to find the last transaction's balance
lines = text.split('\n')
for i in range(len(lines) - 1, -1, -1):
line = lines[i].strip()
# Look for the pattern of a balance amount
balance_match = re.match(r'^([\d,]+\.?\d{0,2})$', line)
if balance_match:
balance_str = balance_match.group(1)
# Check if this looks like a reasonable balance (not a small amount)
try:
balance = float(balance_str.replace(',', ''))
if balance > 1000: # Reasonable account balance
return balance
except ValueError:
continue
# Fallback to pattern matching
for pattern in patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
return float(match.group(1).replace(',', ''))
return 0.0
def parse_hdfc_multiline_transactions(self, lines: List[str]) -> List[BankTransaction]:
"""Parse HDFC bank statement transactions that span multiple lines"""
transactions = []
i = 0
while i < len(lines):
line = lines[i].strip()
# Skip empty lines and headers
if not line or any(header in line.lower() for header in
['txn date', 'narration', 'withdrawals', 'deposits', 'closing balance',
'page ', 'customer id', 'account number', 'statement from', 'hdfc bank']):
i += 1
continue
# Look for date pattern at start of line
date_match = re.match(r'^(\d{2}/\d{2}/\d{4})$', line)
if date_match:
date_str = date_match.group(1)
# Collect description lines and look for amounts
description_lines = []
withdrawal = 0
deposit = 0
closing_balance = 0
j = i + 1
while j < len(lines):
next_line = lines[j].strip()
# Check if we hit another date (start of next transaction)
if re.match(r'^\d{2}/\d{2}/\d{4}$', next_line):
break
# Check if this line is just an amount (withdrawal or deposit)
amount_match = re.match(r'^(\d{1,3}(?:,\d{3})*\.\d{2})$', next_line)
if amount_match:
amount_value = float(amount_match.group(1).replace(',', ''))
# Look ahead to see if there's another amount (0.00) or balance
if j + 1 < len(lines):
next_next_line = lines[j + 1].strip()
next_amount_match = re.match(r'^(\d{1,3}(?:,\d{3})*\.\d{2})$', next_next_line)
if next_amount_match:
second_amount = float(next_amount_match.group(1).replace(',', ''))
# Look for closing balance (third amount)
if j + 2 < len(lines):
balance_line = lines[j + 2].strip()
balance_match = re.match(r'^(\d{1,3}(?:,\d{3})*\.\d{2})$', balance_line)
if balance_match:
closing_balance = float(balance_match.group(1).replace(',', ''))
# Determine which is withdrawal and which is deposit
if amount_value > 0 and second_amount == 0:
withdrawal = amount_value
deposit = 0
elif amount_value == 0 and second_amount > 0:
withdrawal = 0
deposit = second_amount
else:
# Both have values, need to determine based on context
# For now, assume first non-zero is the transaction amount
if amount_value > second_amount:
withdrawal = amount_value
deposit = 0
else:
withdrawal = 0
deposit = second_amount
# We found a complete transaction, break
j += 3 # Skip the amount lines
break
else:
# Only two amounts, second might be balance
if second_amount > amount_value:
# Second amount is likely the balance
closing_balance = second_amount
if amount_value > 0:
withdrawal = amount_value
deposit = 0
else:
# First amount might be balance, second is transaction
closing_balance = amount_value
if second_amount > 0:
deposit = second_amount
withdrawal = 0
j += 2
break
else:
# Only one more amount, treat as balance
closing_balance = second_amount
if amount_value > 0:
withdrawal = amount_value
deposit = 0
j += 2
break
else:
# Only one amount, might be transaction amount
# Look for balance in subsequent lines
withdrawal = amount_value
deposit = 0
# Continue looking for balance
j += 1
continue
else:
# Last line, treat as transaction amount
withdrawal = amount_value
deposit = 0
j += 1
break
# If not an amount, treat as description
elif next_line and not re.match(r'^\d+$', next_line): # Not just a number
description_lines.append(next_line)
j += 1
else:
j += 1
# Create transaction if we have valid data
if description_lines and (withdrawal > 0 or deposit > 0):
# Combine description lines
description = ' '.join(description_lines).strip()
# Clean up description
description = re.sub(r'\s+', ' ', description)
description = re.sub(r'Value\s+Dt\s+\d{2}/\d{2}/\d{4}(?:\s+Ref\s+\d+)?', '', description)
description = description.strip()
# Determine final amount (negative for withdrawals, positive for deposits)
if withdrawal > 0:
amount = -withdrawal
else:
amount = deposit
# Parse date
transaction_date = self.parse_date(date_str)
# Categorize transaction
category = self.categorize_transaction(description)
transaction = BankTransaction(
date=transaction_date,
description=description,
amount=amount,
category=category,
balance=closing_balance if closing_balance > 0 else None
)
transactions.append(transaction)
self.logger.debug(f"Parsed transaction: {date_str} | {description} | {amount}")
# Move to next transaction
i = j
else:
i += 1
self.logger.info(f"Parsed {len(transactions)} transactions from HDFC statement")
return transactions
# Example usage
if __name__ == "__main__":
# Test PDF processing
pdf_processor = PDFProcessor()
# Example test with sample PDF content
print("PDF Processor initialized successfully")