← Назад
"""Risk manager — enforces limits, kill switch, cooldowns""" from datetime import datetime, timedelta, timezone from loguru import logger from config import ( MAX_DAILY_LOSS, MAX_OPEN_POSITIONS, MAX_PER_MARKET, COOLDOWN_SAME_MARKET, BANKROLL, ) import db class RiskManager: def __init__(self): self.kill_switch_active = False self._load_state() def _load_state(self): """Load kill switch state from DB""" state = db.get_bot_state("kill_switch") if state == "true": self.kill_switch_active = True logger.warning("Kill switch is ACTIVE from previous session") def can_trade(self, signal: dict) -> tuple[bool, str]: """ Check all risk rules. Returns (allowed, reason). """ # Kill switch if self.kill_switch_active: return False, "Kill switch active" # Daily loss check today = db.get_today_stats() if today["pnl"] <= -MAX_DAILY_LOSS: self.activate_kill_switch(f"Daily loss ${today['pnl']:.2f} exceeds -${MAX_DAILY_LOSS}") return False, f"Daily loss limit hit: ${today['pnl']:.2f}" # Max open positions active = db.get_active_trades() if len(active) >= MAX_OPEN_POSITIONS: return False, f"Max open positions ({MAX_OPEN_POSITIONS}) reached" # Cooldown on same market (check active AND recently resolved trades) market_id = signal.get("market_id") now_utc = datetime.now(timezone.utc) for trade in active: if trade["market_id"] == market_id: created = datetime.fromisoformat(trade["created_at"]).replace(tzinfo=timezone.utc) if now_utc - created < timedelta(seconds=COOLDOWN_SAME_MARKET): return False, f"Cooldown on market {market_id[:8]} (1h)" # Also check recently resolved trades for same market recent = db.get_recent_trades(limit=100) for trade in recent: if trade["market_id"] == market_id and trade["outcome"] in ("win", "loss"): created = datetime.fromisoformat(trade["created_at"]).replace(tzinfo=timezone.utc) if now_utc - created < timedelta(seconds=COOLDOWN_SAME_MARKET): return False, f"Cooldown on resolved market {market_id[:8]} (1h)" # Max $ deployed on same market market_deployed = sum( t["size"] for t in active if t["market_id"] == market_id ) if market_deployed + signal.get("size", 1) > MAX_PER_MARKET: return False, f"Max per market ${MAX_PER_MARKET} reached (deployed: ${market_deployed})" # Total deployed check total_deployed = sum(t["size"] for t in active) max_deployed = BANKROLL * 0.8 # max 80% of bankroll deployed if total_deployed + signal.get("size", 1) > max_deployed: return False, f"Max total deployed ${max_deployed:.0f} reached" return True, "OK" def activate_kill_switch(self, reason: str): """Activate kill switch — stops all new bets""" self.kill_switch_active = True db.set_bot_state("kill_switch", "true") db.set_bot_state("kill_switch_reason", reason) db.set_bot_state("kill_switch_at", datetime.now(timezone.utc).isoformat()) logger.warning(f"🚨 KILL SWITCH ACTIVATED: {reason}") def deactivate_kill_switch(self): """Manually deactivate kill switch""" self.kill_switch_active = False db.set_bot_state("kill_switch", "false") logger.info("✅ Kill switch deactivated") def get_status(self) -> dict: """Get current risk status for dashboard""" today = db.get_today_stats() active = db.get_active_trades() total_deployed = sum(t["size"] for t in active) return { "kill_switch": self.kill_switch_active, "kill_switch_reason": db.get_bot_state("kill_switch_reason", ""), "daily_pnl": today["pnl"], "daily_trades": today["total_trades"], "daily_wins": today["wins"], "daily_losses": today["losses"], "daily_win_rate": today.get("win_rate", 0), "open_positions": len(active), "total_deployed": round(total_deployed, 2), "bankroll": BANKROLL, "max_daily_loss": MAX_DAILY_LOSS, "max_positions": MAX_OPEN_POSITIONS, }