"""Process October 2025 raw data into features for dataset extension. This script processes the October 2025 raw data (downloaded Nov 13) and generates feature files matching the 24-month dataset schema: - Weather features: 375 features - ENTSO-E features: ~1,863 features - JAO features: 276 features (if October data exists) Output files will be saved to data/processed/ with "_october" suffix. Author: Claude Date: 2025-11-14 """ from pathlib import Path import polars as pl import sys # Add src to path for imports sys.path.append(str(Path(__file__).parent / "src")) from feature_engineering.engineer_weather_features import ( engineer_grid_level_features, engineer_temporal_lags, engineer_derived_features ) from feature_engineering.engineer_entsoe_features import ( engineer_generation_features, engineer_demand_features, engineer_price_features, engineer_hydro_storage_features, engineer_pumped_storage_features, engineer_load_forecast_features, engineer_transmission_outage_features ) def process_october_weather() -> pl.DataFrame: """Process October weather data into 375 features.""" print("\n" + "=" * 80) print("PROCESSING OCTOBER WEATHER DATA") print("=" * 80) raw_file = Path("data/raw/weather_october_2025.parquet") if not raw_file.exists(): raise FileNotFoundError(f"Missing: {raw_file}") # Load October weather data weather_df = pl.read_parquet(raw_file) print(f"\nLoaded weather data: {weather_df.shape}") print(f"Date range: {weather_df['timestamp'].min()} to {weather_df['timestamp'].max()}") # Engineer features using existing modules features = engineer_grid_level_features(weather_df) features = engineer_temporal_lags(features) features = engineer_derived_features(features) # Save to processed directory output_file = Path("data/processed/features_weather_october.parquet") features.write_parquet(output_file) print(f"\n[OK] Weather features saved: {output_file}") print(f" Shape: {features.shape}") print(f" Features: {len(features.columns) - 1} (+ timestamp)") return features def process_october_entsoe() -> pl.DataFrame: """Process October ENTSO-E data into ~1,863 features.""" print("\n" + "=" * 80) print("PROCESSING OCTOBER ENTSO-E DATA") print("=" * 80) # Check which ENTSO-E files exist raw_dir = Path("data/raw") processed_dir = Path("data/processed") required_files = { 'generation': raw_dir / "entsoe_generation_october_2025.parquet", 'demand': raw_dir / "entsoe_demand_october_2025.parquet", 'prices': raw_dir / "entsoe_prices_october_2025.parquet", 'hydro_storage': raw_dir / "entsoe_hydro_storage_october_2025.parquet", 'pumped_storage': raw_dir / "entsoe_pumped_storage_october_2025.parquet", 'load_forecast': raw_dir / "entsoe_load_forecast_october_2025.parquet", 'transmission_outages': raw_dir / "entsoe_transmission_outages_october_2025.parquet" } # Load CNEC master list (required for transmission outage features) cnec_master_path = processed_dir / "cnecs_master_176.csv" if not cnec_master_path.exists(): raise FileNotFoundError(f"Missing CNEC master list: {cnec_master_path}") cnec_master_df = pl.read_csv(cnec_master_path) print(f"\nLoaded CNEC master list: {cnec_master_df.shape}") # Verify all files exist for name, file_path in required_files.items(): if not file_path.exists(): print(f"WARNING: Missing {name} file: {file_path}") # Load all datasets print("\nLoading ENTSO-E datasets...") generation_df = pl.read_parquet(required_files['generation']) demand_df = pl.read_parquet(required_files['demand']) prices_df = pl.read_parquet(required_files['prices']) hydro_storage_df = pl.read_parquet(required_files['hydro_storage']) pumped_storage_df = pl.read_parquet(required_files['pumped_storage']) load_forecast_df = pl.read_parquet(required_files['load_forecast']) transmission_outages_df = pl.read_parquet(required_files['transmission_outages']) print(f" Generation: {generation_df.shape}") print(f" Demand: {demand_df.shape}") print(f" Prices: {prices_df.shape}") print(f" Hydro storage: {hydro_storage_df.shape}") print(f" Pumped storage: {pumped_storage_df.shape}") print(f" Load forecast: {load_forecast_df.shape}") print(f" Transmission outages: {transmission_outages_df.shape}") # Engineer features for each category print("\nEngineering ENTSO-E features...") # Generation features (~228 features) gen_features = engineer_generation_features(generation_df) # Demand features (24 features) demand_features = engineer_demand_features(demand_df) # Price features (24 features) price_features = engineer_price_features(prices_df) # Hydro storage features (12 features) hydro_features = engineer_hydro_storage_features(hydro_storage_df) # Pumped storage features (10 features) pumped_features = engineer_pumped_storage_features(pumped_storage_df) # Load forecast features (12 features) load_forecast_features = engineer_load_forecast_features(load_forecast_df) # Transmission outage features (176 features - ALL CNECs) # Create hourly range for October (Oct 1-14, 2025) import datetime october_start = datetime.datetime(2025, 10, 1, 0, 0) october_end = datetime.datetime(2025, 10, 14, 23, 0) hourly_range = pl.DataFrame({ 'timestamp': pl.datetime_range( october_start, october_end, interval='1h', eager=True ) }) transmission_features = engineer_transmission_outage_features( transmission_outages_df, cnec_master_df, hourly_range ) # Merge all features print("\nMerging all ENTSO-E features...") features = gen_features # Fix timezone and precision issues - ensure all timestamps are timezone-naive and nanosecond precision features = features.with_columns([ pl.col('timestamp').dt.replace_time_zone(None).dt.cast_time_unit('ns').alias('timestamp') ]) for feat_df, name in [ (demand_features, "demand"), (price_features, "prices"), (hydro_features, "hydro_storage"), (pumped_features, "pumped_storage"), (load_forecast_features, "load_forecast"), (transmission_features, "transmission_outages") ]: # Ensure timezone and precision consistency if 'timestamp' in feat_df.columns: feat_df = feat_df.with_columns([ pl.col('timestamp').dt.replace_time_zone(None).dt.cast_time_unit('ns').alias('timestamp') ]) features = features.join(feat_df, on='timestamp', how='left', coalesce=True) print(f" Added {name}: {len(feat_df.columns) - 1} features") # Resample to hourly (some datasets have sub-hourly data) print("\nResampling to hourly...") features = features.with_columns([ pl.col('timestamp').dt.truncate('1h').alias('timestamp') ]) # Group by hour and take mean (for any sub-hourly values) agg_exprs = [pl.col(c).mean().alias(c) for c in features.columns if c != 'timestamp'] features = features.group_by('timestamp').agg(agg_exprs).sort('timestamp') print(f" Resampled to {len(features)} hourly rows") # Ensure complete 336-hour range (Oct 1-14) - fill missing hours with forward-fill october_start = datetime.datetime(2025, 10, 1, 0, 0) october_end = datetime.datetime(2025, 10, 14, 23, 0) complete_range = pl.DataFrame({ 'timestamp': pl.datetime_range( october_start, october_end, interval='1h', eager=True ) }) # Cast complete_range timestamp to match features precision complete_range = complete_range.with_columns([ pl.col('timestamp').dt.cast_time_unit('ns').alias('timestamp') ]) # Join to complete range and forward-fill missing values features = complete_range.join(features, on='timestamp', how='left') # Forward-fill missing values fill_exprs = [] for col in features.columns: if col != 'timestamp': fill_exprs.append(pl.col(col).forward_fill().alias(col)) if fill_exprs: features = features.with_columns(fill_exprs) missing_count = 336 - len(features.filter(pl.all_horizontal(pl.all().is_not_null()))) if missing_count > 0: print(f" Forward-filled {missing_count} missing hours") print(f" Final shape: {len(features)} hourly rows (Oct 1-14)") # Save to processed directory output_file = Path("data/processed/features_entsoe_october.parquet") features.write_parquet(output_file) print(f"\n[OK] ENTSO-E features saved: {output_file}") print(f" Shape: {features.shape}") print(f" Features: {len(features.columns) - 1} (+ timestamp)") return features def process_october_jao() -> pl.DataFrame | None: """Process October JAO data into 276 features (if data exists).""" print("\n" + "=" * 80) print("PROCESSING OCTOBER JAO DATA") print("=" * 80) # Check if October JAO data exists raw_file = Path("data/raw/jao_october_2025.parquet") if not raw_file.exists(): print(f"\nINFO: No October JAO data found at {raw_file}") print("This is expected - JAO features may be historical only.") print("Skipping JAO feature engineering for October.") return None # If data exists, process it from feature_engineering.engineer_jao_features import ( engineer_jao_features_all ) jao_df = pl.read_parquet(raw_file) print(f"\nLoaded JAO data: {jao_df.shape}") features = engineer_jao_features_all(jao_df) # Save to processed directory output_file = Path("data/processed/features_jao_october.parquet") features.write_parquet(output_file) print(f"\n[OK] JAO features saved: {output_file}") print(f" Shape: {features.shape}") return features def validate_october_features(): """Validate October feature files match expected schema.""" print("\n" + "=" * 80) print("VALIDATING OCTOBER FEATURES") print("=" * 80) # Load October feature files weather_file = Path("data/processed/features_weather_october.parquet") entsoe_file = Path("data/processed/features_entsoe_october.parquet") jao_file = Path("data/processed/features_jao_october.parquet") weather_df = pl.read_parquet(weather_file) entsoe_df = pl.read_parquet(entsoe_file) print(f"\nWeather features: {weather_df.shape}") print(f" Rows (expected 336): {len(weather_df)}") print(f" Features (expected 375): {len(weather_df.columns) - 1}") print(f"\nENTSO-E features: {entsoe_df.shape}") print(f" Rows (expected 336): {len(entsoe_df)}") print(f" Features (expected ~1,863): {len(entsoe_df.columns) - 1}") if jao_file.exists(): jao_df = pl.read_parquet(jao_file) print(f"\nJAO features: {jao_df.shape}") print(f" Rows (expected 336): {len(jao_df)}") print(f" Features (expected 276): {len(jao_df.columns) - 1}") else: print("\nJAO features: Not generated (no October JAO data)") # Validate row count (14 days × 24 hours = 336) expected_rows = 336 issues = [] if len(weather_df) != expected_rows: issues.append(f"Weather rows: {len(weather_df)} (expected {expected_rows})") if len(entsoe_df) != expected_rows: issues.append(f"ENTSO-E rows: {len(entsoe_df)} (expected {expected_rows})") # Validate date range (Oct 1-14, 2025) weather_start = weather_df['timestamp'].min() weather_end = weather_df['timestamp'].max() entsoe_start = entsoe_df['timestamp'].min() entsoe_end = entsoe_df['timestamp'].max() print(f"\nDate ranges:") print(f" Weather: {weather_start} to {weather_end}") print(f" ENTSO-E: {entsoe_start} to {entsoe_end}") # Check for null values weather_nulls = weather_df.null_count().sum_horizontal().to_list()[0] entsoe_nulls = entsoe_df.null_count().sum_horizontal().to_list()[0] print(f"\nNull value counts:") print(f" Weather: {weather_nulls} nulls") print(f" ENTSO-E: {entsoe_nulls} nulls") # Report validation results if issues: print("\n[WARNING] Validation issues found:") for issue in issues: print(f" - {issue}") else: print("\n[OK] All validation checks passed!") return len(issues) == 0 def main(): """Main execution: Process all October data.""" print("\n" + "=" * 80) print("OCTOBER 2025 FEATURE ENGINEERING") print("Processing raw data into features for dataset extension") print("=" * 80) try: # Process each feature category weather_features = process_october_weather() entsoe_features = process_october_entsoe() jao_features = process_october_jao() # May return None # Validate features validation_passed = validate_october_features() if validation_passed: print("\n" + "=" * 80) print("SUCCESS: October feature engineering complete!") print("=" * 80) print("\nGenerated files:") print(" - data/processed/features_weather_october.parquet") print(" - data/processed/features_entsoe_october.parquet") if jao_features is not None: print(" - data/processed/features_jao_october.parquet") print("\nNext steps:") print(" 1. Merge October features into unified dataset") print(" 2. Append to 24-month dataset (17,544 -> 17,880 rows)") print(" 3. Upload extended dataset to HuggingFace") else: print("\n[ERROR] Validation failed - please review issues above") sys.exit(1) except Exception as e: # Avoid Unicode errors on Windows console error_msg = str(e).encode('ascii', 'replace').decode('ascii') print(f"\n[ERROR] Feature engineering failed: {error_msg}") import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": main()