Spaces:
Sleeping
Sleeping
Evgueni Poloukarov
feat: Phase 1 complete - Master CNEC list + synchronized feature engineering
d4939ce
| """ | |
| Process ENTSO-E Raw Data into Features | |
| ======================================= | |
| Transforms raw ENTSO-E data into feature matrix: | |
| 1. Encode transmission outages: Event-based → Hourly binary (0/1 per CNEC) | |
| 2. Encode generation outages: Event-based → Hourly (binary + MW per zone-tech) | |
| 3. Interpolate hydro storage: Weekly → Hourly | |
| 4. Pivot generation/demand/prices: Long → Wide format | |
| 5. Align all timestamps to MTU (Europe/Amsterdam timezone) | |
| 6. Merge into single feature matrix | |
| Input: Raw parquet files from collect_entsoe_24month.py | |
| Output: Unified ENTSO-E feature matrix (parquet) | |
| """ | |
| import polars as pl | |
| import pandas as pd | |
| from pathlib import Path | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List | |
| class EntsoEFeatureProcessor: | |
| """Process raw ENTSO-E data into feature matrix.""" | |
| def __init__(self, raw_data_dir: Path, output_dir: Path): | |
| """Initialize processor. | |
| Args: | |
| raw_data_dir: Directory containing raw ENTSO-E parquet files | |
| output_dir: Directory to save processed features | |
| """ | |
| self.raw_data_dir = raw_data_dir | |
| self.output_dir = output_dir | |
| self.output_dir.mkdir(parents=True, exist_ok=True) | |
| def encode_transmission_outages_to_hourly( | |
| self, | |
| outages_df: pl.DataFrame, | |
| start_date: str, | |
| end_date: str | |
| ) -> pl.DataFrame: | |
| """Encode event-based transmission outages to hourly binary features. | |
| Converts outage events (start_time, end_time) to hourly time-series | |
| with binary indicator (0 = no outage, 1 = outage active) for each CNEC. | |
| Args: | |
| outages_df: Outage events DataFrame with columns: | |
| asset_eic, start_time, end_time | |
| start_date: Start date for hourly range (YYYY-MM-DD) | |
| end_date: End date for hourly range (YYYY-MM-DD) | |
| Returns: | |
| Polars DataFrame with hourly binary outage indicators | |
| Columns: timestamp, [cnec_eic_1], [cnec_eic_2], ... | |
| """ | |
| print("Encoding transmission outages to hourly binary features...") | |
| # Create complete hourly timestamp range | |
| hourly_range = pl.datetime_range( | |
| start=pl.datetime(2023, 10, 1, 0, 0, 0), | |
| end=pl.datetime(2025, 9, 30, 23, 0, 0), | |
| interval="1h", | |
| time_zone="UTC", | |
| eager=True | |
| ) | |
| # Initialize base DataFrame with hourly timestamps | |
| hourly_df = pl.DataFrame({ | |
| 'timestamp': hourly_range | |
| }) | |
| if outages_df.is_empty(): | |
| print(" No outages to encode") | |
| return hourly_df | |
| # Get unique CNECs | |
| unique_cnecs = outages_df.select('asset_eic').unique().sort('asset_eic') | |
| cnec_list = unique_cnecs.to_series().to_list() | |
| print(f" Encoding {len(cnec_list)} CNECs to hourly binary...") | |
| print(f" Hourly range: {len(hourly_df):,} hours") | |
| # For each CNEC, create binary indicator | |
| for i, cnec_eic in enumerate(cnec_list, 1): | |
| if i % 10 == 0: | |
| print(f" Processing CNEC {i}/{len(cnec_list)}...") | |
| # Filter outages for this CNEC | |
| cnec_outages = outages_df.filter(pl.col('asset_eic') == cnec_eic) | |
| # Initialize all hours as 0 (no outage) | |
| outage_indicator = pl.Series([0] * len(hourly_df)) | |
| # For each outage event, mark affected hours as 1 | |
| for row in cnec_outages.iter_rows(named=True): | |
| start_time = row['start_time'] | |
| end_time = row['end_time'] | |
| # Create mask for hours within outage period | |
| mask = ( | |
| (hourly_df['timestamp'] >= start_time) & | |
| (hourly_df['timestamp'] < end_time) | |
| ) | |
| # Set outage indicator to 1 for affected hours | |
| outage_indicator = pl.when(mask).then(1).otherwise(outage_indicator) | |
| # Add column for this CNEC | |
| col_name = f"outage_{cnec_eic}" | |
| hourly_df = hourly_df.with_columns(outage_indicator.alias(col_name)) | |
| print(f" ✓ Encoded {len(cnec_list)} CNEC outage features") | |
| print(f" Shape: {hourly_df.shape}") | |
| return hourly_df | |
| def encode_generation_outages_to_hourly( | |
| self, | |
| outages_df: pl.DataFrame, | |
| start_date: str, | |
| end_date: str | |
| ) -> pl.DataFrame: | |
| """Encode event-based generation outages to hourly features. | |
| Converts generation unit outage events to hourly time-series with: | |
| 1. Binary indicator (0/1): Whether outages are active | |
| 2. Capacity offline (MW): Total capacity offline | |
| Aggregates by zone-technology combination (e.g., FR_Nuclear, BE_Gas). | |
| Args: | |
| outages_df: Outage events DataFrame with columns: | |
| zone, psr_type, psr_name, capacity_mw, start_time, end_time | |
| start_date: Start date for hourly range (YYYY-MM-DD) | |
| end_date: End date for hourly range (YYYY-MM-DD) | |
| Returns: | |
| Polars DataFrame with hourly generation outage features | |
| Columns: timestamp, [zone_tech_binary], [zone_tech_mw], ... | |
| """ | |
| print("Encoding generation outages to hourly features...") | |
| # Create complete hourly timestamp range | |
| hourly_range = pl.datetime_range( | |
| start=pl.datetime(2023, 10, 1, 0, 0, 0), | |
| end=pl.datetime(2025, 9, 30, 23, 0, 0), | |
| interval="1h", | |
| time_zone="UTC", | |
| eager=True | |
| ) | |
| # Initialize base DataFrame with hourly timestamps | |
| hourly_df = pl.DataFrame({ | |
| 'timestamp': hourly_range | |
| }) | |
| if outages_df.is_empty(): | |
| print(" No generation outages to encode") | |
| return hourly_df | |
| # Create zone-technology combinations | |
| outages_df = outages_df.with_columns( | |
| (pl.col('zone') + "_" + pl.col('psr_name').str.replace_all(' ', '_')).alias('zone_tech') | |
| ) | |
| # Get unique zone-technology combinations | |
| unique_combos = outages_df.select('zone_tech').unique().sort('zone_tech') | |
| combo_list = unique_combos.to_series().to_list() | |
| print(f" Encoding {len(combo_list)} zone-technology combinations to hourly...") | |
| print(f" Hourly range: {len(hourly_df):,} hours") | |
| # For each zone-technology combination, create binary and capacity features | |
| for i, zone_tech in enumerate(combo_list, 1): | |
| if i % 5 == 0: | |
| print(f" Processing {i}/{len(combo_list)}...") | |
| # Filter outages for this zone-technology | |
| combo_outages = outages_df.filter(pl.col('zone_tech') == zone_tech) | |
| # Initialize all hours as 0 (no outage) | |
| outage_binary = pl.Series([0] * len(hourly_df)) | |
| outage_capacity = pl.Series([0.0] * len(hourly_df)) | |
| # For each outage event, mark affected hours | |
| for row in combo_outages.iter_rows(named=True): | |
| start_time = row['start_time'] | |
| end_time = row['end_time'] | |
| capacity_mw = row['capacity_mw'] | |
| # Create mask for hours within outage period | |
| mask = ( | |
| (hourly_df['timestamp'] >= start_time) & | |
| (hourly_df['timestamp'] < end_time) | |
| ) | |
| # Set binary indicator to 1 for affected hours | |
| outage_binary = pl.when(mask).then(1).otherwise(outage_binary) | |
| # Add capacity to total offline capacity (multiple outages may overlap) | |
| outage_capacity = pl.when(mask).then( | |
| outage_capacity + capacity_mw | |
| ).otherwise(outage_capacity) | |
| # Add columns for this zone-technology combination | |
| binary_col = f"gen_outage_{zone_tech}_binary" | |
| capacity_col = f"gen_outage_{zone_tech}_mw" | |
| hourly_df = hourly_df.with_columns([ | |
| outage_binary.alias(binary_col), | |
| outage_capacity.alias(capacity_col) | |
| ]) | |
| print(f" ✓ Encoded {len(combo_list)} zone-technology outage features") | |
| print(f" Features: {len(combo_list) * 2} (binary + MW for each)") | |
| print(f" Shape: {hourly_df.shape}") | |
| return hourly_df | |
| def interpolate_hydro_storage_to_hourly( | |
| self, | |
| hydro_df: pl.DataFrame, | |
| hourly_range: pl.Series | |
| ) -> pl.DataFrame: | |
| """Interpolate weekly hydro reservoir storage to hourly. | |
| Args: | |
| hydro_df: Weekly hydro storage DataFrame | |
| Columns: timestamp, storage_mwh, zone | |
| hourly_range: Hourly timestamp series to interpolate to | |
| Returns: | |
| Polars DataFrame with hourly interpolated storage | |
| Columns: timestamp, [zone_1_storage], [zone_2_storage], ... | |
| """ | |
| print("Interpolating hydro storage from weekly to hourly...") | |
| hourly_df = pl.DataFrame({'timestamp': hourly_range}) | |
| if hydro_df.is_empty(): | |
| print(" No hydro storage data to interpolate") | |
| return hourly_df | |
| # Get unique zones | |
| zones = hydro_df.select('zone').unique().sort('zone').to_series().to_list() | |
| print(f" Interpolating {len(zones)} zones...") | |
| for zone in zones: | |
| # Filter to this zone | |
| zone_df = hydro_df.filter(pl.col('zone') == zone).sort('timestamp') | |
| # Convert to pandas for interpolation | |
| zone_pd = zone_df.select(['timestamp', 'storage_mwh']).to_pandas() | |
| zone_pd = zone_pd.set_index('timestamp') | |
| # Reindex to hourly and interpolate | |
| hourly_pd = zone_pd.reindex(hourly_range.to_pandas()) | |
| hourly_pd['storage_mwh'] = hourly_pd['storage_mwh'].interpolate(method='linear') | |
| # Fill any remaining NaNs (at edges) with forward/backward fill | |
| hourly_pd['storage_mwh'] = hourly_pd['storage_mwh'].fillna(method='ffill').fillna(method='bfill') | |
| # Add to result | |
| col_name = f"hydro_storage_{zone}" | |
| hourly_df = hourly_df.with_columns( | |
| pl.Series(col_name, hourly_pd['storage_mwh'].values) | |
| ) | |
| print(f" ✓ Interpolated {len(zones)} hydro storage features to hourly") | |
| return hourly_df | |
| def pivot_to_wide_format( | |
| self, | |
| df: pl.DataFrame, | |
| index_col: str, | |
| pivot_col: str, | |
| value_col: str, | |
| prefix: str | |
| ) -> pl.DataFrame: | |
| """Pivot long-format data to wide format. | |
| Args: | |
| df: Input DataFrame in long format | |
| index_col: Column to use as index (e.g., 'timestamp') | |
| pivot_col: Column to pivot (e.g., 'zone' or 'psr_type') | |
| value_col: Column with values (e.g., 'generation_mw') | |
| prefix: Prefix for new column names | |
| Returns: | |
| Wide-format DataFrame | |
| """ | |
| # Group by timestamp and pivot column, aggregate to handle duplicates | |
| df_agg = df.group_by([index_col, pivot_col]).agg( | |
| pl.col(value_col).mean().alias(value_col) | |
| ) | |
| # Pivot to wide format | |
| df_wide = df_agg.pivot( | |
| values=value_col, | |
| index=index_col, | |
| columns=pivot_col | |
| ) | |
| # Rename columns with prefix | |
| new_columns = { | |
| col: f"{prefix}_{col}" if col != index_col else col | |
| for col in df_wide.columns | |
| } | |
| df_wide = df_wide.rename(new_columns) | |
| return df_wide | |
| def process_all_features( | |
| self, | |
| start_date: str = '2023-10-01', | |
| end_date: str = '2025-09-30' | |
| ) -> Dict[str, Path]: | |
| """Process all ENTSO-E raw data into features. | |
| Args: | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| Returns: | |
| Dictionary mapping feature types to output file paths | |
| """ | |
| print("="*80) | |
| print("ENTSO-E FEATURE PROCESSING") | |
| print("="*80) | |
| print() | |
| print(f"Period: {start_date} to {end_date}") | |
| print(f"Input: {self.raw_data_dir}") | |
| print(f"Output: {self.output_dir}") | |
| print() | |
| results = {} | |
| # Create hourly timestamp range for alignment | |
| hourly_range = pl.datetime_range( | |
| start=pl.datetime(2023, 10, 1, 0, 0, 0), | |
| end=pl.datetime(2025, 9, 30, 23, 0, 0), | |
| interval="1h", | |
| time_zone="UTC", | |
| eager=True | |
| ) | |
| # ==================================================================== | |
| # 1. Process Transmission Outages → Hourly Binary | |
| # ==================================================================== | |
| print("-"*80) | |
| print("[1/7] Processing Transmission Outages") | |
| print("-"*80) | |
| print() | |
| outages_file = self.raw_data_dir / "entsoe_transmission_outages_24month.parquet" | |
| if outages_file.exists(): | |
| outages_df = pl.read_parquet(outages_file) | |
| print(f"Loaded: {len(outages_df):,} outage events") | |
| outages_hourly = self.encode_transmission_outages_to_hourly( | |
| outages_df, start_date, end_date | |
| ) | |
| outages_path = self.output_dir / "entsoe_transmission_outages_hourly.parquet" | |
| outages_hourly.write_parquet(outages_path) | |
| results['transmission_outages'] = outages_path | |
| print(f"✓ Saved: {outages_path}") | |
| print(f" Shape: {outages_hourly.shape}") | |
| else: | |
| print(" Warning: Transmission outages file not found, skipping") | |
| print() | |
| # ==================================================================== | |
| # 2. Process Generation Outages → Hourly (Binary + MW) | |
| # ==================================================================== | |
| print("-"*80) | |
| print("[2/7] Processing Generation Outages") | |
| print("-"*80) | |
| print() | |
| gen_outages_file = self.raw_data_dir / "entsoe_generation_outages_24month.parquet" | |
| if gen_outages_file.exists(): | |
| gen_outages_df = pl.read_parquet(gen_outages_file) | |
| print(f"Loaded: {len(gen_outages_df):,} generation outage events") | |
| gen_outages_hourly = self.encode_generation_outages_to_hourly( | |
| gen_outages_df, start_date, end_date | |
| ) | |
| gen_outages_path = self.output_dir / "entsoe_generation_outages_hourly.parquet" | |
| gen_outages_hourly.write_parquet(gen_outages_path) | |
| results['generation_outages'] = gen_outages_path | |
| print(f"✓ Saved: {gen_outages_path}") | |
| print(f" Shape: {gen_outages_hourly.shape}") | |
| else: | |
| print(" Warning: Generation outages file not found, skipping") | |
| print() | |
| # ==================================================================== | |
| # 3. Process Generation by PSR Type → Wide Format | |
| # ==================================================================== | |
| print("-"*80) | |
| print("[3/7] Processing Generation by PSR Type") | |
| print("-"*80) | |
| print() | |
| gen_file = self.raw_data_dir / "entsoe_generation_by_psr_24month.parquet" | |
| if gen_file.exists(): | |
| gen_df = pl.read_parquet(gen_file) | |
| print(f"Loaded: {len(gen_df):,} records") | |
| # Create combined column: zone_psrname | |
| gen_df = gen_df.with_columns( | |
| (pl.col('zone') + "_" + pl.col('psr_name').str.replace_all(' ', '_')).alias('zone_psr') | |
| ) | |
| gen_wide = self.pivot_to_wide_format( | |
| gen_df, | |
| index_col='timestamp', | |
| pivot_col='zone_psr', | |
| value_col='generation_mw', | |
| prefix='gen' | |
| ) | |
| gen_path = self.output_dir / "entsoe_generation_hourly.parquet" | |
| gen_wide.write_parquet(gen_path) | |
| results['generation'] = gen_path | |
| print(f"✓ Saved: {gen_path}") | |
| print(f" Shape: {gen_wide.shape}") | |
| else: | |
| print(" Warning: Generation file not found, skipping") | |
| print() | |
| # ==================================================================== | |
| # 4. Process Demand → Wide Format | |
| # ==================================================================== | |
| print("-"*80) | |
| print("[4/7] Processing Demand") | |
| print("-"*80) | |
| print() | |
| demand_file = self.raw_data_dir / "entsoe_demand_24month.parquet" | |
| if demand_file.exists(): | |
| demand_df = pl.read_parquet(demand_file) | |
| print(f"Loaded: {len(demand_df):,} records") | |
| demand_wide = self.pivot_to_wide_format( | |
| demand_df, | |
| index_col='timestamp', | |
| pivot_col='zone', | |
| value_col='load_mw', | |
| prefix='demand' | |
| ) | |
| demand_path = self.output_dir / "entsoe_demand_hourly.parquet" | |
| demand_wide.write_parquet(demand_path) | |
| results['demand'] = demand_path | |
| print(f"✓ Saved: {demand_path}") | |
| print(f" Shape: {demand_wide.shape}") | |
| else: | |
| print(" Warning: Demand file not found, skipping") | |
| print() | |
| # ==================================================================== | |
| # 5. Process Day-Ahead Prices → Wide Format | |
| # ==================================================================== | |
| print("-"*80) | |
| print("[5/7] Processing Day-Ahead Prices") | |
| print("-"*80) | |
| print() | |
| prices_file = self.raw_data_dir / "entsoe_prices_24month.parquet" | |
| if prices_file.exists(): | |
| prices_df = pl.read_parquet(prices_file) | |
| print(f"Loaded: {len(prices_df):,} records") | |
| prices_wide = self.pivot_to_wide_format( | |
| prices_df, | |
| index_col='timestamp', | |
| pivot_col='zone', | |
| value_col='price_eur_mwh', | |
| prefix='price' | |
| ) | |
| prices_path = self.output_dir / "entsoe_prices_hourly.parquet" | |
| prices_wide.write_parquet(prices_path) | |
| results['prices'] = prices_path | |
| print(f"✓ Saved: {prices_path}") | |
| print(f" Shape: {prices_wide.shape}") | |
| else: | |
| print(" Warning: Prices file not found, skipping") | |
| print() | |
| # ==================================================================== | |
| # 6. Process Hydro Storage → Interpolated Hourly | |
| # ==================================================================== | |
| print("-"*80) | |
| print("[6/7] Processing Hydro Reservoir Storage") | |
| print("-"*80) | |
| print() | |
| hydro_file = self.raw_data_dir / "entsoe_hydro_storage_24month.parquet" | |
| if hydro_file.exists(): | |
| hydro_df = pl.read_parquet(hydro_file) | |
| print(f"Loaded: {len(hydro_df):,} weekly records") | |
| hydro_hourly = self.interpolate_hydro_storage_to_hourly( | |
| hydro_df, hourly_range | |
| ) | |
| hydro_path = self.output_dir / "entsoe_hydro_storage_hourly.parquet" | |
| hydro_hourly.write_parquet(hydro_path) | |
| results['hydro_storage'] = hydro_path | |
| print(f"✓ Saved: {hydro_path}") | |
| print(f" Shape: {hydro_hourly.shape}") | |
| else: | |
| print(" Warning: Hydro storage file not found, skipping") | |
| print() | |
| # ==================================================================== | |
| # 7. Process Pumped Storage & Load Forecast → Wide Format | |
| # ==================================================================== | |
| print("-"*80) | |
| print("[7/7] Processing Pumped Storage & Load Forecast") | |
| print("-"*80) | |
| print() | |
| # Pumped storage | |
| pumped_file = self.raw_data_dir / "entsoe_pumped_storage_24month.parquet" | |
| if pumped_file.exists(): | |
| pumped_df = pl.read_parquet(pumped_file) | |
| print(f"Pumped storage loaded: {len(pumped_df):,} records") | |
| pumped_wide = self.pivot_to_wide_format( | |
| pumped_df, | |
| index_col='timestamp', | |
| pivot_col='zone', | |
| value_col='generation_mw', | |
| prefix='pumped' | |
| ) | |
| pumped_path = self.output_dir / "entsoe_pumped_storage_hourly.parquet" | |
| pumped_wide.write_parquet(pumped_path) | |
| results['pumped_storage'] = pumped_path | |
| print(f"✓ Saved: {pumped_path}") | |
| print(f" Shape: {pumped_wide.shape}") | |
| # Load forecast | |
| forecast_file = self.raw_data_dir / "entsoe_load_forecast_24month.parquet" | |
| if forecast_file.exists(): | |
| forecast_df = pl.read_parquet(forecast_file) | |
| print(f"Load forecast loaded: {len(forecast_df):,} records") | |
| forecast_wide = self.pivot_to_wide_format( | |
| forecast_df, | |
| index_col='timestamp', | |
| pivot_col='zone', | |
| value_col='forecast_mw', | |
| prefix='load_forecast' | |
| ) | |
| forecast_path = self.output_dir / "entsoe_load_forecast_hourly.parquet" | |
| forecast_wide.write_parquet(forecast_path) | |
| results['load_forecast'] = forecast_path | |
| print(f"✓ Saved: {forecast_path}") | |
| print(f" Shape: {forecast_wide.shape}") | |
| print() | |
| print("="*80) | |
| print("PROCESSING COMPLETE") | |
| print("="*80) | |
| print() | |
| print(f"Processed {len(results)} feature types:") | |
| for feature_type, path in results.items(): | |
| file_size = path.stat().st_size / (1024**2) | |
| print(f" {feature_type}: {file_size:.1f} MB") | |
| print() | |
| return results | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Process ENTSO-E raw data into features") | |
| parser.add_argument( | |
| '--raw-data-dir', | |
| type=Path, | |
| default=Path('data/raw'), | |
| help='Directory containing raw ENTSO-E parquet files' | |
| ) | |
| parser.add_argument( | |
| '--output-dir', | |
| type=Path, | |
| default=Path('data/processed'), | |
| help='Output directory for processed features' | |
| ) | |
| parser.add_argument( | |
| '--start-date', | |
| default='2023-10-01', | |
| help='Start date (YYYY-MM-DD)' | |
| ) | |
| parser.add_argument( | |
| '--end-date', | |
| default='2025-09-30', | |
| help='End date (YYYY-MM-DD)' | |
| ) | |
| args = parser.parse_args() | |
| # Initialize processor | |
| processor = EntsoEFeatureProcessor( | |
| raw_data_dir=args.raw_data_dir, | |
| output_dir=args.output_dir | |
| ) | |
| # Process all features | |
| results = processor.process_all_features( | |
| start_date=args.start_date, | |
| end_date=args.end_date | |
| ) | |
| print("Next steps:") | |
| print(" 1. Merge all ENTSO-E features into single matrix") | |
| print(" 2. Combine with JAO features (726) → ~952-1,037 total features") | |
| print(" 3. Create ENTSO-E features EDA notebook for validation") | |