Spaces:
Sleeping
Sleeping
Evgueni Poloukarov
feat: complete weather feature engineering with simplified approach (375 features)
7aa0336
| """OpenMeteo Weather Data Collection with Proper Rate Limiting | |
| Collects historical weather data from OpenMeteo API for 52 strategic grid points. | |
| Implements proper rate limiting based on actual OpenMeteo free tier limits. | |
| OpenMeteo Free Tier Limits (ACTUAL): | |
| - 600 calls/minute | |
| - 5,000 calls/hour | |
| - 10,000 calls/day | |
| - 300,000 calls/month | |
| Request Counting: | |
| - Base request (≤10 variables, ≤2 weeks) = 1.0 API call | |
| - >10 variables OR >2 weeks = Multiple calls (fractional) | |
| - Example: 4 weeks = 3.0 API calls, 8 weeks = 7.0 API calls | |
| Strategy: | |
| - Request data in 2-week chunks (stays at 1.0 API call per request) | |
| - 7 weather parameters (under 10 limit) | |
| - 270 requests/minute (45% of 600 limit - safe but efficient) | |
| - ~5 minutes total for 12 months × 52 locations | |
| """ | |
| import requests | |
| import polars as pl | |
| from pathlib import Path | |
| from datetime import datetime, timedelta | |
| from dotenv import load_dotenv | |
| import os | |
| import time | |
| from typing import List, Dict, Tuple | |
| from tqdm import tqdm | |
| # Load environment variables | |
| load_dotenv() | |
| # 52 Strategic Grid Points (from project plan) | |
| GRID_POINTS = { | |
| # Germany (6 points) | |
| "DE_North_Sea": {"lat": 54.5, "lon": 7.0, "name": "Offshore North Sea"}, | |
| "DE_Hamburg": {"lat": 53.5, "lon": 10.0, "name": "Hamburg/Schleswig-Holstein"}, | |
| "DE_Berlin": {"lat": 52.5, "lon": 13.5, "name": "Berlin/Brandenburg"}, | |
| "DE_Frankfurt": {"lat": 50.1, "lon": 8.7, "name": "Frankfurt"}, | |
| "DE_Munich": {"lat": 48.1, "lon": 11.6, "name": "Munich/Bavaria"}, | |
| "DE_Baltic": {"lat": 54.5, "lon": 13.0, "name": "Offshore Baltic"}, | |
| # France (5 points) | |
| "FR_Dunkirk": {"lat": 51.0, "lon": 2.3, "name": "Dunkirk/Lille"}, | |
| "FR_Paris": {"lat": 48.9, "lon": 2.3, "name": "Paris"}, | |
| "FR_Lyon": {"lat": 45.8, "lon": 4.8, "name": "Lyon"}, | |
| "FR_Marseille": {"lat": 43.3, "lon": 5.4, "name": "Marseille"}, | |
| "FR_Strasbourg": {"lat": 48.6, "lon": 7.8, "name": "Strasbourg"}, | |
| # Netherlands (4 points) | |
| "NL_Offshore": {"lat": 53.5, "lon": 4.5, "name": "Offshore North"}, | |
| "NL_Amsterdam": {"lat": 52.4, "lon": 4.9, "name": "Amsterdam"}, | |
| "NL_Rotterdam": {"lat": 51.9, "lon": 4.5, "name": "Rotterdam"}, | |
| "NL_Groningen": {"lat": 53.2, "lon": 6.6, "name": "Groningen"}, | |
| # Austria (3 points) | |
| "AT_Kaprun": {"lat": 47.26, "lon": 12.74, "name": "Kaprun"}, | |
| "AT_St_Peter": {"lat": 48.26, "lon": 13.08, "name": "St. Peter"}, | |
| "AT_Vienna": {"lat": 48.15, "lon": 16.45, "name": "Vienna"}, | |
| # Belgium (3 points) | |
| "BE_Offshore": {"lat": 51.5, "lon": 2.8, "name": "Belgian Offshore"}, | |
| "BE_Doel": {"lat": 51.32, "lon": 4.26, "name": "Doel"}, | |
| "BE_Avelgem": {"lat": 50.78, "lon": 3.45, "name": "Avelgem"}, | |
| # Czech Republic (3 points) | |
| "CZ_Hradec": {"lat": 50.70, "lon": 13.80, "name": "Hradec-RPST"}, | |
| "CZ_Bohemia": {"lat": 50.50, "lon": 13.60, "name": "Northwest Bohemia"}, | |
| "CZ_Temelin": {"lat": 49.18, "lon": 14.37, "name": "Temelin"}, | |
| # Poland (4 points) | |
| "PL_Baltic": {"lat": 54.8, "lon": 17.5, "name": "Baltic Offshore"}, | |
| "PL_SHVDC": {"lat": 54.5, "lon": 17.0, "name": "SwePol Link"}, | |
| "PL_Belchatow": {"lat": 51.27, "lon": 19.32, "name": "Belchatow"}, | |
| "PL_Mikulowa": {"lat": 51.5, "lon": 15.2, "name": "Mikulowa PST"}, | |
| # Hungary (3 points) | |
| "HU_Paks": {"lat": 46.57, "lon": 18.86, "name": "Paks Nuclear"}, | |
| "HU_Bekescsaba": {"lat": 46.68, "lon": 21.09, "name": "Bekescsaba"}, | |
| "HU_Gyor": {"lat": 47.68, "lon": 17.63, "name": "Gyor"}, | |
| # Romania (3 points) | |
| "RO_Fantanele": {"lat": 44.59, "lon": 28.57, "name": "Fantanele-Cogealac"}, | |
| "RO_Iron_Gates": {"lat": 44.67, "lon": 22.53, "name": "Iron Gates"}, | |
| "RO_Cernavoda": {"lat": 44.32, "lon": 28.03, "name": "Cernavoda"}, | |
| # Slovakia (3 points) | |
| "SK_Bohunice": {"lat": 48.49, "lon": 17.68, "name": "Bohunice/Mochovce"}, | |
| "SK_Gabcikovo": {"lat": 47.88, "lon": 17.54, "name": "Gabcikovo"}, | |
| "SK_Rimavska": {"lat": 48.38, "lon": 20.00, "name": "Rimavska Sobota"}, | |
| # Slovenia (2 points) | |
| "SI_Krsko": {"lat": 45.94, "lon": 15.52, "name": "Krsko Nuclear"}, | |
| "SI_Divaca": {"lat": 45.68, "lon": 13.97, "name": "Divaca"}, | |
| # Croatia (2 points) | |
| "HR_Ernestinovo": {"lat": 45.47, "lon": 18.66, "name": "Ernestinovo"}, | |
| "HR_Zagreb": {"lat": 45.88, "lon": 16.12, "name": "Zagreb"}, | |
| # Luxembourg (2 points) | |
| "LU_Trier": {"lat": 49.75, "lon": 6.63, "name": "Trier/Aach"}, | |
| "LU_Bauler": {"lat": 49.92, "lon": 6.20, "name": "Bauler"}, | |
| # External regions (8 points) | |
| "CH_Central": {"lat": 46.85, "lon": 9.0, "name": "Switzerland Central"}, | |
| "UK_Southeast": {"lat": 51.5, "lon": 0.0, "name": "UK Southeast"}, | |
| "ES_North": {"lat": 43.3, "lon": -3.0, "name": "Spain North"}, | |
| "IT_North": {"lat": 45.5, "lon": 9.2, "name": "Italy North"}, | |
| "NO_South": {"lat": 59.0, "lon": 5.7, "name": "Norway South"}, | |
| "SE_South": {"lat": 56.0, "lon": 13.0, "name": "Sweden South"}, | |
| "DK_West": {"lat": 56.0, "lon": 9.0, "name": "Denmark West"}, | |
| "DK_East": {"lat": 55.7, "lon": 12.6, "name": "Denmark East"}, | |
| } | |
| # Weather parameters to collect (7 params - under 10 limit) | |
| WEATHER_PARAMS = [ | |
| 'temperature_2m', | |
| 'windspeed_10m', | |
| 'windspeed_100m', | |
| 'winddirection_100m', | |
| 'shortwave_radiation', | |
| 'cloudcover', | |
| 'surface_pressure', | |
| ] | |
| class OpenMeteoCollector: | |
| """Collect weather data from OpenMeteo API with proper rate limiting.""" | |
| def __init__( | |
| self, | |
| requests_per_minute: int = 270, | |
| chunk_days: int = 14 | |
| ): | |
| """Initialize collector with rate limiting. | |
| Args: | |
| requests_per_minute: Max HTTP requests per minute (default: 270 = 45% of 600 limit) | |
| chunk_days: Days per request chunk (default: 14 = 1.0 API call) | |
| """ | |
| self.base_url = os.getenv('OPENMETEO_BASE_URL', 'https://api.open-meteo.com/v1/forecast') | |
| # OpenMeteo historical data endpoint (free tier) | |
| self.historical_url = 'https://archive-api.open-meteo.com/v1/archive' | |
| self.requests_per_minute = requests_per_minute | |
| self.chunk_days = chunk_days | |
| self.delay_seconds = 60.0 / requests_per_minute # Delay between requests | |
| self.session = requests.Session() | |
| self.total_api_calls = 0 # Track actual API call count | |
| def _generate_date_chunks( | |
| self, | |
| start_date: str, | |
| end_date: str | |
| ) -> List[Tuple[str, str]]: | |
| """Generate date range chunks of specified size. | |
| Args: | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| Returns: | |
| List of (start, end) date tuples | |
| """ | |
| start_dt = datetime.fromisoformat(start_date) | |
| end_dt = datetime.fromisoformat(end_date) | |
| chunks = [] | |
| current = start_dt | |
| while current < end_dt: | |
| chunk_end = min(current + timedelta(days=self.chunk_days - 1), end_dt) | |
| chunks.append(( | |
| current.strftime('%Y-%m-%d'), | |
| chunk_end.strftime('%Y-%m-%d') | |
| )) | |
| current = chunk_end + timedelta(days=1) | |
| return chunks | |
| def _calculate_api_calls(self, start_date: str, end_date: str) -> float: | |
| """Calculate how many API calls this request will consume. | |
| Args: | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| Returns: | |
| Number of API calls (fractional) | |
| """ | |
| start_dt = datetime.fromisoformat(start_date) | |
| end_dt = datetime.fromisoformat(end_date) | |
| days = (end_dt - start_dt).days + 1 | |
| # OpenMeteo counting: ≤14 days = 1.0 call | |
| # >14 days scales fractionally | |
| if days <= 14: | |
| return 1.0 | |
| else: | |
| return days / 14.0 | |
| def fetch_location_chunk( | |
| self, | |
| location_id: str, | |
| location_data: Dict, | |
| start_date: str, | |
| end_date: str | |
| ) -> pl.DataFrame: | |
| """Fetch weather data for a single location and date chunk. | |
| Args: | |
| location_id: Location identifier (e.g., 'DE_Hamburg') | |
| location_data: Dict with 'lat', 'lon', 'name' | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| Returns: | |
| Polars DataFrame with weather data | |
| """ | |
| params = { | |
| 'latitude': location_data['lat'], | |
| 'longitude': location_data['lon'], | |
| 'hourly': ','.join(WEATHER_PARAMS), | |
| 'start_date': start_date, | |
| 'end_date': end_date, | |
| 'timezone': 'UTC' | |
| } | |
| # Calculate API call cost | |
| api_calls = self._calculate_api_calls(start_date, end_date) | |
| self.total_api_calls += api_calls | |
| try: | |
| response = self.session.get( | |
| self.historical_url, | |
| params=params, | |
| timeout=30 | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| # Parse hourly data | |
| hourly = data.get('hourly', {}) | |
| timestamps = hourly.get('time', []) | |
| if not timestamps: | |
| return pl.DataFrame() | |
| # Build dataframe | |
| df_data = { | |
| 'timestamp': timestamps, | |
| 'grid_point': [location_id] * len(timestamps), | |
| 'location_name': [location_data['name']] * len(timestamps), | |
| 'latitude': [location_data['lat']] * len(timestamps), | |
| 'longitude': [location_data['lon']] * len(timestamps), | |
| } | |
| # Add weather parameters | |
| for param in WEATHER_PARAMS: | |
| df_data[param] = hourly.get(param, [None] * len(timestamps)) | |
| df = pl.DataFrame(df_data) | |
| # Convert timestamp to datetime | |
| df = df.with_columns( | |
| pl.col('timestamp').str.strptime(pl.Datetime, format='%Y-%m-%dT%H:%M') | |
| ) | |
| return df | |
| except requests.exceptions.RequestException as e: | |
| print(f"[ERROR] Failed {location_id} ({start_date} to {end_date}): {e}") | |
| return pl.DataFrame() | |
| def collect_all( | |
| self, | |
| start_date: str, | |
| end_date: str, | |
| output_path: Path | |
| ) -> pl.DataFrame: | |
| """Collect weather data for all 52 grid points with rate limiting. | |
| Args: | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| output_path: Path to save Parquet file | |
| Returns: | |
| Combined Polars DataFrame | |
| """ | |
| # Generate date chunks | |
| date_chunks = self._generate_date_chunks(start_date, end_date) | |
| total_requests = len(GRID_POINTS) * len(date_chunks) | |
| estimated_minutes = total_requests / self.requests_per_minute | |
| print("=" * 70) | |
| print("OpenMeteo Weather Data Collection") | |
| print("=" * 70) | |
| print(f"Date range: {start_date} to {end_date}") | |
| print(f"Grid points: {len(GRID_POINTS)}") | |
| print(f"Date chunks: {len(date_chunks)} ({self.chunk_days}-day periods)") | |
| print(f"Total HTTP requests: {total_requests}") | |
| print(f"Rate limit: {self.requests_per_minute} requests/minute (45% of 600 max)") | |
| print(f"Estimated time: {estimated_minutes:.1f} minutes") | |
| print(f"Delay between requests: {self.delay_seconds:.2f}s") | |
| print() | |
| all_data = [] | |
| request_count = 0 | |
| # Iterate through all locations and date chunks | |
| with tqdm(total=total_requests, desc="Fetching weather data") as pbar: | |
| for location_id, location_data in GRID_POINTS.items(): | |
| location_chunks = [] | |
| for start_chunk, end_chunk in date_chunks: | |
| # Fetch this chunk | |
| df = self.fetch_location_chunk( | |
| location_id, | |
| location_data, | |
| start_chunk, | |
| end_chunk | |
| ) | |
| if not df.is_empty(): | |
| location_chunks.append(df) | |
| request_count += 1 | |
| pbar.update(1) | |
| # Rate limiting - wait before next request | |
| time.sleep(self.delay_seconds) | |
| # Combine all chunks for this location | |
| if location_chunks: | |
| location_df = pl.concat(location_chunks) | |
| all_data.append(location_df) | |
| print(f"[OK] {location_id}: {location_df.shape[0]} hours") | |
| # Combine all dataframes | |
| if all_data: | |
| combined_df = pl.concat(all_data) | |
| # Save to parquet | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| combined_df.write_parquet(output_path) | |
| print() | |
| print("=" * 70) | |
| print("Collection Complete") | |
| print("=" * 70) | |
| print(f"Total HTTP requests: {request_count}") | |
| print(f"Total API calls consumed: {self.total_api_calls:.1f}") | |
| print(f"Total records: {combined_df.shape[0]:,}") | |
| print(f"Date range: {combined_df['timestamp'].min()} to {combined_df['timestamp'].max()}") | |
| print(f"Grid points: {combined_df['grid_point'].n_unique()}") | |
| # Calculate completeness (fix: extract scalar from Polars) | |
| null_count_total = combined_df.null_count().sum_horizontal()[0] | |
| completeness = (1 - null_count_total / (combined_df.shape[0] * combined_df.shape[1])) * 100 | |
| print(f"Completeness: {completeness:.2f}%") | |
| print(f"Output: {output_path}") | |
| print(f"File size: {output_path.stat().st_size / (1024**2):.1f} MB") | |
| return combined_df | |
| else: | |
| print("[ERROR] No data collected") | |
| return pl.DataFrame() | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Collect OpenMeteo weather data with proper rate limiting") | |
| parser.add_argument( | |
| '--start-date', | |
| default='2024-10-01', | |
| help='Start date (YYYY-MM-DD)' | |
| ) | |
| parser.add_argument( | |
| '--end-date', | |
| default='2025-09-30', | |
| help='End date (YYYY-MM-DD)' | |
| ) | |
| parser.add_argument( | |
| '--output', | |
| type=Path, | |
| default=Path('data/raw/weather_2024_2025.parquet'), | |
| help='Output Parquet file path' | |
| ) | |
| parser.add_argument( | |
| '--requests-per-minute', | |
| type=int, | |
| default=270, | |
| help='HTTP requests per minute (default: 270 = 45%% of 600 limit)' | |
| ) | |
| parser.add_argument( | |
| '--chunk-days', | |
| type=int, | |
| default=14, | |
| help='Days per request chunk (default: 14 = 1.0 API call)' | |
| ) | |
| args = parser.parse_args() | |
| # Initialize collector and run | |
| collector = OpenMeteoCollector( | |
| requests_per_minute=args.requests_per_minute, | |
| chunk_days=args.chunk_days | |
| ) | |
| collector.collect_all( | |
| start_date=args.start_date, | |
| end_date=args.end_date, | |
| output_path=args.output | |
| ) | |