Spaces:
Sleeping
Sleeping
| """Unify JAO datasets into single timeline. | |
| Combines MaxBEX, CNEC/PTDF, LTA, and Net Positions data into a single | |
| unified dataset with proper timestamp alignment. | |
| Author: Claude | |
| Date: 2025-11-06 | |
| """ | |
| from pathlib import Path | |
| from typing import Tuple | |
| import polars as pl | |
| def validate_timeline(df: pl.DataFrame, name: str) -> None: | |
| """Validate timeline is hourly with no gaps.""" | |
| print(f"\nValidating {name} timeline...") | |
| # Check sorted | |
| if not df['mtu'].is_sorted(): | |
| raise ValueError(f"{name}: Timeline not sorted") | |
| # Check for gaps (should be hourly) | |
| time_diffs = df['mtu'].diff().drop_nulls() | |
| most_common = time_diffs.mode()[0] | |
| # Most common should be 1 hour (allow for DST transitions) | |
| if most_common.total_seconds() != 3600: | |
| print(f" [WARNING] Most common time diff: {most_common} (expected 1 hour)") | |
| print(f" [OK] {name} timeline validated: {len(df)} records, sorted") | |
| def add_timestamp_to_maxbex( | |
| maxbex: pl.DataFrame, | |
| master_timeline: pl.DataFrame | |
| ) -> pl.DataFrame: | |
| """Add mtu timestamp to MaxBEX via row alignment.""" | |
| print("\nAdding timestamp to MaxBEX...") | |
| # Verify same length | |
| if len(maxbex) != len(master_timeline): | |
| raise ValueError( | |
| f"MaxBEX ({len(maxbex)}) and timeline ({len(master_timeline)}) " | |
| "have different lengths" | |
| ) | |
| # Add mtu column via hstack | |
| maxbex_with_time = maxbex.hstack(master_timeline) | |
| print(f" [OK] MaxBEX timestamp added: {len(maxbex_with_time)} records") | |
| return maxbex_with_time | |
| def fill_lta_gaps( | |
| lta: pl.DataFrame, | |
| master_timeline: pl.DataFrame | |
| ) -> pl.DataFrame: | |
| """Fill LTA gaps using forward-fill strategy.""" | |
| print("\nFilling LTA gaps...") | |
| # Report initial state | |
| initial_records = len(lta) | |
| expected_records = len(master_timeline) | |
| missing_hours = expected_records - initial_records | |
| print(f" Initial LTA records: {initial_records:,}") | |
| print(f" Expected records: {expected_records:,}") | |
| print(f" Missing hours: {missing_hours:,} ({missing_hours/expected_records*100:.1f}%)") | |
| # Remove metadata columns | |
| lta_clean = lta.drop(['is_masked', 'masking_method'], strict=False) | |
| # Left join master timeline with LTA | |
| lta_complete = master_timeline.join( | |
| lta_clean, | |
| on='mtu', | |
| how='left' | |
| ) | |
| # Get border columns | |
| border_cols = [c for c in lta_complete.columns if c.startswith('border_')] | |
| # Forward-fill gaps (LTA changes rarely) | |
| lta_complete = lta_complete.with_columns([ | |
| pl.col(col).forward_fill().alias(col) | |
| for col in border_cols | |
| ]) | |
| # Fill any remaining nulls at start with 0 | |
| lta_complete = lta_complete.fill_null(0) | |
| # Verify no nulls remain | |
| null_count = lta_complete.null_count().sum_horizontal()[0] | |
| if null_count > 0: | |
| raise ValueError(f"LTA still has {null_count} nulls after filling") | |
| print(f" [OK] LTA complete: {len(lta_complete)} records, 0 nulls") | |
| return lta_complete | |
| def broadcast_cnec_to_hourly( | |
| cnec: pl.DataFrame, | |
| master_timeline: pl.DataFrame | |
| ) -> pl.DataFrame: | |
| """Broadcast daily CNEC snapshots to hourly timeline.""" | |
| print("\nBroadcasting CNEC from daily to hourly...") | |
| # Report initial state | |
| unique_days = cnec['collection_date'].dt.date().n_unique() | |
| print(f" CNEC unique days: {unique_days}") | |
| print(f" Target hours: {len(master_timeline):,}") | |
| # Extract date from master timeline | |
| master_with_date = master_timeline.with_columns([ | |
| pl.col('mtu').dt.date().alias('date') | |
| ]) | |
| # Extract date from CNEC collection_date | |
| cnec_with_date = cnec.with_columns([ | |
| pl.col('collection_date').dt.date().alias('date') | |
| ]) | |
| # Drop collection_date, keep date for join | |
| cnec_with_date = cnec_with_date.drop('collection_date') | |
| # Join: Each day's CNEC snapshot broadcasts to 24-26 hours | |
| # Use left join to keep all hours even if no CNEC data | |
| cnec_hourly = master_with_date.join( | |
| cnec_with_date, | |
| on='date', | |
| how='left' | |
| ) | |
| # Drop the date column used for join | |
| cnec_hourly = cnec_hourly.drop('date') | |
| print(f" [OK] CNEC hourly: {len(cnec_hourly)} records") | |
| print(f" [INFO] CNEC in long format - multiple rows per timestamp (one per CNEC)") | |
| return cnec_hourly | |
| def join_datasets( | |
| master_timeline: pl.DataFrame, | |
| maxbex_with_time: pl.DataFrame, | |
| lta_complete: pl.DataFrame, | |
| netpos: pl.DataFrame, | |
| cnec_hourly: pl.DataFrame | |
| ) -> pl.DataFrame: | |
| """Join all datasets on mtu timestamp.""" | |
| print("\nJoining all datasets...") | |
| # Start with MaxBEX (already has mtu via hstack) | |
| # MaxBEX is already aligned by row, so we can use it directly | |
| unified = maxbex_with_time.clone() | |
| print(f" Starting with MaxBEX: {unified.shape}") | |
| # Join LTA | |
| unified = unified.join( | |
| lta_complete, | |
| on='mtu', | |
| how='left', | |
| suffix='_lta' | |
| ) | |
| # Drop duplicate mtu if created | |
| if 'mtu_lta' in unified.columns: | |
| unified = unified.drop('mtu_lta') | |
| print(f" After LTA: {unified.shape}") | |
| # Join NetPos | |
| netpos_clean = netpos.drop(['collection_date'], strict=False) | |
| unified = unified.join( | |
| netpos_clean, | |
| on='mtu', | |
| how='left', | |
| suffix='_netpos' | |
| ) | |
| # Drop duplicate mtu if created | |
| if 'mtu_netpos' in unified.columns: | |
| unified = unified.drop('mtu_netpos') | |
| print(f" After NetPos: {unified.shape}") | |
| # Note: CNEC is in long format, would explode the dataset | |
| # We'll handle CNEC separately in feature engineering | |
| print(f" [INFO] CNEC not joined (long format - handle in feature engineering)") | |
| # Sort by timestamp (joins may have shuffled rows) | |
| print(f"\nSorting by timestamp...") | |
| unified = unified.sort('mtu') | |
| print(f" [OK] Unified dataset: {unified.shape}") | |
| print(f" [OK] Timeline sorted: {unified['mtu'].is_sorted()}") | |
| return unified | |
| def unify_jao_data( | |
| maxbex_path: Path, | |
| cnec_path: Path, | |
| lta_path: Path, | |
| netpos_path: Path, | |
| output_dir: Path | |
| ) -> Tuple[pl.DataFrame, pl.DataFrame]: | |
| """Unify all JAO datasets into single timeline. | |
| Args: | |
| maxbex_path: Path to MaxBEX parquet file | |
| cnec_path: Path to CNEC/PTDF parquet file | |
| lta_path: Path to LTA parquet file | |
| netpos_path: Path to Net Positions parquet file | |
| output_dir: Directory to save unified data | |
| Returns: | |
| Tuple of (unified_wide, cnec_hourly) DataFrames | |
| """ | |
| print("\n" + "=" * 80) | |
| print("JAO DATA UNIFICATION") | |
| print("=" * 80) | |
| # 1. Load datasets | |
| print("\nLoading datasets...") | |
| maxbex = pl.read_parquet(maxbex_path) | |
| cnec = pl.read_parquet(cnec_path) | |
| lta = pl.read_parquet(lta_path) | |
| netpos = pl.read_parquet(netpos_path) | |
| print(f" MaxBEX: {maxbex.shape}") | |
| print(f" CNEC: {cnec.shape}") | |
| print(f" LTA: {lta.shape}") | |
| print(f" NetPos (raw): {netpos.shape}") | |
| # 2. Deduplicate NetPos and align MaxBEX | |
| # MaxBEX has no timestamp - it's row-aligned with NetPos | |
| # Need to deduplicate both together to maintain alignment | |
| print("\nDeduplicating NetPos and aligning MaxBEX...") | |
| # Verify same length (must be row-aligned) | |
| if len(maxbex) != len(netpos): | |
| raise ValueError( | |
| f"MaxBEX ({len(maxbex)}) and NetPos ({len(netpos)}) " | |
| "have different lengths - cannot align" | |
| ) | |
| # Add mtu column to MaxBEX via hstack (before deduplication) | |
| maxbex_with_time = maxbex.hstack(netpos.select(['mtu'])) | |
| print(f" MaxBEX + NetPos aligned: {maxbex_with_time.shape}") | |
| # Deduplicate MaxBEX based on mtu timestamp | |
| maxbex_before = len(maxbex_with_time) | |
| maxbex_with_time = maxbex_with_time.unique(subset=['mtu'], keep='first') | |
| maxbex_after = len(maxbex_with_time) | |
| maxbex_duplicates = maxbex_before - maxbex_after | |
| if maxbex_duplicates > 0: | |
| print(f" MaxBEX deduplicated: {maxbex_with_time.shape} ({maxbex_duplicates:,} duplicates removed)") | |
| # Deduplicate NetPos | |
| netpos_before = len(netpos) | |
| netpos = netpos.unique(subset=['mtu'], keep='first') | |
| netpos_after = len(netpos) | |
| netpos_duplicates = netpos_before - netpos_after | |
| if netpos_duplicates > 0: | |
| print(f" NetPos deduplicated: {netpos.shape} ({netpos_duplicates:,} duplicates removed)") | |
| # 3. Create master timeline from deduplicated NetPos | |
| print("\nCreating master timeline from Net Positions...") | |
| master_timeline = netpos.select(['mtu']).sort('mtu') | |
| validate_timeline(master_timeline, "Master") | |
| # 4. Fill LTA gaps | |
| lta_complete = fill_lta_gaps(lta, master_timeline) | |
| # 5. Broadcast CNEC to hourly | |
| cnec_hourly = broadcast_cnec_to_hourly(cnec, master_timeline) | |
| # 6. Join datasets (wide format: MaxBEX + LTA + NetPos) | |
| unified_wide = join_datasets( | |
| master_timeline, | |
| maxbex_with_time, | |
| lta_complete, | |
| netpos, | |
| cnec_hourly | |
| ) | |
| # 7. Save outputs | |
| print("\nSaving unified data...") | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| unified_wide_path = output_dir / 'unified_jao_24month.parquet' | |
| cnec_hourly_path = output_dir / 'cnec_hourly_24month.parquet' | |
| unified_wide.write_parquet(unified_wide_path) | |
| cnec_hourly.write_parquet(cnec_hourly_path) | |
| print(f" [OK] Unified wide: {unified_wide_path}") | |
| print(f" Size: {unified_wide_path.stat().st_size / (1024**2):.2f} MB") | |
| print(f" [OK] CNEC hourly: {cnec_hourly_path}") | |
| print(f" Size: {cnec_hourly_path.stat().st_size / (1024**2):.2f} MB") | |
| # 8. Validation summary | |
| print("\n" + "=" * 80) | |
| print("UNIFICATION COMPLETE") | |
| print("=" * 80) | |
| print(f"Unified wide dataset: {unified_wide.shape}") | |
| print(f" - mtu timestamp: 1 column") | |
| print(f" - MaxBEX borders: 132 columns") | |
| print(f" - LTA borders: 38 columns") | |
| print(f" - Net Positions: 28 columns") | |
| print(f" Total: {unified_wide.shape[1]} columns") | |
| print() | |
| print(f"CNEC hourly dataset: {cnec_hourly.shape}") | |
| print(f" - Long format (one row per CNEC per hour)") | |
| print(f" - Used in feature engineering phase") | |
| print("=" * 80) | |
| print() | |
| return unified_wide, cnec_hourly | |
| def main(): | |
| """Main execution.""" | |
| # Paths | |
| base_dir = Path.cwd() | |
| data_dir = base_dir / 'data' / 'raw' / 'phase1_24month' | |
| output_dir = base_dir / 'data' / 'processed' | |
| maxbex_path = data_dir / 'jao_maxbex.parquet' | |
| cnec_path = data_dir / 'jao_cnec_ptdf.parquet' | |
| lta_path = data_dir / 'jao_lta.parquet' | |
| netpos_path = data_dir / 'jao_net_positions.parquet' | |
| # Verify files exist | |
| for path in [maxbex_path, cnec_path, lta_path, netpos_path]: | |
| if not path.exists(): | |
| raise FileNotFoundError(f"Required file not found: {path}") | |
| # Unify | |
| unified_wide, cnec_hourly = unify_jao_data( | |
| maxbex_path, | |
| cnec_path, | |
| lta_path, | |
| netpos_path, | |
| output_dir | |
| ) | |
| print("SUCCESS: JAO data unified and saved to data/processed/") | |
| if __name__ == '__main__': | |
| main() | |