← Назад"""
Grid Bot — Main Entry Point v2
=================================
Session grid with auto-rotation + inventory management.
Changes from v1:
- ATR-adaptive spacing from screener
- Inventory management (partial close, unstuck)
- EMA trailing center
- Choppiness Index in screener
- 5x leverage, 5 levels per side
"""
import asyncio
import logging
import os
import sys
import time
from datetime import datetime
from pathlib import Path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from zoneinfo import ZoneInfo
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("main")
from src.exchange import Exchange
from src.grid_manager import GridManager
from src.screener import Screener
from src.risk_manager import RiskManager
from src.tmm_client import TMMClient
from src.tg_bot import TelegramBot
from src.config import (
SCREENER_INTERVAL_SEC, MAX_CONCURRENT_GRIDS,
TMM_STRATEGY_TAG, DATA_DIR, GRID_STATE_FILE,
GRID_LEVELS, GRID_SPACING_PCT, LEVERAGE, POSITION_SIZE_USD,
ATR_SPACING_ENABLED, SCREENER_CHOP_MIN_ENTRY, SCREENER_ADX_MAX,
SCREENER_NATR_MAX, EMA_SLOPE_MAX_PCT, FUNDING_MAX_ABS,
COIN_LOCKOUT_FAILS, COIN_LOCKOUT_HOURS, BREAKOUT_COOLDOWN_MIN,
MAX_EXPOSURE_PCT, DEPOSIT_USD, DAILY_LOSS_LIMIT_USD,
)
VANCOUVER_TZ = ZoneInfo("America/Vancouver")
TICK_INTERVAL = 5
BREAKOUT_CHECK_INTERVAL = 60
DAILY_RESET_HOUR = 0
class GridBot:
def __init__(self):
self.exchange = Exchange()
self.grid_manager = GridManager(self.exchange)
self.screener = Screener(self.exchange)
self.risk = RiskManager(self.exchange)
self.tmm = TMMClient()
self.telegram = TelegramBot(self)
self.running = False
self._last_breakout_check = 0
self._last_daily_reset_day = None
self._last_daily_report_hour = None
self._breakout_cooldown = {} # symbol → timestamp of last breakout close
self._coin_fails = {} # v3: symbol → list of loss timestamps (for lockout)
self._daily_loss_usd = 0.0 # v3: running daily loss
def _load_previous_session(self):
state_path = Path(GRID_STATE_FILE)
if not state_path.exists():
return None
try:
import json
return json.loads(state_path.read_text())
except Exception as e:
logger.warning(f"Failed to load previous state: {e}")
return None
async def start(self):
logger.info("=" * 50)
logger.info(" GRID BOT v3 (NEUTRAL) starting...")
logger.info("=" * 50)
Path(DATA_DIR).mkdir(parents=True, exist_ok=True)
prev_session = self._load_previous_session()
cancelled = self.exchange.nuclear_cleanup()
if cancelled:
logger.info(f"Cleaned up {cancelled} symbols on startup")
self.risk.initialize()
await self.telegram.start()
self._resume_symbol = None
if prev_session and prev_session.get("active"):
symbol = prev_session.get("symbol", "?")
rts = prev_session.get("round_trips", 0)
pnl = prev_session.get("session_pnl", 0) - prev_session.get("session_fees", 0)
await self.telegram.send_message(
f"⚠️ *GRID OFF* (restart): {symbol}\n"
f"RTs: {rts} | PnL: ${pnl:+.4f}\n"
f"Positions closed, resuming same coin"
)
self._resume_symbol = symbol
try:
Path(GRID_STATE_FILE).unlink(missing_ok=True)
except Exception:
pass
logger.info("Running initial screener scan...")
self.screener.scan()
balance = self.exchange.get_balance()
best_sym, best_data = self.screener.get_best_coin()
resume_note = f"\n🔄 Resuming: {self._resume_symbol}" if self._resume_symbol else ""
best_info = ""
if best_sym and best_data:
best_info = (
f"🎯 Best: {best_sym} (score={best_data['score']:.0f}, "
f"CHOP={best_data.get('chop', 0):.0f}, "
f"sp={best_data.get('atr_spacing', GRID_SPACING_PCT):.2f}%)"
)
await self.telegram.send_message(
f"🤖 *Grid Bot v3.1 (NEUTRAL) Started*\n\n"
f"💰 Balance: ${balance:.2f} | Deposit: ${DEPOSIT_USD:.0f}\n"
f"{best_info}\n"
f"⚙️ Levels {GRID_LEVELS}+{GRID_LEVELS} | ${POSITION_SIZE_USD}/lvl × {LEVERAGE}x\n"
f"📉 Filters: NATR≤{SCREENER_NATR_MAX}% CHOP≥{SCREENER_CHOP_MIN_ENTRY} slope<{EMA_SLOPE_MAX_PCT}%\n"
f"🎯 Exit: breakout OR soft SL -$4.50 | Daily: ${DAILY_LOSS_LIMIT_USD:.0f}{resume_note}"
)
self.running = True
await self._main_loop()
async def _main_loop(self):
while self.running:
try:
loop_start = time.time()
self._check_daily_reset()
can_trade, reason = self.risk.can_trade()
if not can_trade:
if self.grid_manager.sessions:
for sym in list(self.grid_manager.sessions.keys()):
summary = self.grid_manager.stop_grid(sym, reason=reason)
if summary:
self.risk.on_session_closed(summary)
await self._notify_grid_closed(sym, summary)
await asyncio.sleep(30)
continue
if self.screener.should_scan():
self.screener.scan()
# v3: Daily loss check
if self._daily_loss_usd >= DAILY_LOSS_LIMIT_USD:
if self.grid_manager.sessions:
logger.warning(f"DAILY LOSS LIMIT hit (${self._daily_loss_usd:.2f} ≥ ${DAILY_LOSS_LIMIT_USD}), closing all")
for sym in list(self.grid_manager.sessions.keys()):
summary = self.grid_manager.stop_grid(sym, reason="daily_loss_limit")
if summary:
self.risk.on_session_closed(summary)
await self._notify_grid_closed(sym, summary)
await asyncio.sleep(60)
continue
# v3: Exposure check (не более 70% депо в notional)
current_exposure_pct = self._calc_exposure_pct()
exposure_ok = current_exposure_pct < MAX_EXPOSURE_PCT
# Start new grid(s) if slots available
active_count = len(self.grid_manager.sessions)
while active_count < MAX_CONCURRENT_GRIDS and exposure_ok:
target_sym = None
spacing = None
active_symbols = set(self.grid_manager.sessions.keys())
locked_symbols = self._get_locked_symbols()
if self._resume_symbol and self._resume_symbol not in active_symbols:
resume_sym = self._resume_symbol
self._resume_symbol = None
resume_data = self.screener.scores.get(resume_sym)
if (resume_data
and resume_data.get("chop", 0) >= SCREENER_CHOP_MIN_ENTRY
and resume_data.get("natr", 999) <= SCREENER_NATR_MAX
and abs(resume_data.get("ema_slope", 999)) <= EMA_SLOPE_MAX_PCT
and resume_sym not in locked_symbols):
target_sym = resume_sym
logger.info(f"Resuming grid on {target_sym} (CHOP={resume_data['chop']:.1f} NATR={resume_data['natr']:.2f}%)")
spacing = self.screener.get_atr_spacing_for_symbol(target_sym)
else:
rd = resume_data or {}
logger.info(f"Skip resume {resume_sym}: CHOP={rd.get('chop',0):.1f} NATR={rd.get('natr',0):.2f}% slope={rd.get('ema_slope',0):+.2f}% — doesn't pass v3 filters")
self._resume_symbol = None
else:
self._resume_symbol = None
# Find best coin not in active/cooldown/locked
cooldown_symbols = {
sym for sym, ts in self._breakout_cooldown.items()
if time.time() - ts < BREAKOUT_COOLDOWN_MIN * 60
}
excluded = active_symbols | cooldown_symbols | locked_symbols
best_sym, best_data = self.screener.get_best_coin(exclude=excluded)
if best_sym and best_data and best_data["score"] > 20:
target_sym = best_sym
spacing = best_data.get("atr_spacing") if ATR_SPACING_ENABLED else None
if target_sym:
session = self.grid_manager.start_grid(target_sym, spacing_pct=spacing)
if session:
score_data = self.screener.scores.get(target_sym, {})
await self._notify_grid_started(target_sym, score_data, session)
self.tmm.on_grid_started(target_sym, score_data)
active_count += 1
else:
break # start_grid failed, don't loop
else:
break # no more candidates
# Tick
events = self.grid_manager.tick()
await self._process_events(events)
# Breakout check
if time.time() - self._last_breakout_check > BREAKOUT_CHECK_INTERVAL:
self._last_breakout_check = time.time()
await self._check_breakouts()
checkpoint = self.risk.check_weekly_checkpoint()
if checkpoint:
msg = self.risk.get_weekly_summary(checkpoint)
await self.telegram.send_message(msg)
await self._check_daily_report()
self.tmm.retry_pending_tags()
elapsed = time.time() - loop_start
sleep_time = max(TICK_INTERVAL - elapsed, 0.5)
await asyncio.sleep(sleep_time)
except KeyboardInterrupt:
logger.info("Shutting down...")
await self._shutdown()
break
except Exception as e:
logger.error(f"Main loop error: {e}", exc_info=True)
await asyncio.sleep(10)
async def _check_breakouts(self):
BREAKOUT_COOLDOWN_SEC = BREAKOUT_COOLDOWN_MIN * 60
for symbol in list(self.grid_manager.sessions.keys()):
if self.screener.is_breakout(symbol):
summary = self.grid_manager.stop_grid(symbol, reason="breakout")
if summary:
self.risk.on_session_closed(summary)
self._breakout_cooldown[symbol] = time.time()
# v3: убыток при breakout тоже копит для daily loss + lockout
net = summary.get("net_pnl", 0)
if net < 0:
self._daily_loss_usd += abs(net)
self._register_coin_fail(symbol)
active_symbols = set(self.grid_manager.sessions.keys())
cooldown_symbols = {
sym for sym, ts in self._breakout_cooldown.items()
if time.time() - ts < BREAKOUT_COOLDOWN_SEC
}
locked = self._get_locked_symbols()
exclude = active_symbols | cooldown_symbols | locked
best_sym, best_data = self.screener.get_best_coin(exclude=exclude)
await self.telegram.send_message(
f"🔄 *SWITCH*: {symbol} → {best_sym or 'searching...'}\n"
f"Reason: breakout (BB+ADX/CHOP)\n"
f"Session: {summary['round_trips']} RTs | ${summary['net_pnl']:+.4f} | "
f"Inv: peak={summary['peak_inventory']}\n"
f"⏸ {symbol} cooldown {BREAKOUT_COOLDOWN_MIN}min"
)
if best_sym and best_data and best_data["score"] > 20:
spacing = best_data.get("atr_spacing") if ATR_SPACING_ENABLED else None
session = self.grid_manager.start_grid(best_sym, spacing_pct=spacing)
if session:
await self._notify_grid_started(best_sym, best_data, session)
self.tmm.on_grid_started(best_sym, best_data)
def _calc_exposure_pct(self):
"""Текущая экспозиция (% от депо) по открытым notional."""
try:
positions = self.exchange.get_positions()
total_notional = 0.0
for p in positions:
amt = abs(float(p.get("positionAmt", 0)))
entry = float(p.get("entryPrice", 0))
total_notional += amt * entry
return (total_notional / DEPOSIT_USD) * 100 if DEPOSIT_USD > 0 else 0
except Exception:
return 0.0
def _get_locked_symbols(self):
"""Монеты в lockout (COIN_LOCKOUT_FAILS убытков за COIN_LOCKOUT_HOURS часов)."""
now = time.time()
window = COIN_LOCKOUT_HOURS * 3600
locked = set()
for sym, fail_times in list(self._coin_fails.items()):
recent = [t for t in fail_times if now - t < window]
self._coin_fails[sym] = recent
if len(recent) >= COIN_LOCKOUT_FAILS:
locked.add(sym)
return locked
def _register_coin_fail(self, symbol):
"""Записать убыточную сессию для lockout tracking."""
if symbol not in self._coin_fails:
self._coin_fails[symbol] = []
self._coin_fails[symbol].append(time.time())
fails_count = len(self._coin_fails[symbol])
logger.info(f"Coin fail registered: {symbol} ({fails_count}/{COIN_LOCKOUT_FAILS})")
async def _handle_grid_close_event(self, event_type, event):
"""v3: единый обработчик закрытия (profit_target / soft_sl / max_loss / daily_loss)."""
summary = event.get("summary") or {}
symbol = event["symbol"]
net_pnl = summary.get("net_pnl", 0)
self.risk.on_session_closed(summary)
# Daily loss tracking
if net_pnl < 0:
self._daily_loss_usd += abs(net_pnl)
# Coin lockout tracking (только для soft_sl / max_loss)
if event_type in ("soft_sl", "max_loss") and net_pnl < 0:
self._register_coin_fail(symbol)
emoji_map = {
"profit_target": "🟢",
"soft_sl": "🟠",
"max_loss": "🔴",
"daily_loss_limit": "⛔",
}
label_map = {
"profit_target": "PROFIT TARGET",
"soft_sl": "SOFT SL",
"max_loss": "MAX LOSS",
"daily_loss_limit": "DAILY LIMIT",
}
emoji = emoji_map.get(event_type, "⚪")
label = label_map.get(event_type, event_type.upper())
await self.telegram.send_message(
f"{emoji} *{label}*: {symbol}\n"
f"RTs: {summary.get('round_trips', 0)} | PnL: ${net_pnl:+.4f}\n"
f"Inv peak={summary.get('peak_inventory', 0)} | "
f"partial={summary.get('partial_closes', 0)} unstuck={summary.get('unstuck_closes', 0)}\n"
f"Duration: {summary.get('duration_min', 0):.0f}min | "
f"Daily loss: ${self._daily_loss_usd:.2f}/${DAILY_LOSS_LIMIT_USD:.0f}"
)
async def _process_events(self, events):
rt_count = 0
rt_pnl = 0
for event in events:
if event["type"] == "round_trip":
rt_count += 1
rt_pnl += event["pnl"]
self.tmm.on_round_trip(event["symbol"], event)
elif event["type"] in ("soft_sl", "max_loss", "daily_loss_limit"):
await self._handle_grid_close_event(event["type"], event)
elif event["type"] == "partial_close":
logger.info(
f"Partial close {event['symbol']}: {event['levels_closed']} lvls, "
f"pnl=${event['pnl']:.4f}, imb={event['remaining_imbalance']}"
)
elif event["type"] == "unstuck":
logger.info(
f"Unstuck {event['symbol']}: {event['levels_closed']} lvls near EMA"
)
elif event["type"] == "recenter":
logger.info(
f"Recenter {event['symbol']}: "
f"{event['old_center']:.6f} → {event['new_center']:.6f}"
)
if rt_count > 0:
logger.info(f"Round-trips: {rt_count} | PnL: ${rt_pnl:+.6f}")
async def _notify_grid_started(self, symbol, score_data, session):
score = score_data.get('score', 0)
bb = score_data.get('bb_width', 0)
adx = score_data.get('adx', 0)
chop = score_data.get('chop', 0)
mv = score_data.get('micro_vol', 0)
natr = score_data.get('natr', 0)
price = score_data.get('price', 0)
sp = session.spacing_pct
await self.telegram.send_message(
f"🟢 *GRID ON*: {symbol}\n"
f"Score: {score:.0f} | CHOP={chop:.0f} ADX={adx:.0f}\n"
f"BB={bb:.2f}% | MV={mv:.0f} | NATR={natr:.2f}%\n"
f"Price: ${price:.4f} | {GRID_LEVELS}+{GRID_LEVELS} lvls | "
f"sp={sp:.2f}% | ${POSITION_SIZE_USD}×{LEVERAGE}x"
)
async def _notify_grid_closed(self, symbol, summary):
emoji = "🟢" if summary["net_pnl"] >= 0 else "🔴"
await self.telegram.send_message(
f"{emoji} *GRID OFF*: {symbol}\n"
f"Reason: {summary['close_reason']}\n"
f"RTs: {summary['round_trips']} | PnL: ${summary['net_pnl']:+.4f}\n"
f"Inv: peak={summary['peak_inventory']} | "
f"partial={summary['partial_closes']} unstuck={summary['unstuck_closes']}\n"
f"Duration: {summary['duration_min']:.0f}min"
)
def _check_daily_reset(self):
now = datetime.now(VANCOUVER_TZ)
today = now.date()
if self._last_daily_reset_day != today and now.hour == DAILY_RESET_HOUR:
self._last_daily_reset_day = today
self.risk.reset_daily()
self._daily_loss_usd = 0.0
self._coin_fails = {} # v3: сброс lockout
self._breakout_cooldown = {}
logger.info("Daily counters reset (loss, lockout, cooldown)")
async def _check_daily_report(self):
now = datetime.now(VANCOUVER_TZ)
if now.hour == 21 and self._last_daily_report_hour != now.date():
self._last_daily_report_hour = now.date()
msg = self.risk.get_daily_summary()
await self.telegram.send_message(msg)
async def _shutdown(self):
logger.info("Shutting down Grid Bot v2...")
for symbol in list(self.grid_manager.sessions.keys()):
summary = self.grid_manager.stop_grid(symbol, reason="shutdown")
if summary:
self.risk.on_session_closed(summary)
await self.telegram.send_message("🛑 *Grid Bot v2 stopped*")
await self.telegram.stop()
logger.info("Grid Bot v2 stopped")
def main():
bot = GridBot()
try:
asyncio.run(bot.start())
except KeyboardInterrupt:
pass
if __name__ == "__main__":
main()