"""
Resolution tracker — fetches actual weather data and resolves trades.
After a market's date passes:
1. Fetch actual temperature from Open-Meteo Historical API
2. Determine outcome (YES/NO) for each bracket
3. Calculate P&L for each trade
"""
import time
import requests
from datetime import datetime, timedelta, timezone
from loguru import logger
from config import CITIES_BY_NAME, RESOLVE_MIN_HOURS, CLOB_API_URL
import db
# Open-Meteo Historical Weather API (free, no key)
HISTORICAL_URL = "https://archive-api.open-meteo.com/v1/archive"
STALE_ORDER_MINUTES = 30 # Cancel unfilled orders older than this
def _check_pending_fills(conn):
"""
Check pending orders and update status to 'filled' or 'cancelled'.
Uses CLOB API to verify order fill status.
Also cancels stale orders older than STALE_ORDER_MINUTES to free capital.
"""
pending = conn.execute("""
SELECT t.id, t.order_id, t.market_id, t.side, t.price, t.size, t.status, t.created_at
FROM trades t
WHERE t.status IN ('pending', 'unverified') AND t.outcome IS NULL AND t.dry_run = 0
""").fetchall()
if not pending:
return
logger.info(f"Checking {len(pending)} pending orders for fill status...")
try:
from executor import get_client, cancel_order
client = get_client()
if not client:
logger.warning("CLOB client unavailable — cannot check fills")
return
now = datetime.now(timezone.utc)
for trade in pending:
try:
order_id = trade["order_id"]
if not order_id:
continue
# Check age — cancel stale orders to free capital
created_at = trade["created_at"]
is_stale = False
if created_at:
try:
created_dt = datetime.strptime(created_at, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)
age_minutes = (now - created_dt).total_seconds() / 60
is_stale = age_minutes > STALE_ORDER_MINUTES
except (ValueError, TypeError):
pass
# Try to get order status from CLOB
try:
order = client.get_order(order_id)
status = order.get("status", "").lower() if order else ""
except Exception:
# If order not found, it might have been cancelled/expired
status = "not_found"
if status in ("matched", "filled"):
conn.execute(
"UPDATE trades SET status = 'filled', filled_at = datetime('now') WHERE id = ?",
(trade["id"],)
)
logger.info(f"Order {order_id[:16]} FILLED → trade #{trade['id']}")
elif status in ("cancelled", "expired", "not_found"):
conn.execute(
"UPDATE trades SET status = 'cancelled', outcome = 'cancelled', pnl = 0 WHERE id = ?",
(trade["id"],)
)
logger.info(f"Order {order_id[:16]} {status} → trade #{trade['id']} cancelled")
elif is_stale:
# Order is still open but stale — cancel it to free capital
logger.info(f"Order {order_id[:16]} stale ({age_minutes:.0f}min old) → cancelling...")
if cancel_order(order_id):
conn.execute(
"UPDATE trades SET status = 'cancelled', outcome = 'cancelled', pnl = 0 WHERE id = ?",
(trade["id"],)
)
logger.info(f"Stale order {order_id[:16]} cancelled → trade #{trade['id']} freed")
else:
logger.warning(f"Failed to cancel stale order {order_id[:16]}")
# else: still open and not yet stale, leave as pending
except Exception as e:
logger.debug(f"Fill check error for trade #{trade['id']}: {e}")
conn.commit()
except Exception as e:
logger.error(f"Fill check error: {e}")
def cancel_stale_orders(max_age_minutes: int = None) -> int:
"""
Cancel all stale pending/unverified orders.
Can be called on startup or manually to free locked capital.
Returns count of cancelled orders.
"""
if max_age_minutes is None:
max_age_minutes = STALE_ORDER_MINUTES
conn = db.get_conn()
pending = conn.execute("""
SELECT t.id, t.order_id, t.price, t.size, t.status, t.created_at
FROM trades t
WHERE t.status IN ('pending', 'unverified') AND t.outcome IS NULL AND t.dry_run = 0
""").fetchall()
if not pending:
conn.close()
return 0
from executor import cancel_order
now = datetime.now(timezone.utc)
cancelled = 0
for trade in pending:
order_id = trade["order_id"]
if not order_id:
continue
# Check age
created_at = trade["created_at"]
if created_at:
try:
created_dt = datetime.strptime(created_at, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)
age_minutes = (now - created_dt).total_seconds() / 60
if age_minutes < max_age_minutes:
continue
except (ValueError, TypeError):
pass # If can't parse date, try to cancel anyway
logger.info(f"Cancelling stale order {order_id[:16]} (${trade['size']:.2f} @ ${trade['price']:.2f})")
if cancel_order(order_id):
conn.execute(
"UPDATE trades SET status = 'cancelled', outcome = 'cancelled', pnl = 0 WHERE id = ?",
(trade["id"],)
)
cancelled += 1
logger.info(f"Cancelled stale order → trade #{trade['id']}")
else:
logger.warning(f"Failed to cancel order {order_id[:16]}")
conn.commit()
conn.close()
logger.info(f"Stale order cleanup: {cancelled}/{len(pending)} cancelled")
return cancelled
def _fetch_clob_market(condition_id: str) -> dict | None:
"""
Fetch market data from Polymarket CLOB API.
Returns dict with 'closed', 'tokens' (with 'winner' field) or None on error.
"""
try:
resp = requests.get(
f"{CLOB_API_URL}/markets/{condition_id}",
timeout=10
)
if resp.status_code != 200:
return None
return resp.json()
except Exception as e:
logger.debug(f"CLOB fetch error for {condition_id[:16]}: {e}")
return None
def _resolve_trade_from_clob(trade: dict, clob_data: dict) -> tuple:
"""
Determine outcome and P&L from Polymarket CLOB resolution.
CLOB tokens have winner=True/False when market is resolved.
Our trade has side (YES/NO) and we check if our side won.
Returns: (outcome: 'win'|'loss'|None, pnl: float)
"""
tokens = clob_data.get("tokens", [])
if not tokens:
return None, 0
# Find winner from CLOB tokens
yes_winner = None
for token in tokens:
outcome_label = token.get("outcome", "").lower()
winner = token.get("winner")
if winner is None:
# Market not yet resolved on Polymarket
return None, 0
if outcome_label == "yes":
yes_winner = winner
if yes_winner is None:
return None, 0
side = trade["side"]
price = trade["price"]
size = trade["size"]
# P&L: size = dollar cost, shares = size / price
# WIN: payout = shares × $1.00 = size / price, profit = payout - size
# LOSS: lose entire cost basis = -size
if side == "YES":
our_side_won = yes_winner
else: # NO
our_side_won = not yes_winner
if our_side_won:
pnl = size * (1.0 - price) / price
outcome = "win"
else:
pnl = -size
outcome = "loss"
return outcome, round(pnl, 4)
def resolve_trades():
"""
Main resolution cycle:
1. Check pending orders for fill status
2. Find all unresolved trades where market date has passed
3. Check Polymarket CLOB API for market resolution (winner field)
4. Fallback to Open-Meteo historical weather if CLOB not resolved yet
"""
conn = db.get_conn()
# First: check pending orders for fill status
_check_pending_fills(conn)
# Find ALL unresolved trades where market date is in the past (no more RESOLVE_MIN_HOURS cutoff for CLOB)
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
trades = conn.execute("""
SELECT t.id, t.market_id, t.side, t.price, t.size, t.edge, t.edge_tier,
t.model_prob, t.market_prob,
m.city, m.date, m.metric, m.threshold, m.threshold_unit,
m.operator, m.question,
m.threshold_low, m.threshold_high
FROM trades t
JOIN markets m ON t.market_id = m.id
WHERE m.date < ?
AND t.outcome IS NULL
AND t.status IN ('filled', 'simulated')
ORDER BY m.city, m.date
""", (today,)).fetchall()
conn.close()
if not trades:
logger.debug(f"No trades to resolve (today: {today})")
return {"resolved": 0, "wins": 0, "losses": 0, "pnl": 0}
logger.info(f"Resolving {len(trades)} trades (checking Polymarket CLOB first, fallback Open-Meteo)...")
total_resolved = 0
total_wins = 0
total_losses = 0
total_pnl = 0.0
# Phase 1: Try CLOB API resolution (one call per unique market_id)
clob_cache = {} # market_id → clob_data
clob_resolved_ids = set()
fallback_trades = []
for idx, trade in enumerate(trades):
market_id = trade["market_id"]
# Fetch CLOB data (cached per market_id)
if market_id not in clob_cache:
if idx > 0:
time.sleep(0.2) # Throttle CLOB API
clob_cache[market_id] = _fetch_clob_market(market_id)
clob_data = clob_cache[market_id]
if clob_data and clob_data.get("closed"):
outcome, pnl = _resolve_trade_from_clob(dict(trade), clob_data)
if outcome:
update_trade_outcome(trade["id"], outcome, pnl, actual_value=None)
total_resolved += 1
total_pnl += pnl
clob_resolved_ids.add(trade["id"])
if outcome == "win":
total_wins += 1
else:
total_losses += 1
logger.info(
f" CLOB resolved: {trade['city']} {trade['date']} | "
f"{trade['side']} @ {trade['price']} → {outcome.upper()} ${pnl:+.2f} | "
f"{trade['question'][:50]}"
)
else:
fallback_trades.append(trade)
else:
fallback_trades.append(trade)
# Phase 2: Fallback to Open-Meteo for trades not resolved via CLOB
if fallback_trades:
cutoff_date = (datetime.now(timezone.utc) - timedelta(hours=RESOLVE_MIN_HOURS)).strftime("%Y-%m-%d")
# Only try Open-Meteo for trades old enough
meteo_trades = [t for t in fallback_trades if t["date"] < cutoff_date]
if meteo_trades:
logger.info(f"Fallback: {len(meteo_trades)} trades via Open-Meteo (cutoff: {cutoff_date})...")
# Group by city+date+metric → one API call per group
groups = {}
for t in meteo_trades:
key = f"{t['city']}|{t['date']}|{t['metric']}"
if key not in groups:
groups[key] = {
"city": t["city"],
"date": t["date"],
"metric": t["metric"],
"trades": [],
}
groups[key]["trades"].append(dict(t))
for idx, (key, group) in enumerate(groups.items()):
city = group["city"]
date = group["date"]
metric = group["metric"]
if idx > 0:
time.sleep(0.5)
actual_value = fetch_actual_weather(city, date, metric)
if actual_value is None:
logger.warning(f"Could not fetch actual weather for {city} on {date}")
continue
logger.info(f"Actual {metric} for {city} on {date}: {actual_value:.1f}°F")
_save_calibration_for_group(group, actual_value)
for trade in group["trades"]:
outcome, pnl = resolve_single_trade(trade, actual_value)
if outcome:
update_trade_outcome(trade["id"], outcome, pnl, actual_value)
total_resolved += 1
total_pnl += pnl
if outcome == "win":
total_wins += 1
else:
total_losses += 1
# === Auto-redeem resolved positions (convert winning tokens → USDC) ===
redeemed = 0
if total_resolved > 0:
redeemed = _auto_redeem_resolved(trades)
result = {
"resolved": total_resolved,
"wins": total_wins,
"losses": total_losses,
"pnl": round(total_pnl, 2),
"win_rate": round(total_wins / total_resolved * 100, 1) if total_resolved else 0,
"redeemed": redeemed,
}
logger.info(
f"Resolution complete: {total_resolved} trades | "
f"W:{total_wins} L:{total_losses} ({result['win_rate']}% WR) | "
f"P&L: ${total_pnl:+.2f} | Redeemed: {redeemed}"
)
return result
def _auto_redeem_resolved(trades: list) -> int:
"""
Auto-redeem resolved positions on Polymarket.
Groups trades by condition_id to avoid duplicate redeems.
Only redeems non-dry-run filled trades.
"""
from executor import redeem_position
# Collect unique condition_ids from resolved trades
redeemed_ids = set()
redeemed_count = 0
for t in trades:
market_id = t["market_id"]
if market_id in redeemed_ids:
continue
# Skip dry-run trades (no real positions to redeem)
# trades from the SQL query don't have dry_run, check DB
conn = db.get_conn()
trade_row = conn.execute(
"SELECT dry_run FROM trades WHERE market_id = ? AND outcome IS NOT NULL LIMIT 1",
(market_id,)
).fetchone()
conn.close()
if trade_row and trade_row["dry_run"]:
continue
redeemed_ids.add(market_id)
try:
result = redeem_position(condition_id=market_id, neg_risk=True)
if result["success"]:
redeemed_count += 1
else:
# Try without neg_risk (some weather markets may not be neg_risk)
result = redeem_position(condition_id=market_id, neg_risk=False)
if result["success"]:
redeemed_count += 1
except Exception as e:
logger.error(f"Redeem error for {market_id[:16]}: {e}")
# Small delay between redeems to avoid nonce issues
time.sleep(2)
if redeemed_count > 0:
logger.info(f"Auto-redeemed {redeemed_count}/{len(redeemed_ids)} positions")
return redeemed_count
def _save_calibration_for_group(group: dict, actual_value: float):
"""
Save calibration record: forecast_value vs actual for this city+date.
Uses the first trade's model_prob as representative.
"""
try:
trades = group["trades"]
if not trades:
return
# Get forecast_value from forecast cache
city = group["city"]
date = group["date"]
metric = group["metric"]
cache_key = f"{city.lower()}|{date}|{metric}"
cached = db.get_cached_forecast(cache_key, ttl_seconds=86400 * 7) # 7 day TTL for calibration
forecast_value = None
source = None
days_ahead = None
if cached:
forecast_value = cached.get("forecast_value")
source = cached.get("source")
days_ahead = cached.get("days_ahead")
if forecast_value is None:
logger.debug(f"No cached forecast for {city}/{date} — skipping calibration")
return
# Use first trade's model_prob for Brier score
first_trade = trades[0]
model_prob = first_trade.get("model_prob")
side = first_trade.get("side")
if model_prob is None:
logger.debug(f"No model_prob for {city}/{date} — skipping calibration")
return
# Determine binary outcome for the first trade's bracket
threshold = first_trade.get("threshold")
operator = first_trade.get("operator")
unit = first_trade.get("threshold_unit", "F")
if threshold is None:
return
threshold_f = threshold * 9/5 + 32 if unit == "C" else threshold
if operator == "gte":
outcome_binary = 1 if actual_value >= threshold_f else 0
elif operator == "lte":
outcome_binary = 1 if actual_value <= threshold_f else 0
else:
outcome_binary = None # Skip Brier for between/eq brackets
# Adjust model_prob to YES direction for Brier
if side == "NO" and model_prob is not None:
model_prob_yes = 1 - model_prob
else:
model_prob_yes = model_prob
db.save_calibration({
"city": city,
"date": date,
"metric": metric,
"forecast_value": forecast_value,
"actual_value": actual_value,
"model_prob": model_prob_yes,
"outcome_binary": outcome_binary,
"days_ahead": days_ahead,
"source": source,
})
logger.debug(
f"Calibration saved: {city} {date} forecast={forecast_value:.1f}°F "
f"actual={actual_value:.1f}°F error={abs(forecast_value - actual_value):.1f}°F"
)
except Exception as e:
logger.error(f"Calibration save error: {e}")
def fetch_actual_weather(city: str, date: str, metric: str = "high_temp") -> float | None:
"""
Fetch actual historical weather from Open-Meteo Archive API.
Returns actual high/low temp in Fahrenheit.
"""
city_lower = city.lower()
city_data = CITIES_BY_NAME.get(city_lower)
if not city_data:
logger.warning(f"Unknown city for resolution: {city}")
return None
try:
# daily variables based on metric
if metric in ("high_temp",):
daily_var = "temperature_2m_max"
elif metric in ("low_temp",):
daily_var = "temperature_2m_min"
else:
daily_var = "temperature_2m_max"
params = {
"latitude": city_data["lat"],
"longitude": city_data["lon"],
"start_date": date,
"end_date": date,
"daily": daily_var,
"temperature_unit": "fahrenheit",
"timezone": city_data["tz"],
}
resp = requests.get(HISTORICAL_URL, params=params, timeout=10)
resp.raise_for_status()
data = resp.json()
daily = data.get("daily", {})
values = daily.get(daily_var, [])
if values and values[0] is not None:
temp = float(values[0])
# Sanity check: reject obviously wrong temperatures
if temp < -60 or temp > 140: # °F — no city on Earth outside this range
logger.warning(f"Suspicious temp {temp}°F for {city}/{date} — rejecting")
return None
return temp
return None
except Exception as e:
logger.error(f"Historical weather fetch error for {city}/{date}: {e}")
return None
def resolve_single_trade(trade: dict, actual_value: float) -> tuple:
"""
Determine outcome and P&L for a single trade.
Returns: (outcome: 'win'|'loss', pnl: float)
P&L calculation (prediction markets):
- size = dollar cost (not shares). shares = size / price.
- WIN: payout = shares × $1.00, profit = payout - cost = size × (1 - price) / price
- LOSS: lose entire cost basis = -size
"""
threshold = trade["threshold"]
operator = trade["operator"]
unit = trade.get("threshold_unit", "F")
side = trade["side"]
price = trade["price"]
size = trade["size"]
# Convert threshold to °F if needed
if unit == "C":
threshold_f = threshold * 9/5 + 32
low_f = (trade.get("threshold_low") or threshold) * 9/5 + 32 if trade.get("threshold_low") is not None else None
high_f = (trade.get("threshold_high") or threshold) * 9/5 + 32 if trade.get("threshold_high") is not None else None
else:
threshold_f = threshold
low_f = trade.get("threshold_low")
high_f = trade.get("threshold_high")
# Determine if bracket is YES (event happened)
if operator == "gte":
event_yes = actual_value >= threshold_f
elif operator == "lte":
event_yes = actual_value <= threshold_f
elif operator in ("between", "eq"):
low = low_f if low_f is not None else threshold_f - 0.5
high = high_f if high_f is not None else threshold_f + 0.5
# Polymarket rounds to nearest integer for resolution
actual_rounded = round(actual_value)
event_yes = low <= actual_rounded <= high
else:
# Unknown operator — skip
return None, 0
# Calculate P&L based on our side vs actual outcome
# size = dollar cost, shares = size / price
# WIN: payout = shares × $1.00 = size / price, profit = payout - size = size × (1 - price) / price
# LOSS: lose entire cost basis = -size
if side == "YES":
if event_yes:
pnl = size * (1.0 - price) / price
outcome = "win"
else:
pnl = -size
outcome = "loss"
else: # side == "NO"
if not event_yes:
pnl = size * (1.0 - price) / price
outcome = "win"
else:
pnl = -size
outcome = "loss"
return outcome, round(pnl, 4)
def update_trade_outcome(trade_id: int, outcome: str, pnl: float, actual_value: float):
"""Update trade in DB with resolution"""
conn = db.get_conn()
conn.execute("""
UPDATE trades
SET outcome = ?, pnl = ?, resolved_at = datetime('now'),
status = ?
WHERE id = ?
""", (outcome, pnl, outcome, trade_id))
conn.commit()
conn.close()
def get_resolution_stats() -> dict:
"""Get overall resolution statistics"""
conn = db.get_conn()
total = conn.execute("SELECT COUNT(*) FROM trades WHERE outcome IS NOT NULL").fetchone()[0]
wins = conn.execute("SELECT COUNT(*) FROM trades WHERE outcome = 'win'").fetchone()[0]
losses = conn.execute("SELECT COUNT(*) FROM trades WHERE outcome = 'loss'").fetchone()[0]
total_pnl = conn.execute("SELECT COALESCE(SUM(pnl), 0) FROM trades WHERE outcome IS NOT NULL").fetchone()[0]
unresolved = conn.execute("""
SELECT COUNT(*) FROM trades t JOIN markets m ON t.market_id = m.id
WHERE m.date < date('now') AND t.outcome IS NULL
""").fetchone()[0]
# By edge tier
by_tier = conn.execute("""
SELECT edge_tier,
COUNT(*) as total,
SUM(CASE WHEN outcome='win' THEN 1 ELSE 0 END) as wins,
COALESCE(SUM(pnl), 0) as pnl,
AVG(edge) as avg_edge
FROM trades WHERE outcome IS NOT NULL
GROUP BY edge_tier
""").fetchall()
# By city (top 10)
by_city = conn.execute("""
SELECT m.city,
COUNT(*) as total,
SUM(CASE WHEN t.outcome='win' THEN 1 ELSE 0 END) as wins,
COALESCE(SUM(t.pnl), 0) as pnl
FROM trades t JOIN markets m ON t.market_id = m.id
WHERE t.outcome IS NOT NULL
GROUP BY m.city
ORDER BY pnl DESC
LIMIT 15
""").fetchall()
# Don't close thread-local connection — it's reused
return {
"total": total,
"wins": wins,
"losses": losses,
"win_rate": round(wins / total * 100, 1) if total else 0,
"pnl": round(total_pnl, 2),
"unresolved": unresolved,
"by_tier": [
{
"tier": r["edge_tier"],
"total": r["total"],
"wins": r["wins"],
"win_rate": round(r["wins"] / r["total"] * 100, 1) if r["total"] else 0,
"pnl": round(r["pnl"], 2),
"avg_edge": round(r["avg_edge"] * 100, 1),
}
for r in by_tier
],
"by_city": [
{
"city": r["city"],
"total": r["total"],
"wins": r["wins"],
"win_rate": round(r["wins"] / r["total"] * 100, 1) if r["total"] else 0,
"pnl": round(r["pnl"], 2),
}
for r in by_city
],
}