← Назад
""" Resolution tracker — fetches actual weather data and resolves trades. After a market's date passes: 1. Fetch actual temperature from Open-Meteo Historical API 2. Determine outcome (YES/NO) for each bracket 3. Calculate P&L for each trade """ import time import requests from datetime import datetime, timedelta, timezone from loguru import logger from config import CITIES_BY_NAME, RESOLVE_MIN_HOURS, CLOB_API_URL import db # Open-Meteo Historical Weather API (free, no key) HISTORICAL_URL = "https://archive-api.open-meteo.com/v1/archive" STALE_ORDER_MINUTES = 30 # Cancel unfilled orders older than this def _check_pending_fills(conn): """ Check pending orders and update status to 'filled' or 'cancelled'. Uses CLOB API to verify order fill status. Also cancels stale orders older than STALE_ORDER_MINUTES to free capital. """ pending = conn.execute(""" SELECT t.id, t.order_id, t.market_id, t.side, t.price, t.size, t.status, t.created_at FROM trades t WHERE t.status IN ('pending', 'unverified') AND t.outcome IS NULL AND t.dry_run = 0 """).fetchall() if not pending: return logger.info(f"Checking {len(pending)} pending orders for fill status...") try: from executor import get_client, cancel_order client = get_client() if not client: logger.warning("CLOB client unavailable — cannot check fills") return now = datetime.now(timezone.utc) for trade in pending: try: order_id = trade["order_id"] if not order_id: continue # Check age — cancel stale orders to free capital created_at = trade["created_at"] is_stale = False if created_at: try: created_dt = datetime.strptime(created_at, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc) age_minutes = (now - created_dt).total_seconds() / 60 is_stale = age_minutes > STALE_ORDER_MINUTES except (ValueError, TypeError): pass # Try to get order status from CLOB try: order = client.get_order(order_id) status = order.get("status", "").lower() if order else "" except Exception: # If order not found, it might have been cancelled/expired status = "not_found" if status in ("matched", "filled"): conn.execute( "UPDATE trades SET status = 'filled', filled_at = datetime('now') WHERE id = ?", (trade["id"],) ) logger.info(f"Order {order_id[:16]} FILLED → trade #{trade['id']}") elif status in ("cancelled", "expired", "not_found"): conn.execute( "UPDATE trades SET status = 'cancelled', outcome = 'cancelled', pnl = 0 WHERE id = ?", (trade["id"],) ) logger.info(f"Order {order_id[:16]} {status} → trade #{trade['id']} cancelled") elif is_stale: # Order is still open but stale — cancel it to free capital logger.info(f"Order {order_id[:16]} stale ({age_minutes:.0f}min old) → cancelling...") if cancel_order(order_id): conn.execute( "UPDATE trades SET status = 'cancelled', outcome = 'cancelled', pnl = 0 WHERE id = ?", (trade["id"],) ) logger.info(f"Stale order {order_id[:16]} cancelled → trade #{trade['id']} freed") else: logger.warning(f"Failed to cancel stale order {order_id[:16]}") # else: still open and not yet stale, leave as pending except Exception as e: logger.debug(f"Fill check error for trade #{trade['id']}: {e}") conn.commit() except Exception as e: logger.error(f"Fill check error: {e}") def cancel_stale_orders(max_age_minutes: int = None) -> int: """ Cancel all stale pending/unverified orders. Can be called on startup or manually to free locked capital. Returns count of cancelled orders. """ if max_age_minutes is None: max_age_minutes = STALE_ORDER_MINUTES conn = db.get_conn() pending = conn.execute(""" SELECT t.id, t.order_id, t.price, t.size, t.status, t.created_at FROM trades t WHERE t.status IN ('pending', 'unverified') AND t.outcome IS NULL AND t.dry_run = 0 """).fetchall() if not pending: conn.close() return 0 from executor import cancel_order now = datetime.now(timezone.utc) cancelled = 0 for trade in pending: order_id = trade["order_id"] if not order_id: continue # Check age created_at = trade["created_at"] if created_at: try: created_dt = datetime.strptime(created_at, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc) age_minutes = (now - created_dt).total_seconds() / 60 if age_minutes < max_age_minutes: continue except (ValueError, TypeError): pass # If can't parse date, try to cancel anyway logger.info(f"Cancelling stale order {order_id[:16]} (${trade['size']:.2f} @ ${trade['price']:.2f})") if cancel_order(order_id): conn.execute( "UPDATE trades SET status = 'cancelled', outcome = 'cancelled', pnl = 0 WHERE id = ?", (trade["id"],) ) cancelled += 1 logger.info(f"Cancelled stale order → trade #{trade['id']}") else: logger.warning(f"Failed to cancel order {order_id[:16]}") conn.commit() conn.close() logger.info(f"Stale order cleanup: {cancelled}/{len(pending)} cancelled") return cancelled def _fetch_clob_market(condition_id: str) -> dict | None: """ Fetch market data from Polymarket CLOB API. Returns dict with 'closed', 'tokens' (with 'winner' field) or None on error. """ try: resp = requests.get( f"{CLOB_API_URL}/markets/{condition_id}", timeout=10 ) if resp.status_code != 200: return None return resp.json() except Exception as e: logger.debug(f"CLOB fetch error for {condition_id[:16]}: {e}") return None def _resolve_trade_from_clob(trade: dict, clob_data: dict) -> tuple: """ Determine outcome and P&L from Polymarket CLOB resolution. CLOB tokens have winner=True/False when market is resolved. Our trade has side (YES/NO) and we check if our side won. Returns: (outcome: 'win'|'loss'|None, pnl: float) """ tokens = clob_data.get("tokens", []) if not tokens: return None, 0 # Find winner from CLOB tokens yes_winner = None for token in tokens: outcome_label = token.get("outcome", "").lower() winner = token.get("winner") if winner is None: # Market not yet resolved on Polymarket return None, 0 if outcome_label == "yes": yes_winner = winner if yes_winner is None: return None, 0 side = trade["side"] price = trade["price"] size = trade["size"] # P&L: size = dollar cost, shares = size / price # WIN: payout = shares × $1.00 = size / price, profit = payout - size # LOSS: lose entire cost basis = -size if side == "YES": our_side_won = yes_winner else: # NO our_side_won = not yes_winner if our_side_won: pnl = size * (1.0 - price) / price outcome = "win" else: pnl = -size outcome = "loss" return outcome, round(pnl, 4) def resolve_trades(): """ Main resolution cycle: 1. Check pending orders for fill status 2. Find all unresolved trades where market date has passed 3. Check Polymarket CLOB API for market resolution (winner field) 4. Fallback to Open-Meteo historical weather if CLOB not resolved yet """ conn = db.get_conn() # First: check pending orders for fill status _check_pending_fills(conn) # Find ALL unresolved trades where market date is in the past (no more RESOLVE_MIN_HOURS cutoff for CLOB) today = datetime.now(timezone.utc).strftime("%Y-%m-%d") trades = conn.execute(""" SELECT t.id, t.market_id, t.side, t.price, t.size, t.edge, t.edge_tier, t.model_prob, t.market_prob, m.city, m.date, m.metric, m.threshold, m.threshold_unit, m.operator, m.question, m.threshold_low, m.threshold_high FROM trades t JOIN markets m ON t.market_id = m.id WHERE m.date < ? AND t.outcome IS NULL AND t.status IN ('filled', 'simulated') ORDER BY m.city, m.date """, (today,)).fetchall() conn.close() if not trades: logger.debug(f"No trades to resolve (today: {today})") return {"resolved": 0, "wins": 0, "losses": 0, "pnl": 0} logger.info(f"Resolving {len(trades)} trades (checking Polymarket CLOB first, fallback Open-Meteo)...") total_resolved = 0 total_wins = 0 total_losses = 0 total_pnl = 0.0 # Phase 1: Try CLOB API resolution (one call per unique market_id) clob_cache = {} # market_id → clob_data clob_resolved_ids = set() fallback_trades = [] for idx, trade in enumerate(trades): market_id = trade["market_id"] # Fetch CLOB data (cached per market_id) if market_id not in clob_cache: if idx > 0: time.sleep(0.2) # Throttle CLOB API clob_cache[market_id] = _fetch_clob_market(market_id) clob_data = clob_cache[market_id] if clob_data and clob_data.get("closed"): outcome, pnl = _resolve_trade_from_clob(dict(trade), clob_data) if outcome: update_trade_outcome(trade["id"], outcome, pnl, actual_value=None) total_resolved += 1 total_pnl += pnl clob_resolved_ids.add(trade["id"]) if outcome == "win": total_wins += 1 else: total_losses += 1 logger.info( f" CLOB resolved: {trade['city']} {trade['date']} | " f"{trade['side']} @ {trade['price']} → {outcome.upper()} ${pnl:+.2f} | " f"{trade['question'][:50]}" ) else: fallback_trades.append(trade) else: fallback_trades.append(trade) # Phase 2: Fallback to Open-Meteo for trades not resolved via CLOB if fallback_trades: cutoff_date = (datetime.now(timezone.utc) - timedelta(hours=RESOLVE_MIN_HOURS)).strftime("%Y-%m-%d") # Only try Open-Meteo for trades old enough meteo_trades = [t for t in fallback_trades if t["date"] < cutoff_date] if meteo_trades: logger.info(f"Fallback: {len(meteo_trades)} trades via Open-Meteo (cutoff: {cutoff_date})...") # Group by city+date+metric → one API call per group groups = {} for t in meteo_trades: key = f"{t['city']}|{t['date']}|{t['metric']}" if key not in groups: groups[key] = { "city": t["city"], "date": t["date"], "metric": t["metric"], "trades": [], } groups[key]["trades"].append(dict(t)) for idx, (key, group) in enumerate(groups.items()): city = group["city"] date = group["date"] metric = group["metric"] if idx > 0: time.sleep(0.5) actual_value = fetch_actual_weather(city, date, metric) if actual_value is None: logger.warning(f"Could not fetch actual weather for {city} on {date}") continue logger.info(f"Actual {metric} for {city} on {date}: {actual_value:.1f}°F") _save_calibration_for_group(group, actual_value) for trade in group["trades"]: outcome, pnl = resolve_single_trade(trade, actual_value) if outcome: update_trade_outcome(trade["id"], outcome, pnl, actual_value) total_resolved += 1 total_pnl += pnl if outcome == "win": total_wins += 1 else: total_losses += 1 # === Auto-redeem resolved positions (convert winning tokens → USDC) === redeemed = 0 if total_resolved > 0: redeemed = _auto_redeem_resolved(trades) result = { "resolved": total_resolved, "wins": total_wins, "losses": total_losses, "pnl": round(total_pnl, 2), "win_rate": round(total_wins / total_resolved * 100, 1) if total_resolved else 0, "redeemed": redeemed, } logger.info( f"Resolution complete: {total_resolved} trades | " f"W:{total_wins} L:{total_losses} ({result['win_rate']}% WR) | " f"P&L: ${total_pnl:+.2f} | Redeemed: {redeemed}" ) return result def _auto_redeem_resolved(trades: list) -> int: """ Auto-redeem resolved positions on Polymarket. Groups trades by condition_id to avoid duplicate redeems. Only redeems non-dry-run filled trades. """ from executor import redeem_position # Collect unique condition_ids from resolved trades redeemed_ids = set() redeemed_count = 0 for t in trades: market_id = t["market_id"] if market_id in redeemed_ids: continue # Skip dry-run trades (no real positions to redeem) # trades from the SQL query don't have dry_run, check DB conn = db.get_conn() trade_row = conn.execute( "SELECT dry_run FROM trades WHERE market_id = ? AND outcome IS NOT NULL LIMIT 1", (market_id,) ).fetchone() conn.close() if trade_row and trade_row["dry_run"]: continue redeemed_ids.add(market_id) try: result = redeem_position(condition_id=market_id, neg_risk=True) if result["success"]: redeemed_count += 1 else: # Try without neg_risk (some weather markets may not be neg_risk) result = redeem_position(condition_id=market_id, neg_risk=False) if result["success"]: redeemed_count += 1 except Exception as e: logger.error(f"Redeem error for {market_id[:16]}: {e}") # Small delay between redeems to avoid nonce issues time.sleep(2) if redeemed_count > 0: logger.info(f"Auto-redeemed {redeemed_count}/{len(redeemed_ids)} positions") return redeemed_count def _save_calibration_for_group(group: dict, actual_value: float): """ Save calibration record: forecast_value vs actual for this city+date. Uses the first trade's model_prob as representative. """ try: trades = group["trades"] if not trades: return # Get forecast_value from forecast cache city = group["city"] date = group["date"] metric = group["metric"] cache_key = f"{city.lower()}|{date}|{metric}" cached = db.get_cached_forecast(cache_key, ttl_seconds=86400 * 7) # 7 day TTL for calibration forecast_value = None source = None days_ahead = None if cached: forecast_value = cached.get("forecast_value") source = cached.get("source") days_ahead = cached.get("days_ahead") if forecast_value is None: logger.debug(f"No cached forecast for {city}/{date} — skipping calibration") return # Use first trade's model_prob for Brier score first_trade = trades[0] model_prob = first_trade.get("model_prob") side = first_trade.get("side") if model_prob is None: logger.debug(f"No model_prob for {city}/{date} — skipping calibration") return # Determine binary outcome for the first trade's bracket threshold = first_trade.get("threshold") operator = first_trade.get("operator") unit = first_trade.get("threshold_unit", "F") if threshold is None: return threshold_f = threshold * 9/5 + 32 if unit == "C" else threshold if operator == "gte": outcome_binary = 1 if actual_value >= threshold_f else 0 elif operator == "lte": outcome_binary = 1 if actual_value <= threshold_f else 0 else: outcome_binary = None # Skip Brier for between/eq brackets # Adjust model_prob to YES direction for Brier if side == "NO" and model_prob is not None: model_prob_yes = 1 - model_prob else: model_prob_yes = model_prob db.save_calibration({ "city": city, "date": date, "metric": metric, "forecast_value": forecast_value, "actual_value": actual_value, "model_prob": model_prob_yes, "outcome_binary": outcome_binary, "days_ahead": days_ahead, "source": source, }) logger.debug( f"Calibration saved: {city} {date} forecast={forecast_value:.1f}°F " f"actual={actual_value:.1f}°F error={abs(forecast_value - actual_value):.1f}°F" ) except Exception as e: logger.error(f"Calibration save error: {e}") def fetch_actual_weather(city: str, date: str, metric: str = "high_temp") -> float | None: """ Fetch actual historical weather from Open-Meteo Archive API. Returns actual high/low temp in Fahrenheit. """ city_lower = city.lower() city_data = CITIES_BY_NAME.get(city_lower) if not city_data: logger.warning(f"Unknown city for resolution: {city}") return None try: # daily variables based on metric if metric in ("high_temp",): daily_var = "temperature_2m_max" elif metric in ("low_temp",): daily_var = "temperature_2m_min" else: daily_var = "temperature_2m_max" params = { "latitude": city_data["lat"], "longitude": city_data["lon"], "start_date": date, "end_date": date, "daily": daily_var, "temperature_unit": "fahrenheit", "timezone": city_data["tz"], } resp = requests.get(HISTORICAL_URL, params=params, timeout=10) resp.raise_for_status() data = resp.json() daily = data.get("daily", {}) values = daily.get(daily_var, []) if values and values[0] is not None: temp = float(values[0]) # Sanity check: reject obviously wrong temperatures if temp < -60 or temp > 140: # °F — no city on Earth outside this range logger.warning(f"Suspicious temp {temp}°F for {city}/{date} — rejecting") return None return temp return None except Exception as e: logger.error(f"Historical weather fetch error for {city}/{date}: {e}") return None def resolve_single_trade(trade: dict, actual_value: float) -> tuple: """ Determine outcome and P&L for a single trade. Returns: (outcome: 'win'|'loss', pnl: float) P&L calculation (prediction markets): - size = dollar cost (not shares). shares = size / price. - WIN: payout = shares × $1.00, profit = payout - cost = size × (1 - price) / price - LOSS: lose entire cost basis = -size """ threshold = trade["threshold"] operator = trade["operator"] unit = trade.get("threshold_unit", "F") side = trade["side"] price = trade["price"] size = trade["size"] # Convert threshold to °F if needed if unit == "C": threshold_f = threshold * 9/5 + 32 low_f = (trade.get("threshold_low") or threshold) * 9/5 + 32 if trade.get("threshold_low") is not None else None high_f = (trade.get("threshold_high") or threshold) * 9/5 + 32 if trade.get("threshold_high") is not None else None else: threshold_f = threshold low_f = trade.get("threshold_low") high_f = trade.get("threshold_high") # Determine if bracket is YES (event happened) if operator == "gte": event_yes = actual_value >= threshold_f elif operator == "lte": event_yes = actual_value <= threshold_f elif operator in ("between", "eq"): low = low_f if low_f is not None else threshold_f - 0.5 high = high_f if high_f is not None else threshold_f + 0.5 # Polymarket rounds to nearest integer for resolution actual_rounded = round(actual_value) event_yes = low <= actual_rounded <= high else: # Unknown operator — skip return None, 0 # Calculate P&L based on our side vs actual outcome # size = dollar cost, shares = size / price # WIN: payout = shares × $1.00 = size / price, profit = payout - size = size × (1 - price) / price # LOSS: lose entire cost basis = -size if side == "YES": if event_yes: pnl = size * (1.0 - price) / price outcome = "win" else: pnl = -size outcome = "loss" else: # side == "NO" if not event_yes: pnl = size * (1.0 - price) / price outcome = "win" else: pnl = -size outcome = "loss" return outcome, round(pnl, 4) def update_trade_outcome(trade_id: int, outcome: str, pnl: float, actual_value: float): """Update trade in DB with resolution""" conn = db.get_conn() conn.execute(""" UPDATE trades SET outcome = ?, pnl = ?, resolved_at = datetime('now'), status = ? WHERE id = ? """, (outcome, pnl, outcome, trade_id)) conn.commit() conn.close() def get_resolution_stats() -> dict: """Get overall resolution statistics""" conn = db.get_conn() total = conn.execute("SELECT COUNT(*) FROM trades WHERE outcome IS NOT NULL").fetchone()[0] wins = conn.execute("SELECT COUNT(*) FROM trades WHERE outcome = 'win'").fetchone()[0] losses = conn.execute("SELECT COUNT(*) FROM trades WHERE outcome = 'loss'").fetchone()[0] total_pnl = conn.execute("SELECT COALESCE(SUM(pnl), 0) FROM trades WHERE outcome IS NOT NULL").fetchone()[0] unresolved = conn.execute(""" SELECT COUNT(*) FROM trades t JOIN markets m ON t.market_id = m.id WHERE m.date < date('now') AND t.outcome IS NULL """).fetchone()[0] # By edge tier by_tier = conn.execute(""" SELECT edge_tier, COUNT(*) as total, SUM(CASE WHEN outcome='win' THEN 1 ELSE 0 END) as wins, COALESCE(SUM(pnl), 0) as pnl, AVG(edge) as avg_edge FROM trades WHERE outcome IS NOT NULL GROUP BY edge_tier """).fetchall() # By city (top 10) by_city = conn.execute(""" SELECT m.city, COUNT(*) as total, SUM(CASE WHEN t.outcome='win' THEN 1 ELSE 0 END) as wins, COALESCE(SUM(t.pnl), 0) as pnl FROM trades t JOIN markets m ON t.market_id = m.id WHERE t.outcome IS NOT NULL GROUP BY m.city ORDER BY pnl DESC LIMIT 15 """).fetchall() # Don't close thread-local connection — it's reused return { "total": total, "wins": wins, "losses": losses, "win_rate": round(wins / total * 100, 1) if total else 0, "pnl": round(total_pnl, 2), "unresolved": unresolved, "by_tier": [ { "tier": r["edge_tier"], "total": r["total"], "wins": r["wins"], "win_rate": round(r["wins"] / r["total"] * 100, 1) if r["total"] else 0, "pnl": round(r["pnl"], 2), "avg_edge": round(r["avg_edge"] * 100, 1), } for r in by_tier ], "by_city": [ { "city": r["city"], "total": r["total"], "wins": r["wins"], "win_rate": round(r["wins"] / r["total"] * 100, 1) if r["total"] else 0, "pnl": round(r["pnl"], 2), } for r in by_city ], }