โ ะะฐะทะฐะด"""
Polymarket Weather Bot โ Main entry point
FastAPI server + APScheduler background jobs
"""
import asyncio
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from fastapi import FastAPI, Request
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from loguru import logger
from config import PORT, HOST, DRY_RUN, BET_SIZE, SCAN_INTERVAL_SEC, RESOLVE_CHECK_SEC, LOG_LEVEL
import db
def _safe_create_task(coro, name: str = None):
"""Create asyncio task with error logging (prevents silent failures)"""
task = asyncio.create_task(coro, name=name)
def _on_done(t):
if t.cancelled():
return
exc = t.exception()
if exc:
logger.error(f"Background task '{name or 'unnamed'}' failed: {exc}", exc_info=exc)
task.add_done_callback(_on_done)
return task
from scanner import scan_weather_markets
from config import CITY_COORDS
from forecaster import get_forecast_and_probability, calculate_probability, calculate_probability_ensemble
from analyzer import analyze_opportunity
from executor import place_bet, get_balance
from risk import RiskManager
from resolver import resolve_trades, get_resolution_stats, cancel_stale_orders
# === Setup logging ===
logger.remove() # Remove default stderr handler (prevents duplicate lines in PM2)
logger.add("logs/bot.log", rotation="10 MB", retention="30 days", level=LOG_LEVEL)
logger.add(lambda msg: print(msg, end=""), level=LOG_LEVEL, colorize=True) # Single stdout handler for PM2
# === Init DB early (before RiskManager reads state) ===
db.init_db()
# === Global state ===
risk_mgr = RiskManager()
scheduler = AsyncIOScheduler()
bot_running = False
last_scan_result = {"markets": 0, "opportunities": 0, "trades": 0, "at": None}
_scan_lock = asyncio.Lock()
# === Helpers ===
def _is_past_peak_time(event: dict, bracket: dict) -> bool:
"""
Check if same-day market's peak temperature time has already passed.
High temp peaks ~14:00 local, low temp bottoms ~06:00 local.
After peak, market is essentially resolved โ skip.
"""
try:
from zoneinfo import ZoneInfo
market_date = bracket.get("date")
if not market_date:
return False
today_utc = datetime.now(timezone.utc).strftime("%Y-%m-%d")
if market_date != today_utc:
return False # Not same-day โ OK to trade
# Get local timezone for this city
city_slug = event.get("city_slug", "")
city_data = CITY_COORDS.get(city_slug, {})
tz_name = city_data.get("tz")
if not tz_name:
return False
tz = ZoneInfo(tz_name)
local_now = datetime.now(tz)
metric = bracket.get("metric", "high_temp")
# Cutoff hours: high_temp after 14:00, low_temp after 08:00
cutoff_hour = 14 if metric == "high_temp" else 8
if local_now.hour >= cutoff_hour:
logger.debug(
f"Skip same-day {bracket.get('city')} {metric}: "
f"local {local_now.strftime('%H:%M')} past cutoff {cutoff_hour}:00"
)
return True
return False
except Exception:
return False
# === Core bot logic ===
async def run_scan_cycle():
"""Main scan cycle: scan โ forecast โ analyze โ execute"""
if _scan_lock.locked():
logger.debug("Scan already running, skipping")
return
async with _scan_lock:
await _run_scan_inner()
async def _run_scan_inner():
global last_scan_result
logger.info("--- Scan cycle started ---")
scan_result = {"markets": 0, "opportunities": 0, "trades": 0, "at": datetime.now(timezone.utc).isoformat()}
try:
# 1. Scan weather markets (run in thread to avoid blocking event loop)
markets = await asyncio.to_thread(scan_weather_markets)
scan_result["markets"] = len(markets)
if not markets:
logger.info("No weather markets found")
last_scan_result = scan_result
return
# 2. Check balance before attempting any trades
try:
balance = await asyncio.to_thread(get_balance)
except Exception:
balance = None
bet_size = BET_SIZE
if balance is not None and balance < bet_size:
logger.info(f"Balance too low: ${balance:.2f} < ${bet_size:.2f} โ skipping trades")
last_scan_result = scan_result
return
if balance is not None:
logger.info(f"Balance: ${balance:.2f}")
# 3. For each event โ for each bracket market: forecast + analyze + maybe execute
# Cache forecasts per city+date+metric to avoid duplicate Open-Meteo calls
forecast_cache = {}
balance_exhausted = False # Stop trying after first "not enough balance"
for event in markets:
brackets = event.get("brackets", [])
if not brackets:
continue
for bracket in brackets:
try:
# Skip if no city/date/threshold parsed
if not bracket.get("city") or not bracket.get("date") or bracket.get("threshold") is None:
continue
# === SAME-DAY CUTOFF: skip if temperature peak already passed ===
if _is_past_peak_time(event, bracket):
continue
# Get forecast (cached per city+date+metric โ ONE API call per city+date)
cache_key = f"{bracket['city']}|{bracket['date']}|{bracket.get('metric', 'high_temp')}"
if cache_key not in forecast_cache:
forecast = await asyncio.to_thread(get_forecast_and_probability, bracket)
forecast_cache[cache_key] = forecast
else:
# Reuse cached forecast, only recalculate probability for this bracket's threshold
cached = forecast_cache[cache_key]
if cached is None:
forecast = None
else:
threshold = bracket["threshold"]
if bracket.get("threshold_unit") == "C":
threshold = threshold * 9/5 + 32
threshold_low = bracket.get("threshold_low")
threshold_high = bracket.get("threshold_high")
if bracket.get("threshold_unit") == "C":
if threshold_low is not None:
threshold_low = threshold_low * 9/5 + 32
if threshold_high is not None:
threshold_high = threshold_high * 9/5 + 32
# Use ensemble if available, else Gaussian
member_values = cached.get("member_values")
if member_values and len(member_values) >= 10:
model_prob = calculate_probability_ensemble(
member_values=member_values,
threshold=threshold,
operator=bracket.get("operator", "gte"),
threshold_low=threshold_low,
threshold_high=threshold_high,
)
prob_method = "ensemble"
else:
model_prob = calculate_probability(
forecast_value=cached["forecast_value"],
threshold=threshold,
operator=bracket.get("operator", "gte"),
sigma=cached["sigma"],
threshold_low=threshold_low,
threshold_high=threshold_high,
)
prob_method = "gaussian"
forecast = {
**cached,
"market_id": bracket["id"],
"model_probability": model_prob,
"prob_method": prob_method,
}
if not forecast:
continue
# Analyze edge
signal = analyze_opportunity(bracket, forecast)
if not signal:
continue
scan_result["opportunities"] += 1
# Skip all trades if balance already exhausted
if balance_exhausted:
continue
# Risk check
allowed, reason = risk_mgr.can_trade(signal)
if not allowed:
logger.info(f"Risk blocked: {reason}")
continue
# Execute (in thread โ CLOB client is blocking)
result = await asyncio.to_thread(place_bet, signal, DRY_RUN)
if result["success"]:
scan_result["trades"] += 1
elif "not enough balance" in result.get("message", ""):
logger.info("Balance exhausted โ stopping trades for this scan")
balance_exhausted = True
except Exception as e:
logger.error(f"Error processing bracket {bracket.get('id', '?')[:8]}: {e}")
except Exception as e:
logger.error(f"Scan cycle error: {e}")
last_scan_result = scan_result
logger.info(
f"--- Scan done: {scan_result['markets']} markets, "
f"{scan_result['opportunities']} opps, {scan_result['trades']} trades ---"
)
async def run_resolve_cycle():
"""Check and resolve trades where market date has passed"""
try:
result = await asyncio.to_thread(resolve_trades)
if result["resolved"] > 0:
logger.info(f"--- Resolved: {result['resolved']} trades, W:{result['wins']} L:{result['losses']}, P&L: ${result['pnl']:+.2f} ---")
except Exception as e:
logger.error(f"Resolve cycle error: {e}")
# === FastAPI App ===
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Startup and shutdown"""
logger.info(f"Weather Bot starting (DRY_RUN={DRY_RUN})")
# Start scheduler
scheduler.add_job(run_scan_cycle, "interval", seconds=SCAN_INTERVAL_SEC, id="scan")
scheduler.add_job(run_resolve_cycle, "interval", seconds=RESOLVE_CHECK_SEC, id="resolve")
scheduler.start()
logger.info(f"Scheduler started: scan every {SCAN_INTERVAL_SEC}s, resolve every {RESOLVE_CHECK_SEC}s")
# Startup cleanup: cancel stale unverified orders to free locked capital
try:
cancelled = await asyncio.to_thread(cancel_stale_orders)
if cancelled > 0:
logger.info(f"Startup cleanup: cancelled {cancelled} stale orders")
except Exception as e:
logger.error(f"Startup cleanup error: {e}")
# Run initial scan + resolve
_safe_create_task(run_scan_cycle(), name="initial-scan")
_safe_create_task(run_resolve_cycle(), name="initial-resolve")
yield
scheduler.shutdown()
logger.info("Weather Bot stopped")
app = FastAPI(title="Polymarket Weather Bot", lifespan=lifespan)
# Static files
app.mount("/static", StaticFiles(directory="static"), name="static")
# === Web Dashboard ===
@app.get("/", response_class=HTMLResponse)
async def dashboard():
return FileResponse("static/index.html")
# === API Endpoints ===
@app.get("/api/status")
async def api_status():
"""Bot status + risk info"""
risk_status = risk_mgr.get_status()
# Add overall win rate (not just today's)
try:
conn = db.get_conn()
row = conn.execute(
"SELECT COUNT(*) as total, "
"SUM(CASE WHEN outcome='win' THEN 1 ELSE 0 END) as wins, "
"COALESCE(SUM(pnl),0) as pnl "
"FROM trades WHERE outcome IS NOT NULL"
).fetchone()
if row and row["total"] > 0:
risk_status["overall_win_rate"] = round(row["wins"] / row["total"] * 100, 1)
risk_status["overall_pnl"] = round(row["pnl"], 2)
risk_status["overall_resolved"] = row["total"]
else:
risk_status["overall_win_rate"] = 0
risk_status["overall_pnl"] = 0
risk_status["overall_resolved"] = 0
except Exception:
pass
return {
"success": True,
"data": {
"bot_running": bot_running or scheduler.running,
"dry_run": DRY_RUN,
"last_scan": last_scan_result,
"risk": risk_status,
}
}
@app.get("/api/trades")
async def api_trades(limit: int = 50):
"""Recent trades"""
trades = db.get_recent_trades(limit)
return {"success": True, "data": trades}
@app.get("/api/trades/active")
async def api_active_trades():
"""Active (open) trades"""
trades = db.get_active_trades()
return {"success": True, "data": trades}
@app.get("/api/markets")
async def api_markets():
"""Scanned weather markets"""
try:
conn = db.get_conn()
rows = conn.execute(
"SELECT * FROM markets WHERE status = 'active' ORDER BY updated_at DESC LIMIT 100"
).fetchall()
return {"success": True, "data": [dict(r) for r in rows]}
except Exception as e:
logger.error(f"api_markets error: {e}")
return {"success": False, "error": str(e)}
@app.get("/api/stats/today")
async def api_today_stats():
"""Today's stats"""
stats = db.get_today_stats()
return {"success": True, "data": stats}
@app.get("/api/stats/history")
async def api_stats_history(days: int = 30):
"""Historical daily stats"""
try:
conn = db.get_conn()
rows = conn.execute(
"SELECT * FROM daily_stats ORDER BY date DESC LIMIT ?", (days,)
).fetchall()
return {"success": True, "data": [dict(r) for r in rows]}
except Exception as e:
logger.error(f"api_stats_history error: {e}")
return {"success": False, "error": str(e)}
@app.post("/api/scan")
async def api_trigger_scan():
"""Manually trigger a scan"""
_safe_create_task(run_scan_cycle(), name="manual-scan")
return {"success": True, "message": "Scan triggered"}
@app.post("/api/resolve")
async def api_trigger_resolve():
"""Manually trigger resolution of past trades"""
_safe_create_task(run_resolve_cycle(), name="manual-resolve")
return {"success": True, "message": "Resolution triggered"}
@app.get("/api/stats/resolution")
async def api_resolution_stats():
"""Resolution stats: win rate, P&L by tier and city"""
stats = get_resolution_stats()
return {"success": True, "data": stats}
@app.get("/api/calibration")
async def api_calibration():
"""City calibration: Brier Score, MAE, confidence multiplier"""
cals = db.get_all_city_calibrations(min_samples=3)
return {"success": True, "data": cals}
@app.post("/api/kill-switch/activate")
async def api_kill_switch_on():
risk_mgr.activate_kill_switch("Manual activation from dashboard")
return {"success": True, "message": "Kill switch activated"}
@app.post("/api/kill-switch/deactivate")
async def api_kill_switch_off():
risk_mgr.deactivate_kill_switch()
return {"success": True, "message": "Kill switch deactivated"}
# === Run ===
if __name__ == "__main__":
import uvicorn
uvicorn.run("bot:app", host=HOST, port=PORT, reload=False)