← Back
"""
Grid Bot — Main Entry Point v2
=================================
Session grid with auto-rotation + inventory management.

Changes from v1:
- ATR-adaptive spacing from screener
- Inventory management (partial close, unstuck)
- EMA trailing center
- Choppiness Index in screener
- 5x leverage, 5 levels per side
"""

import asyncio
import logging
import os
import sys
import time
from datetime import datetime
from pathlib import Path

sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from zoneinfo import ZoneInfo

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("main")

from src.exchange import Exchange
from src.grid_manager import GridManager
from src.screener import Screener
from src.risk_manager import RiskManager
from src.tmm_client import TMMClient
from src.tg_bot import TelegramBot
from src.config import (
    SCREENER_INTERVAL_SEC, MAX_CONCURRENT_GRIDS,
    TMM_STRATEGY_TAG, DATA_DIR, GRID_STATE_FILE,
    GRID_LEVELS, GRID_SPACING_PCT, LEVERAGE, POSITION_SIZE_USD,
    ATR_SPACING_ENABLED, SCREENER_CHOP_MIN_ENTRY, SCREENER_ADX_MAX,
    SCREENER_NATR_MAX, EMA_SLOPE_MAX_PCT, FUNDING_MAX_ABS,
    COIN_LOCKOUT_FAILS, COIN_LOCKOUT_HOURS, BREAKOUT_COOLDOWN_MIN,
    MAX_EXPOSURE_PCT, DEPOSIT_USD, DAILY_LOSS_LIMIT_USD,
)

VANCOUVER_TZ = ZoneInfo("America/Vancouver")

TICK_INTERVAL = 5
BREAKOUT_CHECK_INTERVAL = 60
DAILY_RESET_HOUR = 0


class GridBot:
    def __init__(self):
        self.exchange = Exchange()
        self.grid_manager = GridManager(self.exchange)
        self.screener = Screener(self.exchange)
        self.risk = RiskManager(self.exchange)
        self.tmm = TMMClient()
        self.telegram = TelegramBot(self)
        self.running = False
        self._last_breakout_check = 0
        self._last_daily_reset_day = None
        self._last_daily_report_hour = None
        self._breakout_cooldown = {}  # symbol → timestamp of last breakout close
        self._coin_fails = {}         # v3: symbol → list of loss timestamps (for lockout)
        self._daily_loss_usd = 0.0    # v3: running daily loss

    def _load_previous_session(self):
        state_path = Path(GRID_STATE_FILE)
        if not state_path.exists():
            return None
        try:
            import json
            return json.loads(state_path.read_text())
        except Exception as e:
            logger.warning(f"Failed to load previous state: {e}")
            return None

    async def start(self):
        logger.info("=" * 50)
        logger.info("  GRID BOT v3 (NEUTRAL) starting...")
        logger.info("=" * 50)

        Path(DATA_DIR).mkdir(parents=True, exist_ok=True)

        prev_session = self._load_previous_session()

        cancelled = self.exchange.nuclear_cleanup()
        if cancelled:
            logger.info(f"Cleaned up {cancelled} symbols on startup")

        self.risk.initialize()
        await self.telegram.start()

        self._resume_symbol = None
        if prev_session and prev_session.get("active"):
            symbol = prev_session.get("symbol", "?")
            rts = prev_session.get("round_trips", 0)
            pnl = prev_session.get("session_pnl", 0) - prev_session.get("session_fees", 0)
            await self.telegram.send_message(
                f"⚠️ *GRID OFF* (restart): {symbol}\n"
                f"RTs: {rts} | PnL: ${pnl:+.4f}\n"
                f"Positions closed, resuming same coin"
            )
            self._resume_symbol = symbol
            try:
                Path(GRID_STATE_FILE).unlink(missing_ok=True)
            except Exception:
                pass

        logger.info("Running initial screener scan...")
        self.screener.scan()

        balance = self.exchange.get_balance()
        best_sym, best_data = self.screener.get_best_coin()
        resume_note = f"\n🔄 Resuming: {self._resume_symbol}" if self._resume_symbol else ""

        best_info = ""
        if best_sym and best_data:
            best_info = (
                f"🎯 Best: {best_sym} (score={best_data['score']:.0f}, "
                f"CHOP={best_data.get('chop', 0):.0f}, "
                f"sp={best_data.get('atr_spacing', GRID_SPACING_PCT):.2f}%)"
            )

        await self.telegram.send_message(
            f"🤖 *Grid Bot v3.1 (NEUTRAL) Started*\n\n"
            f"💰 Balance: ${balance:.2f} | Deposit: ${DEPOSIT_USD:.0f}\n"
            f"{best_info}\n"
            f"⚙️ Levels {GRID_LEVELS}+{GRID_LEVELS} | ${POSITION_SIZE_USD}/lvl × {LEVERAGE}x\n"
            f"📉 Filters: NATR≤{SCREENER_NATR_MAX}% CHOP≥{SCREENER_CHOP_MIN_ENTRY} slope<{EMA_SLOPE_MAX_PCT}%\n"
            f"🎯 Exit: breakout OR soft SL -$4.50 | Daily: ${DAILY_LOSS_LIMIT_USD:.0f}{resume_note}"
        )

        self.running = True
        await self._main_loop()

    async def _main_loop(self):
        while self.running:
            try:
                loop_start = time.time()

                self._check_daily_reset()

                can_trade, reason = self.risk.can_trade()
                if not can_trade:
                    if self.grid_manager.sessions:
                        for sym in list(self.grid_manager.sessions.keys()):
                            summary = self.grid_manager.stop_grid(sym, reason=reason)
                            if summary:
                                self.risk.on_session_closed(summary)
                                await self._notify_grid_closed(sym, summary)
                    await asyncio.sleep(30)
                    continue

                if self.screener.should_scan():
                    self.screener.scan()

                # v3: Daily loss check
                if self._daily_loss_usd >= DAILY_LOSS_LIMIT_USD:
                    if self.grid_manager.sessions:
                        logger.warning(f"DAILY LOSS LIMIT hit (${self._daily_loss_usd:.2f} ≥ ${DAILY_LOSS_LIMIT_USD}), closing all")
                        for sym in list(self.grid_manager.sessions.keys()):
                            summary = self.grid_manager.stop_grid(sym, reason="daily_loss_limit")
                            if summary:
                                self.risk.on_session_closed(summary)
                                await self._notify_grid_closed(sym, summary)
                    await asyncio.sleep(60)
                    continue

                # v3: Exposure check (не более 70% депо в notional)
                current_exposure_pct = self._calc_exposure_pct()
                exposure_ok = current_exposure_pct < MAX_EXPOSURE_PCT

                # Start new grid(s) if slots available
                active_count = len(self.grid_manager.sessions)
                while active_count < MAX_CONCURRENT_GRIDS and exposure_ok:
                    target_sym = None
                    spacing = None
                    active_symbols = set(self.grid_manager.sessions.keys())
                    locked_symbols = self._get_locked_symbols()

                    if self._resume_symbol and self._resume_symbol not in active_symbols:
                        resume_sym = self._resume_symbol
                        self._resume_symbol = None
                        resume_data = self.screener.scores.get(resume_sym)
                        if (resume_data
                            and resume_data.get("chop", 0) >= SCREENER_CHOP_MIN_ENTRY
                            and resume_data.get("natr", 999) <= SCREENER_NATR_MAX
                            and abs(resume_data.get("ema_slope", 999)) <= EMA_SLOPE_MAX_PCT
                            and resume_sym not in locked_symbols):
                            target_sym = resume_sym
                            logger.info(f"Resuming grid on {target_sym} (CHOP={resume_data['chop']:.1f} NATR={resume_data['natr']:.2f}%)")
                            spacing = self.screener.get_atr_spacing_for_symbol(target_sym)
                        else:
                            rd = resume_data or {}
                            logger.info(f"Skip resume {resume_sym}: CHOP={rd.get('chop',0):.1f} NATR={rd.get('natr',0):.2f}% slope={rd.get('ema_slope',0):+.2f}% — doesn't pass v3 filters")
                            self._resume_symbol = None
                    else:
                        self._resume_symbol = None
                        # Find best coin not in active/cooldown/locked
                        cooldown_symbols = {
                            sym for sym, ts in self._breakout_cooldown.items()
                            if time.time() - ts < BREAKOUT_COOLDOWN_MIN * 60
                        }
                        excluded = active_symbols | cooldown_symbols | locked_symbols
                        best_sym, best_data = self.screener.get_best_coin(exclude=excluded)
                        if best_sym and best_data and best_data["score"] > 20:
                            target_sym = best_sym
                            spacing = best_data.get("atr_spacing") if ATR_SPACING_ENABLED else None

                    if target_sym:
                        session = self.grid_manager.start_grid(target_sym, spacing_pct=spacing)
                        if session:
                            score_data = self.screener.scores.get(target_sym, {})
                            await self._notify_grid_started(target_sym, score_data, session)
                            self.tmm.on_grid_started(target_sym, score_data)
                            active_count += 1
                        else:
                            break  # start_grid failed, don't loop
                    else:
                        break  # no more candidates

                # Tick
                events = self.grid_manager.tick()
                await self._process_events(events)

                # Breakout check
                if time.time() - self._last_breakout_check > BREAKOUT_CHECK_INTERVAL:
                    self._last_breakout_check = time.time()
                    await self._check_breakouts()

                checkpoint = self.risk.check_weekly_checkpoint()
                if checkpoint:
                    msg = self.risk.get_weekly_summary(checkpoint)
                    await self.telegram.send_message(msg)

                await self._check_daily_report()

                self.tmm.retry_pending_tags()

                elapsed = time.time() - loop_start
                sleep_time = max(TICK_INTERVAL - elapsed, 0.5)
                await asyncio.sleep(sleep_time)

            except KeyboardInterrupt:
                logger.info("Shutting down...")
                await self._shutdown()
                break
            except Exception as e:
                logger.error(f"Main loop error: {e}", exc_info=True)
                await asyncio.sleep(10)

    async def _check_breakouts(self):
        BREAKOUT_COOLDOWN_SEC = BREAKOUT_COOLDOWN_MIN * 60

        for symbol in list(self.grid_manager.sessions.keys()):
            if self.screener.is_breakout(symbol):
                summary = self.grid_manager.stop_grid(symbol, reason="breakout")
                if summary:
                    self.risk.on_session_closed(summary)
                    self._breakout_cooldown[symbol] = time.time()
                    # v3: убыток при breakout тоже копит для daily loss + lockout
                    net = summary.get("net_pnl", 0)
                    if net < 0:
                        self._daily_loss_usd += abs(net)
                        self._register_coin_fail(symbol)

                active_symbols = set(self.grid_manager.sessions.keys())
                cooldown_symbols = {
                    sym for sym, ts in self._breakout_cooldown.items()
                    if time.time() - ts < BREAKOUT_COOLDOWN_SEC
                }
                locked = self._get_locked_symbols()
                exclude = active_symbols | cooldown_symbols | locked
                best_sym, best_data = self.screener.get_best_coin(exclude=exclude)

                await self.telegram.send_message(
                    f"🔄 *SWITCH*: {symbol} → {best_sym or 'searching...'}\n"
                    f"Reason: breakout (BB+ADX/CHOP)\n"
                    f"Session: {summary['round_trips']} RTs | ${summary['net_pnl']:+.4f} | "
                    f"Inv: peak={summary['peak_inventory']}\n"
                    f"⏸ {symbol} cooldown {BREAKOUT_COOLDOWN_MIN}min"
                )

                if best_sym and best_data and best_data["score"] > 20:
                    spacing = best_data.get("atr_spacing") if ATR_SPACING_ENABLED else None
                    session = self.grid_manager.start_grid(best_sym, spacing_pct=spacing)
                    if session:
                        await self._notify_grid_started(best_sym, best_data, session)
                        self.tmm.on_grid_started(best_sym, best_data)

    def _calc_exposure_pct(self):
        """Текущая экспозиция (% от депо) по открытым notional."""
        try:
            positions = self.exchange.get_positions()
            total_notional = 0.0
            for p in positions:
                amt = abs(float(p.get("positionAmt", 0)))
                entry = float(p.get("entryPrice", 0))
                total_notional += amt * entry
            return (total_notional / DEPOSIT_USD) * 100 if DEPOSIT_USD > 0 else 0
        except Exception:
            return 0.0

    def _get_locked_symbols(self):
        """Монеты в lockout (COIN_LOCKOUT_FAILS убытков за COIN_LOCKOUT_HOURS часов)."""
        now = time.time()
        window = COIN_LOCKOUT_HOURS * 3600
        locked = set()
        for sym, fail_times in list(self._coin_fails.items()):
            recent = [t for t in fail_times if now - t < window]
            self._coin_fails[sym] = recent
            if len(recent) >= COIN_LOCKOUT_FAILS:
                locked.add(sym)
        return locked

    def _register_coin_fail(self, symbol):
        """Записать убыточную сессию для lockout tracking."""
        if symbol not in self._coin_fails:
            self._coin_fails[symbol] = []
        self._coin_fails[symbol].append(time.time())
        fails_count = len(self._coin_fails[symbol])
        logger.info(f"Coin fail registered: {symbol} ({fails_count}/{COIN_LOCKOUT_FAILS})")

    async def _handle_grid_close_event(self, event_type, event):
        """v3: единый обработчик закрытия (profit_target / soft_sl / max_loss / daily_loss)."""
        summary = event.get("summary") or {}
        symbol = event["symbol"]
        net_pnl = summary.get("net_pnl", 0)

        self.risk.on_session_closed(summary)

        # Daily loss tracking
        if net_pnl < 0:
            self._daily_loss_usd += abs(net_pnl)

        # Coin lockout tracking (только для soft_sl / max_loss)
        if event_type in ("soft_sl", "max_loss") and net_pnl < 0:
            self._register_coin_fail(symbol)

        emoji_map = {
            "profit_target": "🟢",
            "soft_sl": "🟠",
            "max_loss": "🔴",
            "daily_loss_limit": "⛔",
        }
        label_map = {
            "profit_target": "PROFIT TARGET",
            "soft_sl": "SOFT SL",
            "max_loss": "MAX LOSS",
            "daily_loss_limit": "DAILY LIMIT",
        }
        emoji = emoji_map.get(event_type, "⚪")
        label = label_map.get(event_type, event_type.upper())

        await self.telegram.send_message(
            f"{emoji} *{label}*: {symbol}\n"
            f"RTs: {summary.get('round_trips', 0)} | PnL: ${net_pnl:+.4f}\n"
            f"Inv peak={summary.get('peak_inventory', 0)} | "
            f"partial={summary.get('partial_closes', 0)} unstuck={summary.get('unstuck_closes', 0)}\n"
            f"Duration: {summary.get('duration_min', 0):.0f}min | "
            f"Daily loss: ${self._daily_loss_usd:.2f}/${DAILY_LOSS_LIMIT_USD:.0f}"
        )

    async def _process_events(self, events):
        rt_count = 0
        rt_pnl = 0

        for event in events:
            if event["type"] == "round_trip":
                rt_count += 1
                rt_pnl += event["pnl"]
                self.tmm.on_round_trip(event["symbol"], event)

            elif event["type"] in ("soft_sl", "max_loss", "daily_loss_limit"):
                await self._handle_grid_close_event(event["type"], event)

            elif event["type"] == "partial_close":
                logger.info(
                    f"Partial close {event['symbol']}: {event['levels_closed']} lvls, "
                    f"pnl=${event['pnl']:.4f}, imb={event['remaining_imbalance']}"
                )

            elif event["type"] == "unstuck":
                logger.info(
                    f"Unstuck {event['symbol']}: {event['levels_closed']} lvls near EMA"
                )

            elif event["type"] == "recenter":
                logger.info(
                    f"Recenter {event['symbol']}: "
                    f"{event['old_center']:.6f} → {event['new_center']:.6f}"
                )

        if rt_count > 0:
            logger.info(f"Round-trips: {rt_count} | PnL: ${rt_pnl:+.6f}")

    async def _notify_grid_started(self, symbol, score_data, session):
        score = score_data.get('score', 0)
        bb = score_data.get('bb_width', 0)
        adx = score_data.get('adx', 0)
        chop = score_data.get('chop', 0)
        mv = score_data.get('micro_vol', 0)
        natr = score_data.get('natr', 0)
        price = score_data.get('price', 0)
        sp = session.spacing_pct

        await self.telegram.send_message(
            f"🟢 *GRID ON*: {symbol}\n"
            f"Score: {score:.0f} | CHOP={chop:.0f} ADX={adx:.0f}\n"
            f"BB={bb:.2f}% | MV={mv:.0f} | NATR={natr:.2f}%\n"
            f"Price: ${price:.4f} | {GRID_LEVELS}+{GRID_LEVELS} lvls | "
            f"sp={sp:.2f}% | ${POSITION_SIZE_USD}×{LEVERAGE}x"
        )

    async def _notify_grid_closed(self, symbol, summary):
        emoji = "🟢" if summary["net_pnl"] >= 0 else "🔴"
        await self.telegram.send_message(
            f"{emoji} *GRID OFF*: {symbol}\n"
            f"Reason: {summary['close_reason']}\n"
            f"RTs: {summary['round_trips']} | PnL: ${summary['net_pnl']:+.4f}\n"
            f"Inv: peak={summary['peak_inventory']} | "
            f"partial={summary['partial_closes']} unstuck={summary['unstuck_closes']}\n"
            f"Duration: {summary['duration_min']:.0f}min"
        )

    def _check_daily_reset(self):
        now = datetime.now(VANCOUVER_TZ)
        today = now.date()
        if self._last_daily_reset_day != today and now.hour == DAILY_RESET_HOUR:
            self._last_daily_reset_day = today
            self.risk.reset_daily()
            self._daily_loss_usd = 0.0
            self._coin_fails = {}            # v3: сброс lockout
            self._breakout_cooldown = {}
            logger.info("Daily counters reset (loss, lockout, cooldown)")

    async def _check_daily_report(self):
        now = datetime.now(VANCOUVER_TZ)
        if now.hour == 21 and self._last_daily_report_hour != now.date():
            self._last_daily_report_hour = now.date()
            msg = self.risk.get_daily_summary()
            await self.telegram.send_message(msg)

    async def _shutdown(self):
        logger.info("Shutting down Grid Bot v2...")
        for symbol in list(self.grid_manager.sessions.keys()):
            summary = self.grid_manager.stop_grid(symbol, reason="shutdown")
            if summary:
                self.risk.on_session_closed(summary)
        await self.telegram.send_message("🛑 *Grid Bot v2 stopped*")
        await self.telegram.stop()
        logger.info("Grid Bot v2 stopped")


def main():
    bot = GridBot()
    try:
        asyncio.run(bot.start())
    except KeyboardInterrupt:
        pass


if __name__ == "__main__":
    main()

📜 Git History

c6f6bd5chore: initial commit — version control setup5 weeks ago
Show last diff
Loading...