"""Unify JAO datasets into single timeline. Combines MaxBEX, CNEC/PTDF, LTA, and Net Positions data into a single unified dataset with proper timestamp alignment. Author: Claude Date: 2025-11-06 """ from pathlib import Path from typing import Tuple import polars as pl def validate_timeline(df: pl.DataFrame, name: str) -> None: """Validate timeline is hourly with no gaps.""" print(f"\nValidating {name} timeline...") # Check sorted if not df['mtu'].is_sorted(): raise ValueError(f"{name}: Timeline not sorted") # Check for gaps (should be hourly) time_diffs = df['mtu'].diff().drop_nulls() most_common = time_diffs.mode()[0] # Most common should be 1 hour (allow for DST transitions) if most_common.total_seconds() != 3600: print(f" [WARNING] Most common time diff: {most_common} (expected 1 hour)") print(f" [OK] {name} timeline validated: {len(df)} records, sorted") def add_timestamp_to_maxbex( maxbex: pl.DataFrame, master_timeline: pl.DataFrame ) -> pl.DataFrame: """Add mtu timestamp to MaxBEX via row alignment.""" print("\nAdding timestamp to MaxBEX...") # Verify same length if len(maxbex) != len(master_timeline): raise ValueError( f"MaxBEX ({len(maxbex)}) and timeline ({len(master_timeline)}) " "have different lengths" ) # Add mtu column via hstack maxbex_with_time = maxbex.hstack(master_timeline) print(f" [OK] MaxBEX timestamp added: {len(maxbex_with_time)} records") return maxbex_with_time def fill_lta_gaps( lta: pl.DataFrame, master_timeline: pl.DataFrame ) -> pl.DataFrame: """Fill LTA gaps using forward-fill strategy.""" print("\nFilling LTA gaps...") # Report initial state initial_records = len(lta) expected_records = len(master_timeline) missing_hours = expected_records - initial_records print(f" Initial LTA records: {initial_records:,}") print(f" Expected records: {expected_records:,}") print(f" Missing hours: {missing_hours:,} ({missing_hours/expected_records*100:.1f}%)") # Remove metadata columns lta_clean = lta.drop(['is_masked', 'masking_method'], strict=False) # Left join master timeline with LTA lta_complete = master_timeline.join( lta_clean, on='mtu', how='left' ) # Get border columns border_cols = [c for c in lta_complete.columns if c.startswith('border_')] # Forward-fill gaps (LTA changes rarely) lta_complete = lta_complete.with_columns([ pl.col(col).forward_fill().alias(col) for col in border_cols ]) # Fill any remaining nulls at start with 0 lta_complete = lta_complete.fill_null(0) # Verify no nulls remain null_count = lta_complete.null_count().sum_horizontal()[0] if null_count > 0: raise ValueError(f"LTA still has {null_count} nulls after filling") print(f" [OK] LTA complete: {len(lta_complete)} records, 0 nulls") return lta_complete def broadcast_cnec_to_hourly( cnec: pl.DataFrame, master_timeline: pl.DataFrame ) -> pl.DataFrame: """Broadcast daily CNEC snapshots to hourly timeline.""" print("\nBroadcasting CNEC from daily to hourly...") # Report initial state unique_days = cnec['collection_date'].dt.date().n_unique() print(f" CNEC unique days: {unique_days}") print(f" Target hours: {len(master_timeline):,}") # Extract date from master timeline master_with_date = master_timeline.with_columns([ pl.col('mtu').dt.date().alias('date') ]) # Extract date from CNEC collection_date cnec_with_date = cnec.with_columns([ pl.col('collection_date').dt.date().alias('date') ]) # Drop collection_date, keep date for join cnec_with_date = cnec_with_date.drop('collection_date') # Join: Each day's CNEC snapshot broadcasts to 24-26 hours # Use left join to keep all hours even if no CNEC data cnec_hourly = master_with_date.join( cnec_with_date, on='date', how='left' ) # Drop the date column used for join cnec_hourly = cnec_hourly.drop('date') print(f" [OK] CNEC hourly: {len(cnec_hourly)} records") print(f" [INFO] CNEC in long format - multiple rows per timestamp (one per CNEC)") return cnec_hourly def join_datasets( master_timeline: pl.DataFrame, maxbex_with_time: pl.DataFrame, lta_complete: pl.DataFrame, netpos: pl.DataFrame, cnec_hourly: pl.DataFrame ) -> pl.DataFrame: """Join all datasets on mtu timestamp.""" print("\nJoining all datasets...") # Start with MaxBEX (already has mtu via hstack) # MaxBEX is already aligned by row, so we can use it directly unified = maxbex_with_time.clone() print(f" Starting with MaxBEX: {unified.shape}") # Join LTA unified = unified.join( lta_complete, on='mtu', how='left', suffix='_lta' ) # Drop duplicate mtu if created if 'mtu_lta' in unified.columns: unified = unified.drop('mtu_lta') print(f" After LTA: {unified.shape}") # Join NetPos netpos_clean = netpos.drop(['collection_date'], strict=False) unified = unified.join( netpos_clean, on='mtu', how='left', suffix='_netpos' ) # Drop duplicate mtu if created if 'mtu_netpos' in unified.columns: unified = unified.drop('mtu_netpos') print(f" After NetPos: {unified.shape}") # Note: CNEC is in long format, would explode the dataset # We'll handle CNEC separately in feature engineering print(f" [INFO] CNEC not joined (long format - handle in feature engineering)") # Sort by timestamp (joins may have shuffled rows) print(f"\nSorting by timestamp...") unified = unified.sort('mtu') print(f" [OK] Unified dataset: {unified.shape}") print(f" [OK] Timeline sorted: {unified['mtu'].is_sorted()}") return unified def unify_jao_data( maxbex_path: Path, cnec_path: Path, lta_path: Path, netpos_path: Path, output_dir: Path ) -> Tuple[pl.DataFrame, pl.DataFrame]: """Unify all JAO datasets into single timeline. Args: maxbex_path: Path to MaxBEX parquet file cnec_path: Path to CNEC/PTDF parquet file lta_path: Path to LTA parquet file netpos_path: Path to Net Positions parquet file output_dir: Directory to save unified data Returns: Tuple of (unified_wide, cnec_hourly) DataFrames """ print("\n" + "=" * 80) print("JAO DATA UNIFICATION") print("=" * 80) # 1. Load datasets print("\nLoading datasets...") maxbex = pl.read_parquet(maxbex_path) cnec = pl.read_parquet(cnec_path) lta = pl.read_parquet(lta_path) netpos = pl.read_parquet(netpos_path) print(f" MaxBEX: {maxbex.shape}") print(f" CNEC: {cnec.shape}") print(f" LTA: {lta.shape}") print(f" NetPos (raw): {netpos.shape}") # 2. Deduplicate NetPos and align MaxBEX # MaxBEX has no timestamp - it's row-aligned with NetPos # Need to deduplicate both together to maintain alignment print("\nDeduplicating NetPos and aligning MaxBEX...") # Verify same length (must be row-aligned) if len(maxbex) != len(netpos): raise ValueError( f"MaxBEX ({len(maxbex)}) and NetPos ({len(netpos)}) " "have different lengths - cannot align" ) # Add mtu column to MaxBEX via hstack (before deduplication) maxbex_with_time = maxbex.hstack(netpos.select(['mtu'])) print(f" MaxBEX + NetPos aligned: {maxbex_with_time.shape}") # Deduplicate MaxBEX based on mtu timestamp maxbex_before = len(maxbex_with_time) maxbex_with_time = maxbex_with_time.unique(subset=['mtu'], keep='first') maxbex_after = len(maxbex_with_time) maxbex_duplicates = maxbex_before - maxbex_after if maxbex_duplicates > 0: print(f" MaxBEX deduplicated: {maxbex_with_time.shape} ({maxbex_duplicates:,} duplicates removed)") # Deduplicate NetPos netpos_before = len(netpos) netpos = netpos.unique(subset=['mtu'], keep='first') netpos_after = len(netpos) netpos_duplicates = netpos_before - netpos_after if netpos_duplicates > 0: print(f" NetPos deduplicated: {netpos.shape} ({netpos_duplicates:,} duplicates removed)") # 3. Create master timeline from deduplicated NetPos print("\nCreating master timeline from Net Positions...") master_timeline = netpos.select(['mtu']).sort('mtu') validate_timeline(master_timeline, "Master") # 4. Fill LTA gaps lta_complete = fill_lta_gaps(lta, master_timeline) # 5. Broadcast CNEC to hourly cnec_hourly = broadcast_cnec_to_hourly(cnec, master_timeline) # 6. Join datasets (wide format: MaxBEX + LTA + NetPos) unified_wide = join_datasets( master_timeline, maxbex_with_time, lta_complete, netpos, cnec_hourly ) # 7. Save outputs print("\nSaving unified data...") output_dir.mkdir(parents=True, exist_ok=True) unified_wide_path = output_dir / 'unified_jao_24month.parquet' cnec_hourly_path = output_dir / 'cnec_hourly_24month.parquet' unified_wide.write_parquet(unified_wide_path) cnec_hourly.write_parquet(cnec_hourly_path) print(f" [OK] Unified wide: {unified_wide_path}") print(f" Size: {unified_wide_path.stat().st_size / (1024**2):.2f} MB") print(f" [OK] CNEC hourly: {cnec_hourly_path}") print(f" Size: {cnec_hourly_path.stat().st_size / (1024**2):.2f} MB") # 8. Validation summary print("\n" + "=" * 80) print("UNIFICATION COMPLETE") print("=" * 80) print(f"Unified wide dataset: {unified_wide.shape}") print(f" - mtu timestamp: 1 column") print(f" - MaxBEX borders: 132 columns") print(f" - LTA borders: 38 columns") print(f" - Net Positions: 28 columns") print(f" Total: {unified_wide.shape[1]} columns") print() print(f"CNEC hourly dataset: {cnec_hourly.shape}") print(f" - Long format (one row per CNEC per hour)") print(f" - Used in feature engineering phase") print("=" * 80) print() return unified_wide, cnec_hourly def main(): """Main execution.""" # Paths base_dir = Path.cwd() data_dir = base_dir / 'data' / 'raw' / 'phase1_24month' output_dir = base_dir / 'data' / 'processed' maxbex_path = data_dir / 'jao_maxbex.parquet' cnec_path = data_dir / 'jao_cnec_ptdf.parquet' lta_path = data_dir / 'jao_lta.parquet' netpos_path = data_dir / 'jao_net_positions.parquet' # Verify files exist for path in [maxbex_path, cnec_path, lta_path, netpos_path]: if not path.exists(): raise FileNotFoundError(f"Required file not found: {path}") # Unify unified_wide, cnec_hourly = unify_jao_data( maxbex_path, cnec_path, lta_path, netpos_path, output_dir ) print("SUCCESS: JAO data unified and saved to data/processed/") if __name__ == '__main__': main()