← Назад
""" Grid Bot β€” Risk Manager ========================= Manages risk limits, circuit breaker, daily tracking, and weekly profit checkpoint. Responsibilities: 1. Daily loss limit (10% of starting balance) 2. Circuit breaker (5 max_loss stops in 1h β†’ pause 30min) 3. Emergency drawdown (dep -20% from start β†’ full pause) 4. Weekly checkpoint: Sunday 21:00 Vancouver β€” 50% profit withdraw """ import json import time import logging from datetime import datetime, timedelta from pathlib import Path from zoneinfo import ZoneInfo from src.config import ( DAILY_LOSS_LIMIT_PCT, CIRCUIT_BREAKER_STOPS, CIRCUIT_BREAKER_PAUSE_MIN, EMERGENCY_DRAWDOWN_PCT, CHECKPOINT_DAY, CHECKPOINT_HOUR, WITHDRAW_RATIO, DAILY_PNL_FILE, DATA_DIR, ) logger = logging.getLogger("risk") VANCOUVER_TZ = ZoneInfo("America/Vancouver") class RiskManager: def __init__(self, exchange): self.exchange = exchange self.starting_balance = 0.0 self.daily_start_balance = 0.0 self.daily_pnl = 0.0 self.daily_sessions = 0 self.daily_round_trips = 0 self.stop_timestamps = [] # for circuit breaker self.paused_until = 0 # circuit breaker pause end self.emergency_paused = False self.last_checkpoint_week = None self._initialized = False def initialize(self): """Initialize with current balance.""" balance = self.exchange.get_balance() self.starting_balance = balance self.daily_start_balance = balance self._initialized = True self._load_state() logger.info(f"Risk Manager init: balance=${balance:.2f}") # ============================================================ # DAILY TRACKING # ============================================================ def on_session_closed(self, summary): """Called when a grid session closes.""" pnl = summary.get("net_pnl", 0) self.daily_pnl += pnl self.daily_sessions += 1 self.daily_round_trips += summary.get("round_trips", 0) if summary.get("close_reason") == "max_loss": self.stop_timestamps.append(time.time()) self._save_state() def check_daily_limit(self): """Check if daily loss limit exceeded.""" if not self._initialized: return False max_daily_loss = self.daily_start_balance * DAILY_LOSS_LIMIT_PCT / 100 if self.daily_pnl < -max_daily_loss: logger.warning( f"DAILY LIMIT: PnL=${self.daily_pnl:.2f} < -${max_daily_loss:.2f}" ) return True return False def check_emergency_drawdown(self): """Check if total drawdown exceeds emergency threshold.""" if not self._initialized: return False current_balance = self.exchange.get_balance() drawdown_pct = (self.starting_balance - current_balance) / self.starting_balance * 100 if drawdown_pct >= EMERGENCY_DRAWDOWN_PCT: logger.error( f"EMERGENCY: balance ${current_balance:.2f}, " f"start ${self.starting_balance:.2f}, " f"drawdown {drawdown_pct:.1f}%" ) self.emergency_paused = True return True return False # ============================================================ # CIRCUIT BREAKER # ============================================================ def check_circuit_breaker(self): """Check if too many stops in last hour.""" one_hour_ago = time.time() - 3600 recent = [t for t in self.stop_timestamps if t > one_hour_ago] if len(recent) >= CIRCUIT_BREAKER_STOPS: self.paused_until = time.time() + CIRCUIT_BREAKER_PAUSE_MIN * 60 logger.warning( f"CIRCUIT BREAKER: {len(recent)} stops in 1h, " f"pausing {CIRCUIT_BREAKER_PAUSE_MIN}min" ) return True return False def is_paused(self): """Check if bot is in any pause state.""" if self.emergency_paused: return True, "emergency_drawdown" if time.time() < self.paused_until: remaining = (self.paused_until - time.time()) / 60 return True, f"circuit_breaker ({remaining:.0f}min left)" return False, None # ============================================================ # WEEKLY CHECKPOINT # ============================================================ def check_weekly_checkpoint(self): """ Sunday 21:00 Vancouver β€” weekly profit checkpoint. Returns checkpoint data if it's time, None otherwise. """ now = datetime.now(VANCOUVER_TZ) # Check if it's Sunday (weekday 6) and hour 21 if now.weekday() != CHECKPOINT_DAY or now.hour != CHECKPOINT_HOUR: return None # Only once per week week_id = now.isocalendar()[1] if self.last_checkpoint_week == week_id: return None self.last_checkpoint_week = week_id current_balance = self.exchange.get_balance() week_pnl = current_balance - self.daily_start_balance # approx checkpoint = { "week": week_id, "balance": round(current_balance, 2), "starting_balance": round(self.starting_balance, 2), "week_pnl": round(week_pnl, 2), "daily_pnl_accumulated": round(self.daily_pnl, 2), "total_sessions": self.daily_sessions, "total_round_trips": self.daily_round_trips, "time": now.strftime("%Y-%m-%d %H:%M"), "withdraw_amount": 0, "new_balance": round(current_balance, 2), } if week_pnl > 0: withdraw = week_pnl * WITHDRAW_RATIO new_balance = current_balance - withdraw checkpoint["withdraw_amount"] = round(withdraw, 2) checkpoint["new_balance"] = round(new_balance, 2) logger.info( f"CHECKPOINT: PnL=${week_pnl:.2f}, " f"withdraw=${withdraw:.2f}, new_balance=${new_balance:.2f}" ) else: logger.info(f"CHECKPOINT: PnL=${week_pnl:.2f} (negative, no withdraw)") # Reset daily counters for new week self.daily_pnl = 0 self.daily_sessions = 0 self.daily_round_trips = 0 self.daily_start_balance = checkpoint["new_balance"] self.stop_timestamps = [] self._save_state() return checkpoint def reset_daily(self): """Reset daily counters at midnight Vancouver.""" self.daily_pnl = 0 self.daily_sessions = 0 self.daily_round_trips = 0 self.daily_start_balance = self.exchange.get_balance() self.stop_timestamps = [] self._save_state() logger.info(f"Daily reset: balance=${self.daily_start_balance:.2f}") # ============================================================ # ALLOWED TO TRADE CHECK # ============================================================ def can_trade(self): """Full check: can we open a new grid?""" paused, reason = self.is_paused() if paused: return False, reason if self.check_daily_limit(): return False, "daily_limit" if self.check_emergency_drawdown(): return False, "emergency_drawdown" if self.check_circuit_breaker(): return False, "circuit_breaker" return True, "ok" # ============================================================ # STATUS & FORMATTING # ============================================================ def get_daily_summary(self): """Daily summary for telegram.""" now = datetime.now(VANCOUVER_TZ).strftime("%H:%M") balance = self.exchange.get_balance() emoji = "🟒" if self.daily_pnl >= 0 else "πŸ”΄" paused, reason = self.is_paused() status = f"⏸️ {reason}" if paused else "βœ… Active" return ( f"πŸ“Š *Daily Report* ({now})\n\n" f"{emoji} PnL: ${self.daily_pnl:+.4f}\n" f"πŸ“ˆ Sessions: {self.daily_sessions}\n" f"πŸ”„ Round-trips: {self.daily_round_trips}\n" f"πŸ’° Balance: ${balance:.2f}\n" f"Status: {status}" ) def get_weekly_summary(self, checkpoint): """Format weekly checkpoint for telegram.""" emoji = "🟒" if checkpoint["week_pnl"] >= 0 else "πŸ”΄" withdraw_line = "" if checkpoint["withdraw_amount"] > 0: withdraw_line = f"πŸ’Έ Withdraw: ${checkpoint['withdraw_amount']:.2f}\n" return ( f"πŸ“… *Weekly Checkpoint* (Week {checkpoint['week']})\n\n" f"{emoji} Week PnL: ${checkpoint['week_pnl']:+.2f}\n" f"πŸ“ˆ Sessions: {checkpoint['total_sessions']}\n" f"πŸ”„ Round-trips: {checkpoint['total_round_trips']}\n" f"{withdraw_line}" f"πŸ’° New balance: ${checkpoint['new_balance']:.2f}\n" f"πŸ“Š Start balance: ${checkpoint['starting_balance']:.2f}" ) # ============================================================ # PERSISTENCE # ============================================================ def _save_state(self): """Save risk state to disk.""" Path(DATA_DIR).mkdir(parents=True, exist_ok=True) state = { "starting_balance": self.starting_balance, "daily_start_balance": self.daily_start_balance, "daily_pnl": self.daily_pnl, "daily_sessions": self.daily_sessions, "daily_round_trips": self.daily_round_trips, "stop_timestamps": self.stop_timestamps[-50:], "paused_until": self.paused_until, "emergency_paused": self.emergency_paused, "last_checkpoint_week": self.last_checkpoint_week, "saved_at": datetime.now(VANCOUVER_TZ).isoformat(), } try: with open(DAILY_PNL_FILE, 'w') as f: json.dump(state, f, indent=2) except Exception as e: logger.error(f"Save risk state error: {e}") def _load_state(self): """Load risk state from disk (for restart recovery).""" path = Path(DAILY_PNL_FILE) if not path.exists(): return try: state = json.loads(path.read_text()) self.starting_balance = state.get("starting_balance", self.starting_balance) self.daily_start_balance = state.get("daily_start_balance", self.daily_start_balance) self.daily_pnl = state.get("daily_pnl", 0) self.daily_sessions = state.get("daily_sessions", 0) self.daily_round_trips = state.get("daily_round_trips", 0) self.stop_timestamps = state.get("stop_timestamps", []) self.paused_until = state.get("paused_until", 0) self.emergency_paused = state.get("emergency_paused", False) self.last_checkpoint_week = state.get("last_checkpoint_week") logger.info(f"Risk state loaded: daily_pnl=${self.daily_pnl:.4f}") except Exception as e: logger.warning(f"Load risk state error: {e}")