← Back
β˜†
"""
Grid Bot β€” Risk Manager
=========================
Manages risk limits, circuit breaker, daily tracking,
and weekly profit checkpoint.

Responsibilities:
1. Daily loss limit (10% of starting balance)
2. Circuit breaker (5 max_loss stops in 1h β†’ pause 30min)
3. Emergency drawdown (dep -20% from start β†’ full pause)
4. Weekly checkpoint: Sunday 21:00 Vancouver β€” 50% profit withdraw
"""

import json
import time
import logging
from datetime import datetime, timedelta
from pathlib import Path
from zoneinfo import ZoneInfo

from src.config import (
    DAILY_LOSS_LIMIT_PCT, CIRCUIT_BREAKER_STOPS, CIRCUIT_BREAKER_PAUSE_MIN,
    EMERGENCY_DRAWDOWN_PCT, CHECKPOINT_DAY, CHECKPOINT_HOUR,
    WITHDRAW_RATIO, DAILY_PNL_FILE, DATA_DIR,
)

logger = logging.getLogger("risk")
VANCOUVER_TZ = ZoneInfo("America/Vancouver")


class RiskManager:
    def __init__(self, exchange):
        self.exchange = exchange
        self.starting_balance = 0.0
        self.daily_start_balance = 0.0
        self.daily_pnl = 0.0
        self.daily_sessions = 0
        self.daily_round_trips = 0
        self.stop_timestamps = []       # for circuit breaker
        self.paused_until = 0           # circuit breaker pause end
        self.emergency_paused = False
        self.last_checkpoint_week = None
        self._initialized = False

    def initialize(self):
        """Initialize with current balance."""
        balance = self.exchange.get_balance()
        self.starting_balance = balance
        self.daily_start_balance = balance
        self._initialized = True
        self._load_state()
        logger.info(f"Risk Manager init: balance=${balance:.2f}")

    # ============================================================
    # DAILY TRACKING
    # ============================================================

    def on_session_closed(self, summary):
        """Called when a grid session closes."""
        pnl = summary.get("net_pnl", 0)
        self.daily_pnl += pnl
        self.daily_sessions += 1
        self.daily_round_trips += summary.get("round_trips", 0)

        if summary.get("close_reason") == "max_loss":
            self.stop_timestamps.append(time.time())

        self._save_state()

    def check_daily_limit(self):
        """Check if daily loss limit exceeded."""
        if not self._initialized:
            return False

        max_daily_loss = self.daily_start_balance * DAILY_LOSS_LIMIT_PCT / 100
        if self.daily_pnl < -max_daily_loss:
            logger.warning(
                f"DAILY LIMIT: PnL=${self.daily_pnl:.2f} < -${max_daily_loss:.2f}"
            )
            return True
        return False

    def check_emergency_drawdown(self):
        """Check if total drawdown exceeds emergency threshold."""
        if not self._initialized:
            return False

        current_balance = self.exchange.get_balance()
        drawdown_pct = (self.starting_balance - current_balance) / self.starting_balance * 100

        if drawdown_pct >= EMERGENCY_DRAWDOWN_PCT:
            logger.error(
                f"EMERGENCY: balance ${current_balance:.2f}, "
                f"start ${self.starting_balance:.2f}, "
                f"drawdown {drawdown_pct:.1f}%"
            )
            self.emergency_paused = True
            return True
        return False

    # ============================================================
    # CIRCUIT BREAKER
    # ============================================================

    def check_circuit_breaker(self):
        """Check if too many stops in last hour."""
        one_hour_ago = time.time() - 3600
        recent = [t for t in self.stop_timestamps if t > one_hour_ago]

        if len(recent) >= CIRCUIT_BREAKER_STOPS:
            self.paused_until = time.time() + CIRCUIT_BREAKER_PAUSE_MIN * 60
            logger.warning(
                f"CIRCUIT BREAKER: {len(recent)} stops in 1h, "
                f"pausing {CIRCUIT_BREAKER_PAUSE_MIN}min"
            )
            return True
        return False

    def is_paused(self):
        """Check if bot is in any pause state."""
        if self.emergency_paused:
            return True, "emergency_drawdown"

        if time.time() < self.paused_until:
            remaining = (self.paused_until - time.time()) / 60
            return True, f"circuit_breaker ({remaining:.0f}min left)"

        return False, None

    # ============================================================
    # WEEKLY CHECKPOINT
    # ============================================================

    def check_weekly_checkpoint(self):
        """
        Sunday 21:00 Vancouver β€” weekly profit checkpoint.
        Returns checkpoint data if it's time, None otherwise.
        """
        now = datetime.now(VANCOUVER_TZ)

        # Check if it's Sunday (weekday 6) and hour 21
        if now.weekday() != CHECKPOINT_DAY or now.hour != CHECKPOINT_HOUR:
            return None

        # Only once per week
        week_id = now.isocalendar()[1]
        if self.last_checkpoint_week == week_id:
            return None

        self.last_checkpoint_week = week_id

        current_balance = self.exchange.get_balance()
        week_pnl = current_balance - self.daily_start_balance  # approx

        checkpoint = {
            "week": week_id,
            "balance": round(current_balance, 2),
            "starting_balance": round(self.starting_balance, 2),
            "week_pnl": round(week_pnl, 2),
            "daily_pnl_accumulated": round(self.daily_pnl, 2),
            "total_sessions": self.daily_sessions,
            "total_round_trips": self.daily_round_trips,
            "time": now.strftime("%Y-%m-%d %H:%M"),
            "withdraw_amount": 0,
            "new_balance": round(current_balance, 2),
        }

        if week_pnl > 0:
            withdraw = week_pnl * WITHDRAW_RATIO
            new_balance = current_balance - withdraw
            checkpoint["withdraw_amount"] = round(withdraw, 2)
            checkpoint["new_balance"] = round(new_balance, 2)
            logger.info(
                f"CHECKPOINT: PnL=${week_pnl:.2f}, "
                f"withdraw=${withdraw:.2f}, new_balance=${new_balance:.2f}"
            )
        else:
            logger.info(f"CHECKPOINT: PnL=${week_pnl:.2f} (negative, no withdraw)")

        # Reset daily counters for new week
        self.daily_pnl = 0
        self.daily_sessions = 0
        self.daily_round_trips = 0
        self.daily_start_balance = checkpoint["new_balance"]
        self.stop_timestamps = []

        self._save_state()
        return checkpoint

    def reset_daily(self):
        """Reset daily counters at midnight Vancouver."""
        self.daily_pnl = 0
        self.daily_sessions = 0
        self.daily_round_trips = 0
        self.daily_start_balance = self.exchange.get_balance()
        self.stop_timestamps = []
        self._save_state()
        logger.info(f"Daily reset: balance=${self.daily_start_balance:.2f}")

    # ============================================================
    # ALLOWED TO TRADE CHECK
    # ============================================================

    def can_trade(self):
        """Full check: can we open a new grid?"""
        paused, reason = self.is_paused()
        if paused:
            return False, reason

        if self.check_daily_limit():
            return False, "daily_limit"

        if self.check_emergency_drawdown():
            return False, "emergency_drawdown"

        if self.check_circuit_breaker():
            return False, "circuit_breaker"

        return True, "ok"

    # ============================================================
    # STATUS & FORMATTING
    # ============================================================

    def get_daily_summary(self):
        """Daily summary for telegram."""
        now = datetime.now(VANCOUVER_TZ).strftime("%H:%M")
        balance = self.exchange.get_balance()
        emoji = "🟒" if self.daily_pnl >= 0 else "πŸ”΄"

        paused, reason = self.is_paused()
        status = f"⏸️ {reason}" if paused else "βœ… Active"

        return (
            f"πŸ“Š *Daily Report* ({now})\n\n"
            f"{emoji} PnL: ${self.daily_pnl:+.4f}\n"
            f"πŸ“ˆ Sessions: {self.daily_sessions}\n"
            f"πŸ”„ Round-trips: {self.daily_round_trips}\n"
            f"πŸ’° Balance: ${balance:.2f}\n"
            f"Status: {status}"
        )

    def get_weekly_summary(self, checkpoint):
        """Format weekly checkpoint for telegram."""
        emoji = "🟒" if checkpoint["week_pnl"] >= 0 else "πŸ”΄"
        withdraw_line = ""
        if checkpoint["withdraw_amount"] > 0:
            withdraw_line = f"πŸ’Έ Withdraw: ${checkpoint['withdraw_amount']:.2f}\n"

        return (
            f"πŸ“… *Weekly Checkpoint* (Week {checkpoint['week']})\n\n"
            f"{emoji} Week PnL: ${checkpoint['week_pnl']:+.2f}\n"
            f"πŸ“ˆ Sessions: {checkpoint['total_sessions']}\n"
            f"πŸ”„ Round-trips: {checkpoint['total_round_trips']}\n"
            f"{withdraw_line}"
            f"πŸ’° New balance: ${checkpoint['new_balance']:.2f}\n"
            f"πŸ“Š Start balance: ${checkpoint['starting_balance']:.2f}"
        )

    # ============================================================
    # PERSISTENCE
    # ============================================================

    def _save_state(self):
        """Save risk state to disk."""
        Path(DATA_DIR).mkdir(parents=True, exist_ok=True)
        state = {
            "starting_balance": self.starting_balance,
            "daily_start_balance": self.daily_start_balance,
            "daily_pnl": self.daily_pnl,
            "daily_sessions": self.daily_sessions,
            "daily_round_trips": self.daily_round_trips,
            "stop_timestamps": self.stop_timestamps[-50:],
            "paused_until": self.paused_until,
            "emergency_paused": self.emergency_paused,
            "last_checkpoint_week": self.last_checkpoint_week,
            "saved_at": datetime.now(VANCOUVER_TZ).isoformat(),
        }
        try:
            with open(DAILY_PNL_FILE, 'w') as f:
                json.dump(state, f, indent=2)
        except Exception as e:
            logger.error(f"Save risk state error: {e}")

    def _load_state(self):
        """Load risk state from disk (for restart recovery)."""
        path = Path(DAILY_PNL_FILE)
        if not path.exists():
            return

        try:
            state = json.loads(path.read_text())
            self.starting_balance = state.get("starting_balance", self.starting_balance)
            self.daily_start_balance = state.get("daily_start_balance", self.daily_start_balance)
            self.daily_pnl = state.get("daily_pnl", 0)
            self.daily_sessions = state.get("daily_sessions", 0)
            self.daily_round_trips = state.get("daily_round_trips", 0)
            self.stop_timestamps = state.get("stop_timestamps", [])
            self.paused_until = state.get("paused_until", 0)
            self.emergency_paused = state.get("emergency_paused", False)
            self.last_checkpoint_week = state.get("last_checkpoint_week")
            logger.info(f"Risk state loaded: daily_pnl=${self.daily_pnl:.4f}")
        except Exception as e:
            logger.warning(f"Load risk state error: {e}")

πŸ“œ Git History

120af73chore: archive grid-v3, scaffold of-trader from grid infra4 weeks ago
Show last diff
Loading...