β ΠΠ°Π·Π°Π΄"""
Grid Bot β Risk Manager
=========================
Manages risk limits, circuit breaker, daily tracking,
and weekly profit checkpoint.
Responsibilities:
1. Daily loss limit (10% of starting balance)
2. Circuit breaker (5 max_loss stops in 1h β pause 30min)
3. Emergency drawdown (dep -20% from start β full pause)
4. Weekly checkpoint: Sunday 21:00 Vancouver β 50% profit withdraw
"""
import json
import time
import logging
from datetime import datetime, timedelta
from pathlib import Path
from zoneinfo import ZoneInfo
from src.config import (
DAILY_LOSS_LIMIT_PCT, CIRCUIT_BREAKER_STOPS, CIRCUIT_BREAKER_PAUSE_MIN,
EMERGENCY_DRAWDOWN_PCT, CHECKPOINT_DAY, CHECKPOINT_HOUR,
WITHDRAW_RATIO, DAILY_PNL_FILE, DATA_DIR,
)
logger = logging.getLogger("risk")
VANCOUVER_TZ = ZoneInfo("America/Vancouver")
class RiskManager:
def __init__(self, exchange):
self.exchange = exchange
self.starting_balance = 0.0
self.daily_start_balance = 0.0
self.daily_pnl = 0.0
self.daily_sessions = 0
self.daily_round_trips = 0
self.stop_timestamps = [] # for circuit breaker
self.paused_until = 0 # circuit breaker pause end
self.emergency_paused = False
self.last_checkpoint_week = None
self._initialized = False
def initialize(self):
"""Initialize with current balance."""
balance = self.exchange.get_balance()
self.starting_balance = balance
self.daily_start_balance = balance
self._initialized = True
self._load_state()
logger.info(f"Risk Manager init: balance=${balance:.2f}")
# ============================================================
# DAILY TRACKING
# ============================================================
def on_session_closed(self, summary):
"""Called when a grid session closes."""
pnl = summary.get("net_pnl", 0)
self.daily_pnl += pnl
self.daily_sessions += 1
self.daily_round_trips += summary.get("round_trips", 0)
if summary.get("close_reason") == "max_loss":
self.stop_timestamps.append(time.time())
self._save_state()
def check_daily_limit(self):
"""Check if daily loss limit exceeded."""
if not self._initialized:
return False
max_daily_loss = self.daily_start_balance * DAILY_LOSS_LIMIT_PCT / 100
if self.daily_pnl < -max_daily_loss:
logger.warning(
f"DAILY LIMIT: PnL=${self.daily_pnl:.2f} < -${max_daily_loss:.2f}"
)
return True
return False
def check_emergency_drawdown(self):
"""Check if total drawdown exceeds emergency threshold."""
if not self._initialized:
return False
current_balance = self.exchange.get_balance()
drawdown_pct = (self.starting_balance - current_balance) / self.starting_balance * 100
if drawdown_pct >= EMERGENCY_DRAWDOWN_PCT:
logger.error(
f"EMERGENCY: balance ${current_balance:.2f}, "
f"start ${self.starting_balance:.2f}, "
f"drawdown {drawdown_pct:.1f}%"
)
self.emergency_paused = True
return True
return False
# ============================================================
# CIRCUIT BREAKER
# ============================================================
def check_circuit_breaker(self):
"""Check if too many stops in last hour."""
one_hour_ago = time.time() - 3600
recent = [t for t in self.stop_timestamps if t > one_hour_ago]
if len(recent) >= CIRCUIT_BREAKER_STOPS:
self.paused_until = time.time() + CIRCUIT_BREAKER_PAUSE_MIN * 60
logger.warning(
f"CIRCUIT BREAKER: {len(recent)} stops in 1h, "
f"pausing {CIRCUIT_BREAKER_PAUSE_MIN}min"
)
return True
return False
def is_paused(self):
"""Check if bot is in any pause state."""
if self.emergency_paused:
return True, "emergency_drawdown"
if time.time() < self.paused_until:
remaining = (self.paused_until - time.time()) / 60
return True, f"circuit_breaker ({remaining:.0f}min left)"
return False, None
# ============================================================
# WEEKLY CHECKPOINT
# ============================================================
def check_weekly_checkpoint(self):
"""
Sunday 21:00 Vancouver β weekly profit checkpoint.
Returns checkpoint data if it's time, None otherwise.
"""
now = datetime.now(VANCOUVER_TZ)
# Check if it's Sunday (weekday 6) and hour 21
if now.weekday() != CHECKPOINT_DAY or now.hour != CHECKPOINT_HOUR:
return None
# Only once per week
week_id = now.isocalendar()[1]
if self.last_checkpoint_week == week_id:
return None
self.last_checkpoint_week = week_id
current_balance = self.exchange.get_balance()
week_pnl = current_balance - self.daily_start_balance # approx
checkpoint = {
"week": week_id,
"balance": round(current_balance, 2),
"starting_balance": round(self.starting_balance, 2),
"week_pnl": round(week_pnl, 2),
"daily_pnl_accumulated": round(self.daily_pnl, 2),
"total_sessions": self.daily_sessions,
"total_round_trips": self.daily_round_trips,
"time": now.strftime("%Y-%m-%d %H:%M"),
"withdraw_amount": 0,
"new_balance": round(current_balance, 2),
}
if week_pnl > 0:
withdraw = week_pnl * WITHDRAW_RATIO
new_balance = current_balance - withdraw
checkpoint["withdraw_amount"] = round(withdraw, 2)
checkpoint["new_balance"] = round(new_balance, 2)
logger.info(
f"CHECKPOINT: PnL=${week_pnl:.2f}, "
f"withdraw=${withdraw:.2f}, new_balance=${new_balance:.2f}"
)
else:
logger.info(f"CHECKPOINT: PnL=${week_pnl:.2f} (negative, no withdraw)")
# Reset daily counters for new week
self.daily_pnl = 0
self.daily_sessions = 0
self.daily_round_trips = 0
self.daily_start_balance = checkpoint["new_balance"]
self.stop_timestamps = []
self._save_state()
return checkpoint
def reset_daily(self):
"""Reset daily counters at midnight Vancouver."""
self.daily_pnl = 0
self.daily_sessions = 0
self.daily_round_trips = 0
self.daily_start_balance = self.exchange.get_balance()
self.stop_timestamps = []
self._save_state()
logger.info(f"Daily reset: balance=${self.daily_start_balance:.2f}")
# ============================================================
# ALLOWED TO TRADE CHECK
# ============================================================
def can_trade(self):
"""Full check: can we open a new grid?"""
paused, reason = self.is_paused()
if paused:
return False, reason
if self.check_daily_limit():
return False, "daily_limit"
if self.check_emergency_drawdown():
return False, "emergency_drawdown"
if self.check_circuit_breaker():
return False, "circuit_breaker"
return True, "ok"
# ============================================================
# STATUS & FORMATTING
# ============================================================
def get_daily_summary(self):
"""Daily summary for telegram."""
now = datetime.now(VANCOUVER_TZ).strftime("%H:%M")
balance = self.exchange.get_balance()
emoji = "π’" if self.daily_pnl >= 0 else "π΄"
paused, reason = self.is_paused()
status = f"βΈοΈ {reason}" if paused else "β
Active"
return (
f"π *Daily Report* ({now})\n\n"
f"{emoji} PnL: ${self.daily_pnl:+.4f}\n"
f"π Sessions: {self.daily_sessions}\n"
f"π Round-trips: {self.daily_round_trips}\n"
f"π° Balance: ${balance:.2f}\n"
f"Status: {status}"
)
def get_weekly_summary(self, checkpoint):
"""Format weekly checkpoint for telegram."""
emoji = "π’" if checkpoint["week_pnl"] >= 0 else "π΄"
withdraw_line = ""
if checkpoint["withdraw_amount"] > 0:
withdraw_line = f"πΈ Withdraw: ${checkpoint['withdraw_amount']:.2f}\n"
return (
f"π
*Weekly Checkpoint* (Week {checkpoint['week']})\n\n"
f"{emoji} Week PnL: ${checkpoint['week_pnl']:+.2f}\n"
f"π Sessions: {checkpoint['total_sessions']}\n"
f"π Round-trips: {checkpoint['total_round_trips']}\n"
f"{withdraw_line}"
f"π° New balance: ${checkpoint['new_balance']:.2f}\n"
f"π Start balance: ${checkpoint['starting_balance']:.2f}"
)
# ============================================================
# PERSISTENCE
# ============================================================
def _save_state(self):
"""Save risk state to disk."""
Path(DATA_DIR).mkdir(parents=True, exist_ok=True)
state = {
"starting_balance": self.starting_balance,
"daily_start_balance": self.daily_start_balance,
"daily_pnl": self.daily_pnl,
"daily_sessions": self.daily_sessions,
"daily_round_trips": self.daily_round_trips,
"stop_timestamps": self.stop_timestamps[-50:],
"paused_until": self.paused_until,
"emergency_paused": self.emergency_paused,
"last_checkpoint_week": self.last_checkpoint_week,
"saved_at": datetime.now(VANCOUVER_TZ).isoformat(),
}
try:
with open(DAILY_PNL_FILE, 'w') as f:
json.dump(state, f, indent=2)
except Exception as e:
logger.error(f"Save risk state error: {e}")
def _load_state(self):
"""Load risk state from disk (for restart recovery)."""
path = Path(DAILY_PNL_FILE)
if not path.exists():
return
try:
state = json.loads(path.read_text())
self.starting_balance = state.get("starting_balance", self.starting_balance)
self.daily_start_balance = state.get("daily_start_balance", self.daily_start_balance)
self.daily_pnl = state.get("daily_pnl", 0)
self.daily_sessions = state.get("daily_sessions", 0)
self.daily_round_trips = state.get("daily_round_trips", 0)
self.stop_timestamps = state.get("stop_timestamps", [])
self.paused_until = state.get("paused_until", 0)
self.emergency_paused = state.get("emergency_paused", False)
self.last_checkpoint_week = state.get("last_checkpoint_week")
logger.info(f"Risk state loaded: daily_pnl=${self.daily_pnl:.4f}")
except Exception as e:
logger.warning(f"Load risk state error: {e}")