← Назад"""Risk manager — enforces limits, kill switch, cooldowns"""
from datetime import datetime, timedelta, timezone
from loguru import logger
from config import (
MAX_DAILY_LOSS, MAX_OPEN_POSITIONS, MAX_PER_MARKET,
COOLDOWN_SAME_MARKET, BANKROLL,
)
import db
class RiskManager:
def __init__(self):
self.kill_switch_active = False
self._load_state()
def _load_state(self):
"""Load kill switch state from DB"""
state = db.get_bot_state("kill_switch")
if state == "true":
self.kill_switch_active = True
logger.warning("Kill switch is ACTIVE from previous session")
def can_trade(self, signal: dict) -> tuple[bool, str]:
"""
Check all risk rules. Returns (allowed, reason).
"""
# Kill switch
if self.kill_switch_active:
return False, "Kill switch active"
# Daily loss check
today = db.get_today_stats()
if today["pnl"] <= -MAX_DAILY_LOSS:
self.activate_kill_switch(f"Daily loss ${today['pnl']:.2f} exceeds -${MAX_DAILY_LOSS}")
return False, f"Daily loss limit hit: ${today['pnl']:.2f}"
# Max open positions
active = db.get_active_trades()
if len(active) >= MAX_OPEN_POSITIONS:
return False, f"Max open positions ({MAX_OPEN_POSITIONS}) reached"
# Cooldown on same market (check active AND recently resolved trades)
market_id = signal.get("market_id")
now_utc = datetime.now(timezone.utc)
for trade in active:
if trade["market_id"] == market_id:
created = datetime.fromisoformat(trade["created_at"]).replace(tzinfo=timezone.utc)
if now_utc - created < timedelta(seconds=COOLDOWN_SAME_MARKET):
return False, f"Cooldown on market {market_id[:8]} (1h)"
# Also check recently resolved trades for same market
recent = db.get_recent_trades(limit=100)
for trade in recent:
if trade["market_id"] == market_id and trade["outcome"] in ("win", "loss"):
created = datetime.fromisoformat(trade["created_at"]).replace(tzinfo=timezone.utc)
if now_utc - created < timedelta(seconds=COOLDOWN_SAME_MARKET):
return False, f"Cooldown on resolved market {market_id[:8]} (1h)"
# Max $ deployed on same market
market_deployed = sum(
t["size"] for t in active if t["market_id"] == market_id
)
if market_deployed + signal.get("size", 1) > MAX_PER_MARKET:
return False, f"Max per market ${MAX_PER_MARKET} reached (deployed: ${market_deployed})"
# Total deployed check
total_deployed = sum(t["size"] for t in active)
max_deployed = BANKROLL * 0.8 # max 80% of bankroll deployed
if total_deployed + signal.get("size", 1) > max_deployed:
return False, f"Max total deployed ${max_deployed:.0f} reached"
return True, "OK"
def activate_kill_switch(self, reason: str):
"""Activate kill switch — stops all new bets"""
self.kill_switch_active = True
db.set_bot_state("kill_switch", "true")
db.set_bot_state("kill_switch_reason", reason)
db.set_bot_state("kill_switch_at", datetime.now(timezone.utc).isoformat())
logger.warning(f"🚨 KILL SWITCH ACTIVATED: {reason}")
def deactivate_kill_switch(self):
"""Manually deactivate kill switch"""
self.kill_switch_active = False
db.set_bot_state("kill_switch", "false")
logger.info("✅ Kill switch deactivated")
def get_status(self) -> dict:
"""Get current risk status for dashboard"""
today = db.get_today_stats()
active = db.get_active_trades()
total_deployed = sum(t["size"] for t in active)
return {
"kill_switch": self.kill_switch_active,
"kill_switch_reason": db.get_bot_state("kill_switch_reason", ""),
"daily_pnl": today["pnl"],
"daily_trades": today["total_trades"],
"daily_wins": today["wins"],
"daily_losses": today["losses"],
"daily_win_rate": today.get("win_rate", 0),
"open_positions": len(active),
"total_deployed": round(total_deployed, 2),
"bankroll": BANKROLL,
"max_daily_loss": MAX_DAILY_LOSS,
"max_positions": MAX_OPEN_POSITIONS,
}