fbmc-chronos2 / src /data_processing /process_entsoe_features.py
Evgueni Poloukarov
feat: Phase 1 complete - Master CNEC list + synchronized feature engineering
d4939ce
raw
history blame
23.3 kB
"""
Process ENTSO-E Raw Data into Features
=======================================
Transforms raw ENTSO-E data into feature matrix:
1. Encode transmission outages: Event-based → Hourly binary (0/1 per CNEC)
2. Encode generation outages: Event-based → Hourly (binary + MW per zone-tech)
3. Interpolate hydro storage: Weekly → Hourly
4. Pivot generation/demand/prices: Long → Wide format
5. Align all timestamps to MTU (Europe/Amsterdam timezone)
6. Merge into single feature matrix
Input: Raw parquet files from collect_entsoe_24month.py
Output: Unified ENTSO-E feature matrix (parquet)
"""
import polars as pl
import pandas as pd
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List
class EntsoEFeatureProcessor:
"""Process raw ENTSO-E data into feature matrix."""
def __init__(self, raw_data_dir: Path, output_dir: Path):
"""Initialize processor.
Args:
raw_data_dir: Directory containing raw ENTSO-E parquet files
output_dir: Directory to save processed features
"""
self.raw_data_dir = raw_data_dir
self.output_dir = output_dir
self.output_dir.mkdir(parents=True, exist_ok=True)
def encode_transmission_outages_to_hourly(
self,
outages_df: pl.DataFrame,
start_date: str,
end_date: str
) -> pl.DataFrame:
"""Encode event-based transmission outages to hourly binary features.
Converts outage events (start_time, end_time) to hourly time-series
with binary indicator (0 = no outage, 1 = outage active) for each CNEC.
Args:
outages_df: Outage events DataFrame with columns:
asset_eic, start_time, end_time
start_date: Start date for hourly range (YYYY-MM-DD)
end_date: End date for hourly range (YYYY-MM-DD)
Returns:
Polars DataFrame with hourly binary outage indicators
Columns: timestamp, [cnec_eic_1], [cnec_eic_2], ...
"""
print("Encoding transmission outages to hourly binary features...")
# Create complete hourly timestamp range
hourly_range = pl.datetime_range(
start=pl.datetime(2023, 10, 1, 0, 0, 0),
end=pl.datetime(2025, 9, 30, 23, 0, 0),
interval="1h",
time_zone="UTC",
eager=True
)
# Initialize base DataFrame with hourly timestamps
hourly_df = pl.DataFrame({
'timestamp': hourly_range
})
if outages_df.is_empty():
print(" No outages to encode")
return hourly_df
# Get unique CNECs
unique_cnecs = outages_df.select('asset_eic').unique().sort('asset_eic')
cnec_list = unique_cnecs.to_series().to_list()
print(f" Encoding {len(cnec_list)} CNECs to hourly binary...")
print(f" Hourly range: {len(hourly_df):,} hours")
# For each CNEC, create binary indicator
for i, cnec_eic in enumerate(cnec_list, 1):
if i % 10 == 0:
print(f" Processing CNEC {i}/{len(cnec_list)}...")
# Filter outages for this CNEC
cnec_outages = outages_df.filter(pl.col('asset_eic') == cnec_eic)
# Initialize all hours as 0 (no outage)
outage_indicator = pl.Series([0] * len(hourly_df))
# For each outage event, mark affected hours as 1
for row in cnec_outages.iter_rows(named=True):
start_time = row['start_time']
end_time = row['end_time']
# Create mask for hours within outage period
mask = (
(hourly_df['timestamp'] >= start_time) &
(hourly_df['timestamp'] < end_time)
)
# Set outage indicator to 1 for affected hours
outage_indicator = pl.when(mask).then(1).otherwise(outage_indicator)
# Add column for this CNEC
col_name = f"outage_{cnec_eic}"
hourly_df = hourly_df.with_columns(outage_indicator.alias(col_name))
print(f" ✓ Encoded {len(cnec_list)} CNEC outage features")
print(f" Shape: {hourly_df.shape}")
return hourly_df
def encode_generation_outages_to_hourly(
self,
outages_df: pl.DataFrame,
start_date: str,
end_date: str
) -> pl.DataFrame:
"""Encode event-based generation outages to hourly features.
Converts generation unit outage events to hourly time-series with:
1. Binary indicator (0/1): Whether outages are active
2. Capacity offline (MW): Total capacity offline
Aggregates by zone-technology combination (e.g., FR_Nuclear, BE_Gas).
Args:
outages_df: Outage events DataFrame with columns:
zone, psr_type, psr_name, capacity_mw, start_time, end_time
start_date: Start date for hourly range (YYYY-MM-DD)
end_date: End date for hourly range (YYYY-MM-DD)
Returns:
Polars DataFrame with hourly generation outage features
Columns: timestamp, [zone_tech_binary], [zone_tech_mw], ...
"""
print("Encoding generation outages to hourly features...")
# Create complete hourly timestamp range
hourly_range = pl.datetime_range(
start=pl.datetime(2023, 10, 1, 0, 0, 0),
end=pl.datetime(2025, 9, 30, 23, 0, 0),
interval="1h",
time_zone="UTC",
eager=True
)
# Initialize base DataFrame with hourly timestamps
hourly_df = pl.DataFrame({
'timestamp': hourly_range
})
if outages_df.is_empty():
print(" No generation outages to encode")
return hourly_df
# Create zone-technology combinations
outages_df = outages_df.with_columns(
(pl.col('zone') + "_" + pl.col('psr_name').str.replace_all(' ', '_')).alias('zone_tech')
)
# Get unique zone-technology combinations
unique_combos = outages_df.select('zone_tech').unique().sort('zone_tech')
combo_list = unique_combos.to_series().to_list()
print(f" Encoding {len(combo_list)} zone-technology combinations to hourly...")
print(f" Hourly range: {len(hourly_df):,} hours")
# For each zone-technology combination, create binary and capacity features
for i, zone_tech in enumerate(combo_list, 1):
if i % 5 == 0:
print(f" Processing {i}/{len(combo_list)}...")
# Filter outages for this zone-technology
combo_outages = outages_df.filter(pl.col('zone_tech') == zone_tech)
# Initialize all hours as 0 (no outage)
outage_binary = pl.Series([0] * len(hourly_df))
outage_capacity = pl.Series([0.0] * len(hourly_df))
# For each outage event, mark affected hours
for row in combo_outages.iter_rows(named=True):
start_time = row['start_time']
end_time = row['end_time']
capacity_mw = row['capacity_mw']
# Create mask for hours within outage period
mask = (
(hourly_df['timestamp'] >= start_time) &
(hourly_df['timestamp'] < end_time)
)
# Set binary indicator to 1 for affected hours
outage_binary = pl.when(mask).then(1).otherwise(outage_binary)
# Add capacity to total offline capacity (multiple outages may overlap)
outage_capacity = pl.when(mask).then(
outage_capacity + capacity_mw
).otherwise(outage_capacity)
# Add columns for this zone-technology combination
binary_col = f"gen_outage_{zone_tech}_binary"
capacity_col = f"gen_outage_{zone_tech}_mw"
hourly_df = hourly_df.with_columns([
outage_binary.alias(binary_col),
outage_capacity.alias(capacity_col)
])
print(f" ✓ Encoded {len(combo_list)} zone-technology outage features")
print(f" Features: {len(combo_list) * 2} (binary + MW for each)")
print(f" Shape: {hourly_df.shape}")
return hourly_df
def interpolate_hydro_storage_to_hourly(
self,
hydro_df: pl.DataFrame,
hourly_range: pl.Series
) -> pl.DataFrame:
"""Interpolate weekly hydro reservoir storage to hourly.
Args:
hydro_df: Weekly hydro storage DataFrame
Columns: timestamp, storage_mwh, zone
hourly_range: Hourly timestamp series to interpolate to
Returns:
Polars DataFrame with hourly interpolated storage
Columns: timestamp, [zone_1_storage], [zone_2_storage], ...
"""
print("Interpolating hydro storage from weekly to hourly...")
hourly_df = pl.DataFrame({'timestamp': hourly_range})
if hydro_df.is_empty():
print(" No hydro storage data to interpolate")
return hourly_df
# Get unique zones
zones = hydro_df.select('zone').unique().sort('zone').to_series().to_list()
print(f" Interpolating {len(zones)} zones...")
for zone in zones:
# Filter to this zone
zone_df = hydro_df.filter(pl.col('zone') == zone).sort('timestamp')
# Convert to pandas for interpolation
zone_pd = zone_df.select(['timestamp', 'storage_mwh']).to_pandas()
zone_pd = zone_pd.set_index('timestamp')
# Reindex to hourly and interpolate
hourly_pd = zone_pd.reindex(hourly_range.to_pandas())
hourly_pd['storage_mwh'] = hourly_pd['storage_mwh'].interpolate(method='linear')
# Fill any remaining NaNs (at edges) with forward/backward fill
hourly_pd['storage_mwh'] = hourly_pd['storage_mwh'].fillna(method='ffill').fillna(method='bfill')
# Add to result
col_name = f"hydro_storage_{zone}"
hourly_df = hourly_df.with_columns(
pl.Series(col_name, hourly_pd['storage_mwh'].values)
)
print(f" ✓ Interpolated {len(zones)} hydro storage features to hourly")
return hourly_df
def pivot_to_wide_format(
self,
df: pl.DataFrame,
index_col: str,
pivot_col: str,
value_col: str,
prefix: str
) -> pl.DataFrame:
"""Pivot long-format data to wide format.
Args:
df: Input DataFrame in long format
index_col: Column to use as index (e.g., 'timestamp')
pivot_col: Column to pivot (e.g., 'zone' or 'psr_type')
value_col: Column with values (e.g., 'generation_mw')
prefix: Prefix for new column names
Returns:
Wide-format DataFrame
"""
# Group by timestamp and pivot column, aggregate to handle duplicates
df_agg = df.group_by([index_col, pivot_col]).agg(
pl.col(value_col).mean().alias(value_col)
)
# Pivot to wide format
df_wide = df_agg.pivot(
values=value_col,
index=index_col,
columns=pivot_col
)
# Rename columns with prefix
new_columns = {
col: f"{prefix}_{col}" if col != index_col else col
for col in df_wide.columns
}
df_wide = df_wide.rename(new_columns)
return df_wide
def process_all_features(
self,
start_date: str = '2023-10-01',
end_date: str = '2025-09-30'
) -> Dict[str, Path]:
"""Process all ENTSO-E raw data into features.
Args:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
Returns:
Dictionary mapping feature types to output file paths
"""
print("="*80)
print("ENTSO-E FEATURE PROCESSING")
print("="*80)
print()
print(f"Period: {start_date} to {end_date}")
print(f"Input: {self.raw_data_dir}")
print(f"Output: {self.output_dir}")
print()
results = {}
# Create hourly timestamp range for alignment
hourly_range = pl.datetime_range(
start=pl.datetime(2023, 10, 1, 0, 0, 0),
end=pl.datetime(2025, 9, 30, 23, 0, 0),
interval="1h",
time_zone="UTC",
eager=True
)
# ====================================================================
# 1. Process Transmission Outages → Hourly Binary
# ====================================================================
print("-"*80)
print("[1/7] Processing Transmission Outages")
print("-"*80)
print()
outages_file = self.raw_data_dir / "entsoe_transmission_outages_24month.parquet"
if outages_file.exists():
outages_df = pl.read_parquet(outages_file)
print(f"Loaded: {len(outages_df):,} outage events")
outages_hourly = self.encode_transmission_outages_to_hourly(
outages_df, start_date, end_date
)
outages_path = self.output_dir / "entsoe_transmission_outages_hourly.parquet"
outages_hourly.write_parquet(outages_path)
results['transmission_outages'] = outages_path
print(f"✓ Saved: {outages_path}")
print(f" Shape: {outages_hourly.shape}")
else:
print(" Warning: Transmission outages file not found, skipping")
print()
# ====================================================================
# 2. Process Generation Outages → Hourly (Binary + MW)
# ====================================================================
print("-"*80)
print("[2/7] Processing Generation Outages")
print("-"*80)
print()
gen_outages_file = self.raw_data_dir / "entsoe_generation_outages_24month.parquet"
if gen_outages_file.exists():
gen_outages_df = pl.read_parquet(gen_outages_file)
print(f"Loaded: {len(gen_outages_df):,} generation outage events")
gen_outages_hourly = self.encode_generation_outages_to_hourly(
gen_outages_df, start_date, end_date
)
gen_outages_path = self.output_dir / "entsoe_generation_outages_hourly.parquet"
gen_outages_hourly.write_parquet(gen_outages_path)
results['generation_outages'] = gen_outages_path
print(f"✓ Saved: {gen_outages_path}")
print(f" Shape: {gen_outages_hourly.shape}")
else:
print(" Warning: Generation outages file not found, skipping")
print()
# ====================================================================
# 3. Process Generation by PSR Type → Wide Format
# ====================================================================
print("-"*80)
print("[3/7] Processing Generation by PSR Type")
print("-"*80)
print()
gen_file = self.raw_data_dir / "entsoe_generation_by_psr_24month.parquet"
if gen_file.exists():
gen_df = pl.read_parquet(gen_file)
print(f"Loaded: {len(gen_df):,} records")
# Create combined column: zone_psrname
gen_df = gen_df.with_columns(
(pl.col('zone') + "_" + pl.col('psr_name').str.replace_all(' ', '_')).alias('zone_psr')
)
gen_wide = self.pivot_to_wide_format(
gen_df,
index_col='timestamp',
pivot_col='zone_psr',
value_col='generation_mw',
prefix='gen'
)
gen_path = self.output_dir / "entsoe_generation_hourly.parquet"
gen_wide.write_parquet(gen_path)
results['generation'] = gen_path
print(f"✓ Saved: {gen_path}")
print(f" Shape: {gen_wide.shape}")
else:
print(" Warning: Generation file not found, skipping")
print()
# ====================================================================
# 4. Process Demand → Wide Format
# ====================================================================
print("-"*80)
print("[4/7] Processing Demand")
print("-"*80)
print()
demand_file = self.raw_data_dir / "entsoe_demand_24month.parquet"
if demand_file.exists():
demand_df = pl.read_parquet(demand_file)
print(f"Loaded: {len(demand_df):,} records")
demand_wide = self.pivot_to_wide_format(
demand_df,
index_col='timestamp',
pivot_col='zone',
value_col='load_mw',
prefix='demand'
)
demand_path = self.output_dir / "entsoe_demand_hourly.parquet"
demand_wide.write_parquet(demand_path)
results['demand'] = demand_path
print(f"✓ Saved: {demand_path}")
print(f" Shape: {demand_wide.shape}")
else:
print(" Warning: Demand file not found, skipping")
print()
# ====================================================================
# 5. Process Day-Ahead Prices → Wide Format
# ====================================================================
print("-"*80)
print("[5/7] Processing Day-Ahead Prices")
print("-"*80)
print()
prices_file = self.raw_data_dir / "entsoe_prices_24month.parquet"
if prices_file.exists():
prices_df = pl.read_parquet(prices_file)
print(f"Loaded: {len(prices_df):,} records")
prices_wide = self.pivot_to_wide_format(
prices_df,
index_col='timestamp',
pivot_col='zone',
value_col='price_eur_mwh',
prefix='price'
)
prices_path = self.output_dir / "entsoe_prices_hourly.parquet"
prices_wide.write_parquet(prices_path)
results['prices'] = prices_path
print(f"✓ Saved: {prices_path}")
print(f" Shape: {prices_wide.shape}")
else:
print(" Warning: Prices file not found, skipping")
print()
# ====================================================================
# 6. Process Hydro Storage → Interpolated Hourly
# ====================================================================
print("-"*80)
print("[6/7] Processing Hydro Reservoir Storage")
print("-"*80)
print()
hydro_file = self.raw_data_dir / "entsoe_hydro_storage_24month.parquet"
if hydro_file.exists():
hydro_df = pl.read_parquet(hydro_file)
print(f"Loaded: {len(hydro_df):,} weekly records")
hydro_hourly = self.interpolate_hydro_storage_to_hourly(
hydro_df, hourly_range
)
hydro_path = self.output_dir / "entsoe_hydro_storage_hourly.parquet"
hydro_hourly.write_parquet(hydro_path)
results['hydro_storage'] = hydro_path
print(f"✓ Saved: {hydro_path}")
print(f" Shape: {hydro_hourly.shape}")
else:
print(" Warning: Hydro storage file not found, skipping")
print()
# ====================================================================
# 7. Process Pumped Storage & Load Forecast → Wide Format
# ====================================================================
print("-"*80)
print("[7/7] Processing Pumped Storage & Load Forecast")
print("-"*80)
print()
# Pumped storage
pumped_file = self.raw_data_dir / "entsoe_pumped_storage_24month.parquet"
if pumped_file.exists():
pumped_df = pl.read_parquet(pumped_file)
print(f"Pumped storage loaded: {len(pumped_df):,} records")
pumped_wide = self.pivot_to_wide_format(
pumped_df,
index_col='timestamp',
pivot_col='zone',
value_col='generation_mw',
prefix='pumped'
)
pumped_path = self.output_dir / "entsoe_pumped_storage_hourly.parquet"
pumped_wide.write_parquet(pumped_path)
results['pumped_storage'] = pumped_path
print(f"✓ Saved: {pumped_path}")
print(f" Shape: {pumped_wide.shape}")
# Load forecast
forecast_file = self.raw_data_dir / "entsoe_load_forecast_24month.parquet"
if forecast_file.exists():
forecast_df = pl.read_parquet(forecast_file)
print(f"Load forecast loaded: {len(forecast_df):,} records")
forecast_wide = self.pivot_to_wide_format(
forecast_df,
index_col='timestamp',
pivot_col='zone',
value_col='forecast_mw',
prefix='load_forecast'
)
forecast_path = self.output_dir / "entsoe_load_forecast_hourly.parquet"
forecast_wide.write_parquet(forecast_path)
results['load_forecast'] = forecast_path
print(f"✓ Saved: {forecast_path}")
print(f" Shape: {forecast_wide.shape}")
print()
print("="*80)
print("PROCESSING COMPLETE")
print("="*80)
print()
print(f"Processed {len(results)} feature types:")
for feature_type, path in results.items():
file_size = path.stat().st_size / (1024**2)
print(f" {feature_type}: {file_size:.1f} MB")
print()
return results
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Process ENTSO-E raw data into features")
parser.add_argument(
'--raw-data-dir',
type=Path,
default=Path('data/raw'),
help='Directory containing raw ENTSO-E parquet files'
)
parser.add_argument(
'--output-dir',
type=Path,
default=Path('data/processed'),
help='Output directory for processed features'
)
parser.add_argument(
'--start-date',
default='2023-10-01',
help='Start date (YYYY-MM-DD)'
)
parser.add_argument(
'--end-date',
default='2025-09-30',
help='End date (YYYY-MM-DD)'
)
args = parser.parse_args()
# Initialize processor
processor = EntsoEFeatureProcessor(
raw_data_dir=args.raw_data_dir,
output_dir=args.output_dir
)
# Process all features
results = processor.process_all_features(
start_date=args.start_date,
end_date=args.end_date
)
print("Next steps:")
print(" 1. Merge all ENTSO-E features into single matrix")
print(" 2. Combine with JAO features (726) → ~952-1,037 total features")
print(" 3. Create ENTSO-E features EDA notebook for validation")