← Back
"""Risk manager — enforces limits, kill switch, cooldowns"""
from datetime import datetime, timedelta, timezone
from loguru import logger
from config import (
    MAX_DAILY_LOSS, MAX_OPEN_POSITIONS, MAX_PER_MARKET,
    COOLDOWN_SAME_MARKET, BANKROLL, DRY_RUN,
)
import db


class RiskManager:
    def __init__(self):
        self.kill_switch_active = False
        self._load_state()

    def _load_state(self):
        """Load kill switch state from DB"""
        state = db.get_bot_state("kill_switch")
        if state == "true":
            self.kill_switch_active = True
            logger.warning("Kill switch is ACTIVE from previous session")

    def can_trade(self, signal: dict) -> tuple[bool, str]:
        """
        Check all risk rules. Returns (allowed, reason).
        """
        # Kill switch
        if self.kill_switch_active:
            return False, "Kill switch active"

        # Daily loss check (live_only in live mode — dry-run losses must not trigger kill switch)
        today = db.get_today_stats(live_only=not DRY_RUN)
        if today["pnl"] <= -MAX_DAILY_LOSS:
            self.activate_kill_switch(f"Daily loss ${today['pnl']:.2f} exceeds -${MAX_DAILY_LOSS}")
            return False, f"Daily loss limit hit: ${today['pnl']:.2f}"

        # Max open positions (live mode: ignore old dry-run trades)
        active = db.get_active_trades(live_only=not DRY_RUN)
        if len(active) >= MAX_OPEN_POSITIONS:
            return False, f"Max open positions ({MAX_OPEN_POSITIONS}) reached"

        # Cooldown on same market (check active AND recently resolved trades)
        market_id = signal.get("market_id")
        now_utc = datetime.now(timezone.utc)
        for trade in active:
            if trade["market_id"] == market_id:
                created = datetime.fromisoformat(trade["created_at"]).replace(tzinfo=timezone.utc)
                if now_utc - created < timedelta(seconds=COOLDOWN_SAME_MARKET):
                    return False, f"Cooldown on market {market_id[:8]} (1h)"

        # Also check recently resolved trades for same market
        recent = db.get_recent_trades(limit=100, live_only=not DRY_RUN)
        for trade in recent:
            if trade["market_id"] == market_id and trade["outcome"] in ("win", "loss"):
                created = datetime.fromisoformat(trade["created_at"]).replace(tzinfo=timezone.utc)
                if now_utc - created < timedelta(seconds=COOLDOWN_SAME_MARKET):
                    return False, f"Cooldown on resolved market {market_id[:8]} (1h)"

        # Max $ deployed on same market
        market_deployed = sum(
            t["size"] for t in active if t["market_id"] == market_id
        )
        if market_deployed + signal.get("size", 1) > MAX_PER_MARKET:
            return False, f"Max per market ${MAX_PER_MARKET} reached (deployed: ${market_deployed})"

        # Total deployed check — use MAX_OPEN_POSITIONS × MAX_PER_MARKET as ceiling
        # (BANKROLL=$10k is inflated for DRY_RUN stats, not a real limit)
        total_deployed = sum(t["size"] for t in active)
        max_deployed = MAX_OPEN_POSITIONS * MAX_PER_MARKET
        if total_deployed + signal.get("size", 1) > max_deployed:
            return False, f"Max total deployed ${max_deployed:.0f} reached"

        return True, "OK"

    def check_daily_loss(self) -> bool:
        """Trip the kill switch if the day's realized loss already breaches the
        limit, independent of any new signal. Call at the start of each scan cycle
        so an open-position loss streak stops trading even when no new signals arrive.
        Returns True if trading is halted (kill switch active)."""
        if self.kill_switch_active:
            return True
        today = db.get_today_stats(live_only=not DRY_RUN)
        if today["pnl"] <= -MAX_DAILY_LOSS:
            self.activate_kill_switch(f"Daily loss ${today['pnl']:.2f} exceeds -${MAX_DAILY_LOSS}")
            return True
        return False

    def activate_kill_switch(self, reason: str):
        """Activate kill switch — stops all new bets"""
        self.kill_switch_active = True
        db.set_bot_state("kill_switch", "true")
        db.set_bot_state("kill_switch_reason", reason)
        db.set_bot_state("kill_switch_at", datetime.now(timezone.utc).isoformat())
        logger.warning(f"🚨 KILL SWITCH ACTIVATED: {reason}")

    def deactivate_kill_switch(self):
        """Manually deactivate kill switch"""
        self.kill_switch_active = False
        db.set_bot_state("kill_switch", "false")
        logger.info("✅ Kill switch deactivated")

    def get_status(self) -> dict:
        """Get current risk status for dashboard"""
        today = db.get_today_stats(live_only=not DRY_RUN)
        active = db.get_active_trades(live_only=not DRY_RUN)
        total_deployed = sum(t["size"] for t in active)

        return {
            "kill_switch": self.kill_switch_active,
            "kill_switch_reason": db.get_bot_state("kill_switch_reason", ""),
            "daily_pnl": today["pnl"],
            "daily_trades": today["total_trades"],
            "daily_wins": today["wins"],
            "daily_losses": today["losses"],
            "daily_win_rate": today.get("win_rate", 0),
            "open_positions": len(active),
            "total_deployed": round(total_deployed, 2),
            "bankroll": BANKROLL,
            "max_daily_loss": MAX_DAILY_LOSS,
            "max_positions": MAX_OPEN_POSITIONS,
        }

📜 Git History

058de34fix(audit): chunk 4 - minor robustness, display, calibration5 weeks ago
ddaa0a2fix(audit): chunk 2 - kill switch, auth, config safety5 weeks ago
8fca132chore: initial commit — version control setup5 weeks ago
Show last diff
Loading...