← Назад
""" Multi-provider weather forecaster + probability model. Providers (in priority order): 1. WeatherAPI.com — 1M calls/month free, reliable, needs API key 2. Open-Meteo — no key needed, but aggressive rate limits (fallback) All forecasts are cached in SQLite (2h TTL) to minimize API calls. """ import time import requests import numpy as np from scipy.stats import norm from datetime import datetime, timedelta, timezone from loguru import logger from config import ( WEATHER_API_KEY, WEATHER_API_URL, OPEN_METEO_URL, OPEN_METEO_ENSEMBLE_URL, ENSEMBLE_MODEL, ENSEMBLE_MEMBERS, ENSEMBLE_MODEL_ECMWF, METAR_API_URL, CITY_COORDS, CITIES_BY_NAME, SIGMA_BY_DAYS, DEFAULT_SIGMA, FORECAST_CACHE_TTL, ) import db # ICAO station codes for METAR observations (nearest airport per city) METAR_STATIONS = { "new york city": "KJFK", "london": "EGLL", "seoul": "RKSI", "tokyo": "RJTT", "paris": "LFPG", "miami": "KMIA", "toronto": "CYYZ", "hong kong": "VHHH", "moscow": "UUEE", "houston": "KIAH", "denver": "KDEN", "shanghai": "ZSPD", "chicago": "KORD", "los angeles": "KLAX", "seattle": "KSEA", "san francisco": "KSFO", "sydney": "YSSY", "mumbai": "VABB", "berlin": "EDDB", "rome": "LIRF", "bangkok": "VTBS", "singapore": "WSSS", "dubai": "OMDB", "beijing": "ZBAA", "osaka": "RJBB", "atlanta": "KATL", "austin": "KAUS", "istanbul": "LTFM", "milan": "LIMC", "jakarta": "WIII", "taipei": "RCTP", "manila": "RPLL", "helsinki": "EFHK", "mexico city": "MMMX", "buenos aires": "SAEZ", "são paulo": "SBGR", "cape town": "FACT", "wellington": "NZWN", "cairo": "HECA", "amsterdam": "EHAM", "madrid": "LEMD", "munich": "EDDM", "warsaw": "EPWA", "ankara": "LTAC", "tel aviv": "LLBG", } # Track 429 state per provider _rate_limited_until = {"open-meteo": 0, "weatherapi": 0, "ensemble": 0, "ecmwf": 0, "metar": 0} def get_forecast(city: str, target_date: str, metric: str = "high_temp") -> dict | None: """ Fetch weather forecast for city + date. Uses SQLite cache (2h TTL), then tries WeatherAPI.com → Open-Meteo fallback. """ # Normalize city name city_clean = city.strip() if city else None if not city_clean: return None cache_key = f"{city_clean.lower()}|{target_date}|{metric}" # 1. Check SQLite cache cached = db.get_cached_forecast(cache_key, FORECAST_CACHE_TTL) if cached: n_members = cached.get("n_members", 0) or 0 logger.debug(f"Cache hit: {city_clean} {target_date} {metric}={cached['forecast_value']} (members={n_members})") return { "city": cached["city"], "date": cached["date"], "metric": cached["metric"], "forecast_value": cached["forecast_value"], "sigma": cached["sigma"], "days_ahead": cached["days_ahead"], "source": cached["source"], "member_values": cached.get("member_values"), "n_members": n_members, "ensemble_std": cached.get("ensemble_std"), } # Calculate days ahead for sigma try: target = datetime.strptime(target_date, "%Y-%m-%d") now = datetime.now(timezone.utc) days_ahead = max(0, (target.date() - now.date()).days) except ValueError: logger.error(f"Invalid date format: {target_date}") return None if days_ahead > 16: logger.warning(f"Too far ahead: {days_ahead} days (max 16)") return None # Resolve city coordinates coords = _resolve_city_coords(city_clean) if not coords: logger.warning(f"Unknown city: {city_clean} — no coordinates found") return None # 2. Try Ensemble APIs (PRIMARY — best probability source) # Merge GFS (31 members) + ECMWF (51 members) = up to 82 members gfs_data = _fetch_ensemble(coords, target_date, metric, days_ahead) ecmwf_data = _fetch_ensemble_ecmwf(coords, target_date, metric, days_ahead) gfs_members = gfs_data["member_values"] if gfs_data and gfs_data.get("member_values") else [] ecmwf_members = ecmwf_data["member_values"] if ecmwf_data and ecmwf_data.get("member_values") else [] all_members = gfs_members + ecmwf_members if len(all_members) >= 10: mean_val = float(np.mean(all_members)) std_val = float(np.std(all_members)) # Source label sources = [] if gfs_members: sources.append(f"GFS({len(gfs_members)})") if ecmwf_members: sources.append(f"ECMWF({len(ecmwf_members)})") source_label = "ensemble-" + "+".join(sources) result = { "city": city_clean, "date": target_date, "metric": metric, "forecast_value": round(mean_val, 1), "days_ahead": days_ahead, "sigma": round(std_val, 2), "source": source_label, "member_values": all_members, "n_members": len(all_members), "ensemble_std": round(std_val, 2), } db.save_cached_forecast(cache_key, result) logger.debug( f"Forecast [{source_label}]: {city_clean} {target_date} {metric}=" f"{mean_val:.1f}°F (std={std_val:.1f}°F, {len(all_members)} members, {days_ahead}d)" ) return result # 3. Fallback: WeatherAPI.com (single point forecast) forecast_value = None source = None if WEATHER_API_KEY: forecast_value = _fetch_weatherapi(coords, target_date, metric, days_ahead) if forecast_value is not None: source = "weatherapi" # 4. Fallback: Open-Meteo deterministic if forecast_value is None: forecast_value = _fetch_openmeteo(coords, target_date, metric, days_ahead) if forecast_value is not None: source = "open-meteo" if forecast_value is None: return None sigma = SIGMA_BY_DAYS.get(days_ahead, DEFAULT_SIGMA) result = { "city": city_clean, "date": target_date, "metric": metric, "forecast_value": forecast_value, "days_ahead": days_ahead, "sigma": sigma, "source": source, "member_values": None, # no ensemble data — will use Gaussian fallback "n_members": 0, "ensemble_std": None, } # Save to SQLite cache db.save_cached_forecast(cache_key, result) logger.warning( f"Forecast [FALLBACK {source}]: {city_clean} {target_date} {metric}=" f"{forecast_value}°F (hardcoded σ={sigma}°F, {days_ahead}d) — ensemble unavailable" ) return result def _resolve_city_coords(city: str) -> dict | None: """ Resolve city to lat/lon/tz. Checks CITY_COORDS (slug-based) first, then CITIES_BY_NAME (name-based). Handles hyphens, spaces, case. """ city_lower = city.lower().strip() # Try CITY_COORDS (slug format: "new-york", "hong-kong") slug = city_lower.replace(" ", "-") if slug in CITY_COORDS: return CITY_COORDS[slug] # Try CITIES_BY_NAME (name format: "new york city", "hong kong") name = city_lower.replace("-", " ") if name in CITIES_BY_NAME: return CITIES_BY_NAME[name] # Try direct lowercase match if city_lower in CITIES_BY_NAME: return CITIES_BY_NAME[city_lower] # Try common aliases aliases = { "new york city": "nyc", "new york": "nyc", "los angeles": "los-angeles", "san francisco": "san-francisco", "hong kong": "hong-kong", "tel aviv": "tel-aviv", "mexico city": "mexico-city", "buenos aires": "buenos-aires", "sao paulo": "sao-paulo", "são paulo": "sao-paulo", "cape town": "cape-town", "kuala lumpur": "kuala-lumpur", "panama city": "panama-city", } alias_slug = aliases.get(name) if alias_slug and alias_slug in CITY_COORDS: return CITY_COORDS[alias_slug] return None # ============================================================ # Provider: Open-Meteo Ensemble API (PRIMARY — 51-member GFS) # ============================================================ def _fetch_ensemble(coords: dict, target_date: str, metric: str, days_ahead: int) -> dict | None: """ Fetch 51-member GFS ensemble from Open-Meteo Ensemble API. Returns dict with: - forecast_value: ensemble mean - member_values: list of 51 values (max/min temp per member for target_date) - ensemble_std: standard deviation across members """ if time.time() < _rate_limited_until.get("ensemble", 0): remaining = int(_rate_limited_until["ensemble"] - time.time()) logger.debug(f"Ensemble API rate limited ({remaining}s left)") return None try: # Open-Meteo Ensemble API: request "temperature_2m" and it returns # temperature_2m (control) + temperature_2m_member01..member30 automatically params = { "latitude": coords["lat"], "longitude": coords["lon"], "hourly": "temperature_2m", "models": ENSEMBLE_MODEL, "temperature_unit": "fahrenheit", "timezone": coords.get("tz", "UTC"), "forecast_days": min(days_ahead + 2, 16), } resp = None for attempt in range(3): try: resp = requests.get(OPEN_METEO_ENSEMBLE_URL, params=params, timeout=15) except requests.exceptions.RequestException as e: if attempt < 2: logger.warning(f"Ensemble: request failed (attempt {attempt+1}): {e}") time.sleep(2 ** attempt) continue raise if resp.status_code == 429: if attempt < 2: time.sleep(3 * (attempt + 1)) continue else: _rate_limited_until["ensemble"] = time.time() + 600 logger.error("Ensemble API: 429 persistent, cooldown 10min") return None if resp.status_code >= 500: if attempt < 2: time.sleep(2 ** attempt) continue resp.raise_for_status() break if resp is None: return None data = resp.json() hourly = data.get("hourly", {}) times = hourly.get("time", []) if not times: return None # Find indices for target date target_indices = [i for i, t in enumerate(times) if t.startswith(target_date)] if not target_indices: return None # Extract per-member daily value (max for high_temp, min for low_temp, etc.) # API returns: temperature_2m (control), temperature_2m_member01..member30 member_keys = [k for k in hourly.keys() if k.startswith("temperature_2m")] member_values = [] for key in member_keys: temps = hourly.get(key, []) if not temps: continue values = [temps[i] for i in target_indices if i < len(temps) and temps[i] is not None] if not values: continue if metric == "high_temp": member_values.append(max(values)) elif metric == "low_temp": member_values.append(min(values)) else: member_values.append(max(values)) # default to high if len(member_values) < 10: logger.warning(f"Ensemble: only {len(member_values)} members returned, need ≥10") return None mean_val = float(np.mean(member_values)) std_val = float(np.std(member_values)) logger.debug( f"Ensemble [{ENSEMBLE_MODEL}]: {coords.get('name', '?')} {target_date} {metric} " f"mean={mean_val:.1f}°F std={std_val:.1f}°F ({len(member_values)} members)" ) # Throttle time.sleep(0.5) return { "forecast_value": round(mean_val, 1), "member_values": member_values, "ensemble_std": round(std_val, 2), "n_members": len(member_values), } except requests.exceptions.Timeout: logger.warning("Ensemble API: timeout") return None except Exception as e: logger.error(f"Ensemble API error: {e}") return None def calculate_probability_ensemble( member_values: list[float], threshold: float, operator: str, threshold_low: float = None, threshold_high: float = None, ) -> float: """ Calculate probability directly from ensemble members. No Gaussian assumption needed — just count members. P = count(members matching condition) / total_members """ n = len(member_values) if n == 0: return 0.5 if operator == "gte": count = sum(1 for v in member_values if v >= threshold) elif operator == "lte": count = sum(1 for v in member_values if v <= threshold) elif operator in ("between", "eq"): if threshold_low is not None and threshold_high is not None: # Include rounding: actual temp rounded to nearest integer low = threshold_low - 0.5 high = threshold_high + 0.5 else: low = threshold - 0.5 high = threshold + 0.5 count = sum(1 for v in member_values if low <= v <= high) else: count = sum(1 for v in member_values if v >= threshold) prob = count / n # Clamp to avoid 0.0 / 1.0 (Laplace smoothing: add 1 pseudo-count each side) prob_smoothed = (count + 1) / (n + 2) return round(max(0.01, min(0.99, prob_smoothed)), 4) # ============================================================ # Provider: ECMWF Ensemble (secondary — 51 members via Open-Meteo) # ============================================================ def _fetch_ensemble_ecmwf(coords: dict, target_date: str, metric: str, days_ahead: int) -> dict | None: """ Fetch 51-member ECMWF IFS ensemble from Open-Meteo Ensemble API. Same logic as GFS ensemble but different model. """ if time.time() < _rate_limited_until.get("ecmwf", 0): return None try: params = { "latitude": coords["lat"], "longitude": coords["lon"], "hourly": "temperature_2m", "models": ENSEMBLE_MODEL_ECMWF, "temperature_unit": "fahrenheit", "timezone": coords.get("tz", "UTC"), "forecast_days": min(days_ahead + 2, 16), } resp = None for attempt in range(2): # Fewer retries for secondary source try: resp = requests.get(OPEN_METEO_ENSEMBLE_URL, params=params, timeout=15) except requests.exceptions.RequestException: if attempt < 1: time.sleep(2) continue return None if resp.status_code == 429: _rate_limited_until["ecmwf"] = time.time() + 600 return None if resp.status_code >= 400: return None break if resp is None: return None data = resp.json() hourly = data.get("hourly", {}) times = hourly.get("time", []) if not times: return None target_indices = [i for i, t in enumerate(times) if t.startswith(target_date)] if not target_indices: return None member_keys = [k for k in hourly.keys() if k.startswith("temperature_2m")] member_values = [] for key in member_keys: temps = hourly.get(key, []) if not temps: continue values = [temps[i] for i in target_indices if i < len(temps) and temps[i] is not None] if not values: continue if metric == "high_temp": member_values.append(max(values)) elif metric == "low_temp": member_values.append(min(values)) else: member_values.append(max(values)) if len(member_values) < 10: return None mean_val = float(np.mean(member_values)) std_val = float(np.std(member_values)) logger.debug( f"Ensemble [ECMWF]: {coords.get('name', '?')} {target_date} {metric} " f"mean={mean_val:.1f}°F std={std_val:.1f}°F ({len(member_values)} members)" ) time.sleep(0.5) return { "forecast_value": round(mean_val, 1), "member_values": member_values, "ensemble_std": round(std_val, 2), "n_members": len(member_values), } except Exception as e: logger.error(f"ECMWF Ensemble error: {e}") return None # ============================================================ # Provider: METAR Observations (same-day reality check) # ============================================================ def fetch_metar_temp(city: str) -> float | None: """ Fetch current observed temperature from nearest METAR station. Returns temperature in °F or None if unavailable. Useful for same-day reality check — if current temp already exceeds high forecast, the model is likely underestimating. """ city_lower = city.lower().strip() station = METAR_STATIONS.get(city_lower) if not station: return None if time.time() < _rate_limited_until.get("metar", 0): return None try: resp = requests.get( METAR_API_URL, params={"ids": station, "format": "json", "hours": 2}, timeout=10 ) if resp.status_code == 429: _rate_limited_until["metar"] = time.time() + 300 return None if resp.status_code != 200: return None data = resp.json() if not data or not isinstance(data, list): return None # Take most recent observation obs = data[0] temp_c = obs.get("temp") if temp_c is None: return None temp_f = temp_c * 9/5 + 32 return round(temp_f, 1) except Exception as e: logger.debug(f"METAR error for {station}: {e}") return None # ============================================================ # Provider: WeatherAPI.com # ============================================================ def _fetch_weatherapi(coords: dict, target_date: str, metric: str, days_ahead: int) -> float | None: """ Fetch forecast from WeatherAPI.com. Free tier: 1M calls/month, 3-day forecast with hourly data. """ if time.time() < _rate_limited_until.get("weatherapi", 0): return None # WeatherAPI free tier supports up to 3 days if days_ahead > 2: logger.debug(f"WeatherAPI: {days_ahead}d ahead > 2d limit, skipping") return None try: lat, lon = coords["lat"], coords["lon"] params = { "key": WEATHER_API_KEY, "q": f"{lat},{lon}", "days": min(days_ahead + 1, 3), "aqi": "no", "alerts": "no", } resp = requests.get(WEATHER_API_URL, params=params, timeout=10) if resp.status_code == 403: logger.error("WeatherAPI: invalid API key (403)") _rate_limited_until["weatherapi"] = time.time() + 3600 # skip for 1h return None if resp.status_code == 429: _rate_limited_until["weatherapi"] = time.time() + 300 logger.warning("WeatherAPI: rate limited, cooldown 5min") return None resp.raise_for_status() data = resp.json() forecast_days = data.get("forecast", {}).get("forecastday", []) if not forecast_days: return None # Find matching day for day_data in forecast_days: if day_data.get("date") == target_date: day_info = day_data.get("day", {}) return _extract_weatherapi_value(day_info, day_data, metric) # If target_date not in response, use last available day if forecast_days: last_day = forecast_days[-1] if last_day.get("date") == target_date: return _extract_weatherapi_value(last_day.get("day", {}), last_day, metric) return None except requests.exceptions.Timeout: logger.warning("WeatherAPI: timeout") return None except Exception as e: logger.error(f"WeatherAPI error: {e}") return None def _extract_weatherapi_value(day_info: dict, day_data: dict, metric: str) -> float | None: """Extract value from WeatherAPI.com day response""" if metric == "high_temp": return day_info.get("maxtemp_f") elif metric == "low_temp": return day_info.get("mintemp_f") elif metric == "precipitation": return day_info.get("totalprecip_in") elif metric == "wind": return day_info.get("maxwind_mph") elif metric == "snow": # WeatherAPI doesn't have direct snow field, use hourly hours = day_data.get("hour", []) snow_cm = sum(h.get("snow_cm", 0) for h in hours if h.get("snow_cm")) return snow_cm / 2.54 if snow_cm else 0.0 # cm → inches return None # ============================================================ # Provider: Open-Meteo (fallback) # ============================================================ def _fetch_openmeteo(coords: dict, target_date: str, metric: str, days_ahead: int) -> float | None: """ Fetch forecast from Open-Meteo. No API key needed. Free tier has aggressive rate limits — use as fallback only. """ if time.time() < _rate_limited_until.get("open-meteo", 0): remaining = int(_rate_limited_until["open-meteo"] - time.time()) logger.debug(f"Open-Meteo rate limited ({remaining}s left)") return None try: hourly_vars = _get_hourly_vars(metric) params = { "latitude": coords["lat"], "longitude": coords["lon"], "hourly": ",".join(hourly_vars), "temperature_unit": "fahrenheit", "wind_speed_unit": "mph", "precipitation_unit": "inch", "timezone": coords.get("tz", "UTC"), "forecast_days": min(days_ahead + 2, 16), } # Retry for transient errors (429, 5xx, timeouts) resp = None for attempt in range(3): try: resp = requests.get(OPEN_METEO_URL, params=params, timeout=10) except requests.exceptions.RequestException as e: if attempt < 2: logger.warning(f"Open-Meteo: request failed (attempt {attempt+1}): {e}, retry in {2 ** attempt}s") time.sleep(2 ** attempt) continue raise if resp.status_code == 429: if attempt < 2: logger.warning(f"Open-Meteo: 429 (attempt {attempt+1}), retry in {3 * (attempt+1)}s") time.sleep(3 * (attempt + 1)) continue else: _rate_limited_until["open-meteo"] = time.time() + 600 # 10 min cooldown logger.error("Open-Meteo: 429 persistent, cooldown 10min") return None if resp.status_code >= 500: if attempt < 2: logger.warning(f"Open-Meteo: {resp.status_code} (attempt {attempt+1}), retry in {2 ** attempt}s") time.sleep(2 ** attempt) continue resp.raise_for_status() break if resp is None: return None data = resp.json() value = _extract_openmeteo_value(data, target_date, metric) if value is not None: # Throttle: 0.5s between calls time.sleep(0.5) return value except requests.exceptions.Timeout: logger.warning("Open-Meteo: timeout") return None except Exception as e: logger.error(f"Open-Meteo error: {e}") return None def _get_hourly_vars(metric: str) -> list[str]: """Map metric to Open-Meteo hourly variable names""" mapping = { "high_temp": ["temperature_2m"], "low_temp": ["temperature_2m"], "precipitation": ["precipitation"], "wind": ["wind_speed_10m", "wind_gusts_10m"], "snow": ["snowfall"], } return mapping.get(metric, ["temperature_2m"]) def _extract_openmeteo_value(data: dict, target_date: str, metric: str) -> float | None: """Extract the relevant value from Open-Meteo hourly data for target date""" hourly = data.get("hourly", {}) times = hourly.get("time", []) if not times: return None target_indices = [i for i, t in enumerate(times) if t.startswith(target_date)] if not target_indices: return None if metric in ("high_temp", "low_temp"): temps = hourly.get("temperature_2m", []) values = [temps[i] for i in target_indices if i < len(temps) and temps[i] is not None] if not values: return None return max(values) if metric == "high_temp" else min(values) elif metric == "precipitation": precip = hourly.get("precipitation", []) values = [precip[i] for i in target_indices if i < len(precip) and precip[i] is not None] return sum(values) if values else None elif metric == "wind": wind = hourly.get("wind_speed_10m", []) values = [wind[i] for i in target_indices if i < len(wind) and wind[i] is not None] return max(values) if values else None elif metric == "snow": snow = hourly.get("snowfall", []) values = [snow[i] for i in target_indices if i < len(snow) and snow[i] is not None] return sum(values) if values else None return None # ============================================================ # Probability Model # ============================================================ def calculate_probability(forecast_value: float, threshold: float, operator: str, sigma: float, threshold_low: float = None, threshold_high: float = None) -> float: """ Calculate P(event) using normal distribution. forecast_value: predicted value (e.g., max temp 78°F) threshold: market threshold (e.g., 75°F) — mid for 'between' operator: 'gte' (≥), 'lte' (≤), 'between', 'eq' sigma: uncertainty in same units threshold_low/high: for 'between' brackets """ if sigma <= 0: sigma = 2.0 if operator == "gte": prob = 1 - norm.cdf(threshold, loc=forecast_value, scale=sigma) elif operator == "lte": prob = norm.cdf(threshold, loc=forecast_value, scale=sigma) elif operator in ("between", "eq"): if threshold_low is not None and threshold_high is not None: # "between 68-69°F" → exact boundaries, add 0.5 only to include rounding # Polymarket rounds actual temp to nearest integer for resolution low = threshold_low - 0.5 high = threshold_high + 0.5 else: # "eq 21°C" → exact value, ±0.5 for rounding window low = threshold - 0.5 high = threshold + 0.5 prob = norm.cdf(high, loc=forecast_value, scale=sigma) - \ norm.cdf(low, loc=forecast_value, scale=sigma) else: prob = 1 - norm.cdf(threshold, loc=forecast_value, scale=sigma) return round(max(0.01, min(0.99, prob)), 4) def get_forecast_and_probability(market: dict) -> dict | None: """ Full pipeline: fetch forecast → calculate probability → save to DB. Returns dict with forecast_value, model_probability, sigma, etc. """ if not market.get("city") or not market.get("date"): return None if market.get("threshold") is None: return None forecast = get_forecast( city=market["city"], target_date=market["date"], metric=market.get("metric", "high_temp") ) if not forecast: return None threshold = market["threshold"] if market.get("threshold_unit") == "C": threshold = threshold * 9/5 + 32 threshold_low = market.get("threshold_low") threshold_high = market.get("threshold_high") if market.get("threshold_unit") == "C": if threshold_low is not None: threshold_low = threshold_low * 9/5 + 32 if threshold_high is not None: threshold_high = threshold_high * 9/5 + 32 # Use ensemble probability if available, otherwise fall back to Gaussian member_values = forecast.get("member_values") if member_values and len(member_values) >= 10: model_prob = calculate_probability_ensemble( member_values=member_values, threshold=threshold, operator=market.get("operator", "gte"), threshold_low=threshold_low, threshold_high=threshold_high, ) prob_method = "ensemble" else: model_prob = calculate_probability( forecast_value=forecast["forecast_value"], threshold=threshold, operator=market.get("operator", "gte"), sigma=forecast["sigma"], threshold_low=threshold_low, threshold_high=threshold_high, ) prob_method = "gaussian" result = { "market_id": market["id"], "forecast_value": forecast["forecast_value"], "model_probability": model_prob, "sigma": forecast["sigma"], "days_ahead": forecast["days_ahead"], "source": forecast["source"], "prob_method": prob_method, "n_members": forecast.get("n_members", 0), "ensemble_std": forecast.get("ensemble_std"), "member_values": member_values, } db.save_forecast(result) return result