"""
Polymarket Weather Bot — Main entry point
FastAPI server + APScheduler background jobs
"""
import asyncio
import base64
import os
import secrets
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from fastapi import FastAPI, Request
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse, Response
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from loguru import logger
from config import PORT, HOST, DRY_RUN, BET_SIZE, SCAN_INTERVAL_SEC, RESOLVE_CHECK_SEC, LOG_LEVEL
import db
# Dashboard Basic Auth
DASH_USER = os.getenv("DASH_USER", "")
DASH_PASS = os.getenv("DASH_PASS", "")
def _safe_create_task(coro, name: str = None):
"""Create asyncio task with error logging (prevents silent failures)"""
task = asyncio.create_task(coro, name=name)
def _on_done(t):
if t.cancelled():
return
exc = t.exception()
if exc:
logger.error(f"Background task '{name or 'unnamed'}' failed: {exc}", exc_info=exc)
task.add_done_callback(_on_done)
return task
from scanner import scan_weather_markets
from config import CITY_COORDS
from forecaster import get_forecast_and_probability, calculate_probability, calculate_probability_ensemble
from analyzer import analyze_opportunity
from executor import place_bet, get_balance, get_wallet_balance
from risk import RiskManager
from resolver import resolve_trades, get_resolution_stats, cancel_stale_orders
# === Setup logging ===
logger.remove() # Remove default stderr handler (prevents duplicate lines in PM2)
logger.add("logs/bot.log", rotation="10 MB", retention="30 days", level=LOG_LEVEL)
logger.add(lambda msg: print(msg, end=""), level=LOG_LEVEL, colorize=True) # Single stdout handler for PM2
# === Init DB early (before RiskManager reads state) ===
db.init_db()
# === Global state ===
risk_mgr = RiskManager()
scheduler = AsyncIOScheduler()
bot_running = False
last_scan_result = {"markets": 0, "opportunities": 0, "trades": 0, "at": None}
_scan_lock = asyncio.Lock()
_resolve_lock = asyncio.Lock()
_balance_cache = {"exchange": None, "wallet": None, "ts": 0}
# === Helpers ===
def _get_cached_balances_sync(ttl: int = 60) -> dict:
"""Fetch exchange + wallet balances, cached for `ttl` seconds (blocking I/O)"""
import time as _time
now = _time.time()
if _balance_cache["ts"] > 0 and now - _balance_cache["ts"] < ttl:
return _balance_cache
exchange = get_balance()
wallet = get_wallet_balance()
if exchange is not None:
_balance_cache["exchange"] = round(exchange, 2)
if wallet is not None:
_balance_cache["wallet"] = round(wallet, 2)
_balance_cache["ts"] = now
return _balance_cache
async def _get_cached_balances(ttl: int = 60) -> dict:
"""Non-blocking wrapper — runs balance I/O in thread to avoid blocking event loop"""
import time as _time
now = _time.time()
if _balance_cache["ts"] > 0 and now - _balance_cache["ts"] < ttl:
return _balance_cache
return await asyncio.to_thread(_get_cached_balances_sync, ttl)
def _is_past_peak_time(event: dict, bracket: dict) -> bool:
"""
Check if market's peak temperature time has already passed in city's LOCAL time.
High temp peaks ~14:00 local, low temp bottoms ~06:00 local.
After peak, market is essentially resolved → skip.
Uses LOCAL date comparison (not UTC) to correctly handle cities
ahead of UTC (Tokyo +9h, Seoul +9h, Shanghai +8h, etc.).
"""
try:
from zoneinfo import ZoneInfo
market_date = bracket.get("date")
if not market_date:
return False
# Get local timezone for this city
city_slug = event.get("city_slug", "")
city_data = CITY_COORDS.get(city_slug, {})
tz_name = city_data.get("tz")
if not tz_name:
return False
tz = ZoneInfo(tz_name)
local_now = datetime.now(tz)
local_today = local_now.strftime("%Y-%m-%d")
# If market_date already passed in local time → definitely skip
if market_date < local_today:
logger.debug(
f"Skip past-date {bracket.get('city')} {bracket.get('metric', 'high_temp')}: "
f"market {market_date} < local today {local_today}"
)
return True
# If market_date is in the future locally → OK to trade
if market_date > local_today:
return False
# Same-day: check if peak time already passed
metric = bracket.get("metric", "high_temp")
cutoff_hour = 14 if metric == "high_temp" else 8
if local_now.hour >= cutoff_hour:
logger.debug(
f"Skip same-day {bracket.get('city')} {metric}: "
f"local {local_now.strftime('%H:%M')} past cutoff {cutoff_hour}:00"
)
return True
return False
except Exception as e:
logger.warning(f"Peak time check failed for {bracket.get('city', '?')}: {e}")
return False
# === Core bot logic ===
async def run_scan_cycle():
"""Main scan cycle: scan → forecast → analyze → execute"""
global last_scan_result
if _scan_lock.locked():
logger.debug("Scan already running, skipping")
# Update timestamp so dashboard doesn't show stale "Never"
last_scan_result = {**last_scan_result, "at": datetime.now(timezone.utc).isoformat()}
return
async with _scan_lock:
await _run_scan_inner()
async def _run_scan_inner():
global last_scan_result
logger.info("--- Scan cycle started ---")
scan_result = {"markets": 0, "opportunities": 0, "trades": 0, "at": datetime.now(timezone.utc).isoformat()}
# Cleanup old forecasts + expired cache to prevent unbounded DB growth
try:
await asyncio.to_thread(db.cleanup_old_forecasts, 7)
await asyncio.to_thread(db.cleanup_forecast_cache)
except Exception as e:
logger.warning(f"Forecast cleanup failed: {e}")
try:
# 1. Scan weather markets (run in thread to avoid blocking event loop)
markets = await asyncio.to_thread(scan_weather_markets)
scan_result["markets"] = len(markets)
if not markets:
logger.info("No weather markets found")
last_scan_result = scan_result
return
# 2. Check balance before attempting any trades
try:
balance = await asyncio.to_thread(get_balance)
except Exception:
balance = None
# Polymarket min 5 shares × ~$0.72 = $3.60 minimum order
min_order = 3.50
if balance is not None and balance < min_order:
logger.info(f"Balance too low: ${balance:.2f} < ${min_order:.2f} (min order ~$3.50) — skipping trades")
last_scan_result = scan_result
return
if balance is not None:
logger.info(f"Balance: ${balance:.2f}")
# 3. For each event → for each bracket market: forecast + analyze + maybe execute
# Cache forecasts per city+date+metric to avoid duplicate Open-Meteo calls
forecast_cache = {}
balance_exhausted = False # Stop trying after first "not enough balance"
running_balance = balance # Track remaining balance to avoid over-ordering
# Proactively trip the kill switch on accumulated daily loss before placing
# any new orders (covers losses from already-open positions, not just new signals).
if risk_mgr.check_daily_loss():
logger.warning("Daily loss limit reached — kill switch active, skipping new trades this scan")
for event in markets:
brackets = event.get("brackets", [])
if not brackets:
continue
for bracket in brackets:
try:
# Skip if no city/date/threshold parsed
if not bracket.get("city") or not bracket.get("date") or bracket.get("threshold") is None:
continue
# === SAME-DAY CUTOFF: skip if temperature peak already passed ===
if _is_past_peak_time(event, bracket):
continue
# Get forecast (cached per city+date+metric — ONE API call per city+date)
cache_key = f"{bracket['city']}|{bracket['date']}|{bracket.get('metric', 'high_temp')}"
if cache_key not in forecast_cache:
forecast = await asyncio.to_thread(get_forecast_and_probability, bracket)
forecast_cache[cache_key] = forecast
else:
# Reuse cached forecast, only recalculate probability for this bracket's threshold
cached = forecast_cache[cache_key]
if cached is None:
forecast = None
else:
threshold = bracket["threshold"]
if bracket.get("threshold_unit") == "C":
threshold = threshold * 9/5 + 32
threshold_low = bracket.get("threshold_low")
threshold_high = bracket.get("threshold_high")
if bracket.get("threshold_unit") == "C":
if threshold_low is not None:
threshold_low = threshold_low * 9/5 + 32
if threshold_high is not None:
threshold_high = threshold_high * 9/5 + 32
# Use ensemble if available, else Gaussian
member_values = cached.get("member_values")
if member_values and len(member_values) >= 10:
model_prob = calculate_probability_ensemble(
member_values=member_values,
threshold=threshold,
operator=bracket.get("operator", "gte"),
threshold_low=threshold_low,
threshold_high=threshold_high,
)
prob_method = "ensemble"
else:
model_prob = calculate_probability(
forecast_value=cached["forecast_value"],
threshold=threshold,
operator=bracket.get("operator", "gte"),
sigma=cached["sigma"],
threshold_low=threshold_low,
threshold_high=threshold_high,
)
prob_method = "gaussian"
forecast = {
**cached,
"market_id": bracket["id"],
"model_probability": model_prob,
"prob_method": prob_method,
}
# Persist the per-bracket forecast for calibration. The
# non-cache path saves via forecaster; the cache-reuse path
# must too, else only the first bracket per city+date is recorded.
try:
db.save_forecast(forecast)
except Exception as e:
logger.debug(f"save_forecast (cache-reuse) failed: {e}")
if not forecast:
continue
# Analyze edge
signal = analyze_opportunity(bracket, forecast)
if not signal:
continue
scan_result["opportunities"] += 1
# Skip all trades if balance already exhausted
if balance_exhausted:
continue
# Pre-check: enough running balance for this trade?
if running_balance is not None:
# Actual cost = MIN_SHARES × price (5 shares minimum)
est_cost = max(signal["size"], 5 * signal["price"])
if running_balance < est_cost:
logger.info(f"Balance exhausted — stopping trades for this scan")
balance_exhausted = True
continue
# Risk check
allowed, reason = risk_mgr.can_trade(signal)
if not allowed:
logger.info(f"Risk blocked: {reason}")
continue
# Execute (in thread — CLOB client is blocking)
result = await asyncio.to_thread(place_bet, signal, DRY_RUN)
if result["success"]:
scan_result["trades"] += 1
# Subtract the executor's reported cost (shares × price) from
# running balance — single source of truth, not a re-estimate.
if running_balance is not None:
actual_cost = result.get("actual_cost") or max(signal["size"], 5 * signal["price"])
running_balance -= actual_cost
elif "not enough balance" in result.get("message", ""):
logger.info("Balance exhausted — stopping trades for this scan")
balance_exhausted = True
except Exception as e:
logger.error(f"Error processing bracket {bracket.get('id', '?')[:8]}: {e}")
except Exception as e:
logger.error(f"Scan cycle error: {e}")
last_scan_result = scan_result
logger.info(
f"--- Scan done: {scan_result['markets']} markets, "
f"{scan_result['opportunities']} opps, {scan_result['trades']} trades ---"
)
async def run_resolve_cycle():
"""Check and resolve trades where market date has passed"""
if _resolve_lock.locked():
logger.debug("Resolve cycle skipped — previous still running")
return
async with _resolve_lock:
try:
result = await asyncio.to_thread(resolve_trades)
if result["resolved"] > 0:
logger.info(f"--- Resolved: {result['resolved']} trades, W:{result['wins']} L:{result['losses']}, P&L: ${result['pnl']:+.2f} ---")
except Exception as e:
logger.error(f"Resolve cycle error: {e}")
# === FastAPI App ===
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Startup and shutdown"""
from config import BLOCK_YES_SIDE
# Log the resolved safety flags so a mis-parsed env var is visible immediately.
logger.info(f"Weather Bot starting (DRY_RUN={DRY_RUN}, BLOCK_YES_SIDE={BLOCK_YES_SIDE})")
# Start scheduler. max_instances=1 + coalesce so a scan that overruns its
# interval doesn't pile up or get silently dropped (default misfire_grace_time=1
# would drop a late run, leaving the dashboard showing a stale result).
job_defaults = {"max_instances": 1, "coalesce": True, "misfire_grace_time": 300}
scheduler.add_job(run_scan_cycle, "interval", seconds=SCAN_INTERVAL_SEC, id="scan", **job_defaults)
scheduler.add_job(run_resolve_cycle, "interval", seconds=RESOLVE_CHECK_SEC, id="resolve", **job_defaults)
scheduler.start()
logger.info(f"Scheduler started: scan every {SCAN_INTERVAL_SEC}s, resolve every {RESOLVE_CHECK_SEC}s")
# Startup cleanup: cancel stale unverified orders to free locked capital
try:
cancelled = await asyncio.to_thread(cancel_stale_orders)
if cancelled > 0:
logger.info(f"Startup cleanup: cancelled {cancelled} stale orders")
except Exception as e:
logger.error(f"Startup cleanup error: {e}")
# Run initial scan + resolve
_safe_create_task(run_scan_cycle(), name="initial-scan")
_safe_create_task(run_resolve_cycle(), name="initial-resolve")
yield
scheduler.shutdown()
logger.info("Weather Bot stopped")
app = FastAPI(title="Polymarket Weather Bot", lifespan=lifespan)
@app.middleware("http")
async def basic_auth(request: Request, call_next):
"""Simple Basic Auth for dashboard. If DASH_USER is unset (e.g. PM2 env not
propagated), fail CLOSED for mutating requests rather than exposing
/api/scan, /api/resolve and /api/kill-switch to anyone."""
if not DASH_USER:
# Read-only requests stay open (expected to sit behind nginx/localhost);
# state-changing methods are denied when no credentials are configured.
if request.method in ("GET", "HEAD", "OPTIONS"):
return await call_next(request)
return Response("Unauthorized (auth not configured)", status_code=401,
headers={"WWW-Authenticate": 'Basic realm="Weather Bot"'})
auth = request.headers.get("Authorization", "")
if auth.startswith("Basic "):
try:
decoded = base64.b64decode(auth[6:]).decode()
user, passwd = decoded.split(":", 1)
if secrets.compare_digest(user, DASH_USER) and secrets.compare_digest(passwd, DASH_PASS):
return await call_next(request)
except Exception:
pass
return Response("Unauthorized", status_code=401, headers={"WWW-Authenticate": 'Basic realm="Weather Bot"'})
# Static files
app.mount("/static", StaticFiles(directory="static"), name="static")
# === Web Dashboard ===
@app.get("/", response_class=HTMLResponse)
async def dashboard():
return FileResponse("static/index.html")
# === API Endpoints ===
@app.get("/api/status")
async def api_status():
"""Bot status + risk info"""
risk_status = risk_mgr.get_status()
# Add overall win rate (live trades only — dry-run excluded from dashboard)
try:
from config import STATS_START_DATE
conn = db.get_conn()
row = conn.execute(
"SELECT COUNT(*) as total, "
"SUM(CASE WHEN outcome='win' THEN 1 ELSE 0 END) as wins, "
"COALESCE(SUM(pnl),0) as pnl "
"FROM trades WHERE outcome IN ('win','loss') AND created_at >= ? AND dry_run = 0",
(STATS_START_DATE,)
).fetchone()
if row and row["total"] > 0:
risk_status["overall_win_rate"] = round(row["wins"] / row["total"] * 100, 1)
risk_status["overall_pnl"] = round(row["pnl"], 2)
risk_status["overall_resolved"] = row["total"]
else:
risk_status["overall_win_rate"] = 0
risk_status["overall_pnl"] = 0
risk_status["overall_resolved"] = 0
except Exception as e:
logger.warning(f"Overall stats query failed: {e}")
# Total real trades (open + resolved, excluding cancelled/unverified/failed)
try:
conn2 = db.get_conn()
total_row = conn2.execute(
"SELECT COUNT(*) as cnt FROM trades WHERE status IN ('filled','win','loss') AND created_at >= ? AND dry_run = 0",
(STATS_START_DATE,)
).fetchone()
risk_status["overall_total_trades"] = total_row["cnt"] if total_row else 0
except Exception:
risk_status["overall_total_trades"] = 0
# Override dashboard-facing fields with live-only counts
live_active = db.get_active_trades(live_only=True)
risk_status["open_positions"] = len(live_active)
# Count all active orders as deployed (pending orders lock exchange balance too)
risk_status["total_deployed"] = round(sum(t["size"] for t in live_active if t["status"] in ("filled", "pending")), 2)
live_today = db.get_today_stats(live_only=True)
risk_status["daily_pnl"] = live_today["pnl"]
risk_status["daily_trades"] = live_today["total_trades"]
risk_status["daily_wins"] = live_today["wins"]
risk_status["daily_losses"] = live_today["losses"]
risk_status["daily_win_rate"] = live_today.get("win_rate", 0)
# Real balances from Polymarket (non-blocking)
balances = await _get_cached_balances()
if balances["exchange"] is not None:
risk_status["bankroll"] = balances["exchange"]
if balances["wallet"] is not None:
risk_status["wallet_balance"] = balances["wallet"]
# Total capital = exchange cash + FILLED positions only. The exchange balance
# already excludes funds locked by pending orders, so adding pending deploy back
# in (as total_deployed does) would double-count it.
exchange = balances["exchange"] or risk_status.get("bankroll", 0)
deployed_filled = round(sum(t["size"] for t in live_active if t["status"] == "filled"), 2)
risk_status["total_capital"] = round(exchange + deployed_filled, 2)
return {
"success": True,
"data": {
"bot_running": bot_running or scheduler.running,
"dry_run": DRY_RUN,
"last_scan": last_scan_result,
"risk": risk_status,
}
}
@app.get("/api/trades")
async def api_trades(limit: int = 50):
"""Recent trades (live only — dry-run hidden from dashboard)"""
trades = db.get_recent_trades(limit, live_only=True)
return {"success": True, "data": trades}
@app.get("/api/trades/active")
async def api_active_trades():
"""Active (open) trades (live only)"""
trades = db.get_active_trades(live_only=True)
return {"success": True, "data": trades}
@app.get("/api/markets")
async def api_markets():
"""Scanned weather markets"""
try:
conn = db.get_conn()
rows = conn.execute(
"SELECT * FROM markets WHERE status = 'active' ORDER BY updated_at DESC LIMIT 100"
).fetchall()
return {"success": True, "data": [dict(r) for r in rows]}
except Exception as e:
logger.error(f"api_markets error: {e}")
return {"success": False, "error": str(e)}
@app.get("/api/stats/today")
async def api_today_stats():
"""Today's stats (live only)"""
stats = db.get_today_stats(live_only=True)
return {"success": True, "data": stats}
@app.get("/api/stats/history")
async def api_stats_history(days: int = 30):
"""Historical daily stats (post-calibration only)"""
from config import STATS_START_DATE
try:
conn = db.get_conn()
rows = conn.execute(
"SELECT * FROM daily_stats WHERE date >= date(?) ORDER BY date DESC LIMIT ?",
(STATS_START_DATE, days)
).fetchall()
return {"success": True, "data": [dict(r) for r in rows]}
except Exception as e:
logger.error(f"api_stats_history error: {e}")
return {"success": False, "error": str(e)}
@app.post("/api/scan")
async def api_trigger_scan():
"""Manually trigger a scan"""
_safe_create_task(run_scan_cycle(), name="manual-scan")
return {"success": True, "message": "Scan triggered"}
@app.post("/api/resolve")
async def api_trigger_resolve():
"""Manually trigger resolution of past trades"""
_safe_create_task(run_resolve_cycle(), name="manual-resolve")
return {"success": True, "message": "Resolution triggered"}
@app.get("/api/stats/resolution")
async def api_resolution_stats():
"""Resolution stats: win rate, P&L by tier and city"""
stats = get_resolution_stats()
return {"success": True, "data": stats}
@app.get("/api/calibration")
async def api_calibration():
"""City calibration: Brier Score, MAE, confidence multiplier"""
cals = db.get_all_city_calibrations(min_samples=3)
return {"success": True, "data": cals}
@app.post("/api/kill-switch/activate")
async def api_kill_switch_on():
risk_mgr.activate_kill_switch("Manual activation from dashboard")
return {"success": True, "message": "Kill switch activated"}
@app.post("/api/kill-switch/deactivate")
async def api_kill_switch_off():
risk_mgr.deactivate_kill_switch()
return {"success": True, "message": "Kill switch deactivated"}
# === Run ===
if __name__ == "__main__":
import uvicorn
uvicorn.run("bot:app", host=HOST, port=PORT, reload=False)
📜 Git History
058de34fix(audit): chunk 4 - minor robustness, display, calibration5 weeks ago
3de9313fix(audit): chunk 3 - executor robustness, scheduler, redeem, edge logic5 weeks ago
ddaa0a2fix(audit): chunk 2 - kill switch, auth, config safety5 weeks ago
8fca132chore: initial commit — version control setup5 weeks ago
Show last diff
Loading...