|
|
|
|
|
|
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import torch |
|
|
import torch.nn as nn |
|
|
import torch.optim as optim |
|
|
from torch.utils.data import Dataset, DataLoader, TensorDataset |
|
|
from sklearn.model_selection import train_test_split |
|
|
from sklearn.preprocessing import StandardScaler, LabelEncoder |
|
|
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix |
|
|
import matplotlib.pyplot as plt |
|
|
import seaborn as sns |
|
|
import warnings |
|
|
warnings.filterwarnings('ignore') |
|
|
|
|
|
|
|
|
class TabularModel(nn.Module): |
|
|
def __init__(self, input_size, hidden_sizes, output_size, dropout_rate=0.2): |
|
|
super(TabularModel, self).__init__() |
|
|
|
|
|
layers = [] |
|
|
prev_size = input_size |
|
|
|
|
|
|
|
|
for hidden_size in hidden_sizes: |
|
|
layers.extend([ |
|
|
nn.Linear(prev_size, hidden_size), |
|
|
nn.BatchNorm1d(hidden_size), |
|
|
nn.ReLU(), |
|
|
nn.Dropout(dropout_rate) |
|
|
]) |
|
|
prev_size = hidden_size |
|
|
|
|
|
|
|
|
layers.append(nn.Linear(prev_size, output_size)) |
|
|
|
|
|
self.model = nn.Sequential(*layers) |
|
|
|
|
|
def forward(self, x): |
|
|
return self.model(x) |
|
|
|
|
|
|
|
|
def preprocess_data(df, target_column, test_size=0.2): |
|
|
""" |
|
|
Preprocess tabular data for neural network training |
|
|
""" |
|
|
|
|
|
X = df.drop(columns=[target_column]) |
|
|
y = df[target_column] |
|
|
|
|
|
|
|
|
categorical_columns = X.select_dtypes(include=['object']).columns |
|
|
numerical_columns = X.select_dtypes(include=['int64', 'float64']).columns |
|
|
|
|
|
|
|
|
label_encoders = {} |
|
|
for col in categorical_columns: |
|
|
le = LabelEncoder() |
|
|
X[col] = le.fit_transform(X[col].astype(str)) |
|
|
label_encoders[col] = le |
|
|
|
|
|
|
|
|
scaler = StandardScaler() |
|
|
X[numerical_columns] = scaler.fit_transform(X[numerical_columns]) |
|
|
|
|
|
|
|
|
target_encoder = None |
|
|
if y.dtype == 'object': |
|
|
target_encoder = LabelEncoder() |
|
|
y = target_encoder.fit_transform(y) |
|
|
|
|
|
|
|
|
X_train, X_test, y_train, y_test = train_test_split( |
|
|
X.values, y.values, test_size=test_size, random_state=42, stratify=y |
|
|
) |
|
|
|
|
|
return (X_train, X_test, y_train, y_test, scaler, label_encoders, target_encoder) |
|
|
|
|
|
|
|
|
def train_model(model, train_loader, val_loader, epochs=100, lr=0.001): |
|
|
""" |
|
|
Train the tabular model |
|
|
""" |
|
|
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
|
|
model.to(device) |
|
|
|
|
|
criterion = nn.CrossEntropyLoss() |
|
|
optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=1e-5) |
|
|
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=10) |
|
|
|
|
|
train_losses = [] |
|
|
val_losses = [] |
|
|
|
|
|
for epoch in range(epochs): |
|
|
|
|
|
model.train() |
|
|
train_loss = 0.0 |
|
|
for batch_idx, (data, target) in enumerate(train_loader): |
|
|
data, target = data.to(device), target.to(device) |
|
|
|
|
|
optimizer.zero_grad() |
|
|
output = model(data) |
|
|
loss = criterion(output, target) |
|
|
loss.backward() |
|
|
optimizer.step() |
|
|
|
|
|
train_loss += loss.item() |
|
|
|
|
|
|
|
|
model.eval() |
|
|
val_loss = 0.0 |
|
|
with torch.no_grad(): |
|
|
for data, target in val_loader: |
|
|
data, target = data.to(device), target.to(device) |
|
|
output = model(data) |
|
|
val_loss += criterion(output, target).item() |
|
|
|
|
|
avg_train_loss = train_loss / len(train_loader) |
|
|
avg_val_loss = val_loss / len(val_loader) |
|
|
|
|
|
train_losses.append(avg_train_loss) |
|
|
val_losses.append(avg_val_loss) |
|
|
|
|
|
scheduler.step(avg_val_loss) |
|
|
|
|
|
if (epoch + 1) % 20 == 0: |
|
|
print(f'Epoch [{epoch+1}/{epochs}], Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}') |
|
|
|
|
|
return train_losses, val_losses |
|
|
|
|
|
|
|
|
def evaluate_model(model, test_loader, target_encoder=None): |
|
|
""" |
|
|
Evaluate the trained model |
|
|
""" |
|
|
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
|
|
model.eval() |
|
|
|
|
|
all_predictions = [] |
|
|
all_targets = [] |
|
|
|
|
|
with torch.no_grad(): |
|
|
for data, target in test_loader: |
|
|
data, target = data.to(device), target.to(device) |
|
|
output = model(data) |
|
|
predictions = torch.argmax(output, dim=1) |
|
|
|
|
|
all_predictions.extend(predictions.cpu().numpy()) |
|
|
all_targets.extend(target.cpu().numpy()) |
|
|
|
|
|
|
|
|
if target_encoder: |
|
|
all_predictions = target_encoder.inverse_transform(all_predictions) |
|
|
all_targets = target_encoder.inverse_transform(all_targets) |
|
|
|
|
|
accuracy = accuracy_score(all_targets, all_predictions) |
|
|
report = classification_report(all_targets, all_predictions) |
|
|
|
|
|
return accuracy, report, all_predictions, all_targets |
|
|
|
|
|
|
|
|
def plot_training_history(train_losses, val_losses): |
|
|
""" |
|
|
Plot training and validation losses |
|
|
""" |
|
|
plt.figure(figsize=(12, 5)) |
|
|
|
|
|
plt.subplot(1, 2, 1) |
|
|
plt.plot(train_losses, label='Training Loss', color='blue') |
|
|
plt.plot(val_losses, label='Validation Loss', color='red') |
|
|
plt.xlabel('Epoch') |
|
|
plt.ylabel('Loss') |
|
|
plt.title('Training and Validation Loss') |
|
|
plt.legend() |
|
|
plt.grid(True) |
|
|
|
|
|
plt.subplot(1, 2, 2) |
|
|
plt.plot(train_losses, label='Training Loss', color='blue') |
|
|
plt.plot(val_losses, label='Validation Loss', color='red') |
|
|
plt.xlabel('Epoch') |
|
|
plt.ylabel('Loss (Log Scale)') |
|
|
plt.title('Training and Validation Loss (Log Scale)') |
|
|
plt.yscale('log') |
|
|
plt.legend() |
|
|
plt.grid(True) |
|
|
|
|
|
plt.tight_layout() |
|
|
plt.show() |
|
|
|
|
|
|
|
|
def plot_confusion_matrix(y_true, y_pred, labels=None): |
|
|
""" |
|
|
Plot confusion matrix |
|
|
""" |
|
|
cm = confusion_matrix(y_true, y_pred) |
|
|
plt.figure(figsize=(8, 6)) |
|
|
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', |
|
|
xticklabels=labels, yticklabels=labels) |
|
|
plt.xlabel('Predicted') |
|
|
plt.ylabel('Actual') |
|
|
plt.title('Confusion Matrix') |
|
|
plt.show() |
|
|
|
|
|
|
|
|
def save_model(model, filepath, scaler, label_encoders, target_encoder=None): |
|
|
""" |
|
|
Save the trained model and preprocessing objects |
|
|
""" |
|
|
torch.save({ |
|
|
'model_state_dict': model.state_dict(), |
|
|
'scaler': scaler, |
|
|
'label_encoders': label_encoders, |
|
|
'target_encoder': target_encoder |
|
|
}, filepath) |
|
|
print(f"Model saved to {filepath}") |
|
|
|
|
|
|
|
|
def load_model(filepath, input_size, hidden_sizes, output_size, dropout_rate=0.2): |
|
|
""" |
|
|
Load the trained model and preprocessing objects |
|
|
""" |
|
|
checkpoint = torch.load(filepath) |
|
|
|
|
|
model = TabularModel(input_size, hidden_sizes, output_size, dropout_rate) |
|
|
model.load_state_dict(checkpoint['model_state_dict']) |
|
|
|
|
|
return model, checkpoint['scaler'], checkpoint['label_encoders'], checkpoint['target_encoder'] |
|
|
|
|
|
|
|
|
def main(): |
|
|
|
|
|
|
|
|
|
|
|
df = pd.read_csv("Electric_Vehicle_Population.csv") |
|
|
|
|
|
|
|
|
print(f"Original dataset shape: {df.shape}") |
|
|
print(f"Columns: {list(df.columns)}") |
|
|
|
|
|
|
|
|
|
|
|
df = df.dropna(subset=['Make', 'Model', 'Electric Vehicle Type', 'Model Year']) |
|
|
|
|
|
|
|
|
|
|
|
df_clean = df.copy() |
|
|
|
|
|
|
|
|
df_clean['Model Year'] = pd.to_numeric(df_clean['Model Year'], errors='coerce') |
|
|
df_clean['Electric Range'] = pd.to_numeric(df_clean['Electric Range'], errors='coerce') |
|
|
df_clean['Base MSRP'] = pd.to_numeric(df_clean['Base MSRP'], errors='coerce') |
|
|
df_clean['Legislative District'] = pd.to_numeric(df_clean['Legislative District'], errors='coerce') |
|
|
|
|
|
|
|
|
df_clean['Electric Range'] = df_clean['Electric Range'].fillna(df_clean['Electric Range'].median()) |
|
|
df_clean['Base MSRP'] = df_clean['Base MSRP'].fillna(df_clean['Base MSRP'].median()) |
|
|
df_clean['Legislative District'] = df_clean['Legislative District'].fillna(0) |
|
|
|
|
|
|
|
|
df_clean['target'] = (df_clean['Electric Vehicle Type'] == 'Battery Electric Vehicle (BEV)').astype(int) |
|
|
|
|
|
|
|
|
feature_columns = [ |
|
|
'Model Year', 'Make', 'Model', 'Electric Range', 'Base MSRP', |
|
|
'Legislative District', 'County', 'State', 'Clean Alternative Fuel Vehicle (CAFV) Eligibility' |
|
|
] |
|
|
|
|
|
|
|
|
df_final = df_clean[feature_columns + ['target']].copy() |
|
|
|
|
|
|
|
|
df_final.columns = [ |
|
|
'model_year', 'make', 'model', 'electric_range', 'base_msrp', |
|
|
'legislative_district', 'county', 'state', 'cafv_eligibility', 'target' |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
top_makes = df_final['make'].value_counts().head(10).index |
|
|
df_final['make'] = df_final['make'].apply(lambda x: x if x in top_makes else 'OTHER') |
|
|
|
|
|
top_models = df_final['model'].value_counts().head(15).index |
|
|
df_final['model'] = df_final['model'].apply(lambda x: x if x in top_models else 'OTHER') |
|
|
|
|
|
top_counties = df_final['county'].value_counts().head(20).index |
|
|
df_final['county'] = df_final['county'].apply(lambda x: x if x in top_counties else 'OTHER') |
|
|
|
|
|
|
|
|
df_final = df_final.dropna() |
|
|
|
|
|
df = df_final |
|
|
print(f"Processed dataset shape: {df.shape}") |
|
|
print(f"Target distribution:") |
|
|
print(f"BEV (1): {(df['target'] == 1).sum()}") |
|
|
print(f"PHEV (0): {(df['target'] == 0).sum()}") |
|
|
|
|
|
|
|
|
target_column = 'target' |
|
|
|
|
|
|
|
|
X_train, X_test, y_train, y_test, scaler, label_encoders, target_encoder = preprocess_data( |
|
|
df, target_column |
|
|
) |
|
|
|
|
|
|
|
|
X_train_tensor = torch.FloatTensor(X_train) |
|
|
y_train_tensor = torch.LongTensor(y_train) |
|
|
X_test_tensor = torch.FloatTensor(X_test) |
|
|
y_test_tensor = torch.LongTensor(y_test) |
|
|
|
|
|
|
|
|
X_train_split, X_val_split, y_train_split, y_val_split = train_test_split( |
|
|
X_train_tensor, y_train_tensor, test_size=0.2, random_state=42, stratify=y_train_tensor |
|
|
) |
|
|
|
|
|
|
|
|
batch_size = 64 |
|
|
train_dataset = TensorDataset(X_train_split, y_train_split) |
|
|
val_dataset = TensorDataset(X_val_split, y_val_split) |
|
|
test_dataset = TensorDataset(X_test_tensor, y_test_tensor) |
|
|
|
|
|
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) |
|
|
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False) |
|
|
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False) |
|
|
|
|
|
|
|
|
input_size = X_train.shape[1] |
|
|
hidden_sizes = [128, 64, 32] |
|
|
output_size = len(np.unique(y_train)) |
|
|
|
|
|
|
|
|
model = TabularModel( |
|
|
input_size=input_size, |
|
|
hidden_sizes=hidden_sizes, |
|
|
output_size=output_size, |
|
|
dropout_rate=0.3 |
|
|
) |
|
|
|
|
|
print(f"\nModel architecture:") |
|
|
print(f"Input size: {input_size}") |
|
|
print(f"Hidden layers: {hidden_sizes}") |
|
|
print(f"Output size: {output_size}") |
|
|
print(f"Total parameters: {sum(p.numel() for p in model.parameters())}") |
|
|
|
|
|
|
|
|
print("\nStarting training...") |
|
|
epochs = 100 |
|
|
learning_rate = 0.001 |
|
|
|
|
|
train_losses, val_losses = train_model( |
|
|
model, train_loader, val_loader, epochs=epochs, lr=learning_rate |
|
|
) |
|
|
|
|
|
|
|
|
plot_training_history(train_losses, val_losses) |
|
|
|
|
|
|
|
|
print("\nEvaluating model on test set...") |
|
|
accuracy, report, predictions, targets = evaluate_model(model, test_loader, target_encoder) |
|
|
|
|
|
print(f"Test Accuracy: {accuracy:.4f}") |
|
|
print("\nClassification Report:") |
|
|
print(report) |
|
|
|
|
|
|
|
|
labels = ['PHEV', 'BEV'] if target_encoder is None else None |
|
|
plot_confusion_matrix(targets, predictions, labels) |
|
|
|
|
|
|
|
|
model_filepath = 'ev_classifier_model.pth' |
|
|
save_model(model, model_filepath, scaler, label_encoders, target_encoder) |
|
|
|
|
|
print(f"\nTraining completed successfully!") |
|
|
print(f"Final test accuracy: {accuracy:.4f}") |
|
|
|
|
|
return model, scaler, label_encoders, target_encoder |
|
|
|
|
|
|
|
|
def predict_new_data(model, new_data, scaler, label_encoders, target_encoder=None): |
|
|
""" |
|
|
Make predictions on new data |
|
|
""" |
|
|
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
|
|
model.to(device) |
|
|
model.eval() |
|
|
|
|
|
|
|
|
new_data_processed = new_data.copy() |
|
|
|
|
|
|
|
|
for col, encoder in label_encoders.items(): |
|
|
if col in new_data_processed.columns: |
|
|
|
|
|
new_data_processed[col] = new_data_processed[col].apply( |
|
|
lambda x: x if x in encoder.classes_ else 'OTHER' |
|
|
) |
|
|
new_data_processed[col] = encoder.transform(new_data_processed[col].astype(str)) |
|
|
|
|
|
|
|
|
numerical_columns = new_data_processed.select_dtypes(include=['int64', 'float64']).columns |
|
|
new_data_processed[numerical_columns] = scaler.transform(new_data_processed[numerical_columns]) |
|
|
|
|
|
|
|
|
X_new = torch.FloatTensor(new_data_processed.values) |
|
|
X_new = X_new.to(device) |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = model(X_new) |
|
|
probabilities = torch.softmax(outputs, dim=1) |
|
|
predictions = torch.argmax(outputs, dim=1) |
|
|
|
|
|
|
|
|
if target_encoder: |
|
|
predictions = target_encoder.inverse_transform(predictions.cpu().numpy()) |
|
|
else: |
|
|
predictions = predictions.cpu().numpy() |
|
|
|
|
|
return predictions, probabilities.cpu().numpy() |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
model, scaler, label_encoders, target_encoder = main() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|