Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 38 additions & 2 deletions src/spotforecast2_safe/weather/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,44 @@
# SPDX-License-Identifier: AGPL-3.0-or-later

import logging
import os
import threading
from pathlib import Path
from time import monotonic, sleep
from typing import Any

import pandas as pd
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# Process-wide proactive throttle for Open-Meteo requests. Per-zone / global
# population-weighted weather fetch many cities, each through a fresh client and
# session, so a burst can trip Open-Meteo's archive rate limit (HTTP 429) faster
# than per-request retry/backoff can recover. A minimum spacing between any two
# requests (process-wide) spreads the burst under the limit. Overridable via the
# SPOTFORECAST2_WEATHER_MIN_REQUEST_INTERVAL env var (seconds; "0" disables).
# Only request *timing* is affected, never the returned data — determinism of
# results is preserved.
_MIN_REQUEST_INTERVAL_S = float(
os.environ.get("SPOTFORECAST2_WEATHER_MIN_REQUEST_INTERVAL", "0.5")
)
_THROTTLE_LOCK = threading.Lock()
_LAST_REQUEST_MONOTONIC = 0.0


def _throttle_open_meteo() -> None:
"""Block until at least ``_MIN_REQUEST_INTERVAL_S`` has passed since the last
Open-Meteo request (process-wide). No-op when the interval is non-positive."""
global _LAST_REQUEST_MONOTONIC
if _MIN_REQUEST_INTERVAL_S <= 0:
return
with _THROTTLE_LOCK:
wait = _MIN_REQUEST_INTERVAL_S - (monotonic() - _LAST_REQUEST_MONOTONIC)
if wait > 0:
sleep(wait)
_LAST_REQUEST_MONOTONIC = monotonic()

Check notice

Code scanning / CodeQL

Unused global variable Note

The global variable '_LAST_REQUEST_MONOTONIC' is not used.


class WeatherFetchError(ValueError):
"""Raised when Open-Meteo cannot be reached or returns no usable data.
Expand Down Expand Up @@ -116,10 +146,15 @@
def _create_session(self) -> requests.Session:
"""Create a requests session with retry logic."""
session = requests.Session()
# Reactive backoff: on 429 / 5xx, retry up to 5 times with exponential
# backoff and honour the server's Retry-After header (Open-Meteo sends
# one). Pairs with the proactive _throttle_open_meteo() spacing below.
retry_strategy = Retry(
total=3,
backoff_factor=1,
total=5,
backoff_factor=2,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=frozenset({"GET"}),
respect_retry_after_header=True,
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
Expand All @@ -128,6 +163,7 @@

def _fetch(self, url: str, params: dict[str, Any]) -> pd.DataFrame:
"""Execute API request and return parsed DataFrame."""
_throttle_open_meteo() # proactive burst spacing (see module top)
try:
response = self._session.get(url, params=params, timeout=30)
response.raise_for_status()
Expand Down
20 changes: 17 additions & 3 deletions src/spotforecast2_safe/weather/features.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,23 @@ def _fetch_one(lat: float, lon: float) -> pd.DataFrame:
f"({len(location_weights)}) must have equal length."
)
frames = [_fetch_one(lat, lon) for (lat, lon) in locations]
# population_weighted_average is fail-safe: it raises if the per-city
# frames disagree on index or columns rather than silently aligning.
weather_df = population_weighted_average(frames, list(location_weights))
# population_weighted_average is fail-safe: it raises a plain ValueError
# if the per-city frames disagree on index or columns rather than
# silently aligning. That disagreement is, in practice, a transient
# per-city fetch failure (e.g. a 429 sending one city to differently
# ranged fallback data). Re-raise it as WeatherFetchError so the
# `on_weather_failure` policy can govern it — a bare ValueError would
# otherwise escape "skip" and crash the whole run (per-zone weather).
try:
weather_df = population_weighted_average(frames, list(location_weights))
except WeatherFetchError:
raise
except ValueError as exc:
raise WeatherFetchError(
"Multi-city weather frames could not be combined "
f"({len(frames)} locations); this usually means a transient "
f"per-city fetch failure or fallback range mismatch: {exc}"
) from exc
else:
weather_df = _fetch_one(latitude, longitude)

Expand Down
24 changes: 24 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# SPDX-FileCopyrightText: 2026 bartzbeielstein
# SPDX-License-Identifier: AGPL-3.0-or-later

"""Shared pytest fixtures."""

import pytest

import spotforecast2_safe.weather.client as _weather_client


@pytest.fixture(autouse=True)
def _disable_open_meteo_throttle():
"""Disable the proactive Open-Meteo request throttle during tests.

The throttle only spaces real HTTP requests; tests mock the network, so the
spacing would add wall-clock sleeps without changing behaviour. Individual
tests that exercise the throttle set the interval explicitly.
"""
saved = _weather_client._MIN_REQUEST_INTERVAL_S
_weather_client._MIN_REQUEST_INTERVAL_S = 0.0
try:
yield
finally:
_weather_client._MIN_REQUEST_INTERVAL_S = saved
62 changes: 62 additions & 0 deletions tests/test_per_zone_weather.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,68 @@ def failing_for_tennet(*, data, start, cov_end, locations=None, **kwargs):
assert task.zone_weather_aligned == {}
assert "temperature_2m" not in task.exog_feature_names

def test_multicity_combine_mismatch_raises_weatherfetcherror(self, monkeypatch):
"""A transient per-city fetch (mismatched index) must surface as a
CATCHABLE WeatherFetchError, not a bare ValueError that escapes
on_weather_failure='skip' (the 22.4.0 per-zone crash, fixed in 22.4.1)."""
from spotforecast2_safe.weather import WeatherFetchError, get_weather_features

start = pd.Timestamp("2024-01-01", tz="UTC")
cov_end = pd.Timestamp("2024-01-06", tz="UTC")
full = pd.date_range(start, cov_end, freq="h")

def fake_fetch(*, cov_start, cov_end, latitude, longitude, **kwargs):
# The northern city (lat ~53) returns a SHORTER range, mimicking a
# 429 fallback that doesn't cover the full window -> mismatched index.
idx = full[:-12] if latitude > 53.0 else full
return pd.DataFrame(
{"temperature_2m": np.arange(len(idx), dtype="float64")}, index=idx
)

# get_weather_features imports fetch_weather_data lazily from this module.
# The data package re-exports a `fetch_data` function that shadows the
# submodule name, so fetch the real module object from sys.modules.
import sys

fd_mod = sys.modules["spotforecast2_safe.data.fetch_data"]
monkeypatch.setattr(fd_mod, "fetch_weather_data", fake_fetch)
ref = pd.DataFrame({"load": np.zeros(len(full))}, index=full)
with pytest.raises(WeatherFetchError, match="Multi-city"):
get_weather_features(
data=ref,
start=start,
cov_end=cov_end,
forecast_horizon=24,
locations=[(52.52, 13.40), (53.55, 9.99)],
location_weights=[1.0, 1.0],
)

def test_throttle_spaces_open_meteo_requests(self, monkeypatch):
"""_throttle_open_meteo sleeps to maintain the minimum request spacing."""
import spotforecast2_safe.weather.client as wc

monkeypatch.setattr(wc, "_MIN_REQUEST_INTERVAL_S", 0.5)
monkeypatch.setattr(wc, "_LAST_REQUEST_MONOTONIC", 0.0)
clock = [1000.0]
sleeps: list[float] = []
monkeypatch.setattr(wc, "monotonic", lambda: clock[0])
monkeypatch.setattr(wc, "sleep", lambda s: sleeps.append(s))

wc._throttle_open_meteo() # first call after a long idle -> no sleep
assert sleeps == []
wc._throttle_open_meteo() # immediate second call -> must wait ~0.5 s
assert len(sleeps) == 1 and abs(sleeps[0] - 0.5) < 1e-9

def test_throttle_noop_when_disabled(self, monkeypatch):
import spotforecast2_safe.weather.client as wc

sleeps: list[float] = []
monkeypatch.setattr(wc, "_MIN_REQUEST_INTERVAL_S", 0.0)
monkeypatch.setattr(wc, "sleep", lambda s: sleeps.append(s))
wc._throttle_open_meteo()
wc._throttle_open_meteo()
assert sleeps == []

def test_non_zone_target_raises_value_error(self, monkeypatch, tmp_path):
"""per_zone_weather=True with a target that is not a TSO zone must raise."""
import spotforecast2_safe.multitask.base as mt_base
Expand Down
Loading