โ† ะะฐะทะฐะด
""" Polymarket Weather Bot โ€” Main entry point FastAPI server + APScheduler background jobs """ import asyncio from contextlib import asynccontextmanager from datetime import datetime, timezone from fastapi import FastAPI, Request from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse, JSONResponse, FileResponse from apscheduler.schedulers.asyncio import AsyncIOScheduler from loguru import logger from config import PORT, HOST, DRY_RUN, BET_SIZE, SCAN_INTERVAL_SEC, RESOLVE_CHECK_SEC, LOG_LEVEL import db def _safe_create_task(coro, name: str = None): """Create asyncio task with error logging (prevents silent failures)""" task = asyncio.create_task(coro, name=name) def _on_done(t): if t.cancelled(): return exc = t.exception() if exc: logger.error(f"Background task '{name or 'unnamed'}' failed: {exc}", exc_info=exc) task.add_done_callback(_on_done) return task from scanner import scan_weather_markets from config import CITY_COORDS from forecaster import get_forecast_and_probability, calculate_probability, calculate_probability_ensemble from analyzer import analyze_opportunity from executor import place_bet, get_balance from risk import RiskManager from resolver import resolve_trades, get_resolution_stats, cancel_stale_orders # === Setup logging === logger.remove() # Remove default stderr handler (prevents duplicate lines in PM2) logger.add("logs/bot.log", rotation="10 MB", retention="30 days", level=LOG_LEVEL) logger.add(lambda msg: print(msg, end=""), level=LOG_LEVEL, colorize=True) # Single stdout handler for PM2 # === Init DB early (before RiskManager reads state) === db.init_db() # === Global state === risk_mgr = RiskManager() scheduler = AsyncIOScheduler() bot_running = False last_scan_result = {"markets": 0, "opportunities": 0, "trades": 0, "at": None} _scan_lock = asyncio.Lock() # === Helpers === def _is_past_peak_time(event: dict, bracket: dict) -> bool: """ Check if same-day market's peak temperature time has already passed. High temp peaks ~14:00 local, low temp bottoms ~06:00 local. After peak, market is essentially resolved โ†’ skip. """ try: from zoneinfo import ZoneInfo market_date = bracket.get("date") if not market_date: return False today_utc = datetime.now(timezone.utc).strftime("%Y-%m-%d") if market_date != today_utc: return False # Not same-day โ€” OK to trade # Get local timezone for this city city_slug = event.get("city_slug", "") city_data = CITY_COORDS.get(city_slug, {}) tz_name = city_data.get("tz") if not tz_name: return False tz = ZoneInfo(tz_name) local_now = datetime.now(tz) metric = bracket.get("metric", "high_temp") # Cutoff hours: high_temp after 14:00, low_temp after 08:00 cutoff_hour = 14 if metric == "high_temp" else 8 if local_now.hour >= cutoff_hour: logger.debug( f"Skip same-day {bracket.get('city')} {metric}: " f"local {local_now.strftime('%H:%M')} past cutoff {cutoff_hour}:00" ) return True return False except Exception: return False # === Core bot logic === async def run_scan_cycle(): """Main scan cycle: scan โ†’ forecast โ†’ analyze โ†’ execute""" if _scan_lock.locked(): logger.debug("Scan already running, skipping") return async with _scan_lock: await _run_scan_inner() async def _run_scan_inner(): global last_scan_result logger.info("--- Scan cycle started ---") scan_result = {"markets": 0, "opportunities": 0, "trades": 0, "at": datetime.now(timezone.utc).isoformat()} try: # 1. Scan weather markets (run in thread to avoid blocking event loop) markets = await asyncio.to_thread(scan_weather_markets) scan_result["markets"] = len(markets) if not markets: logger.info("No weather markets found") last_scan_result = scan_result return # 2. Check balance before attempting any trades try: balance = await asyncio.to_thread(get_balance) except Exception: balance = None bet_size = BET_SIZE if balance is not None and balance < bet_size: logger.info(f"Balance too low: ${balance:.2f} < ${bet_size:.2f} โ€” skipping trades") last_scan_result = scan_result return if balance is not None: logger.info(f"Balance: ${balance:.2f}") # 3. For each event โ†’ for each bracket market: forecast + analyze + maybe execute # Cache forecasts per city+date+metric to avoid duplicate Open-Meteo calls forecast_cache = {} balance_exhausted = False # Stop trying after first "not enough balance" for event in markets: brackets = event.get("brackets", []) if not brackets: continue for bracket in brackets: try: # Skip if no city/date/threshold parsed if not bracket.get("city") or not bracket.get("date") or bracket.get("threshold") is None: continue # === SAME-DAY CUTOFF: skip if temperature peak already passed === if _is_past_peak_time(event, bracket): continue # Get forecast (cached per city+date+metric โ€” ONE API call per city+date) cache_key = f"{bracket['city']}|{bracket['date']}|{bracket.get('metric', 'high_temp')}" if cache_key not in forecast_cache: forecast = await asyncio.to_thread(get_forecast_and_probability, bracket) forecast_cache[cache_key] = forecast else: # Reuse cached forecast, only recalculate probability for this bracket's threshold cached = forecast_cache[cache_key] if cached is None: forecast = None else: threshold = bracket["threshold"] if bracket.get("threshold_unit") == "C": threshold = threshold * 9/5 + 32 threshold_low = bracket.get("threshold_low") threshold_high = bracket.get("threshold_high") if bracket.get("threshold_unit") == "C": if threshold_low is not None: threshold_low = threshold_low * 9/5 + 32 if threshold_high is not None: threshold_high = threshold_high * 9/5 + 32 # Use ensemble if available, else Gaussian member_values = cached.get("member_values") if member_values and len(member_values) >= 10: model_prob = calculate_probability_ensemble( member_values=member_values, threshold=threshold, operator=bracket.get("operator", "gte"), threshold_low=threshold_low, threshold_high=threshold_high, ) prob_method = "ensemble" else: model_prob = calculate_probability( forecast_value=cached["forecast_value"], threshold=threshold, operator=bracket.get("operator", "gte"), sigma=cached["sigma"], threshold_low=threshold_low, threshold_high=threshold_high, ) prob_method = "gaussian" forecast = { **cached, "market_id": bracket["id"], "model_probability": model_prob, "prob_method": prob_method, } if not forecast: continue # Analyze edge signal = analyze_opportunity(bracket, forecast) if not signal: continue scan_result["opportunities"] += 1 # Skip all trades if balance already exhausted if balance_exhausted: continue # Risk check allowed, reason = risk_mgr.can_trade(signal) if not allowed: logger.info(f"Risk blocked: {reason}") continue # Execute (in thread โ€” CLOB client is blocking) result = await asyncio.to_thread(place_bet, signal, DRY_RUN) if result["success"]: scan_result["trades"] += 1 elif "not enough balance" in result.get("message", ""): logger.info("Balance exhausted โ€” stopping trades for this scan") balance_exhausted = True except Exception as e: logger.error(f"Error processing bracket {bracket.get('id', '?')[:8]}: {e}") except Exception as e: logger.error(f"Scan cycle error: {e}") last_scan_result = scan_result logger.info( f"--- Scan done: {scan_result['markets']} markets, " f"{scan_result['opportunities']} opps, {scan_result['trades']} trades ---" ) async def run_resolve_cycle(): """Check and resolve trades where market date has passed""" try: result = await asyncio.to_thread(resolve_trades) if result["resolved"] > 0: logger.info(f"--- Resolved: {result['resolved']} trades, W:{result['wins']} L:{result['losses']}, P&L: ${result['pnl']:+.2f} ---") except Exception as e: logger.error(f"Resolve cycle error: {e}") # === FastAPI App === @asynccontextmanager async def lifespan(app: FastAPI): """Startup and shutdown""" logger.info(f"Weather Bot starting (DRY_RUN={DRY_RUN})") # Start scheduler scheduler.add_job(run_scan_cycle, "interval", seconds=SCAN_INTERVAL_SEC, id="scan") scheduler.add_job(run_resolve_cycle, "interval", seconds=RESOLVE_CHECK_SEC, id="resolve") scheduler.start() logger.info(f"Scheduler started: scan every {SCAN_INTERVAL_SEC}s, resolve every {RESOLVE_CHECK_SEC}s") # Startup cleanup: cancel stale unverified orders to free locked capital try: cancelled = await asyncio.to_thread(cancel_stale_orders) if cancelled > 0: logger.info(f"Startup cleanup: cancelled {cancelled} stale orders") except Exception as e: logger.error(f"Startup cleanup error: {e}") # Run initial scan + resolve _safe_create_task(run_scan_cycle(), name="initial-scan") _safe_create_task(run_resolve_cycle(), name="initial-resolve") yield scheduler.shutdown() logger.info("Weather Bot stopped") app = FastAPI(title="Polymarket Weather Bot", lifespan=lifespan) # Static files app.mount("/static", StaticFiles(directory="static"), name="static") # === Web Dashboard === @app.get("/", response_class=HTMLResponse) async def dashboard(): return FileResponse("static/index.html") # === API Endpoints === @app.get("/api/status") async def api_status(): """Bot status + risk info""" risk_status = risk_mgr.get_status() # Add overall win rate (not just today's) try: conn = db.get_conn() row = conn.execute( "SELECT COUNT(*) as total, " "SUM(CASE WHEN outcome='win' THEN 1 ELSE 0 END) as wins, " "COALESCE(SUM(pnl),0) as pnl " "FROM trades WHERE outcome IS NOT NULL" ).fetchone() if row and row["total"] > 0: risk_status["overall_win_rate"] = round(row["wins"] / row["total"] * 100, 1) risk_status["overall_pnl"] = round(row["pnl"], 2) risk_status["overall_resolved"] = row["total"] else: risk_status["overall_win_rate"] = 0 risk_status["overall_pnl"] = 0 risk_status["overall_resolved"] = 0 except Exception: pass return { "success": True, "data": { "bot_running": bot_running or scheduler.running, "dry_run": DRY_RUN, "last_scan": last_scan_result, "risk": risk_status, } } @app.get("/api/trades") async def api_trades(limit: int = 50): """Recent trades""" trades = db.get_recent_trades(limit) return {"success": True, "data": trades} @app.get("/api/trades/active") async def api_active_trades(): """Active (open) trades""" trades = db.get_active_trades() return {"success": True, "data": trades} @app.get("/api/markets") async def api_markets(): """Scanned weather markets""" try: conn = db.get_conn() rows = conn.execute( "SELECT * FROM markets WHERE status = 'active' ORDER BY updated_at DESC LIMIT 100" ).fetchall() return {"success": True, "data": [dict(r) for r in rows]} except Exception as e: logger.error(f"api_markets error: {e}") return {"success": False, "error": str(e)} @app.get("/api/stats/today") async def api_today_stats(): """Today's stats""" stats = db.get_today_stats() return {"success": True, "data": stats} @app.get("/api/stats/history") async def api_stats_history(days: int = 30): """Historical daily stats""" try: conn = db.get_conn() rows = conn.execute( "SELECT * FROM daily_stats ORDER BY date DESC LIMIT ?", (days,) ).fetchall() return {"success": True, "data": [dict(r) for r in rows]} except Exception as e: logger.error(f"api_stats_history error: {e}") return {"success": False, "error": str(e)} @app.post("/api/scan") async def api_trigger_scan(): """Manually trigger a scan""" _safe_create_task(run_scan_cycle(), name="manual-scan") return {"success": True, "message": "Scan triggered"} @app.post("/api/resolve") async def api_trigger_resolve(): """Manually trigger resolution of past trades""" _safe_create_task(run_resolve_cycle(), name="manual-resolve") return {"success": True, "message": "Resolution triggered"} @app.get("/api/stats/resolution") async def api_resolution_stats(): """Resolution stats: win rate, P&L by tier and city""" stats = get_resolution_stats() return {"success": True, "data": stats} @app.get("/api/calibration") async def api_calibration(): """City calibration: Brier Score, MAE, confidence multiplier""" cals = db.get_all_city_calibrations(min_samples=3) return {"success": True, "data": cals} @app.post("/api/kill-switch/activate") async def api_kill_switch_on(): risk_mgr.activate_kill_switch("Manual activation from dashboard") return {"success": True, "message": "Kill switch activated"} @app.post("/api/kill-switch/deactivate") async def api_kill_switch_off(): risk_mgr.deactivate_kill_switch() return {"success": True, "message": "Kill switch deactivated"} # === Run === if __name__ == "__main__": import uvicorn uvicorn.run("bot:app", host=HOST, port=PORT, reload=False)