fbmc-chronos2 / src /feature_engineering /engineer_weather_features.py
Evgueni Poloukarov
feat: complete weather feature engineering with simplified approach (375 features)
7aa0336
raw
history blame
9 kB
"""Engineer 375 Weather features for FBMC forecasting.
Transforms OpenMeteo weather data into model-ready features:
1. Grid-level features (51 points × 7 vars = 357 features)
2. Temporal lags (3 vars × 4 time periods = 12 features)
3. Derived features (rate-of-change + stability = 6 features)
Total: 375 weather features
Weather Variables (7):
- temperature_2m (C)
- windspeed_10m (m/s)
- windspeed_100m (m/s) - for wind generation
- winddirection_100m (degrees)
- shortwave_radiation (W/m2) - for solar generation
- cloudcover (%)
- surface_pressure (hPa)
Author: Claude
Date: 2025-11-10
"""
from pathlib import Path
import polars as pl
def engineer_grid_level_features(weather_df: pl.DataFrame) -> pl.DataFrame:
"""Engineer grid-level weather features (51 points × 7 vars = 357 features).
For each grid point, pivot all 7 weather variables to wide format:
- temp_<grid_point>
- wind10m_<grid_point>
- wind100m_<grid_point>
- winddir_<grid_point>
- solar_<grid_point>
- cloud_<grid_point>
- pressure_<grid_point>
"""
print("\n[1/5] Engineering grid-level features (51 points × 7 vars)...")
# Pivot each weather variable separately
features = None
weather_vars = [
('temperature_2m', 'temp'),
('windspeed_10m', 'wind10m'),
('windspeed_100m', 'wind100m'),
('winddirection_100m', 'winddir'),
('shortwave_radiation', 'solar'),
('cloudcover', 'cloud'),
('surface_pressure', 'pressure')
]
for orig_col, short_name in weather_vars:
print(f" Pivoting {orig_col}...")
pivoted = weather_df.select(['timestamp', 'grid_point', orig_col]).pivot(
values=orig_col,
index='timestamp',
on='grid_point',
aggregate_function='first'
)
# Rename columns to <short_name>_<grid_point>
rename_map = {}
for col in pivoted.columns:
if col != 'timestamp':
rename_map[col] = f'{short_name}_{col}'
pivoted = pivoted.rename(rename_map)
# Join to features
if features is None:
features = pivoted
else:
features = features.join(pivoted, on='timestamp', how='left', coalesce=True)
print(f" [OK] {len(features.columns) - 1} grid-level features")
return features
def engineer_temporal_lags(features: pl.DataFrame) -> pl.DataFrame:
"""Add temporal lags for key weather variables.
Lags: 1h, 6h, 12h, 24h for:
- Average temperature (1 lag feature)
- Average wind speed (1 lag feature)
- Average solar radiation (1 lag feature)
Total: ~12 lag features (3 vars × 4 lags)
"""
print("\n[2/3] Engineering temporal lags (1h, 6h, 12h, 24h)...")
# Calculate system-wide averages for lagging
# Temperature average (across all temp_ columns)
temp_cols = [c for c in features.columns if c.startswith('temp_')]
features = features.with_columns([
pl.concat_list([pl.col(c) for c in temp_cols]).list.mean().alias('temp_avg')
])
# Wind speed average (100m - for wind generation)
wind_cols = [c for c in features.columns if c.startswith('wind100m_')]
features = features.with_columns([
pl.concat_list([pl.col(c) for c in wind_cols]).list.mean().alias('wind_avg')
])
# Solar radiation average
solar_cols = [c for c in features.columns if c.startswith('solar_')]
features = features.with_columns([
pl.concat_list([pl.col(c) for c in solar_cols]).list.mean().alias('solar_avg')
])
# Add lags
lag_vars = ['temp_avg', 'wind_avg', 'solar_avg']
lag_hours = [1, 6, 12, 24]
for var in lag_vars:
for lag_h in lag_hours:
features = features.with_columns([
pl.col(var).shift(lag_h).alias(f'{var}_lag{lag_h}h')
])
# Drop intermediate averages (keep only lagged versions)
features = features.drop(['temp_avg', 'wind_avg', 'solar_avg'])
lag_features = len(lag_vars) * len(lag_hours)
print(f" [OK] {lag_features} temporal lag features")
return features
def engineer_derived_features(features: pl.DataFrame) -> pl.DataFrame:
"""Engineer derived weather features (6 features).
Simple features without requiring calibration data:
- Rate of change (hour-over-hour deltas): wind, solar, temperature
- Weather stability (rolling std): wind, solar, temperature
"""
print("\n[3/3] Engineering derived features (rate-of-change + stability)...")
# Calculate system averages for rate-of-change and stability
wind_cols = [c for c in features.columns if c.startswith('wind100m_')]
solar_cols = [c for c in features.columns if c.startswith('solar_')]
temp_cols = [c for c in features.columns if c.startswith('temp_')]
features = features.with_columns([
pl.concat_list([pl.col(c) for c in wind_cols]).list.mean().alias('wind_system_avg'),
pl.concat_list([pl.col(c) for c in solar_cols]).list.mean().alias('solar_system_avg'),
pl.concat_list([pl.col(c) for c in temp_cols]).list.mean().alias('temp_system_avg')
])
# Rate of change (hour-over-hour deltas)
# Captures sudden spikes/drops that correlate with grid constraints
features = features.with_columns([
pl.col('wind_system_avg').diff().alias('wind_rate_change'),
pl.col('solar_system_avg').diff().alias('solar_rate_change'),
pl.col('temp_system_avg').diff().alias('temp_rate_change')
])
# Weather stability: 6-hour rolling std
# Detects volatility periods (useful for forecasting uncertainty)
features = features.with_columns([
pl.col('wind_system_avg').rolling_std(window_size=6).alias('wind_stability_6h'),
pl.col('solar_system_avg').rolling_std(window_size=6).alias('solar_stability_6h'),
pl.col('temp_system_avg').rolling_std(window_size=6).alias('temp_stability_6h')
])
# Drop intermediate columns
features = features.drop(['wind_system_avg', 'solar_system_avg', 'temp_system_avg'])
# Count derived features
derived_cols = ['wind_rate_change', 'solar_rate_change', 'temp_rate_change',
'wind_stability_6h', 'solar_stability_6h', 'temp_stability_6h']
print(f" [OK] {len(derived_cols)} derived features")
return features
def engineer_weather_features(
weather_path: Path,
output_dir: Path
) -> pl.DataFrame:
"""Main feature engineering pipeline for weather data.
Args:
weather_path: Path to raw weather data (weather_24month.parquet)
output_dir: Directory to save engineered features
Returns:
DataFrame with ~435 weather features
"""
print("=" * 80)
print("WEATHER FEATURE ENGINEERING")
print("=" * 80)
print()
print(f"Input: {weather_path}")
print(f"Output: {output_dir}")
print()
# Load raw weather data
print("Loading weather data...")
weather_df = pl.read_parquet(weather_path)
print(f" [OK] {weather_df.shape[0]:,} rows × {weather_df.shape[1]} columns")
print(f" Date range: {weather_df['timestamp'].min()} to {weather_df['timestamp'].max()}")
print()
# 1. Grid-level features (51 × 7 = 357 features)
all_features = engineer_grid_level_features(weather_df)
# 2. Temporal lags (~12 features)
all_features = engineer_temporal_lags(all_features)
# 3. Derived features (6 features: rate-of-change + stability)
all_features = engineer_derived_features(all_features)
# Sort by timestamp
all_features = all_features.sort('timestamp')
# Final validation
print("\n" + "=" * 80)
print("FEATURE ENGINEERING COMPLETE")
print("=" * 80)
print(f"Total features: {all_features.shape[1] - 1} (excluding timestamp)")
print(f"Total rows: {len(all_features):,}")
# Check completeness
null_count_total = all_features.null_count().sum_horizontal()[0]
completeness = (1 - null_count_total / (all_features.shape[0] * all_features.shape[1])) * 100
print(f"Completeness: {completeness:.2f}%")
print()
# Save features
output_path = output_dir / 'features_weather_24month.parquet'
all_features.write_parquet(output_path)
file_size_mb = output_path.stat().st_size / (1024 ** 2)
print(f"Features saved: {output_path}")
print(f"File size: {file_size_mb:.2f} MB")
print("=" * 80)
print()
return all_features
def main():
"""Main execution."""
# Paths
base_dir = Path.cwd()
raw_dir = base_dir / 'data' / 'raw'
processed_dir = base_dir / 'data' / 'processed'
weather_path = raw_dir / 'weather_24month.parquet'
# Verify file exists
if not weather_path.exists():
raise FileNotFoundError(f"Weather data not found: {weather_path}")
# Engineer features
features = engineer_weather_features(weather_path, processed_dir)
print("SUCCESS: Weather features engineered and saved to data/processed/")
if __name__ == '__main__':
main()