fbmc-chronos2 / src /feature_engineering /engineer_entsoe_features.py
Evgueni Poloukarov
feat: fix future covariate architecture (615 features: temporal, weather, 176 CNEC outages)
af88e60
raw
history blame
34 kB
"""Engineer ~486 ENTSO-E features for FBMC forecasting.
Transforms 8 ENTSO-E datasets into model-ready features:
1. Generation by PSR type (~228 features):
- Individual PSR types (8 types × 12 zones × 2 = 192 features with lags)
- Aggregates (total + shares = 36 features)
2. Demand/Load (24 features)
3. Day-ahead prices (24 features)
4. Hydro reservoir storage (12 features)
5. Pumped storage (10 features)
6. Load forecasts (12 features)
7. Transmission outages (176 features - ALL CNECs with EIC mapping)
Total: ~486 features (generation outages not available in raw data)
Author: Claude
Date: 2025-11-10
"""
from pathlib import Path
from typing import Tuple, List
import polars as pl
import numpy as np
# =========================================================================
# Feature Category 1: Generation by PSR Type
# =========================================================================
def engineer_generation_features(generation_df: pl.DataFrame) -> pl.DataFrame:
"""Engineer ~228 generation features from PSR type data.
Features per zone:
- Individual PSR type generation (8 types × 2 = value + lag): 192 features
- Total generation (sum across PSR types): 12 features
- Renewable/Thermal shares: 24 features
PSR Types:
- B04: Fossil Gas
- B05: Fossil Hard coal
- B06: Fossil Oil
- B11: Hydro Run-of-river
- B12: Hydro Reservoir
- B14: Nuclear
- B16: Solar
- B19: Wind Onshore
Args:
generation_df: Generation by PSR type (12 zones × 8 PSR types)
Returns:
DataFrame with generation features, indexed by timestamp
"""
print("\n[1/8] Engineering generation features...")
# PSR type name mapping for clean feature names
psr_name_map = {
'Fossil Gas': 'fossil_gas',
'Fossil Hard coal': 'fossil_coal',
'Fossil Oil': 'fossil_oil',
'Hydro Run-of-river and poundage': 'hydro_ror',
'Hydro Water Reservoir': 'hydro_reservoir',
'Nuclear': 'nuclear',
'Solar': 'solar',
'Wind Onshore': 'wind_onshore'
}
# Create individual PSR type features (8 PSR types × 12 zones = 96 base features)
psr_features_list = []
for psr_name, psr_clean in psr_name_map.items():
# Filter data for this PSR type
psr_data = generation_df.filter(pl.col('psr_name') == psr_name)
if len(psr_data) > 0:
# Pivot to wide format (one column per zone)
psr_wide = psr_data.pivot(
values='generation_mw',
index='timestamp',
on='zone',
aggregate_function='sum'
)
# Rename columns with PSR type prefix
psr_cols = [c for c in psr_wide.columns if c != 'timestamp']
psr_wide = psr_wide.rename({c: f'gen_{psr_clean}_{c}' for c in psr_cols})
# Add lag features (t-1) for this PSR type
lag_features = {}
for col in psr_wide.columns:
if col.startswith('gen_'):
lag_features[f'{col}_lag1'] = pl.col(col).shift(1)
psr_wide = psr_wide.with_columns(**lag_features)
psr_features_list.append(psr_wide)
# Aggregate features: Total generation per zone
zone_total = generation_df.group_by(['timestamp', 'zone']).agg([
pl.col('generation_mw').sum().alias('total_gen')
])
total_gen_wide = zone_total.pivot(
values='total_gen',
index='timestamp',
on='zone',
aggregate_function='first'
).rename({c: f'gen_total_{c}' for c in zone_total['zone'].unique() if c != 'timestamp'})
# Aggregate features: Renewable and thermal shares
zone_shares = generation_df.group_by(['timestamp', 'zone']).agg([
pl.col('generation_mw').sum().alias('total_gen'),
pl.col('generation_mw').filter(
pl.col('psr_name').is_in(['Wind Onshore', 'Solar', 'Hydro Run-of-river and poundage', 'Hydro Water Reservoir'])
).sum().alias('renewable_gen'),
pl.col('generation_mw').filter(
pl.col('psr_name').is_in(['Fossil Gas', 'Fossil Hard coal', 'Fossil Oil', 'Nuclear'])
).sum().alias('thermal_gen')
])
zone_shares = zone_shares.with_columns([
(pl.col('renewable_gen') / pl.col('total_gen').clip(lower_bound=1)).round(4).alias('renewable_share'),
(pl.col('thermal_gen') / pl.col('total_gen').clip(lower_bound=1)).round(4).alias('thermal_share')
])
renewable_share_wide = zone_shares.pivot(
values='renewable_share',
index='timestamp',
on='zone',
aggregate_function='first'
).rename({c: f'gen_renewable_share_{c}' for c in zone_shares['zone'].unique() if c != 'timestamp'})
thermal_share_wide = zone_shares.pivot(
values='thermal_share',
index='timestamp',
on='zone',
aggregate_function='first'
).rename({c: f'gen_thermal_share_{c}' for c in zone_shares['zone'].unique() if c != 'timestamp'})
# Merge all generation features
features = total_gen_wide
features = features.join(renewable_share_wide, on='timestamp', how='left')
features = features.join(thermal_share_wide, on='timestamp', how='left')
for psr_features in psr_features_list:
features = features.join(psr_features, on='timestamp', how='left')
print(f" Created {len(features.columns) - 1} generation features")
print(f" - Individual PSR types: {sum([len(pf.columns) - 1 for pf in psr_features_list])} features (8 types x 12 zones x 2)")
print(f" - Aggregates: {len(total_gen_wide.columns) + len(renewable_share_wide.columns) + len(thermal_share_wide.columns) - 3} features")
return features
# =========================================================================
# Feature Category 2: Demand/Load
# =========================================================================
def engineer_demand_features(demand_df: pl.DataFrame) -> pl.DataFrame:
"""Engineer ~24 demand features.
Features per zone:
- Actual demand
- Demand lag (t-1)
Args:
demand_df: Actual demand data (12 zones)
Returns:
DataFrame with demand features, indexed by timestamp
"""
print("\n[2/8] Engineering demand features...")
# FIX: Resample to hourly (some zones have 15-min data for 2025)
demand_df = demand_df.with_columns([
pl.col('timestamp').dt.truncate('1h').alias('timestamp')
])
# Aggregate by hour (mean of sub-hourly values)
demand_df = demand_df.group_by(['timestamp', 'zone']).agg([
pl.col('load_mw').mean().alias('load_mw')
])
# Pivot demand to wide format
demand_wide = demand_df.pivot(
values='load_mw',
index='timestamp',
on='zone',
aggregate_function='first'
)
# Rename to demand_<zone>
demand_cols = [c for c in demand_wide.columns if c != 'timestamp']
demand_wide = demand_wide.rename({c: f'demand_{c}' for c in demand_cols})
# Add lag features (t-1)
lag_features = {}
for col in demand_wide.columns:
if col.startswith('demand_'):
lag_features[f'{col}_lag1'] = pl.col(col).shift(1)
features = demand_wide.with_columns(**lag_features)
print(f" Created {len(features.columns) - 1} demand features")
return features
# =========================================================================
# Feature Category 3: Day-Ahead Prices
# =========================================================================
def engineer_price_features(prices_df: pl.DataFrame) -> pl.DataFrame:
"""Engineer ~24 price features.
Features per zone:
- Day-ahead price
- Price lag (t-1)
Args:
prices_df: Day-ahead prices (12 zones)
Returns:
DataFrame with price features, indexed by timestamp
"""
print("\n[3/8] Engineering price features...")
# Pivot prices to wide format
price_wide = prices_df.pivot(
values='price_eur_mwh',
index='timestamp',
on='zone',
aggregate_function='first'
)
# Rename to price_<zone>
price_cols = [c for c in price_wide.columns if c != 'timestamp']
price_wide = price_wide.rename({c: f'price_{c}' for c in price_cols})
# Add lag features (t-1)
lag_features = {}
for col in price_wide.columns:
if col.startswith('price_'):
lag_features[f'{col}_lag1'] = pl.col(col).shift(1)
features = price_wide.with_columns(**lag_features)
print(f" Created {len(features.columns) - 1} price features")
return features
# =========================================================================
# Feature Category 4: Hydro Reservoir Storage
# =========================================================================
def engineer_hydro_storage_features(hydro_df: pl.DataFrame) -> pl.DataFrame:
"""Engineer ~12 hydro storage features (weekly → hourly interpolation).
Features per zone with data (6 zones):
- Hydro storage level (interpolated to hourly)
- Storage change (week-over-week)
Args:
hydro_df: Weekly hydro storage data (6 zones)
Returns:
DataFrame with hydro storage features, indexed by timestamp
"""
print("\n[4/8] Engineering hydro storage features...")
# Pivot to wide format
hydro_wide = hydro_df.pivot(
values='storage_mwh',
index='timestamp',
on='zone',
aggregate_function='first'
)
# Rename to hydro_storage_<zone>
hydro_cols = [c for c in hydro_wide.columns if c != 'timestamp']
hydro_wide = hydro_wide.rename({c: f'hydro_storage_{c}' for c in hydro_cols})
# Create hourly date range (Oct 2023 - Sept 2025)
hourly_range = pl.DataFrame({
'timestamp': pl.datetime_range(
start=hydro_wide['timestamp'].min(),
end=hydro_wide['timestamp'].max(),
interval='1h',
eager=True
)
})
# Cast timestamp to match precision (datetime[ns])
hydro_wide = hydro_wide.with_columns(
pl.col('timestamp').cast(pl.Datetime('us'))
)
# Join and interpolate (forward fill for weekly → hourly)
features = hourly_range.join(hydro_wide, on='timestamp', how='left')
# Forward fill missing values (weekly data → hourly)
for col in features.columns:
if col.startswith('hydro_storage_'):
features = features.with_columns(
pl.col(col).forward_fill().alias(col)
)
# Add week-over-week change (168 hours = 1 week)
change_features = {}
for col in features.columns:
if col.startswith('hydro_storage_'):
change_features[f'{col}_change_w'] = pl.col(col) - pl.col(col).shift(168)
features = features.with_columns(**change_features)
print(f" Created {len(features.columns) - 1} hydro storage features")
return features
# =========================================================================
# Feature Category 5: Pumped Storage
# =========================================================================
def engineer_pumped_storage_features(pumped_df: pl.DataFrame) -> pl.DataFrame:
"""Engineer ~10 pumped storage features.
Features per zone with data (5 zones):
- Pumped storage generation
- Generation lag (t-1)
Args:
pumped_df: Pumped storage generation (5 zones)
Returns:
DataFrame with pumped storage features, indexed by timestamp
"""
print("\n[5/8] Engineering pumped storage features...")
# Pivot to wide format
pumped_wide = pumped_df.pivot(
values='generation_mw',
index='timestamp',
on='zone',
aggregate_function='first'
)
# Rename to pumped_storage_<zone>
pumped_cols = [c for c in pumped_wide.columns if c != 'timestamp']
pumped_wide = pumped_wide.rename({c: f'pumped_storage_{c}' for c in pumped_cols})
# Add lag features (t-1)
lag_features = {}
for col in pumped_wide.columns:
if col.startswith('pumped_storage_'):
lag_features[f'{col}_lag1'] = pl.col(col).shift(1)
features = pumped_wide.with_columns(**lag_features)
print(f" Created {len(features.columns) - 1} pumped storage features")
return features
# =========================================================================
# Feature Category 6: Load Forecasts
# =========================================================================
def engineer_load_forecast_features(forecast_df: pl.DataFrame) -> pl.DataFrame:
"""Engineer ~24 load forecast features.
Features per zone:
- Load forecast
- Forecast error (forecast - actual, if available)
Args:
forecast_df: Load forecasts (12 zones)
Returns:
DataFrame with load forecast features, indexed by timestamp
"""
print("\n[6/8] Engineering load forecast features...")
# FIX: Resample to hourly (some zones have 15-min data for 2025)
forecast_df = forecast_df.with_columns([
pl.col('timestamp').dt.truncate('1h').alias('timestamp')
])
# Aggregate by hour (mean of sub-hourly values)
forecast_df = forecast_df.group_by(['timestamp', 'zone']).agg([
pl.col('forecast_mw').mean().alias('forecast_mw')
])
# Pivot to wide format
forecast_wide = forecast_df.pivot(
values='forecast_mw',
index='timestamp',
on='zone',
aggregate_function='first'
)
# Rename to load_forecast_<zone>
forecast_cols = [c for c in forecast_wide.columns if c != 'timestamp']
forecast_wide = forecast_wide.rename({c: f'load_forecast_{c}' for c in forecast_cols})
print(f" Created {len(forecast_wide.columns) - 1} load forecast features")
return forecast_wide
# =========================================================================
# Feature Category 7: Transmission Outages (ALL 176 CNECs)
# =========================================================================
def engineer_transmission_outage_features(
outages_df: pl.DataFrame,
cnec_master_df: pl.DataFrame,
hourly_range: pl.DataFrame
) -> pl.DataFrame:
"""Engineer 176 transmission outage features (ALL CNECs with EIC mapping).
Creates binary feature for each CNEC:
- 1 = Outage active on this CNEC at this timestamp
- 0 = No outage
Uses EIC codes from master CNEC list to map ENTSO-E outages to CNECs.
31 CNECs have historical outages, 145 are zero-filled (ready for future).
Args:
outages_df: ENTSO-E transmission outages with Asset_RegisteredResource.mRID (EIC)
cnec_master_df: Master CNEC list with cnec_eic column (176 rows)
hourly_range: Hourly timestamp range (Oct 2023 - Sept 2025)
Returns:
DataFrame with 176 transmission outage features, indexed by timestamp
"""
print("\n[7/8] Engineering transmission outage features (ALL 176 CNECs)...")
# Create EIC → CNEC mapping from master list
eic_to_cnec = dict(zip(cnec_master_df['cnec_eic'], cnec_master_df['cnec_name']))
all_cnec_eics = cnec_master_df['cnec_eic'].to_list()
print(f" Loaded {len(all_cnec_eics)} CNECs from master list")
# Process outages: expand start → end to hourly timestamps
if len(outages_df) == 0:
print(" WARNING: No transmission outages found in raw data")
# Create zero-filled features for all CNECs
features = hourly_range.clone()
for eic in all_cnec_eics:
features = features.with_columns(
pl.lit(0).alias(f'outage_cnec_{eic}')
)
print(f" Created {len(all_cnec_eics)} zero-filled outage features")
return features
# Parse outage periods (start_time → end_time)
outages_expanded = []
for row in outages_df.iter_rows(named=True):
eic = row.get('asset_eic', None)
start = row.get('start_time', None)
end = row.get('end_time', None)
if not eic or not start or not end:
continue
# Only process if EIC is in master CNEC list
if eic not in all_cnec_eics:
continue
# Create hourly range for this outage
outage_hours = pl.datetime_range(
start=start,
end=end,
interval='1h',
eager=True
)
for hour in outage_hours:
outages_expanded.append({
'timestamp': hour,
'cnec_eic': eic,
'outage_active': 1
})
print(f" Expanded {len(outages_expanded)} hourly outage events")
if len(outages_expanded) == 0:
# No outages matched to CNECs - create zero-filled features
features = hourly_range.clone()
for eic in all_cnec_eics:
features = features.with_columns(
pl.lit(0).alias(f'outage_cnec_{eic}')
)
print(f" Created {len(all_cnec_eics)} zero-filled outage features (no matches)")
return features
# Convert to Polars DataFrame
outages_hourly = pl.DataFrame(outages_expanded)
# Remove timezone from timestamp to match hourly_range
outages_hourly = outages_hourly.with_columns(
pl.col('timestamp').dt.replace_time_zone(None)
)
# Pivot to wide format (one column per CNEC)
outages_wide = outages_hourly.pivot(
values='outage_active',
index='timestamp',
on='cnec_eic',
aggregate_function='max' # If multiple outages, max = 1
)
# Rename columns
outage_cols = [c for c in outages_wide.columns if c != 'timestamp']
outages_wide = outages_wide.rename({c: f'outage_cnec_{c}' for c in outage_cols})
# Join with hourly range to ensure complete timestamp coverage
features = hourly_range.join(outages_wide, on='timestamp', how='left')
# Fill nulls with 0 (no outage)
for col in features.columns:
if col.startswith('outage_cnec_'):
features = features.with_columns(
pl.col(col).fill_null(0).alias(col)
)
# Add zero-filled features for CNECs with no historical outages
existing_cnecs = [c.replace('outage_cnec_', '') for c in features.columns if c.startswith('outage_cnec_')]
missing_cnecs = [eic for eic in all_cnec_eics if eic not in existing_cnecs]
for eic in missing_cnecs:
features = features.with_columns(
pl.lit(0).alias(f'outage_cnec_{eic}')
)
cnecs_with_data = len(existing_cnecs)
cnecs_zero_filled = len(missing_cnecs)
print(f" Created {len(features.columns) - 1} transmission outage features:")
print(f" - {cnecs_with_data} CNECs with historical outages")
print(f" - {cnecs_zero_filled} CNECs zero-filled (ready for future)")
return features
# =========================================================================
# Feature Category 8: Generation Outages
# =========================================================================
def engineer_generation_outage_features(gen_outages_df: pl.DataFrame, hourly_range: pl.DataFrame) -> pl.DataFrame:
"""Engineer ~45 generation outage features (nuclear, coal, lignite by zone).
Features per zone and PSR type:
- Nuclear capacity offline (MW)
- Coal capacity offline (MW)
- Lignite capacity offline (MW)
- Total outage count
- Binary outage indicator
Args:
gen_outages_df: Generation unavailability data with columns:
['timestamp', 'zone', 'psr_type', 'capacity_mw', 'unit_name']
hourly_range: Hourly timestamps for alignment
Returns:
DataFrame with generation outage features, indexed by timestamp
"""
print("\n[8/8] Engineering generation outage features...")
if len(gen_outages_df) == 0:
print(" WARNING: No generation outages found - creating zero-filled features")
# Create zero-filled features for all zones
features = hourly_range.select('timestamp')
zones = ['FR', 'BE', 'CZ', 'HU', 'RO', 'SI', 'SK', 'DE_LU', 'PL']
for zone in zones:
features = features.with_columns([
pl.lit(0.0).alias(f'gen_outage_nuclear_mw_{zone}'),
pl.lit(0.0).alias(f'gen_outage_coal_mw_{zone}'),
pl.lit(0.0).alias(f'gen_outage_lignite_mw_{zone}'),
pl.lit(0).alias(f'gen_outage_count_{zone}'),
pl.lit(0).alias(f'gen_outage_active_{zone}')
])
print(f" Created {len(features.columns) - 1} zero-filled generation outage features")
return features
# Expand outages to hourly granularity (outages span multiple hours)
print(" Expanding outages to hourly timestamps...")
# Create hourly records for each outage period
hourly_outages = []
for row in gen_outages_df.iter_rows(named=True):
start = row['start_time']
end = row['end_time']
# Generate hourly timestamps for outage period
outage_hours = pl.datetime_range(
start=start,
end=end,
interval='1h',
eager=True
).to_frame('timestamp')
# Add outage metadata
outage_hours = outage_hours.with_columns([
pl.lit(row['zone']).alias('zone'),
pl.lit(row['psr_type']).alias('psr_type'),
pl.lit(row['capacity_mw']).alias('capacity_mw'),
pl.lit(row['unit_name']).alias('unit_name')
])
hourly_outages.append(outage_hours)
# Combine all hourly outages
hourly_outages_df = pl.concat(hourly_outages)
print(f" Expanded to {len(hourly_outages_df):,} hourly outage records")
# Map PSR types to clean names
psr_map = {
'B14': 'nuclear',
'B05': 'coal',
'B02': 'lignite'
}
hourly_outages_df = hourly_outages_df.with_columns(
pl.col('psr_type').replace(psr_map).alias('psr_clean')
)
# Create features for each PSR type
all_features = [hourly_range.select('timestamp')]
for psr_code, psr_name in psr_map.items():
psr_outages = hourly_outages_df.filter(pl.col('psr_type') == psr_code)
if len(psr_outages) > 0:
# Aggregate capacity by zone and timestamp
psr_agg = psr_outages.group_by(['timestamp', 'zone']).agg(
pl.col('capacity_mw').sum().alias('capacity_mw')
)
# Pivot to wide format
psr_wide = psr_agg.pivot(
values='capacity_mw',
index='timestamp',
on='zone'
)
# Rename columns
rename_map = {
col: f'gen_outage_{psr_name}_mw_{col}'
for col in psr_wide.columns if col != 'timestamp'
}
psr_wide = psr_wide.rename(rename_map)
all_features.append(psr_wide)
# Create aggregate count and binary indicator features
total_agg = hourly_outages_df.group_by(['timestamp', 'zone']).agg([
pl.col('unit_name').n_unique().alias('outage_count'),
pl.lit(1).alias('outage_active')
])
# Pivot count
count_wide = total_agg.pivot(
values='outage_count',
index='timestamp',
on='zone'
).rename({
col: f'gen_outage_count_{col}'
for col in total_agg['zone'].unique() if col != 'timestamp'
})
# Pivot binary indicator
active_wide = total_agg.pivot(
values='outage_active',
index='timestamp',
on='zone'
).rename({
col: f'gen_outage_active_{col}'
for col in total_agg['zone'].unique() if col != 'timestamp'
})
all_features.extend([count_wide, active_wide])
# Join all features
features = all_features[0]
for feat_df in all_features[1:]:
features = features.join(feat_df, on='timestamp', how='left')
# Fill nulls with zeros (no outage)
feature_cols = [col for col in features.columns if col != 'timestamp']
features = features.with_columns([
pl.col(col).fill_null(0) for col in feature_cols
])
print(f" Created {len(features.columns) - 1} generation outage features")
print(f" - Nuclear: capacity offline per zone")
print(f" - Coal: capacity offline per zone")
print(f" - Lignite: capacity offline per zone")
print(f" - Count: number of units offline per zone")
print(f" - Active: binary indicator per zone")
return features
# =========================================================================
# Main Pipeline
# =========================================================================
def engineer_all_entsoe_features(
data_dir: Path = Path("data/raw"),
output_path: Path = Path("data/processed/features_entsoe_24month.parquet")
) -> pl.DataFrame:
"""Engineer all ENTSO-E features from 8 raw datasets.
Args:
data_dir: Directory containing raw ENTSO-E data
output_path: Path to save engineered features
Returns:
DataFrame with ~324-424 ENTSO-E features, indexed by timestamp
"""
print("\n" + "="*70)
print("ENTSO-E FEATURE ENGINEERING PIPELINE")
print("="*70)
# Load master CNEC list (for transmission outage mapping)
cnec_master = pl.read_csv("data/processed/cnecs_master_176.csv")
print(f"\nLoaded master CNEC list: {len(cnec_master)} CNECs")
# Create hourly timestamp range (Oct 2023 - Sept 2025)
hourly_range = pl.DataFrame({
'timestamp': pl.datetime_range(
start=pl.datetime(2023, 10, 1, 0, 0),
end=pl.datetime(2025, 9, 30, 23, 0),
interval='1h',
eager=True
)
})
print(f"Created hourly range: {len(hourly_range)} timestamps")
# Load raw ENTSO-E datasets
generation_df = pl.read_parquet(data_dir / "entsoe_generation_by_psr_24month.parquet")
demand_df = pl.read_parquet(data_dir / "entsoe_demand_24month.parquet")
prices_df = pl.read_parquet(data_dir / "entsoe_prices_24month.parquet")
hydro_df = pl.read_parquet(data_dir / "entsoe_hydro_storage_24month.parquet")
pumped_df = pl.read_parquet(data_dir / "entsoe_pumped_storage_24month.parquet")
forecast_df = pl.read_parquet(data_dir / "entsoe_load_forecast_24month.parquet")
transmission_outages_df = pl.read_parquet(data_dir / "entsoe_transmission_outages_24month.parquet")
# Check for generation outages file
gen_outages_path = data_dir / "entsoe_generation_outages_24month.parquet"
if gen_outages_path.exists():
gen_outages_df = pl.read_parquet(gen_outages_path)
else:
print("\nWARNING: Generation outages file not found, skipping...")
gen_outages_df = pl.DataFrame()
print(f"\nLoaded 8 ENTSO-E datasets:")
print(f" - Generation: {len(generation_df):,} rows")
print(f" - Demand: {len(demand_df):,} rows")
print(f" - Prices: {len(prices_df):,} rows")
print(f" - Hydro storage: {len(hydro_df):,} rows")
print(f" - Pumped storage: {len(pumped_df):,} rows")
print(f" - Load forecast: {len(forecast_df):,} rows")
print(f" - Transmission outages: {len(transmission_outages_df):,} rows")
print(f" - Generation outages: {len(gen_outages_df):,} rows")
# Engineer features for each category
gen_features = engineer_generation_features(generation_df)
demand_features = engineer_demand_features(demand_df)
price_features = engineer_price_features(prices_df)
hydro_features = engineer_hydro_storage_features(hydro_df)
pumped_features = engineer_pumped_storage_features(pumped_df)
forecast_features = engineer_load_forecast_features(forecast_df)
transmission_outage_features = engineer_transmission_outage_features(
transmission_outages_df, cnec_master, hourly_range
)
if len(gen_outages_df) > 0:
gen_outage_features = engineer_generation_outage_features(gen_outages_df, hourly_range)
else:
gen_outage_features = engineer_generation_outage_features(pl.DataFrame(), hourly_range)
# Merge all features on timestamp
print("\n" + "="*70)
print("MERGING ALL FEATURES")
print("="*70)
features = hourly_range.clone()
# Standardize timestamps (remove timezone, cast to datetime[μs])
def standardize_timestamp(df):
"""Remove timezone and cast timestamp to datetime[μs]."""
if 'timestamp' in df.columns:
# Check if timestamp has timezone
if hasattr(df['timestamp'].dtype, 'time_zone') and df['timestamp'].dtype.time_zone is not None:
df = df.with_columns(pl.col('timestamp').dt.replace_time_zone(None))
# Cast to datetime[μs]
df = df.with_columns(pl.col('timestamp').cast(pl.Datetime('us')))
return df
gen_features = standardize_timestamp(gen_features)
demand_features = standardize_timestamp(demand_features)
price_features = standardize_timestamp(price_features)
hydro_features = standardize_timestamp(hydro_features)
pumped_features = standardize_timestamp(pumped_features)
forecast_features = standardize_timestamp(forecast_features)
transmission_outage_features = standardize_timestamp(transmission_outage_features)
# Join each feature set
features = features.join(gen_features, on='timestamp', how='left')
features = features.join(demand_features, on='timestamp', how='left')
features = features.join(price_features, on='timestamp', how='left')
features = features.join(hydro_features, on='timestamp', how='left')
features = features.join(pumped_features, on='timestamp', how='left')
features = features.join(forecast_features, on='timestamp', how='left')
features = features.join(transmission_outage_features, on='timestamp', how='left')
features = features.join(gen_outage_features, on='timestamp', how='left')
print(f"\nFinal feature matrix shape: {features.shape}")
print(f" - Timestamps: {len(features):,}")
print(f" - Features: {len(features.columns) - 1:,}")
# Check for nulls
null_counts = features.null_count()
total_nulls = sum([null_counts[col][0] for col in features.columns if col != 'timestamp'])
null_pct = (total_nulls / (len(features) * (len(features.columns) - 1))) * 100
print(f"\nData quality:")
print(f" - Total nulls: {total_nulls:,} ({null_pct:.2f}%)")
print(f" - Completeness: {100 - null_pct:.2f}%")
# Clean up redundant features
print("\n" + "="*70)
print("CLEANING REDUNDANT FEATURES")
print("="*70)
features_before = len(features.columns) - 1
# Remove 100% null features
null_pcts = {}
for col in features.columns:
if col != 'timestamp':
null_count = features[col].null_count()
null_pct_col = (null_count / len(features)) * 100
if null_pct_col >= 100.0:
null_pcts[col] = null_pct_col
if null_pcts:
print(f"\nRemoving {len(null_pcts)} features with 100% nulls:")
for col in null_pcts:
print(f" - {col}")
features = features.drop(list(null_pcts.keys()))
# Remove zero-variance features (constants)
# EXCEPT transmission outage features - keep them even if zero-filled for future inference
zero_var_cols = []
for col in features.columns:
if col != 'timestamp':
# Skip transmission outage features (needed for future inference)
if col.startswith('outage_cnec_'):
continue
# Check if all values are the same (excluding nulls)
non_null = features[col].drop_nulls()
if len(non_null) > 0 and non_null.n_unique() == 1:
zero_var_cols.append(col)
if zero_var_cols:
print(f"\nRemoving {len(zero_var_cols)} zero-variance features (keeping transmission outages)")
features = features.drop(zero_var_cols)
# Remove duplicate columns
# EXCEPT transmission outage features - keep all CNECs even if identical (all zeros)
dup_groups = {}
cols_to_check = [c for c in features.columns if c != 'timestamp']
for i, col1 in enumerate(cols_to_check):
if col1 in dup_groups.values(): # Already marked as duplicate
continue
# Skip transmission outage features (each CNEC needs its own column for inference)
if col1.startswith('outage_cnec_'):
continue
for col2 in cols_to_check[i+1:]:
if col2 in dup_groups.values(): # Already marked as duplicate
continue
# Skip transmission outage features
if col2.startswith('outage_cnec_'):
continue
# Check if columns are identical
if features[col1].equals(features[col2]):
dup_groups[col2] = col1 # col2 is duplicate of col1
if dup_groups:
print(f"\nRemoving {len(dup_groups)} duplicate columns (keeping transmission outages)")
features = features.drop(list(dup_groups.keys()))
features_after = len(features.columns) - 1
features_removed = features_before - features_after
print(f"\n[CLEANUP SUMMARY]")
print(f" Features before: {features_before}")
print(f" Features after: {features_after}")
print(f" Features removed: {features_removed} ({features_removed/features_before*100:.1f}%)")
# Save features
output_path.parent.mkdir(parents=True, exist_ok=True)
features.write_parquet(output_path)
file_size_mb = output_path.stat().st_size / (1024 * 1024)
print(f"\nSaved ENTSO-E features to: {output_path}")
print(f"File size: {file_size_mb:.2f} MB")
print("\n" + "="*70)
print("ENTSO-E FEATURE ENGINEERING COMPLETE")
print("="*70)
return features
if __name__ == "__main__":
# Run feature engineering pipeline
features = engineer_all_entsoe_features()
print("\n[SUCCESS] ENTSO-E features engineered and saved!")
print(f"Feature count: {len(features.columns) - 1}")
print(f"Timestamp range: {features['timestamp'].min()} to {features['timestamp'].max()}")