""" Process ENTSO-E Raw Data into Features ======================================= Transforms raw ENTSO-E data into feature matrix: 1. Encode transmission outages: Event-based → Hourly binary (0/1 per CNEC) 2. Encode generation outages: Event-based → Hourly (binary + MW per zone-tech) 3. Interpolate hydro storage: Weekly → Hourly 4. Pivot generation/demand/prices: Long → Wide format 5. Align all timestamps to MTU (Europe/Amsterdam timezone) 6. Merge into single feature matrix Input: Raw parquet files from collect_entsoe_24month.py Output: Unified ENTSO-E feature matrix (parquet) """ import polars as pl import pandas as pd from pathlib import Path from datetime import datetime, timedelta from typing import Dict, List class EntsoEFeatureProcessor: """Process raw ENTSO-E data into feature matrix.""" def __init__(self, raw_data_dir: Path, output_dir: Path): """Initialize processor. Args: raw_data_dir: Directory containing raw ENTSO-E parquet files output_dir: Directory to save processed features """ self.raw_data_dir = raw_data_dir self.output_dir = output_dir self.output_dir.mkdir(parents=True, exist_ok=True) def encode_transmission_outages_to_hourly( self, outages_df: pl.DataFrame, start_date: str, end_date: str ) -> pl.DataFrame: """Encode event-based transmission outages to hourly binary features. Converts outage events (start_time, end_time) to hourly time-series with binary indicator (0 = no outage, 1 = outage active) for each CNEC. Args: outages_df: Outage events DataFrame with columns: asset_eic, start_time, end_time start_date: Start date for hourly range (YYYY-MM-DD) end_date: End date for hourly range (YYYY-MM-DD) Returns: Polars DataFrame with hourly binary outage indicators Columns: timestamp, [cnec_eic_1], [cnec_eic_2], ... """ print("Encoding transmission outages to hourly binary features...") # Create complete hourly timestamp range hourly_range = pl.datetime_range( start=pl.datetime(2023, 10, 1, 0, 0, 0), end=pl.datetime(2025, 9, 30, 23, 0, 0), interval="1h", time_zone="UTC", eager=True ) # Initialize base DataFrame with hourly timestamps hourly_df = pl.DataFrame({ 'timestamp': hourly_range }) if outages_df.is_empty(): print(" No outages to encode") return hourly_df # Get unique CNECs unique_cnecs = outages_df.select('asset_eic').unique().sort('asset_eic') cnec_list = unique_cnecs.to_series().to_list() print(f" Encoding {len(cnec_list)} CNECs to hourly binary...") print(f" Hourly range: {len(hourly_df):,} hours") # For each CNEC, create binary indicator for i, cnec_eic in enumerate(cnec_list, 1): if i % 10 == 0: print(f" Processing CNEC {i}/{len(cnec_list)}...") # Filter outages for this CNEC cnec_outages = outages_df.filter(pl.col('asset_eic') == cnec_eic) # Initialize all hours as 0 (no outage) outage_indicator = pl.Series([0] * len(hourly_df)) # For each outage event, mark affected hours as 1 for row in cnec_outages.iter_rows(named=True): start_time = row['start_time'] end_time = row['end_time'] # Create mask for hours within outage period mask = ( (hourly_df['timestamp'] >= start_time) & (hourly_df['timestamp'] < end_time) ) # Set outage indicator to 1 for affected hours outage_indicator = pl.when(mask).then(1).otherwise(outage_indicator) # Add column for this CNEC col_name = f"outage_{cnec_eic}" hourly_df = hourly_df.with_columns(outage_indicator.alias(col_name)) print(f" ✓ Encoded {len(cnec_list)} CNEC outage features") print(f" Shape: {hourly_df.shape}") return hourly_df def encode_generation_outages_to_hourly( self, outages_df: pl.DataFrame, start_date: str, end_date: str ) -> pl.DataFrame: """Encode event-based generation outages to hourly features. Converts generation unit outage events to hourly time-series with: 1. Binary indicator (0/1): Whether outages are active 2. Capacity offline (MW): Total capacity offline Aggregates by zone-technology combination (e.g., FR_Nuclear, BE_Gas). Args: outages_df: Outage events DataFrame with columns: zone, psr_type, psr_name, capacity_mw, start_time, end_time start_date: Start date for hourly range (YYYY-MM-DD) end_date: End date for hourly range (YYYY-MM-DD) Returns: Polars DataFrame with hourly generation outage features Columns: timestamp, [zone_tech_binary], [zone_tech_mw], ... """ print("Encoding generation outages to hourly features...") # Create complete hourly timestamp range hourly_range = pl.datetime_range( start=pl.datetime(2023, 10, 1, 0, 0, 0), end=pl.datetime(2025, 9, 30, 23, 0, 0), interval="1h", time_zone="UTC", eager=True ) # Initialize base DataFrame with hourly timestamps hourly_df = pl.DataFrame({ 'timestamp': hourly_range }) if outages_df.is_empty(): print(" No generation outages to encode") return hourly_df # Create zone-technology combinations outages_df = outages_df.with_columns( (pl.col('zone') + "_" + pl.col('psr_name').str.replace_all(' ', '_')).alias('zone_tech') ) # Get unique zone-technology combinations unique_combos = outages_df.select('zone_tech').unique().sort('zone_tech') combo_list = unique_combos.to_series().to_list() print(f" Encoding {len(combo_list)} zone-technology combinations to hourly...") print(f" Hourly range: {len(hourly_df):,} hours") # For each zone-technology combination, create binary and capacity features for i, zone_tech in enumerate(combo_list, 1): if i % 5 == 0: print(f" Processing {i}/{len(combo_list)}...") # Filter outages for this zone-technology combo_outages = outages_df.filter(pl.col('zone_tech') == zone_tech) # Initialize all hours as 0 (no outage) outage_binary = pl.Series([0] * len(hourly_df)) outage_capacity = pl.Series([0.0] * len(hourly_df)) # For each outage event, mark affected hours for row in combo_outages.iter_rows(named=True): start_time = row['start_time'] end_time = row['end_time'] capacity_mw = row['capacity_mw'] # Create mask for hours within outage period mask = ( (hourly_df['timestamp'] >= start_time) & (hourly_df['timestamp'] < end_time) ) # Set binary indicator to 1 for affected hours outage_binary = pl.when(mask).then(1).otherwise(outage_binary) # Add capacity to total offline capacity (multiple outages may overlap) outage_capacity = pl.when(mask).then( outage_capacity + capacity_mw ).otherwise(outage_capacity) # Add columns for this zone-technology combination binary_col = f"gen_outage_{zone_tech}_binary" capacity_col = f"gen_outage_{zone_tech}_mw" hourly_df = hourly_df.with_columns([ outage_binary.alias(binary_col), outage_capacity.alias(capacity_col) ]) print(f" ✓ Encoded {len(combo_list)} zone-technology outage features") print(f" Features: {len(combo_list) * 2} (binary + MW for each)") print(f" Shape: {hourly_df.shape}") return hourly_df def interpolate_hydro_storage_to_hourly( self, hydro_df: pl.DataFrame, hourly_range: pl.Series ) -> pl.DataFrame: """Interpolate weekly hydro reservoir storage to hourly. Args: hydro_df: Weekly hydro storage DataFrame Columns: timestamp, storage_mwh, zone hourly_range: Hourly timestamp series to interpolate to Returns: Polars DataFrame with hourly interpolated storage Columns: timestamp, [zone_1_storage], [zone_2_storage], ... """ print("Interpolating hydro storage from weekly to hourly...") hourly_df = pl.DataFrame({'timestamp': hourly_range}) if hydro_df.is_empty(): print(" No hydro storage data to interpolate") return hourly_df # Get unique zones zones = hydro_df.select('zone').unique().sort('zone').to_series().to_list() print(f" Interpolating {len(zones)} zones...") for zone in zones: # Filter to this zone zone_df = hydro_df.filter(pl.col('zone') == zone).sort('timestamp') # Convert to pandas for interpolation zone_pd = zone_df.select(['timestamp', 'storage_mwh']).to_pandas() zone_pd = zone_pd.set_index('timestamp') # Reindex to hourly and interpolate hourly_pd = zone_pd.reindex(hourly_range.to_pandas()) hourly_pd['storage_mwh'] = hourly_pd['storage_mwh'].interpolate(method='linear') # Fill any remaining NaNs (at edges) with forward/backward fill hourly_pd['storage_mwh'] = hourly_pd['storage_mwh'].fillna(method='ffill').fillna(method='bfill') # Add to result col_name = f"hydro_storage_{zone}" hourly_df = hourly_df.with_columns( pl.Series(col_name, hourly_pd['storage_mwh'].values) ) print(f" ✓ Interpolated {len(zones)} hydro storage features to hourly") return hourly_df def pivot_to_wide_format( self, df: pl.DataFrame, index_col: str, pivot_col: str, value_col: str, prefix: str ) -> pl.DataFrame: """Pivot long-format data to wide format. Args: df: Input DataFrame in long format index_col: Column to use as index (e.g., 'timestamp') pivot_col: Column to pivot (e.g., 'zone' or 'psr_type') value_col: Column with values (e.g., 'generation_mw') prefix: Prefix for new column names Returns: Wide-format DataFrame """ # Group by timestamp and pivot column, aggregate to handle duplicates df_agg = df.group_by([index_col, pivot_col]).agg( pl.col(value_col).mean().alias(value_col) ) # Pivot to wide format df_wide = df_agg.pivot( values=value_col, index=index_col, columns=pivot_col ) # Rename columns with prefix new_columns = { col: f"{prefix}_{col}" if col != index_col else col for col in df_wide.columns } df_wide = df_wide.rename(new_columns) return df_wide def process_all_features( self, start_date: str = '2023-10-01', end_date: str = '2025-09-30' ) -> Dict[str, Path]: """Process all ENTSO-E raw data into features. Args: start_date: Start date (YYYY-MM-DD) end_date: End date (YYYY-MM-DD) Returns: Dictionary mapping feature types to output file paths """ print("="*80) print("ENTSO-E FEATURE PROCESSING") print("="*80) print() print(f"Period: {start_date} to {end_date}") print(f"Input: {self.raw_data_dir}") print(f"Output: {self.output_dir}") print() results = {} # Create hourly timestamp range for alignment hourly_range = pl.datetime_range( start=pl.datetime(2023, 10, 1, 0, 0, 0), end=pl.datetime(2025, 9, 30, 23, 0, 0), interval="1h", time_zone="UTC", eager=True ) # ==================================================================== # 1. Process Transmission Outages → Hourly Binary # ==================================================================== print("-"*80) print("[1/7] Processing Transmission Outages") print("-"*80) print() outages_file = self.raw_data_dir / "entsoe_transmission_outages_24month.parquet" if outages_file.exists(): outages_df = pl.read_parquet(outages_file) print(f"Loaded: {len(outages_df):,} outage events") outages_hourly = self.encode_transmission_outages_to_hourly( outages_df, start_date, end_date ) outages_path = self.output_dir / "entsoe_transmission_outages_hourly.parquet" outages_hourly.write_parquet(outages_path) results['transmission_outages'] = outages_path print(f"✓ Saved: {outages_path}") print(f" Shape: {outages_hourly.shape}") else: print(" Warning: Transmission outages file not found, skipping") print() # ==================================================================== # 2. Process Generation Outages → Hourly (Binary + MW) # ==================================================================== print("-"*80) print("[2/7] Processing Generation Outages") print("-"*80) print() gen_outages_file = self.raw_data_dir / "entsoe_generation_outages_24month.parquet" if gen_outages_file.exists(): gen_outages_df = pl.read_parquet(gen_outages_file) print(f"Loaded: {len(gen_outages_df):,} generation outage events") gen_outages_hourly = self.encode_generation_outages_to_hourly( gen_outages_df, start_date, end_date ) gen_outages_path = self.output_dir / "entsoe_generation_outages_hourly.parquet" gen_outages_hourly.write_parquet(gen_outages_path) results['generation_outages'] = gen_outages_path print(f"✓ Saved: {gen_outages_path}") print(f" Shape: {gen_outages_hourly.shape}") else: print(" Warning: Generation outages file not found, skipping") print() # ==================================================================== # 3. Process Generation by PSR Type → Wide Format # ==================================================================== print("-"*80) print("[3/7] Processing Generation by PSR Type") print("-"*80) print() gen_file = self.raw_data_dir / "entsoe_generation_by_psr_24month.parquet" if gen_file.exists(): gen_df = pl.read_parquet(gen_file) print(f"Loaded: {len(gen_df):,} records") # Create combined column: zone_psrname gen_df = gen_df.with_columns( (pl.col('zone') + "_" + pl.col('psr_name').str.replace_all(' ', '_')).alias('zone_psr') ) gen_wide = self.pivot_to_wide_format( gen_df, index_col='timestamp', pivot_col='zone_psr', value_col='generation_mw', prefix='gen' ) gen_path = self.output_dir / "entsoe_generation_hourly.parquet" gen_wide.write_parquet(gen_path) results['generation'] = gen_path print(f"✓ Saved: {gen_path}") print(f" Shape: {gen_wide.shape}") else: print(" Warning: Generation file not found, skipping") print() # ==================================================================== # 4. Process Demand → Wide Format # ==================================================================== print("-"*80) print("[4/7] Processing Demand") print("-"*80) print() demand_file = self.raw_data_dir / "entsoe_demand_24month.parquet" if demand_file.exists(): demand_df = pl.read_parquet(demand_file) print(f"Loaded: {len(demand_df):,} records") demand_wide = self.pivot_to_wide_format( demand_df, index_col='timestamp', pivot_col='zone', value_col='load_mw', prefix='demand' ) demand_path = self.output_dir / "entsoe_demand_hourly.parquet" demand_wide.write_parquet(demand_path) results['demand'] = demand_path print(f"✓ Saved: {demand_path}") print(f" Shape: {demand_wide.shape}") else: print(" Warning: Demand file not found, skipping") print() # ==================================================================== # 5. Process Day-Ahead Prices → Wide Format # ==================================================================== print("-"*80) print("[5/7] Processing Day-Ahead Prices") print("-"*80) print() prices_file = self.raw_data_dir / "entsoe_prices_24month.parquet" if prices_file.exists(): prices_df = pl.read_parquet(prices_file) print(f"Loaded: {len(prices_df):,} records") prices_wide = self.pivot_to_wide_format( prices_df, index_col='timestamp', pivot_col='zone', value_col='price_eur_mwh', prefix='price' ) prices_path = self.output_dir / "entsoe_prices_hourly.parquet" prices_wide.write_parquet(prices_path) results['prices'] = prices_path print(f"✓ Saved: {prices_path}") print(f" Shape: {prices_wide.shape}") else: print(" Warning: Prices file not found, skipping") print() # ==================================================================== # 6. Process Hydro Storage → Interpolated Hourly # ==================================================================== print("-"*80) print("[6/7] Processing Hydro Reservoir Storage") print("-"*80) print() hydro_file = self.raw_data_dir / "entsoe_hydro_storage_24month.parquet" if hydro_file.exists(): hydro_df = pl.read_parquet(hydro_file) print(f"Loaded: {len(hydro_df):,} weekly records") hydro_hourly = self.interpolate_hydro_storage_to_hourly( hydro_df, hourly_range ) hydro_path = self.output_dir / "entsoe_hydro_storage_hourly.parquet" hydro_hourly.write_parquet(hydro_path) results['hydro_storage'] = hydro_path print(f"✓ Saved: {hydro_path}") print(f" Shape: {hydro_hourly.shape}") else: print(" Warning: Hydro storage file not found, skipping") print() # ==================================================================== # 7. Process Pumped Storage & Load Forecast → Wide Format # ==================================================================== print("-"*80) print("[7/7] Processing Pumped Storage & Load Forecast") print("-"*80) print() # Pumped storage pumped_file = self.raw_data_dir / "entsoe_pumped_storage_24month.parquet" if pumped_file.exists(): pumped_df = pl.read_parquet(pumped_file) print(f"Pumped storage loaded: {len(pumped_df):,} records") pumped_wide = self.pivot_to_wide_format( pumped_df, index_col='timestamp', pivot_col='zone', value_col='generation_mw', prefix='pumped' ) pumped_path = self.output_dir / "entsoe_pumped_storage_hourly.parquet" pumped_wide.write_parquet(pumped_path) results['pumped_storage'] = pumped_path print(f"✓ Saved: {pumped_path}") print(f" Shape: {pumped_wide.shape}") # Load forecast forecast_file = self.raw_data_dir / "entsoe_load_forecast_24month.parquet" if forecast_file.exists(): forecast_df = pl.read_parquet(forecast_file) print(f"Load forecast loaded: {len(forecast_df):,} records") forecast_wide = self.pivot_to_wide_format( forecast_df, index_col='timestamp', pivot_col='zone', value_col='forecast_mw', prefix='load_forecast' ) forecast_path = self.output_dir / "entsoe_load_forecast_hourly.parquet" forecast_wide.write_parquet(forecast_path) results['load_forecast'] = forecast_path print(f"✓ Saved: {forecast_path}") print(f" Shape: {forecast_wide.shape}") print() print("="*80) print("PROCESSING COMPLETE") print("="*80) print() print(f"Processed {len(results)} feature types:") for feature_type, path in results.items(): file_size = path.stat().st_size / (1024**2) print(f" {feature_type}: {file_size:.1f} MB") print() return results if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Process ENTSO-E raw data into features") parser.add_argument( '--raw-data-dir', type=Path, default=Path('data/raw'), help='Directory containing raw ENTSO-E parquet files' ) parser.add_argument( '--output-dir', type=Path, default=Path('data/processed'), help='Output directory for processed features' ) parser.add_argument( '--start-date', default='2023-10-01', help='Start date (YYYY-MM-DD)' ) parser.add_argument( '--end-date', default='2025-09-30', help='End date (YYYY-MM-DD)' ) args = parser.parse_args() # Initialize processor processor = EntsoEFeatureProcessor( raw_data_dir=args.raw_data_dir, output_dir=args.output_dir ) # Process all features results = processor.process_all_features( start_date=args.start_date, end_date=args.end_date ) print("Next steps:") print(" 1. Merge all ENTSO-E features into single matrix") print(" 2. Combine with JAO features (726) → ~952-1,037 total features") print(" 3. Create ENTSO-E features EDA notebook for validation")