← Назад
""" Grid Bot — Grid Manager v2 ============================= v2 changes: - ATR-adaptive spacing (from screener) - Inventory tracking + cap + partial close - Passivbot-style unstucking at EMA - EMA trailing center (replaces hard recenter) - 5x leverage, 5 levels per side State persisted to JSON for PM2 restart recovery. """ import json import time import logging from datetime import datetime from pathlib import Path from zoneinfo import ZoneInfo from src.config import ( GRID_LEVELS, GRID_SPACING_PCT, POSITION_SIZE_USD, LEVERAGE, MAX_LOSS_PCT, MAKER_FEE, TAKER_FEE, GRID_STATE_FILE, SESSION_LOG_FILE, DATA_DIR, ATR_SPACING_ENABLED, INVENTORY_WARN_LEVELS, INVENTORY_MAX_LEVELS, UNSTUCK_ENABLED, UNSTUCK_THRESHOLD, UNSTUCK_CLOSE_PCT, EMA_CENTER_PERIOD, CENTER_UPDATE_SEC, CENTER_DRIFT_THRESHOLD, PROFIT_TARGET_USD, SOFT_SL_USD, ) logger = logging.getLogger("grid") VANCOUVER_TZ = ZoneInfo("America/Vancouver") class GridOrder: """Single grid order.""" __slots__ = ['side', 'price', 'qty', 'order_id', 'status', 'fill_price', 'fill_time', 'pair_order_id'] def __init__(self, side, price, qty, order_id=None): self.side = side self.price = price self.qty = qty self.order_id = order_id self.status = "pending" self.fill_price = None self.fill_time = None self.pair_order_id = None def to_dict(self): return { "side": self.side, "price": self.price, "qty": self.qty, "order_id": self.order_id, "status": self.status, "fill_price": self.fill_price, "fill_time": self.fill_time, "pair_order_id": self.pair_order_id, } @classmethod def from_dict(cls, d): o = cls(d["side"], d["price"], d["qty"], d.get("order_id")) o.status = d.get("status", "pending") o.fill_price = d.get("fill_price") o.fill_time = d.get("fill_time") o.pair_order_id = d.get("pair_order_id") return o class GridSession: """One grid session on one symbol.""" def __init__(self, symbol, center_price, exchange, symbol_info, spacing_pct=None): self.symbol = symbol self.center_price = center_price self.exchange = exchange self.symbol_info = symbol_info self.start_time = time.time() self.orders = {} self.round_trips = 0 self.session_pnl = 0.0 self.session_fees = 0.0 self.total_buys_filled = 0 self.total_sells_filled = 0 self.active = True self.close_reason = None # v2: ATR-adaptive spacing self.spacing_pct = spacing_pct or GRID_SPACING_PCT # v2: Inventory tracking self.inventory_imbalance = 0 # +N = long N levels, -N = short N levels self.peak_inventory = 0 self.partial_closes = 0 self.unstuck_closes = 0 # v2: EMA trailing center self._last_center_update = time.time() self._price_history = [] # for EMA calculation @property def spacing(self): """Absolute spacing in price terms.""" return self.center_price * self.spacing_pct / 100 # ============================================================ # GRID SETUP # ============================================================ def setup_grid(self): """Place initial grid: buy levels below, sell levels above center.""" spacing = self.spacing qty = self._calc_qty(self.center_price) if qty <= 0: logger.error(f"Qty too small for {self.symbol} at {self.center_price}") return False placed = 0 # Buy levels below center for i in range(1, GRID_LEVELS + 1): price = self.center_price - i * spacing order = self._place_grid_order("BUY", price, qty) if order: placed += 1 # Sell levels above center for i in range(1, GRID_LEVELS + 1): price = self.center_price + i * spacing order = self._place_grid_order("SELL", price, qty) if order: placed += 1 logger.info( f"Grid setup: {self.symbol} center={self.center_price:.6f} " f"spacing={self.spacing_pct:.3f}% ({spacing:.6f}) qty={qty} " f"placed={placed}/{GRID_LEVELS*2}" ) return placed > 0 def _calc_qty(self, price): notional = POSITION_SIZE_USD * LEVERAGE qty = notional / price qty = self.exchange.round_qty(self.symbol_info, qty) min_qty = self.symbol_info.get("min_qty", 0) if qty < min_qty: logger.warning(f"Qty {qty} < min {min_qty} for {self.symbol}") return 0 return qty def _place_grid_order(self, side, price, qty): result = self.exchange.place_limit_order( self.symbol, side, qty, price, self.symbol_info ) if result and result.get("orderId"): order_id = result["orderId"] grid_order = GridOrder(side, price, qty, order_id) grid_order.status = "open" self.orders[order_id] = grid_order return grid_order return None # ============================================================ # FILL DETECTION # ============================================================ def check_fills(self): if not self.active: return [] events = [] open_orders = self.exchange.get_open_orders(self.symbol) open_ids = {int(o["orderId"]) for o in open_orders} for order_id, grid_order in list(self.orders.items()): if grid_order.status != "open": continue if order_id not in open_ids: status = self.exchange.get_order_status(self.symbol, order_id) if not status: continue if status["status"] == "FILLED": fill_price = float(status.get("avgPrice", grid_order.price)) grid_order.status = "filled" grid_order.fill_price = fill_price grid_order.fill_time = time.time() fee = grid_order.qty * fill_price * MAKER_FEE self.session_fees += fee if grid_order.side == "BUY": self.total_buys_filled += 1 self.inventory_imbalance += 1 else: self.total_sells_filled += 1 self.inventory_imbalance -= 1 # Track peak inventory if abs(self.inventory_imbalance) > self.peak_inventory: self.peak_inventory = abs(self.inventory_imbalance) # v2: Check inventory cap BEFORE placing exit if self._should_block_side(grid_order.side): logger.info( f"INV CAP: {self.symbol} imbalance={self.inventory_imbalance}, " f"blocking new {grid_order.side} exit" ) else: exit_event = self._place_exit_order(grid_order) if exit_event: events.append(exit_event) events.append({ "type": "fill", "symbol": self.symbol, "side": grid_order.side, "price": fill_price, "qty": grid_order.qty, "inventory": self.inventory_imbalance, }) elif status["status"] in ("CANCELED", "EXPIRED"): grid_order.status = "cancelled" return events def _should_block_side(self, filled_side): """Check if inventory cap reached — cancel orders that increase imbalance.""" imb = abs(self.inventory_imbalance) if imb < INVENTORY_MAX_LEVELS: return False # At cap: cancel orders on the side that INCREASES imbalance # If we're LONG (imb > 0), cancel remaining BUYs (they make us more long) # If we're SHORT (imb < 0), cancel remaining SELLs (they make us more short) cancel_side = "BUY" if self.inventory_imbalance > 0 else "SELL" cancelled = 0 for oid, order in list(self.orders.items()): if order.status == "open" and order.side == cancel_side: try: self.exchange.cancel_order(self.symbol, oid) except Exception: pass order.status = "cancelled" cancelled += 1 if cancelled > 0: logger.warning( f"INV HARD CAP: {self.symbol} imb={self.inventory_imbalance}, " f"cancelled {cancelled} {cancel_side} orders" ) return False # still allow exit orders (they reduce inventory) def _place_exit_order(self, filled_order): spacing = self.spacing if filled_order.side == "BUY": exit_price = filled_order.fill_price + spacing exit_side = "SELL" else: exit_price = filled_order.fill_price - spacing exit_side = "BUY" exit_order = self._place_grid_order(exit_side, exit_price, filled_order.qty) if exit_order: filled_order.pair_order_id = exit_order.order_id exit_order.pair_order_id = filled_order.order_id return { "type": "exit_placed", "side": exit_side, "price": exit_price, "for_fill": filled_order.fill_price, } return None # ============================================================ # ROUND-TRIP DETECTION # ============================================================ def check_round_trips(self): completed = [] for order_id, order in list(self.orders.items()): if order.status != "filled" or not order.pair_order_id: continue pair = self.orders.get(order.pair_order_id) if not pair or pair.status != "filled": continue if order.side == "BUY" and pair.side == "SELL": buy_price = order.fill_price sell_price = pair.fill_price elif order.side == "SELL" and pair.side == "BUY": buy_price = pair.fill_price sell_price = order.fill_price else: continue qty = order.qty pnl = qty * (sell_price - buy_price) fee = qty * (buy_price + sell_price) * MAKER_FEE net_pnl = pnl - fee self.round_trips += 1 self.session_pnl += net_pnl completed.append({ "rt_num": self.round_trips, "buy_price": buy_price, "sell_price": sell_price, "qty": qty, "pnl": round(net_pnl, 6), "gross_pnl": round(pnl, 6), "fee": round(fee, 6), }) order.status = "closed" pair.status = "closed" return completed # ============================================================ # INVENTORY MANAGEMENT (v2) # ============================================================ def check_inventory(self, current_price): """ Check inventory and take action: 1. At WARN level → partial close excess 2. At UNSTUCK threshold → aggressive close (no EMA gate) Returns list of events. """ events = [] imb = abs(self.inventory_imbalance) if imb < INVENTORY_WARN_LEVELS: return events need_regrid = False # Partial close at warn level — more aggressive formula if imb >= INVENTORY_WARN_LEVELS: excess = imb - INVENTORY_WARN_LEVELS + 1 levels_to_close = max(1, (excess + 1) // 2) # ceil division: 1→1, 2→1, 3→2, 4→2, 5→3 pnl, fee = self._close_inventory_partial(current_price, levels_to_close) self.partial_closes += levels_to_close need_regrid = True events.append({ "type": "partial_close", "symbol": self.symbol, "levels_closed": levels_to_close, "pnl": round(pnl, 6), "fee": round(fee, 6), "remaining_imbalance": self.inventory_imbalance, }) # Passivbot unstucking at high imbalance — NO EMA proximity gate if UNSTUCK_ENABLED and imb >= UNSTUCK_THRESHOLD: levels_to_unstuck = max(1, int(imb * UNSTUCK_CLOSE_PCT)) pnl, fee = self._close_inventory_partial(current_price, levels_to_unstuck) self.unstuck_closes += levels_to_unstuck need_regrid = True events.append({ "type": "unstuck", "symbol": self.symbol, "levels_closed": levels_to_unstuck, "pnl": round(pnl, 6), "remaining_imbalance": self.inventory_imbalance, }) # Re-setup grid after partial close (orders were cancelled by close_position_market) if need_regrid: self.center_price = current_price for order in self.orders.values(): if order.status == "open": order.status = "cancelled" self.setup_grid() logger.info(f"Re-grid after inventory close: {self.symbol} center={current_price:.6f}") return events def _close_inventory_partial(self, current_price, levels): """Close N levels of inventory at market.""" if self.inventory_imbalance == 0 or levels <= 0: return 0, 0 qty_per_level = self._calc_qty(current_price) if qty_per_level <= 0: return 0, 0 total_qty = qty_per_level * levels if self.inventory_imbalance > 0: # Long → sell to reduce close_side = "SELL" else: # Short → buy to reduce close_side = "BUY" close_price = self.exchange.close_position_market( self.symbol, close_side, total_qty, self.symbol_info ) if close_price and close_price > 0: # Estimate PnL (simplified — actual tracked in binance) if self.inventory_imbalance > 0: pnl = total_qty * (close_price - self.center_price) else: pnl = total_qty * (self.center_price - close_price) fee = total_qty * close_price * TAKER_FEE self.session_pnl += pnl - fee self.session_fees += fee # Update imbalance if self.inventory_imbalance > 0: self.inventory_imbalance = max(0, self.inventory_imbalance - levels) else: self.inventory_imbalance = min(0, self.inventory_imbalance + levels) logger.info( f"INV CLOSE: {self.symbol} {close_side} {levels} lvls, " f"pnl=${pnl-fee:.4f}, imb={self.inventory_imbalance}" ) return pnl - fee, fee return 0, 0 # ============================================================ # EMA TRAILING CENTER (v2) # ============================================================ def update_price_history(self, price): """Add price point for EMA calculation.""" self._price_history.append(price) # Keep last 100 prices if len(self._price_history) > 100: self._price_history = self._price_history[-100:] def _get_ema_center(self): """Calculate EMA from price history.""" if len(self._price_history) < EMA_CENTER_PERIOD: return None prices = self._price_history[-EMA_CENTER_PERIOD * 2:] mult = 2 / (EMA_CENTER_PERIOD + 1) ema = prices[0] for p in prices[1:]: ema = p * mult + ema * (1 - mult) return ema def check_trailing_center(self, current_price): """Check if center should be updated via EMA trailing.""" now = time.time() if now - self._last_center_update < CENTER_UPDATE_SEC: return None self._last_center_update = now ema = self._get_ema_center() if not ema: return None # Check if price drifted enough from current center spacing = self.spacing drift_levels = abs(current_price - self.center_price) / spacing if drift_levels >= CENTER_DRIFT_THRESHOLD: old_center = self.center_price new_center = ema logger.info( f"EMA RECENTER: {self.symbol} {old_center:.6f} → {new_center:.6f} " f"(drift={drift_levels:.1f} levels) — keep position, replace orders" ) # Cancel open orders only — keep position alive, no market close self.exchange.cancel_all_orders(self.symbol) self.center_price = new_center for order in self.orders.values(): if order.status == "open": order.status = "cancelled" # Keep inventory — don't reset, position stays self.setup_grid() return {"old": old_center, "new": new_center, "drift": drift_levels} return None # ============================================================ # UNREALIZED PNL & MAX LOSS # ============================================================ def get_unrealized_pnl(self, current_price): unrealized = 0 for order in self.orders.values(): if order.status != "filled": continue if order.pair_order_id and self.orders.get(order.pair_order_id, None): pair = self.orders[order.pair_order_id] if pair.status in ("filled", "closed"): continue if order.side == "BUY": unrealized += order.qty * (current_price - order.fill_price) elif order.side == "SELL": unrealized += order.qty * (order.fill_price - current_price) return unrealized def check_max_loss(self, current_price): """Legacy: абсолютный лимит лосса (совместимость).""" unrealized = self.get_unrealized_pnl(current_price) net_pnl = self.session_pnl + unrealized - self.session_fees if net_pnl < -MAX_LOSS_PCT: logger.warning( f"MAX LOSS: {self.symbol} net=${net_pnl:.4f} < -${MAX_LOSS_PCT:.2f}" ) return True return False def check_exit_conditions(self, current_price): """ v3.1: Profit target убран — бот работает пока монета в боковике. Выход только при breakout (см. screener.is_breakout) или soft SL. - Soft SL (-$SOFT_SL_USD) → "soft_sl" → закрытие + coin lockout tick """ unrealized = self.get_unrealized_pnl(current_price) net_pnl = self.session_pnl + unrealized - self.session_fees if net_pnl <= -SOFT_SL_USD: logger.warning( f"SOFT SL: {self.symbol} net=${net_pnl:.4f} ≤ -${SOFT_SL_USD:.2f} → close & lockout" ) return "soft_sl" return None # ============================================================ # CLOSE GRID # ============================================================ def _close_net_position(self): positions = self.exchange.get_positions() for pos in positions: if pos["symbol"] != self.symbol: continue amt = float(pos["positionAmt"]) if amt == 0: continue entry = float(pos["entryPrice"]) close_side = "SELL" if amt > 0 else "BUY" close_price = self.exchange.close_position_market( self.symbol, close_side, abs(amt), self.symbol_info ) if close_price: if amt > 0: pnl = abs(amt) * (close_price - entry) else: pnl = abs(amt) * (entry - close_price) fee = abs(amt) * close_price * TAKER_FEE self.session_pnl += pnl - fee self.session_fees += fee logger.info(f"Closed net position {self.symbol}: amt={amt} pnl={pnl:.4f}") def close_grid(self, reason="manual"): logger.info(f"CLOSE GRID: {self.symbol} reason={reason}") self.active = False self.close_reason = reason self.exchange.cancel_all_orders(self.symbol) self._close_net_position() for order in self.orders.values(): if order.status == "open": order.status = "cancelled" return self.get_session_summary() def get_session_summary(self): duration_min = (time.time() - self.start_time) / 60 now_van = datetime.now(VANCOUVER_TZ).strftime("%H:%M") return { "symbol": self.symbol, "center_price": self.center_price, "spacing_pct": self.spacing_pct, "round_trips": self.round_trips, "session_pnl": round(self.session_pnl, 4), "session_fees": round(self.session_fees, 4), "net_pnl": round(self.session_pnl - self.session_fees, 4), "duration_min": round(duration_min, 1), "buys_filled": self.total_buys_filled, "sells_filled": self.total_sells_filled, "inventory_imbalance": self.inventory_imbalance, "peak_inventory": self.peak_inventory, "partial_closes": self.partial_closes, "unstuck_closes": self.unstuck_closes, "close_reason": self.close_reason, "time": now_van, } # ============================================================ # ACTIVE ORDERS COUNT # ============================================================ def get_open_order_count(self): return sum(1 for o in self.orders.values() if o.status == "open") def get_filled_unpaired_count(self): count = 0 for o in self.orders.values(): if o.status == "filled" and o.pair_order_id: pair = self.orders.get(o.pair_order_id) if pair and pair.status == "open": count += 1 return count # ============================================================ # STATE PERSISTENCE # ============================================================ def save_state(self): Path(DATA_DIR).mkdir(parents=True, exist_ok=True) state = { "symbol": self.symbol, "center_price": self.center_price, "spacing_pct": self.spacing_pct, "start_time": self.start_time, "round_trips": self.round_trips, "session_pnl": self.session_pnl, "session_fees": self.session_fees, "total_buys_filled": self.total_buys_filled, "total_sells_filled": self.total_sells_filled, "inventory_imbalance": self.inventory_imbalance, "peak_inventory": self.peak_inventory, "partial_closes": self.partial_closes, "unstuck_closes": self.unstuck_closes, "active": self.active, "close_reason": self.close_reason, "orders": {str(k): v.to_dict() for k, v in self.orders.items()}, "saved_at": datetime.now(VANCOUVER_TZ).isoformat(), } with open(GRID_STATE_FILE, 'w') as f: json.dump(state, f, indent=2) def log_session(self): Path(DATA_DIR).mkdir(parents=True, exist_ok=True) summary = self.get_session_summary() summary["start_time"] = datetime.fromtimestamp( self.start_time, VANCOUVER_TZ ).isoformat() log_path = Path(SESSION_LOG_FILE) sessions = [] if log_path.exists(): try: sessions = json.loads(log_path.read_text()) except Exception: sessions = [] sessions.append(summary) if len(sessions) > 500: sessions = sessions[-500:] with open(SESSION_LOG_FILE, 'w') as f: json.dump(sessions, f, indent=2) class GridManager: """Manages grid sessions with v2 inventory management.""" def __init__(self, exchange): self.exchange = exchange self.sessions = {} self.daily_pnl = 0.0 self.daily_stops = [] def start_grid(self, symbol, spacing_pct=None): """Start new grid session with ATR spacing.""" if symbol in self.sessions: logger.warning(f"Grid already running on {symbol}") return None symbol_info = self.exchange.get_symbol_info(symbol) if not symbol_info: logger.error(f"No symbol info for {symbol}") return None self.exchange.set_leverage(symbol) self.exchange.set_margin_type(symbol) price = self.exchange.get_mark_price(symbol) session = GridSession(symbol, price, self.exchange, symbol_info, spacing_pct) success = session.setup_grid() if success: self.sessions[symbol] = session session.save_state() logger.info( f"Grid started: {symbol} center={price:.6f} " f"spacing={session.spacing_pct:.3f}% levels={GRID_LEVELS}×2 " f"leverage={LEVERAGE}x" ) return session else: logger.error(f"Failed to setup grid for {symbol}") return None def stop_grid(self, symbol, reason="manual"): session = self.sessions.get(symbol) if not session: return None summary = session.close_grid(reason) session.log_session() self.daily_pnl += summary["net_pnl"] del self.sessions[symbol] try: Path(GRID_STATE_FILE).unlink(missing_ok=True) except Exception: pass return summary def tick(self): """Main loop tick with inventory management.""" all_events = [] for symbol, session in list(self.sessions.items()): if not session.active: continue # 1. Check fills fill_events = session.check_fills() all_events.extend(fill_events) # 2. Check round-trips rts = session.check_round_trips() for rt in rts: all_events.append({"type": "round_trip", "symbol": symbol, **rt}) # 3. Get current price try: current_price = self.exchange.get_mark_price(symbol) except Exception: continue # 4. Update price history for EMA session.update_price_history(current_price) # 5. v2: Check inventory management inv_events = session.check_inventory(current_price) all_events.extend(inv_events) # 6. v3: Check profit target / soft SL / max loss exit_reason = session.check_exit_conditions(current_price) if exit_reason is None and session.check_max_loss(current_price): exit_reason = "max_loss" if exit_reason: summary = self.stop_grid(symbol, reason=exit_reason) if exit_reason in ("soft_sl", "max_loss"): self.daily_stops.append(time.time()) all_events.append({ "type": exit_reason, # "profit_target" | "soft_sl" | "max_loss" "symbol": symbol, "summary": summary, }) continue # 7. v2: EMA trailing center recenter = session.check_trailing_center(current_price) if recenter: session.save_state() all_events.append({ "type": "recenter", "symbol": symbol, "old_center": recenter["old"], "new_center": recenter["new"], }) # 8. Save state session.save_state() return all_events def is_circuit_breaker_active(self): from src.config import CIRCUIT_BREAKER_STOPS one_hour_ago = time.time() - 3600 recent_stops = [t for t in self.daily_stops if t > one_hour_ago] return len(recent_stops) >= CIRCUIT_BREAKER_STOPS def get_status(self): lines = [] for symbol, session in self.sessions.items(): s = session.get_session_summary() inv = s['inventory_imbalance'] inv_str = f"+{inv}L" if inv > 0 else f"{inv}S" if inv < 0 else "0" lines.append( f"📊 {symbol}: {s['round_trips']} RTs | " f"PnL: ${s['net_pnl']:.4f} | " f"Inv: {inv_str} | " f"{s['duration_min']:.0f}min | " f"sp={s['spacing_pct']:.2f}%" ) if not lines: lines.append("📭 No active grids") return "\n".join(lines)