fbmc-chronos2 / src /data_processing /unify_jao_data.py
Evgueni Poloukarov
feat: complete Phase 1 ENTSO-E asset-specific outage validation
27cb60a
raw
history blame
11.1 kB
"""Unify JAO datasets into single timeline.
Combines MaxBEX, CNEC/PTDF, LTA, and Net Positions data into a single
unified dataset with proper timestamp alignment.
Author: Claude
Date: 2025-11-06
"""
from pathlib import Path
from typing import Tuple
import polars as pl
def validate_timeline(df: pl.DataFrame, name: str) -> None:
"""Validate timeline is hourly with no gaps."""
print(f"\nValidating {name} timeline...")
# Check sorted
if not df['mtu'].is_sorted():
raise ValueError(f"{name}: Timeline not sorted")
# Check for gaps (should be hourly)
time_diffs = df['mtu'].diff().drop_nulls()
most_common = time_diffs.mode()[0]
# Most common should be 1 hour (allow for DST transitions)
if most_common.total_seconds() != 3600:
print(f" [WARNING] Most common time diff: {most_common} (expected 1 hour)")
print(f" [OK] {name} timeline validated: {len(df)} records, sorted")
def add_timestamp_to_maxbex(
maxbex: pl.DataFrame,
master_timeline: pl.DataFrame
) -> pl.DataFrame:
"""Add mtu timestamp to MaxBEX via row alignment."""
print("\nAdding timestamp to MaxBEX...")
# Verify same length
if len(maxbex) != len(master_timeline):
raise ValueError(
f"MaxBEX ({len(maxbex)}) and timeline ({len(master_timeline)}) "
"have different lengths"
)
# Add mtu column via hstack
maxbex_with_time = maxbex.hstack(master_timeline)
print(f" [OK] MaxBEX timestamp added: {len(maxbex_with_time)} records")
return maxbex_with_time
def fill_lta_gaps(
lta: pl.DataFrame,
master_timeline: pl.DataFrame
) -> pl.DataFrame:
"""Fill LTA gaps using forward-fill strategy."""
print("\nFilling LTA gaps...")
# Report initial state
initial_records = len(lta)
expected_records = len(master_timeline)
missing_hours = expected_records - initial_records
print(f" Initial LTA records: {initial_records:,}")
print(f" Expected records: {expected_records:,}")
print(f" Missing hours: {missing_hours:,} ({missing_hours/expected_records*100:.1f}%)")
# Remove metadata columns
lta_clean = lta.drop(['is_masked', 'masking_method'], strict=False)
# Left join master timeline with LTA
lta_complete = master_timeline.join(
lta_clean,
on='mtu',
how='left'
)
# Get border columns
border_cols = [c for c in lta_complete.columns if c.startswith('border_')]
# Forward-fill gaps (LTA changes rarely)
lta_complete = lta_complete.with_columns([
pl.col(col).forward_fill().alias(col)
for col in border_cols
])
# Fill any remaining nulls at start with 0
lta_complete = lta_complete.fill_null(0)
# Verify no nulls remain
null_count = lta_complete.null_count().sum_horizontal()[0]
if null_count > 0:
raise ValueError(f"LTA still has {null_count} nulls after filling")
print(f" [OK] LTA complete: {len(lta_complete)} records, 0 nulls")
return lta_complete
def broadcast_cnec_to_hourly(
cnec: pl.DataFrame,
master_timeline: pl.DataFrame
) -> pl.DataFrame:
"""Broadcast daily CNEC snapshots to hourly timeline."""
print("\nBroadcasting CNEC from daily to hourly...")
# Report initial state
unique_days = cnec['collection_date'].dt.date().n_unique()
print(f" CNEC unique days: {unique_days}")
print(f" Target hours: {len(master_timeline):,}")
# Extract date from master timeline
master_with_date = master_timeline.with_columns([
pl.col('mtu').dt.date().alias('date')
])
# Extract date from CNEC collection_date
cnec_with_date = cnec.with_columns([
pl.col('collection_date').dt.date().alias('date')
])
# Drop collection_date, keep date for join
cnec_with_date = cnec_with_date.drop('collection_date')
# Join: Each day's CNEC snapshot broadcasts to 24-26 hours
# Use left join to keep all hours even if no CNEC data
cnec_hourly = master_with_date.join(
cnec_with_date,
on='date',
how='left'
)
# Drop the date column used for join
cnec_hourly = cnec_hourly.drop('date')
print(f" [OK] CNEC hourly: {len(cnec_hourly)} records")
print(f" [INFO] CNEC in long format - multiple rows per timestamp (one per CNEC)")
return cnec_hourly
def join_datasets(
master_timeline: pl.DataFrame,
maxbex_with_time: pl.DataFrame,
lta_complete: pl.DataFrame,
netpos: pl.DataFrame,
cnec_hourly: pl.DataFrame
) -> pl.DataFrame:
"""Join all datasets on mtu timestamp."""
print("\nJoining all datasets...")
# Start with MaxBEX (already has mtu via hstack)
# MaxBEX is already aligned by row, so we can use it directly
unified = maxbex_with_time.clone()
print(f" Starting with MaxBEX: {unified.shape}")
# Join LTA
unified = unified.join(
lta_complete,
on='mtu',
how='left',
suffix='_lta'
)
# Drop duplicate mtu if created
if 'mtu_lta' in unified.columns:
unified = unified.drop('mtu_lta')
print(f" After LTA: {unified.shape}")
# Join NetPos
netpos_clean = netpos.drop(['collection_date'], strict=False)
unified = unified.join(
netpos_clean,
on='mtu',
how='left',
suffix='_netpos'
)
# Drop duplicate mtu if created
if 'mtu_netpos' in unified.columns:
unified = unified.drop('mtu_netpos')
print(f" After NetPos: {unified.shape}")
# Note: CNEC is in long format, would explode the dataset
# We'll handle CNEC separately in feature engineering
print(f" [INFO] CNEC not joined (long format - handle in feature engineering)")
# Sort by timestamp (joins may have shuffled rows)
print(f"\nSorting by timestamp...")
unified = unified.sort('mtu')
print(f" [OK] Unified dataset: {unified.shape}")
print(f" [OK] Timeline sorted: {unified['mtu'].is_sorted()}")
return unified
def unify_jao_data(
maxbex_path: Path,
cnec_path: Path,
lta_path: Path,
netpos_path: Path,
output_dir: Path
) -> Tuple[pl.DataFrame, pl.DataFrame]:
"""Unify all JAO datasets into single timeline.
Args:
maxbex_path: Path to MaxBEX parquet file
cnec_path: Path to CNEC/PTDF parquet file
lta_path: Path to LTA parquet file
netpos_path: Path to Net Positions parquet file
output_dir: Directory to save unified data
Returns:
Tuple of (unified_wide, cnec_hourly) DataFrames
"""
print("\n" + "=" * 80)
print("JAO DATA UNIFICATION")
print("=" * 80)
# 1. Load datasets
print("\nLoading datasets...")
maxbex = pl.read_parquet(maxbex_path)
cnec = pl.read_parquet(cnec_path)
lta = pl.read_parquet(lta_path)
netpos = pl.read_parquet(netpos_path)
print(f" MaxBEX: {maxbex.shape}")
print(f" CNEC: {cnec.shape}")
print(f" LTA: {lta.shape}")
print(f" NetPos (raw): {netpos.shape}")
# 2. Deduplicate NetPos and align MaxBEX
# MaxBEX has no timestamp - it's row-aligned with NetPos
# Need to deduplicate both together to maintain alignment
print("\nDeduplicating NetPos and aligning MaxBEX...")
# Verify same length (must be row-aligned)
if len(maxbex) != len(netpos):
raise ValueError(
f"MaxBEX ({len(maxbex)}) and NetPos ({len(netpos)}) "
"have different lengths - cannot align"
)
# Add mtu column to MaxBEX via hstack (before deduplication)
maxbex_with_time = maxbex.hstack(netpos.select(['mtu']))
print(f" MaxBEX + NetPos aligned: {maxbex_with_time.shape}")
# Deduplicate MaxBEX based on mtu timestamp
maxbex_before = len(maxbex_with_time)
maxbex_with_time = maxbex_with_time.unique(subset=['mtu'], keep='first')
maxbex_after = len(maxbex_with_time)
maxbex_duplicates = maxbex_before - maxbex_after
if maxbex_duplicates > 0:
print(f" MaxBEX deduplicated: {maxbex_with_time.shape} ({maxbex_duplicates:,} duplicates removed)")
# Deduplicate NetPos
netpos_before = len(netpos)
netpos = netpos.unique(subset=['mtu'], keep='first')
netpos_after = len(netpos)
netpos_duplicates = netpos_before - netpos_after
if netpos_duplicates > 0:
print(f" NetPos deduplicated: {netpos.shape} ({netpos_duplicates:,} duplicates removed)")
# 3. Create master timeline from deduplicated NetPos
print("\nCreating master timeline from Net Positions...")
master_timeline = netpos.select(['mtu']).sort('mtu')
validate_timeline(master_timeline, "Master")
# 4. Fill LTA gaps
lta_complete = fill_lta_gaps(lta, master_timeline)
# 5. Broadcast CNEC to hourly
cnec_hourly = broadcast_cnec_to_hourly(cnec, master_timeline)
# 6. Join datasets (wide format: MaxBEX + LTA + NetPos)
unified_wide = join_datasets(
master_timeline,
maxbex_with_time,
lta_complete,
netpos,
cnec_hourly
)
# 7. Save outputs
print("\nSaving unified data...")
output_dir.mkdir(parents=True, exist_ok=True)
unified_wide_path = output_dir / 'unified_jao_24month.parquet'
cnec_hourly_path = output_dir / 'cnec_hourly_24month.parquet'
unified_wide.write_parquet(unified_wide_path)
cnec_hourly.write_parquet(cnec_hourly_path)
print(f" [OK] Unified wide: {unified_wide_path}")
print(f" Size: {unified_wide_path.stat().st_size / (1024**2):.2f} MB")
print(f" [OK] CNEC hourly: {cnec_hourly_path}")
print(f" Size: {cnec_hourly_path.stat().st_size / (1024**2):.2f} MB")
# 8. Validation summary
print("\n" + "=" * 80)
print("UNIFICATION COMPLETE")
print("=" * 80)
print(f"Unified wide dataset: {unified_wide.shape}")
print(f" - mtu timestamp: 1 column")
print(f" - MaxBEX borders: 132 columns")
print(f" - LTA borders: 38 columns")
print(f" - Net Positions: 28 columns")
print(f" Total: {unified_wide.shape[1]} columns")
print()
print(f"CNEC hourly dataset: {cnec_hourly.shape}")
print(f" - Long format (one row per CNEC per hour)")
print(f" - Used in feature engineering phase")
print("=" * 80)
print()
return unified_wide, cnec_hourly
def main():
"""Main execution."""
# Paths
base_dir = Path.cwd()
data_dir = base_dir / 'data' / 'raw' / 'phase1_24month'
output_dir = base_dir / 'data' / 'processed'
maxbex_path = data_dir / 'jao_maxbex.parquet'
cnec_path = data_dir / 'jao_cnec_ptdf.parquet'
lta_path = data_dir / 'jao_lta.parquet'
netpos_path = data_dir / 'jao_net_positions.parquet'
# Verify files exist
for path in [maxbex_path, cnec_path, lta_path, netpos_path]:
if not path.exists():
raise FileNotFoundError(f"Required file not found: {path}")
# Unify
unified_wide, cnec_hourly = unify_jao_data(
maxbex_path,
cnec_path,
lta_path,
netpos_path,
output_dir
)
print("SUCCESS: JAO data unified and saved to data/processed/")
if __name__ == '__main__':
main()