← Назад"""
Multi-provider weather forecaster + probability model.
Providers (in priority order):
1. WeatherAPI.com — 1M calls/month free, reliable, needs API key
2. Open-Meteo — no key needed, but aggressive rate limits (fallback)
All forecasts are cached in SQLite (2h TTL) to minimize API calls.
"""
import time
import requests
import numpy as np
from scipy.stats import norm
from datetime import datetime, timedelta, timezone
from loguru import logger
from config import (
WEATHER_API_KEY, WEATHER_API_URL, OPEN_METEO_URL,
OPEN_METEO_ENSEMBLE_URL, ENSEMBLE_MODEL, ENSEMBLE_MEMBERS,
ENSEMBLE_MODEL_ECMWF, METAR_API_URL,
CITY_COORDS, CITIES_BY_NAME, SIGMA_BY_DAYS, DEFAULT_SIGMA, FORECAST_CACHE_TTL,
)
import db
# ICAO station codes for METAR observations (nearest airport per city)
METAR_STATIONS = {
"new york city": "KJFK", "london": "EGLL", "seoul": "RKSI", "tokyo": "RJTT",
"paris": "LFPG", "miami": "KMIA", "toronto": "CYYZ", "hong kong": "VHHH",
"moscow": "UUEE", "houston": "KIAH", "denver": "KDEN", "shanghai": "ZSPD",
"chicago": "KORD", "los angeles": "KLAX", "seattle": "KSEA",
"san francisco": "KSFO", "sydney": "YSSY", "mumbai": "VABB",
"berlin": "EDDB", "rome": "LIRF", "bangkok": "VTBS", "singapore": "WSSS",
"dubai": "OMDB", "beijing": "ZBAA", "osaka": "RJBB", "atlanta": "KATL",
"austin": "KAUS", "istanbul": "LTFM", "milan": "LIMC", "jakarta": "WIII",
"taipei": "RCTP", "manila": "RPLL", "helsinki": "EFHK",
"mexico city": "MMMX", "buenos aires": "SAEZ", "são paulo": "SBGR",
"cape town": "FACT", "wellington": "NZWN", "cairo": "HECA",
"amsterdam": "EHAM", "madrid": "LEMD", "munich": "EDDM",
"warsaw": "EPWA", "ankara": "LTAC", "tel aviv": "LLBG",
}
# Track 429 state per provider
_rate_limited_until = {"open-meteo": 0, "weatherapi": 0, "ensemble": 0, "ecmwf": 0, "metar": 0}
def get_forecast(city: str, target_date: str, metric: str = "high_temp") -> dict | None:
"""
Fetch weather forecast for city + date.
Uses SQLite cache (2h TTL), then tries WeatherAPI.com → Open-Meteo fallback.
"""
# Normalize city name
city_clean = city.strip() if city else None
if not city_clean:
return None
cache_key = f"{city_clean.lower()}|{target_date}|{metric}"
# 1. Check SQLite cache
cached = db.get_cached_forecast(cache_key, FORECAST_CACHE_TTL)
if cached:
n_members = cached.get("n_members", 0) or 0
logger.debug(f"Cache hit: {city_clean} {target_date} {metric}={cached['forecast_value']} (members={n_members})")
return {
"city": cached["city"],
"date": cached["date"],
"metric": cached["metric"],
"forecast_value": cached["forecast_value"],
"sigma": cached["sigma"],
"days_ahead": cached["days_ahead"],
"source": cached["source"],
"member_values": cached.get("member_values"),
"n_members": n_members,
"ensemble_std": cached.get("ensemble_std"),
}
# Calculate days ahead for sigma
try:
target = datetime.strptime(target_date, "%Y-%m-%d")
now = datetime.now(timezone.utc)
days_ahead = max(0, (target.date() - now.date()).days)
except ValueError:
logger.error(f"Invalid date format: {target_date}")
return None
if days_ahead > 16:
logger.warning(f"Too far ahead: {days_ahead} days (max 16)")
return None
# Resolve city coordinates
coords = _resolve_city_coords(city_clean)
if not coords:
logger.warning(f"Unknown city: {city_clean} — no coordinates found")
return None
# 2. Try Ensemble APIs (PRIMARY — best probability source)
# Merge GFS (31 members) + ECMWF (51 members) = up to 82 members
gfs_data = _fetch_ensemble(coords, target_date, metric, days_ahead)
ecmwf_data = _fetch_ensemble_ecmwf(coords, target_date, metric, days_ahead)
gfs_members = gfs_data["member_values"] if gfs_data and gfs_data.get("member_values") else []
ecmwf_members = ecmwf_data["member_values"] if ecmwf_data and ecmwf_data.get("member_values") else []
all_members = gfs_members + ecmwf_members
if len(all_members) >= 10:
mean_val = float(np.mean(all_members))
std_val = float(np.std(all_members))
# Source label
sources = []
if gfs_members:
sources.append(f"GFS({len(gfs_members)})")
if ecmwf_members:
sources.append(f"ECMWF({len(ecmwf_members)})")
source_label = "ensemble-" + "+".join(sources)
result = {
"city": city_clean,
"date": target_date,
"metric": metric,
"forecast_value": round(mean_val, 1),
"days_ahead": days_ahead,
"sigma": round(std_val, 2),
"source": source_label,
"member_values": all_members,
"n_members": len(all_members),
"ensemble_std": round(std_val, 2),
}
db.save_cached_forecast(cache_key, result)
logger.debug(
f"Forecast [{source_label}]: {city_clean} {target_date} {metric}="
f"{mean_val:.1f}°F (std={std_val:.1f}°F, {len(all_members)} members, {days_ahead}d)"
)
return result
# 3. Fallback: WeatherAPI.com (single point forecast)
forecast_value = None
source = None
if WEATHER_API_KEY:
forecast_value = _fetch_weatherapi(coords, target_date, metric, days_ahead)
if forecast_value is not None:
source = "weatherapi"
# 4. Fallback: Open-Meteo deterministic
if forecast_value is None:
forecast_value = _fetch_openmeteo(coords, target_date, metric, days_ahead)
if forecast_value is not None:
source = "open-meteo"
if forecast_value is None:
return None
sigma = SIGMA_BY_DAYS.get(days_ahead, DEFAULT_SIGMA)
result = {
"city": city_clean,
"date": target_date,
"metric": metric,
"forecast_value": forecast_value,
"days_ahead": days_ahead,
"sigma": sigma,
"source": source,
"member_values": None, # no ensemble data — will use Gaussian fallback
"n_members": 0,
"ensemble_std": None,
}
# Save to SQLite cache
db.save_cached_forecast(cache_key, result)
logger.warning(
f"Forecast [FALLBACK {source}]: {city_clean} {target_date} {metric}="
f"{forecast_value}°F (hardcoded σ={sigma}°F, {days_ahead}d) — ensemble unavailable"
)
return result
def _resolve_city_coords(city: str) -> dict | None:
"""
Resolve city to lat/lon/tz. Checks CITY_COORDS (slug-based) first,
then CITIES_BY_NAME (name-based). Handles hyphens, spaces, case.
"""
city_lower = city.lower().strip()
# Try CITY_COORDS (slug format: "new-york", "hong-kong")
slug = city_lower.replace(" ", "-")
if slug in CITY_COORDS:
return CITY_COORDS[slug]
# Try CITIES_BY_NAME (name format: "new york city", "hong kong")
name = city_lower.replace("-", " ")
if name in CITIES_BY_NAME:
return CITIES_BY_NAME[name]
# Try direct lowercase match
if city_lower in CITIES_BY_NAME:
return CITIES_BY_NAME[city_lower]
# Try common aliases
aliases = {
"new york city": "nyc",
"new york": "nyc",
"los angeles": "los-angeles",
"san francisco": "san-francisco",
"hong kong": "hong-kong",
"tel aviv": "tel-aviv",
"mexico city": "mexico-city",
"buenos aires": "buenos-aires",
"sao paulo": "sao-paulo",
"são paulo": "sao-paulo",
"cape town": "cape-town",
"kuala lumpur": "kuala-lumpur",
"panama city": "panama-city",
}
alias_slug = aliases.get(name)
if alias_slug and alias_slug in CITY_COORDS:
return CITY_COORDS[alias_slug]
return None
# ============================================================
# Provider: Open-Meteo Ensemble API (PRIMARY — 51-member GFS)
# ============================================================
def _fetch_ensemble(coords: dict, target_date: str, metric: str, days_ahead: int) -> dict | None:
"""
Fetch 51-member GFS ensemble from Open-Meteo Ensemble API.
Returns dict with:
- forecast_value: ensemble mean
- member_values: list of 51 values (max/min temp per member for target_date)
- ensemble_std: standard deviation across members
"""
if time.time() < _rate_limited_until.get("ensemble", 0):
remaining = int(_rate_limited_until["ensemble"] - time.time())
logger.debug(f"Ensemble API rate limited ({remaining}s left)")
return None
try:
# Open-Meteo Ensemble API: request "temperature_2m" and it returns
# temperature_2m (control) + temperature_2m_member01..member30 automatically
params = {
"latitude": coords["lat"],
"longitude": coords["lon"],
"hourly": "temperature_2m",
"models": ENSEMBLE_MODEL,
"temperature_unit": "fahrenheit",
"timezone": coords.get("tz", "UTC"),
"forecast_days": min(days_ahead + 2, 16),
}
resp = None
for attempt in range(3):
try:
resp = requests.get(OPEN_METEO_ENSEMBLE_URL, params=params, timeout=15)
except requests.exceptions.RequestException as e:
if attempt < 2:
logger.warning(f"Ensemble: request failed (attempt {attempt+1}): {e}")
time.sleep(2 ** attempt)
continue
raise
if resp.status_code == 429:
if attempt < 2:
time.sleep(3 * (attempt + 1))
continue
else:
_rate_limited_until["ensemble"] = time.time() + 600
logger.error("Ensemble API: 429 persistent, cooldown 10min")
return None
if resp.status_code >= 500:
if attempt < 2:
time.sleep(2 ** attempt)
continue
resp.raise_for_status()
break
if resp is None:
return None
data = resp.json()
hourly = data.get("hourly", {})
times = hourly.get("time", [])
if not times:
return None
# Find indices for target date
target_indices = [i for i, t in enumerate(times) if t.startswith(target_date)]
if not target_indices:
return None
# Extract per-member daily value (max for high_temp, min for low_temp, etc.)
# API returns: temperature_2m (control), temperature_2m_member01..member30
member_keys = [k for k in hourly.keys() if k.startswith("temperature_2m")]
member_values = []
for key in member_keys:
temps = hourly.get(key, [])
if not temps:
continue
values = [temps[i] for i in target_indices if i < len(temps) and temps[i] is not None]
if not values:
continue
if metric == "high_temp":
member_values.append(max(values))
elif metric == "low_temp":
member_values.append(min(values))
else:
member_values.append(max(values)) # default to high
if len(member_values) < 10:
logger.warning(f"Ensemble: only {len(member_values)} members returned, need ≥10")
return None
mean_val = float(np.mean(member_values))
std_val = float(np.std(member_values))
logger.debug(
f"Ensemble [{ENSEMBLE_MODEL}]: {coords.get('name', '?')} {target_date} {metric} "
f"mean={mean_val:.1f}°F std={std_val:.1f}°F ({len(member_values)} members)"
)
# Throttle
time.sleep(0.5)
return {
"forecast_value": round(mean_val, 1),
"member_values": member_values,
"ensemble_std": round(std_val, 2),
"n_members": len(member_values),
}
except requests.exceptions.Timeout:
logger.warning("Ensemble API: timeout")
return None
except Exception as e:
logger.error(f"Ensemble API error: {e}")
return None
def calculate_probability_ensemble(
member_values: list[float],
threshold: float,
operator: str,
threshold_low: float = None,
threshold_high: float = None,
) -> float:
"""
Calculate probability directly from ensemble members.
No Gaussian assumption needed — just count members.
P = count(members matching condition) / total_members
"""
n = len(member_values)
if n == 0:
return 0.5
if operator == "gte":
count = sum(1 for v in member_values if v >= threshold)
elif operator == "lte":
count = sum(1 for v in member_values if v <= threshold)
elif operator in ("between", "eq"):
if threshold_low is not None and threshold_high is not None:
# Include rounding: actual temp rounded to nearest integer
low = threshold_low - 0.5
high = threshold_high + 0.5
else:
low = threshold - 0.5
high = threshold + 0.5
count = sum(1 for v in member_values if low <= v <= high)
else:
count = sum(1 for v in member_values if v >= threshold)
prob = count / n
# Clamp to avoid 0.0 / 1.0 (Laplace smoothing: add 1 pseudo-count each side)
prob_smoothed = (count + 1) / (n + 2)
return round(max(0.01, min(0.99, prob_smoothed)), 4)
# ============================================================
# Provider: ECMWF Ensemble (secondary — 51 members via Open-Meteo)
# ============================================================
def _fetch_ensemble_ecmwf(coords: dict, target_date: str, metric: str, days_ahead: int) -> dict | None:
"""
Fetch 51-member ECMWF IFS ensemble from Open-Meteo Ensemble API.
Same logic as GFS ensemble but different model.
"""
if time.time() < _rate_limited_until.get("ecmwf", 0):
return None
try:
params = {
"latitude": coords["lat"],
"longitude": coords["lon"],
"hourly": "temperature_2m",
"models": ENSEMBLE_MODEL_ECMWF,
"temperature_unit": "fahrenheit",
"timezone": coords.get("tz", "UTC"),
"forecast_days": min(days_ahead + 2, 16),
}
resp = None
for attempt in range(2): # Fewer retries for secondary source
try:
resp = requests.get(OPEN_METEO_ENSEMBLE_URL, params=params, timeout=15)
except requests.exceptions.RequestException:
if attempt < 1:
time.sleep(2)
continue
return None
if resp.status_code == 429:
_rate_limited_until["ecmwf"] = time.time() + 600
return None
if resp.status_code >= 400:
return None
break
if resp is None:
return None
data = resp.json()
hourly = data.get("hourly", {})
times = hourly.get("time", [])
if not times:
return None
target_indices = [i for i, t in enumerate(times) if t.startswith(target_date)]
if not target_indices:
return None
member_keys = [k for k in hourly.keys() if k.startswith("temperature_2m")]
member_values = []
for key in member_keys:
temps = hourly.get(key, [])
if not temps:
continue
values = [temps[i] for i in target_indices if i < len(temps) and temps[i] is not None]
if not values:
continue
if metric == "high_temp":
member_values.append(max(values))
elif metric == "low_temp":
member_values.append(min(values))
else:
member_values.append(max(values))
if len(member_values) < 10:
return None
mean_val = float(np.mean(member_values))
std_val = float(np.std(member_values))
logger.debug(
f"Ensemble [ECMWF]: {coords.get('name', '?')} {target_date} {metric} "
f"mean={mean_val:.1f}°F std={std_val:.1f}°F ({len(member_values)} members)"
)
time.sleep(0.5)
return {
"forecast_value": round(mean_val, 1),
"member_values": member_values,
"ensemble_std": round(std_val, 2),
"n_members": len(member_values),
}
except Exception as e:
logger.error(f"ECMWF Ensemble error: {e}")
return None
# ============================================================
# Provider: METAR Observations (same-day reality check)
# ============================================================
def fetch_metar_temp(city: str) -> float | None:
"""
Fetch current observed temperature from nearest METAR station.
Returns temperature in °F or None if unavailable.
Useful for same-day reality check — if current temp already exceeds
high forecast, the model is likely underestimating.
"""
city_lower = city.lower().strip()
station = METAR_STATIONS.get(city_lower)
if not station:
return None
if time.time() < _rate_limited_until.get("metar", 0):
return None
try:
resp = requests.get(
METAR_API_URL,
params={"ids": station, "format": "json", "hours": 2},
timeout=10
)
if resp.status_code == 429:
_rate_limited_until["metar"] = time.time() + 300
return None
if resp.status_code != 200:
return None
data = resp.json()
if not data or not isinstance(data, list):
return None
# Take most recent observation
obs = data[0]
temp_c = obs.get("temp")
if temp_c is None:
return None
temp_f = temp_c * 9/5 + 32
return round(temp_f, 1)
except Exception as e:
logger.debug(f"METAR error for {station}: {e}")
return None
# ============================================================
# Provider: WeatherAPI.com
# ============================================================
def _fetch_weatherapi(coords: dict, target_date: str, metric: str, days_ahead: int) -> float | None:
"""
Fetch forecast from WeatherAPI.com.
Free tier: 1M calls/month, 3-day forecast with hourly data.
"""
if time.time() < _rate_limited_until.get("weatherapi", 0):
return None
# WeatherAPI free tier supports up to 3 days
if days_ahead > 2:
logger.debug(f"WeatherAPI: {days_ahead}d ahead > 2d limit, skipping")
return None
try:
lat, lon = coords["lat"], coords["lon"]
params = {
"key": WEATHER_API_KEY,
"q": f"{lat},{lon}",
"days": min(days_ahead + 1, 3),
"aqi": "no",
"alerts": "no",
}
resp = requests.get(WEATHER_API_URL, params=params, timeout=10)
if resp.status_code == 403:
logger.error("WeatherAPI: invalid API key (403)")
_rate_limited_until["weatherapi"] = time.time() + 3600 # skip for 1h
return None
if resp.status_code == 429:
_rate_limited_until["weatherapi"] = time.time() + 300
logger.warning("WeatherAPI: rate limited, cooldown 5min")
return None
resp.raise_for_status()
data = resp.json()
forecast_days = data.get("forecast", {}).get("forecastday", [])
if not forecast_days:
return None
# Find matching day
for day_data in forecast_days:
if day_data.get("date") == target_date:
day_info = day_data.get("day", {})
return _extract_weatherapi_value(day_info, day_data, metric)
# If target_date not in response, use last available day
if forecast_days:
last_day = forecast_days[-1]
if last_day.get("date") == target_date:
return _extract_weatherapi_value(last_day.get("day", {}), last_day, metric)
return None
except requests.exceptions.Timeout:
logger.warning("WeatherAPI: timeout")
return None
except Exception as e:
logger.error(f"WeatherAPI error: {e}")
return None
def _extract_weatherapi_value(day_info: dict, day_data: dict, metric: str) -> float | None:
"""Extract value from WeatherAPI.com day response"""
if metric == "high_temp":
return day_info.get("maxtemp_f")
elif metric == "low_temp":
return day_info.get("mintemp_f")
elif metric == "precipitation":
return day_info.get("totalprecip_in")
elif metric == "wind":
return day_info.get("maxwind_mph")
elif metric == "snow":
# WeatherAPI doesn't have direct snow field, use hourly
hours = day_data.get("hour", [])
snow_cm = sum(h.get("snow_cm", 0) for h in hours if h.get("snow_cm"))
return snow_cm / 2.54 if snow_cm else 0.0 # cm → inches
return None
# ============================================================
# Provider: Open-Meteo (fallback)
# ============================================================
def _fetch_openmeteo(coords: dict, target_date: str, metric: str, days_ahead: int) -> float | None:
"""
Fetch forecast from Open-Meteo. No API key needed.
Free tier has aggressive rate limits — use as fallback only.
"""
if time.time() < _rate_limited_until.get("open-meteo", 0):
remaining = int(_rate_limited_until["open-meteo"] - time.time())
logger.debug(f"Open-Meteo rate limited ({remaining}s left)")
return None
try:
hourly_vars = _get_hourly_vars(metric)
params = {
"latitude": coords["lat"],
"longitude": coords["lon"],
"hourly": ",".join(hourly_vars),
"temperature_unit": "fahrenheit",
"wind_speed_unit": "mph",
"precipitation_unit": "inch",
"timezone": coords.get("tz", "UTC"),
"forecast_days": min(days_ahead + 2, 16),
}
# Retry for transient errors (429, 5xx, timeouts)
resp = None
for attempt in range(3):
try:
resp = requests.get(OPEN_METEO_URL, params=params, timeout=10)
except requests.exceptions.RequestException as e:
if attempt < 2:
logger.warning(f"Open-Meteo: request failed (attempt {attempt+1}): {e}, retry in {2 ** attempt}s")
time.sleep(2 ** attempt)
continue
raise
if resp.status_code == 429:
if attempt < 2:
logger.warning(f"Open-Meteo: 429 (attempt {attempt+1}), retry in {3 * (attempt+1)}s")
time.sleep(3 * (attempt + 1))
continue
else:
_rate_limited_until["open-meteo"] = time.time() + 600 # 10 min cooldown
logger.error("Open-Meteo: 429 persistent, cooldown 10min")
return None
if resp.status_code >= 500:
if attempt < 2:
logger.warning(f"Open-Meteo: {resp.status_code} (attempt {attempt+1}), retry in {2 ** attempt}s")
time.sleep(2 ** attempt)
continue
resp.raise_for_status()
break
if resp is None:
return None
data = resp.json()
value = _extract_openmeteo_value(data, target_date, metric)
if value is not None:
# Throttle: 0.5s between calls
time.sleep(0.5)
return value
except requests.exceptions.Timeout:
logger.warning("Open-Meteo: timeout")
return None
except Exception as e:
logger.error(f"Open-Meteo error: {e}")
return None
def _get_hourly_vars(metric: str) -> list[str]:
"""Map metric to Open-Meteo hourly variable names"""
mapping = {
"high_temp": ["temperature_2m"],
"low_temp": ["temperature_2m"],
"precipitation": ["precipitation"],
"wind": ["wind_speed_10m", "wind_gusts_10m"],
"snow": ["snowfall"],
}
return mapping.get(metric, ["temperature_2m"])
def _extract_openmeteo_value(data: dict, target_date: str, metric: str) -> float | None:
"""Extract the relevant value from Open-Meteo hourly data for target date"""
hourly = data.get("hourly", {})
times = hourly.get("time", [])
if not times:
return None
target_indices = [i for i, t in enumerate(times) if t.startswith(target_date)]
if not target_indices:
return None
if metric in ("high_temp", "low_temp"):
temps = hourly.get("temperature_2m", [])
values = [temps[i] for i in target_indices if i < len(temps) and temps[i] is not None]
if not values:
return None
return max(values) if metric == "high_temp" else min(values)
elif metric == "precipitation":
precip = hourly.get("precipitation", [])
values = [precip[i] for i in target_indices if i < len(precip) and precip[i] is not None]
return sum(values) if values else None
elif metric == "wind":
wind = hourly.get("wind_speed_10m", [])
values = [wind[i] for i in target_indices if i < len(wind) and wind[i] is not None]
return max(values) if values else None
elif metric == "snow":
snow = hourly.get("snowfall", [])
values = [snow[i] for i in target_indices if i < len(snow) and snow[i] is not None]
return sum(values) if values else None
return None
# ============================================================
# Probability Model
# ============================================================
def calculate_probability(forecast_value: float, threshold: float,
operator: str, sigma: float,
threshold_low: float = None,
threshold_high: float = None) -> float:
"""
Calculate P(event) using normal distribution.
forecast_value: predicted value (e.g., max temp 78°F)
threshold: market threshold (e.g., 75°F) — mid for 'between'
operator: 'gte' (≥), 'lte' (≤), 'between', 'eq'
sigma: uncertainty in same units
threshold_low/high: for 'between' brackets
"""
if sigma <= 0:
sigma = 2.0
if operator == "gte":
prob = 1 - norm.cdf(threshold, loc=forecast_value, scale=sigma)
elif operator == "lte":
prob = norm.cdf(threshold, loc=forecast_value, scale=sigma)
elif operator in ("between", "eq"):
if threshold_low is not None and threshold_high is not None:
# "between 68-69°F" → exact boundaries, add 0.5 only to include rounding
# Polymarket rounds actual temp to nearest integer for resolution
low = threshold_low - 0.5
high = threshold_high + 0.5
else:
# "eq 21°C" → exact value, ±0.5 for rounding window
low = threshold - 0.5
high = threshold + 0.5
prob = norm.cdf(high, loc=forecast_value, scale=sigma) - \
norm.cdf(low, loc=forecast_value, scale=sigma)
else:
prob = 1 - norm.cdf(threshold, loc=forecast_value, scale=sigma)
return round(max(0.01, min(0.99, prob)), 4)
def get_forecast_and_probability(market: dict) -> dict | None:
"""
Full pipeline: fetch forecast → calculate probability → save to DB.
Returns dict with forecast_value, model_probability, sigma, etc.
"""
if not market.get("city") or not market.get("date"):
return None
if market.get("threshold") is None:
return None
forecast = get_forecast(
city=market["city"],
target_date=market["date"],
metric=market.get("metric", "high_temp")
)
if not forecast:
return None
threshold = market["threshold"]
if market.get("threshold_unit") == "C":
threshold = threshold * 9/5 + 32
threshold_low = market.get("threshold_low")
threshold_high = market.get("threshold_high")
if market.get("threshold_unit") == "C":
if threshold_low is not None:
threshold_low = threshold_low * 9/5 + 32
if threshold_high is not None:
threshold_high = threshold_high * 9/5 + 32
# Use ensemble probability if available, otherwise fall back to Gaussian
member_values = forecast.get("member_values")
if member_values and len(member_values) >= 10:
model_prob = calculate_probability_ensemble(
member_values=member_values,
threshold=threshold,
operator=market.get("operator", "gte"),
threshold_low=threshold_low,
threshold_high=threshold_high,
)
prob_method = "ensemble"
else:
model_prob = calculate_probability(
forecast_value=forecast["forecast_value"],
threshold=threshold,
operator=market.get("operator", "gte"),
sigma=forecast["sigma"],
threshold_low=threshold_low,
threshold_high=threshold_high,
)
prob_method = "gaussian"
result = {
"market_id": market["id"],
"forecast_value": forecast["forecast_value"],
"model_probability": model_prob,
"sigma": forecast["sigma"],
"days_ahead": forecast["days_ahead"],
"source": forecast["source"],
"prob_method": prob_method,
"n_members": forecast.get("n_members", 0),
"ensemble_std": forecast.get("ensemble_std"),
"member_values": member_values,
}
db.save_forecast(result)
return result