Spaces:
Sleeping
Sleeping
Evgueni Poloukarov
feat: fix future covariate architecture (615 features: temporal, weather, 176 CNEC outages)
af88e60
| """Engineer ~486 ENTSO-E features for FBMC forecasting. | |
| Transforms 8 ENTSO-E datasets into model-ready features: | |
| 1. Generation by PSR type (~228 features): | |
| - Individual PSR types (8 types × 12 zones × 2 = 192 features with lags) | |
| - Aggregates (total + shares = 36 features) | |
| 2. Demand/Load (24 features) | |
| 3. Day-ahead prices (24 features) | |
| 4. Hydro reservoir storage (12 features) | |
| 5. Pumped storage (10 features) | |
| 6. Load forecasts (12 features) | |
| 7. Transmission outages (176 features - ALL CNECs with EIC mapping) | |
| Total: ~486 features (generation outages not available in raw data) | |
| Author: Claude | |
| Date: 2025-11-10 | |
| """ | |
| from pathlib import Path | |
| from typing import Tuple, List | |
| import polars as pl | |
| import numpy as np | |
| # ========================================================================= | |
| # Feature Category 1: Generation by PSR Type | |
| # ========================================================================= | |
| def engineer_generation_features(generation_df: pl.DataFrame) -> pl.DataFrame: | |
| """Engineer ~228 generation features from PSR type data. | |
| Features per zone: | |
| - Individual PSR type generation (8 types × 2 = value + lag): 192 features | |
| - Total generation (sum across PSR types): 12 features | |
| - Renewable/Thermal shares: 24 features | |
| PSR Types: | |
| - B04: Fossil Gas | |
| - B05: Fossil Hard coal | |
| - B06: Fossil Oil | |
| - B11: Hydro Run-of-river | |
| - B12: Hydro Reservoir | |
| - B14: Nuclear | |
| - B16: Solar | |
| - B19: Wind Onshore | |
| Args: | |
| generation_df: Generation by PSR type (12 zones × 8 PSR types) | |
| Returns: | |
| DataFrame with generation features, indexed by timestamp | |
| """ | |
| print("\n[1/8] Engineering generation features...") | |
| # PSR type name mapping for clean feature names | |
| psr_name_map = { | |
| 'Fossil Gas': 'fossil_gas', | |
| 'Fossil Hard coal': 'fossil_coal', | |
| 'Fossil Oil': 'fossil_oil', | |
| 'Hydro Run-of-river and poundage': 'hydro_ror', | |
| 'Hydro Water Reservoir': 'hydro_reservoir', | |
| 'Nuclear': 'nuclear', | |
| 'Solar': 'solar', | |
| 'Wind Onshore': 'wind_onshore' | |
| } | |
| # Create individual PSR type features (8 PSR types × 12 zones = 96 base features) | |
| psr_features_list = [] | |
| for psr_name, psr_clean in psr_name_map.items(): | |
| # Filter data for this PSR type | |
| psr_data = generation_df.filter(pl.col('psr_name') == psr_name) | |
| if len(psr_data) > 0: | |
| # Pivot to wide format (one column per zone) | |
| psr_wide = psr_data.pivot( | |
| values='generation_mw', | |
| index='timestamp', | |
| on='zone', | |
| aggregate_function='sum' | |
| ) | |
| # Rename columns with PSR type prefix | |
| psr_cols = [c for c in psr_wide.columns if c != 'timestamp'] | |
| psr_wide = psr_wide.rename({c: f'gen_{psr_clean}_{c}' for c in psr_cols}) | |
| # Add lag features (t-1) for this PSR type | |
| lag_features = {} | |
| for col in psr_wide.columns: | |
| if col.startswith('gen_'): | |
| lag_features[f'{col}_lag1'] = pl.col(col).shift(1) | |
| psr_wide = psr_wide.with_columns(**lag_features) | |
| psr_features_list.append(psr_wide) | |
| # Aggregate features: Total generation per zone | |
| zone_total = generation_df.group_by(['timestamp', 'zone']).agg([ | |
| pl.col('generation_mw').sum().alias('total_gen') | |
| ]) | |
| total_gen_wide = zone_total.pivot( | |
| values='total_gen', | |
| index='timestamp', | |
| on='zone', | |
| aggregate_function='first' | |
| ).rename({c: f'gen_total_{c}' for c in zone_total['zone'].unique() if c != 'timestamp'}) | |
| # Aggregate features: Renewable and thermal shares | |
| zone_shares = generation_df.group_by(['timestamp', 'zone']).agg([ | |
| pl.col('generation_mw').sum().alias('total_gen'), | |
| pl.col('generation_mw').filter( | |
| pl.col('psr_name').is_in(['Wind Onshore', 'Solar', 'Hydro Run-of-river and poundage', 'Hydro Water Reservoir']) | |
| ).sum().alias('renewable_gen'), | |
| pl.col('generation_mw').filter( | |
| pl.col('psr_name').is_in(['Fossil Gas', 'Fossil Hard coal', 'Fossil Oil', 'Nuclear']) | |
| ).sum().alias('thermal_gen') | |
| ]) | |
| zone_shares = zone_shares.with_columns([ | |
| (pl.col('renewable_gen') / pl.col('total_gen').clip(lower_bound=1)).round(4).alias('renewable_share'), | |
| (pl.col('thermal_gen') / pl.col('total_gen').clip(lower_bound=1)).round(4).alias('thermal_share') | |
| ]) | |
| renewable_share_wide = zone_shares.pivot( | |
| values='renewable_share', | |
| index='timestamp', | |
| on='zone', | |
| aggregate_function='first' | |
| ).rename({c: f'gen_renewable_share_{c}' for c in zone_shares['zone'].unique() if c != 'timestamp'}) | |
| thermal_share_wide = zone_shares.pivot( | |
| values='thermal_share', | |
| index='timestamp', | |
| on='zone', | |
| aggregate_function='first' | |
| ).rename({c: f'gen_thermal_share_{c}' for c in zone_shares['zone'].unique() if c != 'timestamp'}) | |
| # Merge all generation features | |
| features = total_gen_wide | |
| features = features.join(renewable_share_wide, on='timestamp', how='left') | |
| features = features.join(thermal_share_wide, on='timestamp', how='left') | |
| for psr_features in psr_features_list: | |
| features = features.join(psr_features, on='timestamp', how='left') | |
| print(f" Created {len(features.columns) - 1} generation features") | |
| print(f" - Individual PSR types: {sum([len(pf.columns) - 1 for pf in psr_features_list])} features (8 types x 12 zones x 2)") | |
| print(f" - Aggregates: {len(total_gen_wide.columns) + len(renewable_share_wide.columns) + len(thermal_share_wide.columns) - 3} features") | |
| return features | |
| # ========================================================================= | |
| # Feature Category 2: Demand/Load | |
| # ========================================================================= | |
| def engineer_demand_features(demand_df: pl.DataFrame) -> pl.DataFrame: | |
| """Engineer ~24 demand features. | |
| Features per zone: | |
| - Actual demand | |
| - Demand lag (t-1) | |
| Args: | |
| demand_df: Actual demand data (12 zones) | |
| Returns: | |
| DataFrame with demand features, indexed by timestamp | |
| """ | |
| print("\n[2/8] Engineering demand features...") | |
| # FIX: Resample to hourly (some zones have 15-min data for 2025) | |
| demand_df = demand_df.with_columns([ | |
| pl.col('timestamp').dt.truncate('1h').alias('timestamp') | |
| ]) | |
| # Aggregate by hour (mean of sub-hourly values) | |
| demand_df = demand_df.group_by(['timestamp', 'zone']).agg([ | |
| pl.col('load_mw').mean().alias('load_mw') | |
| ]) | |
| # Pivot demand to wide format | |
| demand_wide = demand_df.pivot( | |
| values='load_mw', | |
| index='timestamp', | |
| on='zone', | |
| aggregate_function='first' | |
| ) | |
| # Rename to demand_<zone> | |
| demand_cols = [c for c in demand_wide.columns if c != 'timestamp'] | |
| demand_wide = demand_wide.rename({c: f'demand_{c}' for c in demand_cols}) | |
| # Add lag features (t-1) | |
| lag_features = {} | |
| for col in demand_wide.columns: | |
| if col.startswith('demand_'): | |
| lag_features[f'{col}_lag1'] = pl.col(col).shift(1) | |
| features = demand_wide.with_columns(**lag_features) | |
| print(f" Created {len(features.columns) - 1} demand features") | |
| return features | |
| # ========================================================================= | |
| # Feature Category 3: Day-Ahead Prices | |
| # ========================================================================= | |
| def engineer_price_features(prices_df: pl.DataFrame) -> pl.DataFrame: | |
| """Engineer ~24 price features. | |
| Features per zone: | |
| - Day-ahead price | |
| - Price lag (t-1) | |
| Args: | |
| prices_df: Day-ahead prices (12 zones) | |
| Returns: | |
| DataFrame with price features, indexed by timestamp | |
| """ | |
| print("\n[3/8] Engineering price features...") | |
| # Pivot prices to wide format | |
| price_wide = prices_df.pivot( | |
| values='price_eur_mwh', | |
| index='timestamp', | |
| on='zone', | |
| aggregate_function='first' | |
| ) | |
| # Rename to price_<zone> | |
| price_cols = [c for c in price_wide.columns if c != 'timestamp'] | |
| price_wide = price_wide.rename({c: f'price_{c}' for c in price_cols}) | |
| # Add lag features (t-1) | |
| lag_features = {} | |
| for col in price_wide.columns: | |
| if col.startswith('price_'): | |
| lag_features[f'{col}_lag1'] = pl.col(col).shift(1) | |
| features = price_wide.with_columns(**lag_features) | |
| print(f" Created {len(features.columns) - 1} price features") | |
| return features | |
| # ========================================================================= | |
| # Feature Category 4: Hydro Reservoir Storage | |
| # ========================================================================= | |
| def engineer_hydro_storage_features(hydro_df: pl.DataFrame) -> pl.DataFrame: | |
| """Engineer ~12 hydro storage features (weekly → hourly interpolation). | |
| Features per zone with data (6 zones): | |
| - Hydro storage level (interpolated to hourly) | |
| - Storage change (week-over-week) | |
| Args: | |
| hydro_df: Weekly hydro storage data (6 zones) | |
| Returns: | |
| DataFrame with hydro storage features, indexed by timestamp | |
| """ | |
| print("\n[4/8] Engineering hydro storage features...") | |
| # Pivot to wide format | |
| hydro_wide = hydro_df.pivot( | |
| values='storage_mwh', | |
| index='timestamp', | |
| on='zone', | |
| aggregate_function='first' | |
| ) | |
| # Rename to hydro_storage_<zone> | |
| hydro_cols = [c for c in hydro_wide.columns if c != 'timestamp'] | |
| hydro_wide = hydro_wide.rename({c: f'hydro_storage_{c}' for c in hydro_cols}) | |
| # Create hourly date range (Oct 2023 - Sept 2025) | |
| hourly_range = pl.DataFrame({ | |
| 'timestamp': pl.datetime_range( | |
| start=hydro_wide['timestamp'].min(), | |
| end=hydro_wide['timestamp'].max(), | |
| interval='1h', | |
| eager=True | |
| ) | |
| }) | |
| # Cast timestamp to match precision (datetime[ns]) | |
| hydro_wide = hydro_wide.with_columns( | |
| pl.col('timestamp').cast(pl.Datetime('us')) | |
| ) | |
| # Join and interpolate (forward fill for weekly → hourly) | |
| features = hourly_range.join(hydro_wide, on='timestamp', how='left') | |
| # Forward fill missing values (weekly data → hourly) | |
| for col in features.columns: | |
| if col.startswith('hydro_storage_'): | |
| features = features.with_columns( | |
| pl.col(col).forward_fill().alias(col) | |
| ) | |
| # Add week-over-week change (168 hours = 1 week) | |
| change_features = {} | |
| for col in features.columns: | |
| if col.startswith('hydro_storage_'): | |
| change_features[f'{col}_change_w'] = pl.col(col) - pl.col(col).shift(168) | |
| features = features.with_columns(**change_features) | |
| print(f" Created {len(features.columns) - 1} hydro storage features") | |
| return features | |
| # ========================================================================= | |
| # Feature Category 5: Pumped Storage | |
| # ========================================================================= | |
| def engineer_pumped_storage_features(pumped_df: pl.DataFrame) -> pl.DataFrame: | |
| """Engineer ~10 pumped storage features. | |
| Features per zone with data (5 zones): | |
| - Pumped storage generation | |
| - Generation lag (t-1) | |
| Args: | |
| pumped_df: Pumped storage generation (5 zones) | |
| Returns: | |
| DataFrame with pumped storage features, indexed by timestamp | |
| """ | |
| print("\n[5/8] Engineering pumped storage features...") | |
| # Pivot to wide format | |
| pumped_wide = pumped_df.pivot( | |
| values='generation_mw', | |
| index='timestamp', | |
| on='zone', | |
| aggregate_function='first' | |
| ) | |
| # Rename to pumped_storage_<zone> | |
| pumped_cols = [c for c in pumped_wide.columns if c != 'timestamp'] | |
| pumped_wide = pumped_wide.rename({c: f'pumped_storage_{c}' for c in pumped_cols}) | |
| # Add lag features (t-1) | |
| lag_features = {} | |
| for col in pumped_wide.columns: | |
| if col.startswith('pumped_storage_'): | |
| lag_features[f'{col}_lag1'] = pl.col(col).shift(1) | |
| features = pumped_wide.with_columns(**lag_features) | |
| print(f" Created {len(features.columns) - 1} pumped storage features") | |
| return features | |
| # ========================================================================= | |
| # Feature Category 6: Load Forecasts | |
| # ========================================================================= | |
| def engineer_load_forecast_features(forecast_df: pl.DataFrame) -> pl.DataFrame: | |
| """Engineer ~24 load forecast features. | |
| Features per zone: | |
| - Load forecast | |
| - Forecast error (forecast - actual, if available) | |
| Args: | |
| forecast_df: Load forecasts (12 zones) | |
| Returns: | |
| DataFrame with load forecast features, indexed by timestamp | |
| """ | |
| print("\n[6/8] Engineering load forecast features...") | |
| # FIX: Resample to hourly (some zones have 15-min data for 2025) | |
| forecast_df = forecast_df.with_columns([ | |
| pl.col('timestamp').dt.truncate('1h').alias('timestamp') | |
| ]) | |
| # Aggregate by hour (mean of sub-hourly values) | |
| forecast_df = forecast_df.group_by(['timestamp', 'zone']).agg([ | |
| pl.col('forecast_mw').mean().alias('forecast_mw') | |
| ]) | |
| # Pivot to wide format | |
| forecast_wide = forecast_df.pivot( | |
| values='forecast_mw', | |
| index='timestamp', | |
| on='zone', | |
| aggregate_function='first' | |
| ) | |
| # Rename to load_forecast_<zone> | |
| forecast_cols = [c for c in forecast_wide.columns if c != 'timestamp'] | |
| forecast_wide = forecast_wide.rename({c: f'load_forecast_{c}' for c in forecast_cols}) | |
| print(f" Created {len(forecast_wide.columns) - 1} load forecast features") | |
| return forecast_wide | |
| # ========================================================================= | |
| # Feature Category 7: Transmission Outages (ALL 176 CNECs) | |
| # ========================================================================= | |
| def engineer_transmission_outage_features( | |
| outages_df: pl.DataFrame, | |
| cnec_master_df: pl.DataFrame, | |
| hourly_range: pl.DataFrame | |
| ) -> pl.DataFrame: | |
| """Engineer 176 transmission outage features (ALL CNECs with EIC mapping). | |
| Creates binary feature for each CNEC: | |
| - 1 = Outage active on this CNEC at this timestamp | |
| - 0 = No outage | |
| Uses EIC codes from master CNEC list to map ENTSO-E outages to CNECs. | |
| 31 CNECs have historical outages, 145 are zero-filled (ready for future). | |
| Args: | |
| outages_df: ENTSO-E transmission outages with Asset_RegisteredResource.mRID (EIC) | |
| cnec_master_df: Master CNEC list with cnec_eic column (176 rows) | |
| hourly_range: Hourly timestamp range (Oct 2023 - Sept 2025) | |
| Returns: | |
| DataFrame with 176 transmission outage features, indexed by timestamp | |
| """ | |
| print("\n[7/8] Engineering transmission outage features (ALL 176 CNECs)...") | |
| # Create EIC → CNEC mapping from master list | |
| eic_to_cnec = dict(zip(cnec_master_df['cnec_eic'], cnec_master_df['cnec_name'])) | |
| all_cnec_eics = cnec_master_df['cnec_eic'].to_list() | |
| print(f" Loaded {len(all_cnec_eics)} CNECs from master list") | |
| # Process outages: expand start → end to hourly timestamps | |
| if len(outages_df) == 0: | |
| print(" WARNING: No transmission outages found in raw data") | |
| # Create zero-filled features for all CNECs | |
| features = hourly_range.clone() | |
| for eic in all_cnec_eics: | |
| features = features.with_columns( | |
| pl.lit(0).alias(f'outage_cnec_{eic}') | |
| ) | |
| print(f" Created {len(all_cnec_eics)} zero-filled outage features") | |
| return features | |
| # Parse outage periods (start_time → end_time) | |
| outages_expanded = [] | |
| for row in outages_df.iter_rows(named=True): | |
| eic = row.get('asset_eic', None) | |
| start = row.get('start_time', None) | |
| end = row.get('end_time', None) | |
| if not eic or not start or not end: | |
| continue | |
| # Only process if EIC is in master CNEC list | |
| if eic not in all_cnec_eics: | |
| continue | |
| # Create hourly range for this outage | |
| outage_hours = pl.datetime_range( | |
| start=start, | |
| end=end, | |
| interval='1h', | |
| eager=True | |
| ) | |
| for hour in outage_hours: | |
| outages_expanded.append({ | |
| 'timestamp': hour, | |
| 'cnec_eic': eic, | |
| 'outage_active': 1 | |
| }) | |
| print(f" Expanded {len(outages_expanded)} hourly outage events") | |
| if len(outages_expanded) == 0: | |
| # No outages matched to CNECs - create zero-filled features | |
| features = hourly_range.clone() | |
| for eic in all_cnec_eics: | |
| features = features.with_columns( | |
| pl.lit(0).alias(f'outage_cnec_{eic}') | |
| ) | |
| print(f" Created {len(all_cnec_eics)} zero-filled outage features (no matches)") | |
| return features | |
| # Convert to Polars DataFrame | |
| outages_hourly = pl.DataFrame(outages_expanded) | |
| # Remove timezone from timestamp to match hourly_range | |
| outages_hourly = outages_hourly.with_columns( | |
| pl.col('timestamp').dt.replace_time_zone(None) | |
| ) | |
| # Pivot to wide format (one column per CNEC) | |
| outages_wide = outages_hourly.pivot( | |
| values='outage_active', | |
| index='timestamp', | |
| on='cnec_eic', | |
| aggregate_function='max' # If multiple outages, max = 1 | |
| ) | |
| # Rename columns | |
| outage_cols = [c for c in outages_wide.columns if c != 'timestamp'] | |
| outages_wide = outages_wide.rename({c: f'outage_cnec_{c}' for c in outage_cols}) | |
| # Join with hourly range to ensure complete timestamp coverage | |
| features = hourly_range.join(outages_wide, on='timestamp', how='left') | |
| # Fill nulls with 0 (no outage) | |
| for col in features.columns: | |
| if col.startswith('outage_cnec_'): | |
| features = features.with_columns( | |
| pl.col(col).fill_null(0).alias(col) | |
| ) | |
| # Add zero-filled features for CNECs with no historical outages | |
| existing_cnecs = [c.replace('outage_cnec_', '') for c in features.columns if c.startswith('outage_cnec_')] | |
| missing_cnecs = [eic for eic in all_cnec_eics if eic not in existing_cnecs] | |
| for eic in missing_cnecs: | |
| features = features.with_columns( | |
| pl.lit(0).alias(f'outage_cnec_{eic}') | |
| ) | |
| cnecs_with_data = len(existing_cnecs) | |
| cnecs_zero_filled = len(missing_cnecs) | |
| print(f" Created {len(features.columns) - 1} transmission outage features:") | |
| print(f" - {cnecs_with_data} CNECs with historical outages") | |
| print(f" - {cnecs_zero_filled} CNECs zero-filled (ready for future)") | |
| return features | |
| # ========================================================================= | |
| # Feature Category 8: Generation Outages | |
| # ========================================================================= | |
| def engineer_generation_outage_features(gen_outages_df: pl.DataFrame, hourly_range: pl.DataFrame) -> pl.DataFrame: | |
| """Engineer ~45 generation outage features (nuclear, coal, lignite by zone). | |
| Features per zone and PSR type: | |
| - Nuclear capacity offline (MW) | |
| - Coal capacity offline (MW) | |
| - Lignite capacity offline (MW) | |
| - Total outage count | |
| - Binary outage indicator | |
| Args: | |
| gen_outages_df: Generation unavailability data with columns: | |
| ['timestamp', 'zone', 'psr_type', 'capacity_mw', 'unit_name'] | |
| hourly_range: Hourly timestamps for alignment | |
| Returns: | |
| DataFrame with generation outage features, indexed by timestamp | |
| """ | |
| print("\n[8/8] Engineering generation outage features...") | |
| if len(gen_outages_df) == 0: | |
| print(" WARNING: No generation outages found - creating zero-filled features") | |
| # Create zero-filled features for all zones | |
| features = hourly_range.select('timestamp') | |
| zones = ['FR', 'BE', 'CZ', 'HU', 'RO', 'SI', 'SK', 'DE_LU', 'PL'] | |
| for zone in zones: | |
| features = features.with_columns([ | |
| pl.lit(0.0).alias(f'gen_outage_nuclear_mw_{zone}'), | |
| pl.lit(0.0).alias(f'gen_outage_coal_mw_{zone}'), | |
| pl.lit(0.0).alias(f'gen_outage_lignite_mw_{zone}'), | |
| pl.lit(0).alias(f'gen_outage_count_{zone}'), | |
| pl.lit(0).alias(f'gen_outage_active_{zone}') | |
| ]) | |
| print(f" Created {len(features.columns) - 1} zero-filled generation outage features") | |
| return features | |
| # Expand outages to hourly granularity (outages span multiple hours) | |
| print(" Expanding outages to hourly timestamps...") | |
| # Create hourly records for each outage period | |
| hourly_outages = [] | |
| for row in gen_outages_df.iter_rows(named=True): | |
| start = row['start_time'] | |
| end = row['end_time'] | |
| # Generate hourly timestamps for outage period | |
| outage_hours = pl.datetime_range( | |
| start=start, | |
| end=end, | |
| interval='1h', | |
| eager=True | |
| ).to_frame('timestamp') | |
| # Add outage metadata | |
| outage_hours = outage_hours.with_columns([ | |
| pl.lit(row['zone']).alias('zone'), | |
| pl.lit(row['psr_type']).alias('psr_type'), | |
| pl.lit(row['capacity_mw']).alias('capacity_mw'), | |
| pl.lit(row['unit_name']).alias('unit_name') | |
| ]) | |
| hourly_outages.append(outage_hours) | |
| # Combine all hourly outages | |
| hourly_outages_df = pl.concat(hourly_outages) | |
| print(f" Expanded to {len(hourly_outages_df):,} hourly outage records") | |
| # Map PSR types to clean names | |
| psr_map = { | |
| 'B14': 'nuclear', | |
| 'B05': 'coal', | |
| 'B02': 'lignite' | |
| } | |
| hourly_outages_df = hourly_outages_df.with_columns( | |
| pl.col('psr_type').replace(psr_map).alias('psr_clean') | |
| ) | |
| # Create features for each PSR type | |
| all_features = [hourly_range.select('timestamp')] | |
| for psr_code, psr_name in psr_map.items(): | |
| psr_outages = hourly_outages_df.filter(pl.col('psr_type') == psr_code) | |
| if len(psr_outages) > 0: | |
| # Aggregate capacity by zone and timestamp | |
| psr_agg = psr_outages.group_by(['timestamp', 'zone']).agg( | |
| pl.col('capacity_mw').sum().alias('capacity_mw') | |
| ) | |
| # Pivot to wide format | |
| psr_wide = psr_agg.pivot( | |
| values='capacity_mw', | |
| index='timestamp', | |
| on='zone' | |
| ) | |
| # Rename columns | |
| rename_map = { | |
| col: f'gen_outage_{psr_name}_mw_{col}' | |
| for col in psr_wide.columns if col != 'timestamp' | |
| } | |
| psr_wide = psr_wide.rename(rename_map) | |
| all_features.append(psr_wide) | |
| # Create aggregate count and binary indicator features | |
| total_agg = hourly_outages_df.group_by(['timestamp', 'zone']).agg([ | |
| pl.col('unit_name').n_unique().alias('outage_count'), | |
| pl.lit(1).alias('outage_active') | |
| ]) | |
| # Pivot count | |
| count_wide = total_agg.pivot( | |
| values='outage_count', | |
| index='timestamp', | |
| on='zone' | |
| ).rename({ | |
| col: f'gen_outage_count_{col}' | |
| for col in total_agg['zone'].unique() if col != 'timestamp' | |
| }) | |
| # Pivot binary indicator | |
| active_wide = total_agg.pivot( | |
| values='outage_active', | |
| index='timestamp', | |
| on='zone' | |
| ).rename({ | |
| col: f'gen_outage_active_{col}' | |
| for col in total_agg['zone'].unique() if col != 'timestamp' | |
| }) | |
| all_features.extend([count_wide, active_wide]) | |
| # Join all features | |
| features = all_features[0] | |
| for feat_df in all_features[1:]: | |
| features = features.join(feat_df, on='timestamp', how='left') | |
| # Fill nulls with zeros (no outage) | |
| feature_cols = [col for col in features.columns if col != 'timestamp'] | |
| features = features.with_columns([ | |
| pl.col(col).fill_null(0) for col in feature_cols | |
| ]) | |
| print(f" Created {len(features.columns) - 1} generation outage features") | |
| print(f" - Nuclear: capacity offline per zone") | |
| print(f" - Coal: capacity offline per zone") | |
| print(f" - Lignite: capacity offline per zone") | |
| print(f" - Count: number of units offline per zone") | |
| print(f" - Active: binary indicator per zone") | |
| return features | |
| # ========================================================================= | |
| # Main Pipeline | |
| # ========================================================================= | |
| def engineer_all_entsoe_features( | |
| data_dir: Path = Path("data/raw"), | |
| output_path: Path = Path("data/processed/features_entsoe_24month.parquet") | |
| ) -> pl.DataFrame: | |
| """Engineer all ENTSO-E features from 8 raw datasets. | |
| Args: | |
| data_dir: Directory containing raw ENTSO-E data | |
| output_path: Path to save engineered features | |
| Returns: | |
| DataFrame with ~324-424 ENTSO-E features, indexed by timestamp | |
| """ | |
| print("\n" + "="*70) | |
| print("ENTSO-E FEATURE ENGINEERING PIPELINE") | |
| print("="*70) | |
| # Load master CNEC list (for transmission outage mapping) | |
| cnec_master = pl.read_csv("data/processed/cnecs_master_176.csv") | |
| print(f"\nLoaded master CNEC list: {len(cnec_master)} CNECs") | |
| # Create hourly timestamp range (Oct 2023 - Sept 2025) | |
| hourly_range = pl.DataFrame({ | |
| 'timestamp': pl.datetime_range( | |
| start=pl.datetime(2023, 10, 1, 0, 0), | |
| end=pl.datetime(2025, 9, 30, 23, 0), | |
| interval='1h', | |
| eager=True | |
| ) | |
| }) | |
| print(f"Created hourly range: {len(hourly_range)} timestamps") | |
| # Load raw ENTSO-E datasets | |
| generation_df = pl.read_parquet(data_dir / "entsoe_generation_by_psr_24month.parquet") | |
| demand_df = pl.read_parquet(data_dir / "entsoe_demand_24month.parquet") | |
| prices_df = pl.read_parquet(data_dir / "entsoe_prices_24month.parquet") | |
| hydro_df = pl.read_parquet(data_dir / "entsoe_hydro_storage_24month.parquet") | |
| pumped_df = pl.read_parquet(data_dir / "entsoe_pumped_storage_24month.parquet") | |
| forecast_df = pl.read_parquet(data_dir / "entsoe_load_forecast_24month.parquet") | |
| transmission_outages_df = pl.read_parquet(data_dir / "entsoe_transmission_outages_24month.parquet") | |
| # Check for generation outages file | |
| gen_outages_path = data_dir / "entsoe_generation_outages_24month.parquet" | |
| if gen_outages_path.exists(): | |
| gen_outages_df = pl.read_parquet(gen_outages_path) | |
| else: | |
| print("\nWARNING: Generation outages file not found, skipping...") | |
| gen_outages_df = pl.DataFrame() | |
| print(f"\nLoaded 8 ENTSO-E datasets:") | |
| print(f" - Generation: {len(generation_df):,} rows") | |
| print(f" - Demand: {len(demand_df):,} rows") | |
| print(f" - Prices: {len(prices_df):,} rows") | |
| print(f" - Hydro storage: {len(hydro_df):,} rows") | |
| print(f" - Pumped storage: {len(pumped_df):,} rows") | |
| print(f" - Load forecast: {len(forecast_df):,} rows") | |
| print(f" - Transmission outages: {len(transmission_outages_df):,} rows") | |
| print(f" - Generation outages: {len(gen_outages_df):,} rows") | |
| # Engineer features for each category | |
| gen_features = engineer_generation_features(generation_df) | |
| demand_features = engineer_demand_features(demand_df) | |
| price_features = engineer_price_features(prices_df) | |
| hydro_features = engineer_hydro_storage_features(hydro_df) | |
| pumped_features = engineer_pumped_storage_features(pumped_df) | |
| forecast_features = engineer_load_forecast_features(forecast_df) | |
| transmission_outage_features = engineer_transmission_outage_features( | |
| transmission_outages_df, cnec_master, hourly_range | |
| ) | |
| if len(gen_outages_df) > 0: | |
| gen_outage_features = engineer_generation_outage_features(gen_outages_df, hourly_range) | |
| else: | |
| gen_outage_features = engineer_generation_outage_features(pl.DataFrame(), hourly_range) | |
| # Merge all features on timestamp | |
| print("\n" + "="*70) | |
| print("MERGING ALL FEATURES") | |
| print("="*70) | |
| features = hourly_range.clone() | |
| # Standardize timestamps (remove timezone, cast to datetime[μs]) | |
| def standardize_timestamp(df): | |
| """Remove timezone and cast timestamp to datetime[μs].""" | |
| if 'timestamp' in df.columns: | |
| # Check if timestamp has timezone | |
| if hasattr(df['timestamp'].dtype, 'time_zone') and df['timestamp'].dtype.time_zone is not None: | |
| df = df.with_columns(pl.col('timestamp').dt.replace_time_zone(None)) | |
| # Cast to datetime[μs] | |
| df = df.with_columns(pl.col('timestamp').cast(pl.Datetime('us'))) | |
| return df | |
| gen_features = standardize_timestamp(gen_features) | |
| demand_features = standardize_timestamp(demand_features) | |
| price_features = standardize_timestamp(price_features) | |
| hydro_features = standardize_timestamp(hydro_features) | |
| pumped_features = standardize_timestamp(pumped_features) | |
| forecast_features = standardize_timestamp(forecast_features) | |
| transmission_outage_features = standardize_timestamp(transmission_outage_features) | |
| # Join each feature set | |
| features = features.join(gen_features, on='timestamp', how='left') | |
| features = features.join(demand_features, on='timestamp', how='left') | |
| features = features.join(price_features, on='timestamp', how='left') | |
| features = features.join(hydro_features, on='timestamp', how='left') | |
| features = features.join(pumped_features, on='timestamp', how='left') | |
| features = features.join(forecast_features, on='timestamp', how='left') | |
| features = features.join(transmission_outage_features, on='timestamp', how='left') | |
| features = features.join(gen_outage_features, on='timestamp', how='left') | |
| print(f"\nFinal feature matrix shape: {features.shape}") | |
| print(f" - Timestamps: {len(features):,}") | |
| print(f" - Features: {len(features.columns) - 1:,}") | |
| # Check for nulls | |
| null_counts = features.null_count() | |
| total_nulls = sum([null_counts[col][0] for col in features.columns if col != 'timestamp']) | |
| null_pct = (total_nulls / (len(features) * (len(features.columns) - 1))) * 100 | |
| print(f"\nData quality:") | |
| print(f" - Total nulls: {total_nulls:,} ({null_pct:.2f}%)") | |
| print(f" - Completeness: {100 - null_pct:.2f}%") | |
| # Clean up redundant features | |
| print("\n" + "="*70) | |
| print("CLEANING REDUNDANT FEATURES") | |
| print("="*70) | |
| features_before = len(features.columns) - 1 | |
| # Remove 100% null features | |
| null_pcts = {} | |
| for col in features.columns: | |
| if col != 'timestamp': | |
| null_count = features[col].null_count() | |
| null_pct_col = (null_count / len(features)) * 100 | |
| if null_pct_col >= 100.0: | |
| null_pcts[col] = null_pct_col | |
| if null_pcts: | |
| print(f"\nRemoving {len(null_pcts)} features with 100% nulls:") | |
| for col in null_pcts: | |
| print(f" - {col}") | |
| features = features.drop(list(null_pcts.keys())) | |
| # Remove zero-variance features (constants) | |
| # EXCEPT transmission outage features - keep them even if zero-filled for future inference | |
| zero_var_cols = [] | |
| for col in features.columns: | |
| if col != 'timestamp': | |
| # Skip transmission outage features (needed for future inference) | |
| if col.startswith('outage_cnec_'): | |
| continue | |
| # Check if all values are the same (excluding nulls) | |
| non_null = features[col].drop_nulls() | |
| if len(non_null) > 0 and non_null.n_unique() == 1: | |
| zero_var_cols.append(col) | |
| if zero_var_cols: | |
| print(f"\nRemoving {len(zero_var_cols)} zero-variance features (keeping transmission outages)") | |
| features = features.drop(zero_var_cols) | |
| # Remove duplicate columns | |
| # EXCEPT transmission outage features - keep all CNECs even if identical (all zeros) | |
| dup_groups = {} | |
| cols_to_check = [c for c in features.columns if c != 'timestamp'] | |
| for i, col1 in enumerate(cols_to_check): | |
| if col1 in dup_groups.values(): # Already marked as duplicate | |
| continue | |
| # Skip transmission outage features (each CNEC needs its own column for inference) | |
| if col1.startswith('outage_cnec_'): | |
| continue | |
| for col2 in cols_to_check[i+1:]: | |
| if col2 in dup_groups.values(): # Already marked as duplicate | |
| continue | |
| # Skip transmission outage features | |
| if col2.startswith('outage_cnec_'): | |
| continue | |
| # Check if columns are identical | |
| if features[col1].equals(features[col2]): | |
| dup_groups[col2] = col1 # col2 is duplicate of col1 | |
| if dup_groups: | |
| print(f"\nRemoving {len(dup_groups)} duplicate columns (keeping transmission outages)") | |
| features = features.drop(list(dup_groups.keys())) | |
| features_after = len(features.columns) - 1 | |
| features_removed = features_before - features_after | |
| print(f"\n[CLEANUP SUMMARY]") | |
| print(f" Features before: {features_before}") | |
| print(f" Features after: {features_after}") | |
| print(f" Features removed: {features_removed} ({features_removed/features_before*100:.1f}%)") | |
| # Save features | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| features.write_parquet(output_path) | |
| file_size_mb = output_path.stat().st_size / (1024 * 1024) | |
| print(f"\nSaved ENTSO-E features to: {output_path}") | |
| print(f"File size: {file_size_mb:.2f} MB") | |
| print("\n" + "="*70) | |
| print("ENTSO-E FEATURE ENGINEERING COMPLETE") | |
| print("="*70) | |
| return features | |
| if __name__ == "__main__": | |
| # Run feature engineering pipeline | |
| features = engineer_all_entsoe_features() | |
| print("\n[SUCCESS] ENTSO-E features engineered and saved!") | |
| print(f"Feature count: {len(features.columns) - 1}") | |
| print(f"Timestamp range: {features['timestamp'].min()} to {features['timestamp'].max()}") | |