fbmc-chronos2 / src /data_collection /collect_openmeteo.py
Evgueni Poloukarov
feat: complete weather feature engineering with simplified approach (375 features)
7aa0336
raw
history blame
15.1 kB
"""OpenMeteo Weather Data Collection with Proper Rate Limiting
Collects historical weather data from OpenMeteo API for 52 strategic grid points.
Implements proper rate limiting based on actual OpenMeteo free tier limits.
OpenMeteo Free Tier Limits (ACTUAL):
- 600 calls/minute
- 5,000 calls/hour
- 10,000 calls/day
- 300,000 calls/month
Request Counting:
- Base request (≤10 variables, ≤2 weeks) = 1.0 API call
- >10 variables OR >2 weeks = Multiple calls (fractional)
- Example: 4 weeks = 3.0 API calls, 8 weeks = 7.0 API calls
Strategy:
- Request data in 2-week chunks (stays at 1.0 API call per request)
- 7 weather parameters (under 10 limit)
- 270 requests/minute (45% of 600 limit - safe but efficient)
- ~5 minutes total for 12 months × 52 locations
"""
import requests
import polars as pl
from pathlib import Path
from datetime import datetime, timedelta
from dotenv import load_dotenv
import os
import time
from typing import List, Dict, Tuple
from tqdm import tqdm
# Load environment variables
load_dotenv()
# 52 Strategic Grid Points (from project plan)
GRID_POINTS = {
# Germany (6 points)
"DE_North_Sea": {"lat": 54.5, "lon": 7.0, "name": "Offshore North Sea"},
"DE_Hamburg": {"lat": 53.5, "lon": 10.0, "name": "Hamburg/Schleswig-Holstein"},
"DE_Berlin": {"lat": 52.5, "lon": 13.5, "name": "Berlin/Brandenburg"},
"DE_Frankfurt": {"lat": 50.1, "lon": 8.7, "name": "Frankfurt"},
"DE_Munich": {"lat": 48.1, "lon": 11.6, "name": "Munich/Bavaria"},
"DE_Baltic": {"lat": 54.5, "lon": 13.0, "name": "Offshore Baltic"},
# France (5 points)
"FR_Dunkirk": {"lat": 51.0, "lon": 2.3, "name": "Dunkirk/Lille"},
"FR_Paris": {"lat": 48.9, "lon": 2.3, "name": "Paris"},
"FR_Lyon": {"lat": 45.8, "lon": 4.8, "name": "Lyon"},
"FR_Marseille": {"lat": 43.3, "lon": 5.4, "name": "Marseille"},
"FR_Strasbourg": {"lat": 48.6, "lon": 7.8, "name": "Strasbourg"},
# Netherlands (4 points)
"NL_Offshore": {"lat": 53.5, "lon": 4.5, "name": "Offshore North"},
"NL_Amsterdam": {"lat": 52.4, "lon": 4.9, "name": "Amsterdam"},
"NL_Rotterdam": {"lat": 51.9, "lon": 4.5, "name": "Rotterdam"},
"NL_Groningen": {"lat": 53.2, "lon": 6.6, "name": "Groningen"},
# Austria (3 points)
"AT_Kaprun": {"lat": 47.26, "lon": 12.74, "name": "Kaprun"},
"AT_St_Peter": {"lat": 48.26, "lon": 13.08, "name": "St. Peter"},
"AT_Vienna": {"lat": 48.15, "lon": 16.45, "name": "Vienna"},
# Belgium (3 points)
"BE_Offshore": {"lat": 51.5, "lon": 2.8, "name": "Belgian Offshore"},
"BE_Doel": {"lat": 51.32, "lon": 4.26, "name": "Doel"},
"BE_Avelgem": {"lat": 50.78, "lon": 3.45, "name": "Avelgem"},
# Czech Republic (3 points)
"CZ_Hradec": {"lat": 50.70, "lon": 13.80, "name": "Hradec-RPST"},
"CZ_Bohemia": {"lat": 50.50, "lon": 13.60, "name": "Northwest Bohemia"},
"CZ_Temelin": {"lat": 49.18, "lon": 14.37, "name": "Temelin"},
# Poland (4 points)
"PL_Baltic": {"lat": 54.8, "lon": 17.5, "name": "Baltic Offshore"},
"PL_SHVDC": {"lat": 54.5, "lon": 17.0, "name": "SwePol Link"},
"PL_Belchatow": {"lat": 51.27, "lon": 19.32, "name": "Belchatow"},
"PL_Mikulowa": {"lat": 51.5, "lon": 15.2, "name": "Mikulowa PST"},
# Hungary (3 points)
"HU_Paks": {"lat": 46.57, "lon": 18.86, "name": "Paks Nuclear"},
"HU_Bekescsaba": {"lat": 46.68, "lon": 21.09, "name": "Bekescsaba"},
"HU_Gyor": {"lat": 47.68, "lon": 17.63, "name": "Gyor"},
# Romania (3 points)
"RO_Fantanele": {"lat": 44.59, "lon": 28.57, "name": "Fantanele-Cogealac"},
"RO_Iron_Gates": {"lat": 44.67, "lon": 22.53, "name": "Iron Gates"},
"RO_Cernavoda": {"lat": 44.32, "lon": 28.03, "name": "Cernavoda"},
# Slovakia (3 points)
"SK_Bohunice": {"lat": 48.49, "lon": 17.68, "name": "Bohunice/Mochovce"},
"SK_Gabcikovo": {"lat": 47.88, "lon": 17.54, "name": "Gabcikovo"},
"SK_Rimavska": {"lat": 48.38, "lon": 20.00, "name": "Rimavska Sobota"},
# Slovenia (2 points)
"SI_Krsko": {"lat": 45.94, "lon": 15.52, "name": "Krsko Nuclear"},
"SI_Divaca": {"lat": 45.68, "lon": 13.97, "name": "Divaca"},
# Croatia (2 points)
"HR_Ernestinovo": {"lat": 45.47, "lon": 18.66, "name": "Ernestinovo"},
"HR_Zagreb": {"lat": 45.88, "lon": 16.12, "name": "Zagreb"},
# Luxembourg (2 points)
"LU_Trier": {"lat": 49.75, "lon": 6.63, "name": "Trier/Aach"},
"LU_Bauler": {"lat": 49.92, "lon": 6.20, "name": "Bauler"},
# External regions (8 points)
"CH_Central": {"lat": 46.85, "lon": 9.0, "name": "Switzerland Central"},
"UK_Southeast": {"lat": 51.5, "lon": 0.0, "name": "UK Southeast"},
"ES_North": {"lat": 43.3, "lon": -3.0, "name": "Spain North"},
"IT_North": {"lat": 45.5, "lon": 9.2, "name": "Italy North"},
"NO_South": {"lat": 59.0, "lon": 5.7, "name": "Norway South"},
"SE_South": {"lat": 56.0, "lon": 13.0, "name": "Sweden South"},
"DK_West": {"lat": 56.0, "lon": 9.0, "name": "Denmark West"},
"DK_East": {"lat": 55.7, "lon": 12.6, "name": "Denmark East"},
}
# Weather parameters to collect (7 params - under 10 limit)
WEATHER_PARAMS = [
'temperature_2m',
'windspeed_10m',
'windspeed_100m',
'winddirection_100m',
'shortwave_radiation',
'cloudcover',
'surface_pressure',
]
class OpenMeteoCollector:
"""Collect weather data from OpenMeteo API with proper rate limiting."""
def __init__(
self,
requests_per_minute: int = 270,
chunk_days: int = 14
):
"""Initialize collector with rate limiting.
Args:
requests_per_minute: Max HTTP requests per minute (default: 270 = 45% of 600 limit)
chunk_days: Days per request chunk (default: 14 = 1.0 API call)
"""
self.base_url = os.getenv('OPENMETEO_BASE_URL', 'https://api.open-meteo.com/v1/forecast')
# OpenMeteo historical data endpoint (free tier)
self.historical_url = 'https://archive-api.open-meteo.com/v1/archive'
self.requests_per_minute = requests_per_minute
self.chunk_days = chunk_days
self.delay_seconds = 60.0 / requests_per_minute # Delay between requests
self.session = requests.Session()
self.total_api_calls = 0 # Track actual API call count
def _generate_date_chunks(
self,
start_date: str,
end_date: str
) -> List[Tuple[str, str]]:
"""Generate date range chunks of specified size.
Args:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
Returns:
List of (start, end) date tuples
"""
start_dt = datetime.fromisoformat(start_date)
end_dt = datetime.fromisoformat(end_date)
chunks = []
current = start_dt
while current < end_dt:
chunk_end = min(current + timedelta(days=self.chunk_days - 1), end_dt)
chunks.append((
current.strftime('%Y-%m-%d'),
chunk_end.strftime('%Y-%m-%d')
))
current = chunk_end + timedelta(days=1)
return chunks
def _calculate_api_calls(self, start_date: str, end_date: str) -> float:
"""Calculate how many API calls this request will consume.
Args:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
Returns:
Number of API calls (fractional)
"""
start_dt = datetime.fromisoformat(start_date)
end_dt = datetime.fromisoformat(end_date)
days = (end_dt - start_dt).days + 1
# OpenMeteo counting: ≤14 days = 1.0 call
# >14 days scales fractionally
if days <= 14:
return 1.0
else:
return days / 14.0
def fetch_location_chunk(
self,
location_id: str,
location_data: Dict,
start_date: str,
end_date: str
) -> pl.DataFrame:
"""Fetch weather data for a single location and date chunk.
Args:
location_id: Location identifier (e.g., 'DE_Hamburg')
location_data: Dict with 'lat', 'lon', 'name'
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
Returns:
Polars DataFrame with weather data
"""
params = {
'latitude': location_data['lat'],
'longitude': location_data['lon'],
'hourly': ','.join(WEATHER_PARAMS),
'start_date': start_date,
'end_date': end_date,
'timezone': 'UTC'
}
# Calculate API call cost
api_calls = self._calculate_api_calls(start_date, end_date)
self.total_api_calls += api_calls
try:
response = self.session.get(
self.historical_url,
params=params,
timeout=30
)
response.raise_for_status()
data = response.json()
# Parse hourly data
hourly = data.get('hourly', {})
timestamps = hourly.get('time', [])
if not timestamps:
return pl.DataFrame()
# Build dataframe
df_data = {
'timestamp': timestamps,
'grid_point': [location_id] * len(timestamps),
'location_name': [location_data['name']] * len(timestamps),
'latitude': [location_data['lat']] * len(timestamps),
'longitude': [location_data['lon']] * len(timestamps),
}
# Add weather parameters
for param in WEATHER_PARAMS:
df_data[param] = hourly.get(param, [None] * len(timestamps))
df = pl.DataFrame(df_data)
# Convert timestamp to datetime
df = df.with_columns(
pl.col('timestamp').str.strptime(pl.Datetime, format='%Y-%m-%dT%H:%M')
)
return df
except requests.exceptions.RequestException as e:
print(f"[ERROR] Failed {location_id} ({start_date} to {end_date}): {e}")
return pl.DataFrame()
def collect_all(
self,
start_date: str,
end_date: str,
output_path: Path
) -> pl.DataFrame:
"""Collect weather data for all 52 grid points with rate limiting.
Args:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
output_path: Path to save Parquet file
Returns:
Combined Polars DataFrame
"""
# Generate date chunks
date_chunks = self._generate_date_chunks(start_date, end_date)
total_requests = len(GRID_POINTS) * len(date_chunks)
estimated_minutes = total_requests / self.requests_per_minute
print("=" * 70)
print("OpenMeteo Weather Data Collection")
print("=" * 70)
print(f"Date range: {start_date} to {end_date}")
print(f"Grid points: {len(GRID_POINTS)}")
print(f"Date chunks: {len(date_chunks)} ({self.chunk_days}-day periods)")
print(f"Total HTTP requests: {total_requests}")
print(f"Rate limit: {self.requests_per_minute} requests/minute (45% of 600 max)")
print(f"Estimated time: {estimated_minutes:.1f} minutes")
print(f"Delay between requests: {self.delay_seconds:.2f}s")
print()
all_data = []
request_count = 0
# Iterate through all locations and date chunks
with tqdm(total=total_requests, desc="Fetching weather data") as pbar:
for location_id, location_data in GRID_POINTS.items():
location_chunks = []
for start_chunk, end_chunk in date_chunks:
# Fetch this chunk
df = self.fetch_location_chunk(
location_id,
location_data,
start_chunk,
end_chunk
)
if not df.is_empty():
location_chunks.append(df)
request_count += 1
pbar.update(1)
# Rate limiting - wait before next request
time.sleep(self.delay_seconds)
# Combine all chunks for this location
if location_chunks:
location_df = pl.concat(location_chunks)
all_data.append(location_df)
print(f"[OK] {location_id}: {location_df.shape[0]} hours")
# Combine all dataframes
if all_data:
combined_df = pl.concat(all_data)
# Save to parquet
output_path.parent.mkdir(parents=True, exist_ok=True)
combined_df.write_parquet(output_path)
print()
print("=" * 70)
print("Collection Complete")
print("=" * 70)
print(f"Total HTTP requests: {request_count}")
print(f"Total API calls consumed: {self.total_api_calls:.1f}")
print(f"Total records: {combined_df.shape[0]:,}")
print(f"Date range: {combined_df['timestamp'].min()} to {combined_df['timestamp'].max()}")
print(f"Grid points: {combined_df['grid_point'].n_unique()}")
# Calculate completeness (fix: extract scalar from Polars)
null_count_total = combined_df.null_count().sum_horizontal()[0]
completeness = (1 - null_count_total / (combined_df.shape[0] * combined_df.shape[1])) * 100
print(f"Completeness: {completeness:.2f}%")
print(f"Output: {output_path}")
print(f"File size: {output_path.stat().st_size / (1024**2):.1f} MB")
return combined_df
else:
print("[ERROR] No data collected")
return pl.DataFrame()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Collect OpenMeteo weather data with proper rate limiting")
parser.add_argument(
'--start-date',
default='2024-10-01',
help='Start date (YYYY-MM-DD)'
)
parser.add_argument(
'--end-date',
default='2025-09-30',
help='End date (YYYY-MM-DD)'
)
parser.add_argument(
'--output',
type=Path,
default=Path('data/raw/weather_2024_2025.parquet'),
help='Output Parquet file path'
)
parser.add_argument(
'--requests-per-minute',
type=int,
default=270,
help='HTTP requests per minute (default: 270 = 45%% of 600 limit)'
)
parser.add_argument(
'--chunk-days',
type=int,
default=14,
help='Days per request chunk (default: 14 = 1.0 API call)'
)
args = parser.parse_args()
# Initialize collector and run
collector = OpenMeteoCollector(
requests_per_minute=args.requests_per_minute,
chunk_days=args.chunk_days
)
collector.collect_all(
start_date=args.start_date,
end_date=args.end_date,
output_path=args.output
)