← Назад
""" Grid Bot — Main Entry Point v2 ================================= Session grid with auto-rotation + inventory management. Changes from v1: - ATR-adaptive spacing from screener - Inventory management (partial close, unstuck) - EMA trailing center - Choppiness Index in screener - 5x leverage, 5 levels per side """ import asyncio import logging import os import sys import time from datetime import datetime from pathlib import Path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from zoneinfo import ZoneInfo logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger("main") from src.exchange import Exchange from src.grid_manager import GridManager from src.screener import Screener from src.risk_manager import RiskManager from src.tmm_client import TMMClient from src.tg_bot import TelegramBot from src.config import ( SCREENER_INTERVAL_SEC, MAX_CONCURRENT_GRIDS, TMM_STRATEGY_TAG, DATA_DIR, GRID_STATE_FILE, GRID_LEVELS, GRID_SPACING_PCT, LEVERAGE, POSITION_SIZE_USD, ATR_SPACING_ENABLED, SCREENER_CHOP_MIN_ENTRY, SCREENER_ADX_MAX, SCREENER_NATR_MAX, EMA_SLOPE_MAX_PCT, FUNDING_MAX_ABS, COIN_LOCKOUT_FAILS, COIN_LOCKOUT_HOURS, BREAKOUT_COOLDOWN_MIN, MAX_EXPOSURE_PCT, DEPOSIT_USD, DAILY_LOSS_LIMIT_USD, ) VANCOUVER_TZ = ZoneInfo("America/Vancouver") TICK_INTERVAL = 5 BREAKOUT_CHECK_INTERVAL = 60 DAILY_RESET_HOUR = 0 class GridBot: def __init__(self): self.exchange = Exchange() self.grid_manager = GridManager(self.exchange) self.screener = Screener(self.exchange) self.risk = RiskManager(self.exchange) self.tmm = TMMClient() self.telegram = TelegramBot(self) self.running = False self._last_breakout_check = 0 self._last_daily_reset_day = None self._last_daily_report_hour = None self._breakout_cooldown = {} # symbol → timestamp of last breakout close self._coin_fails = {} # v3: symbol → list of loss timestamps (for lockout) self._daily_loss_usd = 0.0 # v3: running daily loss def _load_previous_session(self): state_path = Path(GRID_STATE_FILE) if not state_path.exists(): return None try: import json return json.loads(state_path.read_text()) except Exception as e: logger.warning(f"Failed to load previous state: {e}") return None async def start(self): logger.info("=" * 50) logger.info(" GRID BOT v3 (NEUTRAL) starting...") logger.info("=" * 50) Path(DATA_DIR).mkdir(parents=True, exist_ok=True) prev_session = self._load_previous_session() cancelled = self.exchange.nuclear_cleanup() if cancelled: logger.info(f"Cleaned up {cancelled} symbols on startup") self.risk.initialize() await self.telegram.start() self._resume_symbol = None if prev_session and prev_session.get("active"): symbol = prev_session.get("symbol", "?") rts = prev_session.get("round_trips", 0) pnl = prev_session.get("session_pnl", 0) - prev_session.get("session_fees", 0) await self.telegram.send_message( f"⚠️ *GRID OFF* (restart): {symbol}\n" f"RTs: {rts} | PnL: ${pnl:+.4f}\n" f"Positions closed, resuming same coin" ) self._resume_symbol = symbol try: Path(GRID_STATE_FILE).unlink(missing_ok=True) except Exception: pass logger.info("Running initial screener scan...") self.screener.scan() balance = self.exchange.get_balance() best_sym, best_data = self.screener.get_best_coin() resume_note = f"\n🔄 Resuming: {self._resume_symbol}" if self._resume_symbol else "" best_info = "" if best_sym and best_data: best_info = ( f"🎯 Best: {best_sym} (score={best_data['score']:.0f}, " f"CHOP={best_data.get('chop', 0):.0f}, " f"sp={best_data.get('atr_spacing', GRID_SPACING_PCT):.2f}%)" ) await self.telegram.send_message( f"🤖 *Grid Bot v3.1 (NEUTRAL) Started*\n\n" f"💰 Balance: ${balance:.2f} | Deposit: ${DEPOSIT_USD:.0f}\n" f"{best_info}\n" f"⚙️ Levels {GRID_LEVELS}+{GRID_LEVELS} | ${POSITION_SIZE_USD}/lvl × {LEVERAGE}x\n" f"📉 Filters: NATR≤{SCREENER_NATR_MAX}% CHOP≥{SCREENER_CHOP_MIN_ENTRY} slope<{EMA_SLOPE_MAX_PCT}%\n" f"🎯 Exit: breakout OR soft SL -$4.50 | Daily: ${DAILY_LOSS_LIMIT_USD:.0f}{resume_note}" ) self.running = True await self._main_loop() async def _main_loop(self): while self.running: try: loop_start = time.time() self._check_daily_reset() can_trade, reason = self.risk.can_trade() if not can_trade: if self.grid_manager.sessions: for sym in list(self.grid_manager.sessions.keys()): summary = self.grid_manager.stop_grid(sym, reason=reason) if summary: self.risk.on_session_closed(summary) await self._notify_grid_closed(sym, summary) await asyncio.sleep(30) continue if self.screener.should_scan(): self.screener.scan() # v3: Daily loss check if self._daily_loss_usd >= DAILY_LOSS_LIMIT_USD: if self.grid_manager.sessions: logger.warning(f"DAILY LOSS LIMIT hit (${self._daily_loss_usd:.2f} ≥ ${DAILY_LOSS_LIMIT_USD}), closing all") for sym in list(self.grid_manager.sessions.keys()): summary = self.grid_manager.stop_grid(sym, reason="daily_loss_limit") if summary: self.risk.on_session_closed(summary) await self._notify_grid_closed(sym, summary) await asyncio.sleep(60) continue # v3: Exposure check (не более 70% депо в notional) current_exposure_pct = self._calc_exposure_pct() exposure_ok = current_exposure_pct < MAX_EXPOSURE_PCT # Start new grid(s) if slots available active_count = len(self.grid_manager.sessions) while active_count < MAX_CONCURRENT_GRIDS and exposure_ok: target_sym = None spacing = None active_symbols = set(self.grid_manager.sessions.keys()) locked_symbols = self._get_locked_symbols() if self._resume_symbol and self._resume_symbol not in active_symbols: resume_sym = self._resume_symbol self._resume_symbol = None resume_data = self.screener.scores.get(resume_sym) if (resume_data and resume_data.get("chop", 0) >= SCREENER_CHOP_MIN_ENTRY and resume_data.get("natr", 999) <= SCREENER_NATR_MAX and abs(resume_data.get("ema_slope", 999)) <= EMA_SLOPE_MAX_PCT and resume_sym not in locked_symbols): target_sym = resume_sym logger.info(f"Resuming grid on {target_sym} (CHOP={resume_data['chop']:.1f} NATR={resume_data['natr']:.2f}%)") spacing = self.screener.get_atr_spacing_for_symbol(target_sym) else: rd = resume_data or {} logger.info(f"Skip resume {resume_sym}: CHOP={rd.get('chop',0):.1f} NATR={rd.get('natr',0):.2f}% slope={rd.get('ema_slope',0):+.2f}% — doesn't pass v3 filters") self._resume_symbol = None else: self._resume_symbol = None # Find best coin not in active/cooldown/locked cooldown_symbols = { sym for sym, ts in self._breakout_cooldown.items() if time.time() - ts < BREAKOUT_COOLDOWN_MIN * 60 } excluded = active_symbols | cooldown_symbols | locked_symbols best_sym, best_data = self.screener.get_best_coin(exclude=excluded) if best_sym and best_data and best_data["score"] > 20: target_sym = best_sym spacing = best_data.get("atr_spacing") if ATR_SPACING_ENABLED else None if target_sym: session = self.grid_manager.start_grid(target_sym, spacing_pct=spacing) if session: score_data = self.screener.scores.get(target_sym, {}) await self._notify_grid_started(target_sym, score_data, session) self.tmm.on_grid_started(target_sym, score_data) active_count += 1 else: break # start_grid failed, don't loop else: break # no more candidates # Tick events = self.grid_manager.tick() await self._process_events(events) # Breakout check if time.time() - self._last_breakout_check > BREAKOUT_CHECK_INTERVAL: self._last_breakout_check = time.time() await self._check_breakouts() checkpoint = self.risk.check_weekly_checkpoint() if checkpoint: msg = self.risk.get_weekly_summary(checkpoint) await self.telegram.send_message(msg) await self._check_daily_report() self.tmm.retry_pending_tags() elapsed = time.time() - loop_start sleep_time = max(TICK_INTERVAL - elapsed, 0.5) await asyncio.sleep(sleep_time) except KeyboardInterrupt: logger.info("Shutting down...") await self._shutdown() break except Exception as e: logger.error(f"Main loop error: {e}", exc_info=True) await asyncio.sleep(10) async def _check_breakouts(self): BREAKOUT_COOLDOWN_SEC = BREAKOUT_COOLDOWN_MIN * 60 for symbol in list(self.grid_manager.sessions.keys()): if self.screener.is_breakout(symbol): summary = self.grid_manager.stop_grid(symbol, reason="breakout") if summary: self.risk.on_session_closed(summary) self._breakout_cooldown[symbol] = time.time() # v3: убыток при breakout тоже копит для daily loss + lockout net = summary.get("net_pnl", 0) if net < 0: self._daily_loss_usd += abs(net) self._register_coin_fail(symbol) active_symbols = set(self.grid_manager.sessions.keys()) cooldown_symbols = { sym for sym, ts in self._breakout_cooldown.items() if time.time() - ts < BREAKOUT_COOLDOWN_SEC } locked = self._get_locked_symbols() exclude = active_symbols | cooldown_symbols | locked best_sym, best_data = self.screener.get_best_coin(exclude=exclude) await self.telegram.send_message( f"🔄 *SWITCH*: {symbol} → {best_sym or 'searching...'}\n" f"Reason: breakout (BB+ADX/CHOP)\n" f"Session: {summary['round_trips']} RTs | ${summary['net_pnl']:+.4f} | " f"Inv: peak={summary['peak_inventory']}\n" f"⏸ {symbol} cooldown {BREAKOUT_COOLDOWN_MIN}min" ) if best_sym and best_data and best_data["score"] > 20: spacing = best_data.get("atr_spacing") if ATR_SPACING_ENABLED else None session = self.grid_manager.start_grid(best_sym, spacing_pct=spacing) if session: await self._notify_grid_started(best_sym, best_data, session) self.tmm.on_grid_started(best_sym, best_data) def _calc_exposure_pct(self): """Текущая экспозиция (% от депо) по открытым notional.""" try: positions = self.exchange.get_positions() total_notional = 0.0 for p in positions: amt = abs(float(p.get("positionAmt", 0))) entry = float(p.get("entryPrice", 0)) total_notional += amt * entry return (total_notional / DEPOSIT_USD) * 100 if DEPOSIT_USD > 0 else 0 except Exception: return 0.0 def _get_locked_symbols(self): """Монеты в lockout (COIN_LOCKOUT_FAILS убытков за COIN_LOCKOUT_HOURS часов).""" now = time.time() window = COIN_LOCKOUT_HOURS * 3600 locked = set() for sym, fail_times in list(self._coin_fails.items()): recent = [t for t in fail_times if now - t < window] self._coin_fails[sym] = recent if len(recent) >= COIN_LOCKOUT_FAILS: locked.add(sym) return locked def _register_coin_fail(self, symbol): """Записать убыточную сессию для lockout tracking.""" if symbol not in self._coin_fails: self._coin_fails[symbol] = [] self._coin_fails[symbol].append(time.time()) fails_count = len(self._coin_fails[symbol]) logger.info(f"Coin fail registered: {symbol} ({fails_count}/{COIN_LOCKOUT_FAILS})") async def _handle_grid_close_event(self, event_type, event): """v3: единый обработчик закрытия (profit_target / soft_sl / max_loss / daily_loss).""" summary = event.get("summary") or {} symbol = event["symbol"] net_pnl = summary.get("net_pnl", 0) self.risk.on_session_closed(summary) # Daily loss tracking if net_pnl < 0: self._daily_loss_usd += abs(net_pnl) # Coin lockout tracking (только для soft_sl / max_loss) if event_type in ("soft_sl", "max_loss") and net_pnl < 0: self._register_coin_fail(symbol) emoji_map = { "profit_target": "🟢", "soft_sl": "🟠", "max_loss": "🔴", "daily_loss_limit": "⛔", } label_map = { "profit_target": "PROFIT TARGET", "soft_sl": "SOFT SL", "max_loss": "MAX LOSS", "daily_loss_limit": "DAILY LIMIT", } emoji = emoji_map.get(event_type, "⚪") label = label_map.get(event_type, event_type.upper()) await self.telegram.send_message( f"{emoji} *{label}*: {symbol}\n" f"RTs: {summary.get('round_trips', 0)} | PnL: ${net_pnl:+.4f}\n" f"Inv peak={summary.get('peak_inventory', 0)} | " f"partial={summary.get('partial_closes', 0)} unstuck={summary.get('unstuck_closes', 0)}\n" f"Duration: {summary.get('duration_min', 0):.0f}min | " f"Daily loss: ${self._daily_loss_usd:.2f}/${DAILY_LOSS_LIMIT_USD:.0f}" ) async def _process_events(self, events): rt_count = 0 rt_pnl = 0 for event in events: if event["type"] == "round_trip": rt_count += 1 rt_pnl += event["pnl"] self.tmm.on_round_trip(event["symbol"], event) elif event["type"] in ("soft_sl", "max_loss", "daily_loss_limit"): await self._handle_grid_close_event(event["type"], event) elif event["type"] == "partial_close": logger.info( f"Partial close {event['symbol']}: {event['levels_closed']} lvls, " f"pnl=${event['pnl']:.4f}, imb={event['remaining_imbalance']}" ) elif event["type"] == "unstuck": logger.info( f"Unstuck {event['symbol']}: {event['levels_closed']} lvls near EMA" ) elif event["type"] == "recenter": logger.info( f"Recenter {event['symbol']}: " f"{event['old_center']:.6f} → {event['new_center']:.6f}" ) if rt_count > 0: logger.info(f"Round-trips: {rt_count} | PnL: ${rt_pnl:+.6f}") async def _notify_grid_started(self, symbol, score_data, session): score = score_data.get('score', 0) bb = score_data.get('bb_width', 0) adx = score_data.get('adx', 0) chop = score_data.get('chop', 0) mv = score_data.get('micro_vol', 0) natr = score_data.get('natr', 0) price = score_data.get('price', 0) sp = session.spacing_pct await self.telegram.send_message( f"🟢 *GRID ON*: {symbol}\n" f"Score: {score:.0f} | CHOP={chop:.0f} ADX={adx:.0f}\n" f"BB={bb:.2f}% | MV={mv:.0f} | NATR={natr:.2f}%\n" f"Price: ${price:.4f} | {GRID_LEVELS}+{GRID_LEVELS} lvls | " f"sp={sp:.2f}% | ${POSITION_SIZE_USD}×{LEVERAGE}x" ) async def _notify_grid_closed(self, symbol, summary): emoji = "🟢" if summary["net_pnl"] >= 0 else "🔴" await self.telegram.send_message( f"{emoji} *GRID OFF*: {symbol}\n" f"Reason: {summary['close_reason']}\n" f"RTs: {summary['round_trips']} | PnL: ${summary['net_pnl']:+.4f}\n" f"Inv: peak={summary['peak_inventory']} | " f"partial={summary['partial_closes']} unstuck={summary['unstuck_closes']}\n" f"Duration: {summary['duration_min']:.0f}min" ) def _check_daily_reset(self): now = datetime.now(VANCOUVER_TZ) today = now.date() if self._last_daily_reset_day != today and now.hour == DAILY_RESET_HOUR: self._last_daily_reset_day = today self.risk.reset_daily() self._daily_loss_usd = 0.0 self._coin_fails = {} # v3: сброс lockout self._breakout_cooldown = {} logger.info("Daily counters reset (loss, lockout, cooldown)") async def _check_daily_report(self): now = datetime.now(VANCOUVER_TZ) if now.hour == 21 and self._last_daily_report_hour != now.date(): self._last_daily_report_hour = now.date() msg = self.risk.get_daily_summary() await self.telegram.send_message(msg) async def _shutdown(self): logger.info("Shutting down Grid Bot v2...") for symbol in list(self.grid_manager.sessions.keys()): summary = self.grid_manager.stop_grid(symbol, reason="shutdown") if summary: self.risk.on_session_closed(summary) await self.telegram.send_message("🛑 *Grid Bot v2 stopped*") await self.telegram.stop() logger.info("Grid Bot v2 stopped") def main(): bot = GridBot() try: asyncio.run(bot.start()) except KeyboardInterrupt: pass if __name__ == "__main__": main()