← Назад"""
Grid Bot — Grid Manager v2
=============================
v2 changes:
- ATR-adaptive spacing (from screener)
- Inventory tracking + cap + partial close
- Passivbot-style unstucking at EMA
- EMA trailing center (replaces hard recenter)
- 5x leverage, 5 levels per side
State persisted to JSON for PM2 restart recovery.
"""
import json
import time
import logging
from datetime import datetime
from pathlib import Path
from zoneinfo import ZoneInfo
from src.config import (
GRID_LEVELS, GRID_SPACING_PCT, POSITION_SIZE_USD, LEVERAGE,
MAX_LOSS_PCT, MAKER_FEE, TAKER_FEE,
GRID_STATE_FILE, SESSION_LOG_FILE, DATA_DIR,
ATR_SPACING_ENABLED,
INVENTORY_WARN_LEVELS, INVENTORY_MAX_LEVELS,
UNSTUCK_ENABLED, UNSTUCK_THRESHOLD, UNSTUCK_CLOSE_PCT,
EMA_CENTER_PERIOD, CENTER_UPDATE_SEC, CENTER_DRIFT_THRESHOLD,
PROFIT_TARGET_USD, SOFT_SL_USD,
)
logger = logging.getLogger("grid")
VANCOUVER_TZ = ZoneInfo("America/Vancouver")
class GridOrder:
"""Single grid order."""
__slots__ = ['side', 'price', 'qty', 'order_id', 'status', 'fill_price', 'fill_time', 'pair_order_id']
def __init__(self, side, price, qty, order_id=None):
self.side = side
self.price = price
self.qty = qty
self.order_id = order_id
self.status = "pending"
self.fill_price = None
self.fill_time = None
self.pair_order_id = None
def to_dict(self):
return {
"side": self.side, "price": self.price, "qty": self.qty,
"order_id": self.order_id, "status": self.status,
"fill_price": self.fill_price, "fill_time": self.fill_time,
"pair_order_id": self.pair_order_id,
}
@classmethod
def from_dict(cls, d):
o = cls(d["side"], d["price"], d["qty"], d.get("order_id"))
o.status = d.get("status", "pending")
o.fill_price = d.get("fill_price")
o.fill_time = d.get("fill_time")
o.pair_order_id = d.get("pair_order_id")
return o
class GridSession:
"""One grid session on one symbol."""
def __init__(self, symbol, center_price, exchange, symbol_info, spacing_pct=None):
self.symbol = symbol
self.center_price = center_price
self.exchange = exchange
self.symbol_info = symbol_info
self.start_time = time.time()
self.orders = {}
self.round_trips = 0
self.session_pnl = 0.0
self.session_fees = 0.0
self.total_buys_filled = 0
self.total_sells_filled = 0
self.active = True
self.close_reason = None
# v2: ATR-adaptive spacing
self.spacing_pct = spacing_pct or GRID_SPACING_PCT
# v2: Inventory tracking
self.inventory_imbalance = 0 # +N = long N levels, -N = short N levels
self.peak_inventory = 0
self.partial_closes = 0
self.unstuck_closes = 0
# v2: EMA trailing center
self._last_center_update = time.time()
self._price_history = [] # for EMA calculation
@property
def spacing(self):
"""Absolute spacing in price terms."""
return self.center_price * self.spacing_pct / 100
# ============================================================
# GRID SETUP
# ============================================================
def setup_grid(self):
"""Place initial grid: buy levels below, sell levels above center."""
spacing = self.spacing
qty = self._calc_qty(self.center_price)
if qty <= 0:
logger.error(f"Qty too small for {self.symbol} at {self.center_price}")
return False
placed = 0
# Buy levels below center
for i in range(1, GRID_LEVELS + 1):
price = self.center_price - i * spacing
order = self._place_grid_order("BUY", price, qty)
if order:
placed += 1
# Sell levels above center
for i in range(1, GRID_LEVELS + 1):
price = self.center_price + i * spacing
order = self._place_grid_order("SELL", price, qty)
if order:
placed += 1
logger.info(
f"Grid setup: {self.symbol} center={self.center_price:.6f} "
f"spacing={self.spacing_pct:.3f}% ({spacing:.6f}) qty={qty} "
f"placed={placed}/{GRID_LEVELS*2}"
)
return placed > 0
def _calc_qty(self, price):
notional = POSITION_SIZE_USD * LEVERAGE
qty = notional / price
qty = self.exchange.round_qty(self.symbol_info, qty)
min_qty = self.symbol_info.get("min_qty", 0)
if qty < min_qty:
logger.warning(f"Qty {qty} < min {min_qty} for {self.symbol}")
return 0
return qty
def _place_grid_order(self, side, price, qty):
result = self.exchange.place_limit_order(
self.symbol, side, qty, price, self.symbol_info
)
if result and result.get("orderId"):
order_id = result["orderId"]
grid_order = GridOrder(side, price, qty, order_id)
grid_order.status = "open"
self.orders[order_id] = grid_order
return grid_order
return None
# ============================================================
# FILL DETECTION
# ============================================================
def check_fills(self):
if not self.active:
return []
events = []
open_orders = self.exchange.get_open_orders(self.symbol)
open_ids = {int(o["orderId"]) for o in open_orders}
for order_id, grid_order in list(self.orders.items()):
if grid_order.status != "open":
continue
if order_id not in open_ids:
status = self.exchange.get_order_status(self.symbol, order_id)
if not status:
continue
if status["status"] == "FILLED":
fill_price = float(status.get("avgPrice", grid_order.price))
grid_order.status = "filled"
grid_order.fill_price = fill_price
grid_order.fill_time = time.time()
fee = grid_order.qty * fill_price * MAKER_FEE
self.session_fees += fee
if grid_order.side == "BUY":
self.total_buys_filled += 1
self.inventory_imbalance += 1
else:
self.total_sells_filled += 1
self.inventory_imbalance -= 1
# Track peak inventory
if abs(self.inventory_imbalance) > self.peak_inventory:
self.peak_inventory = abs(self.inventory_imbalance)
# v2: Check inventory cap BEFORE placing exit
if self._should_block_side(grid_order.side):
logger.info(
f"INV CAP: {self.symbol} imbalance={self.inventory_imbalance}, "
f"blocking new {grid_order.side} exit"
)
else:
exit_event = self._place_exit_order(grid_order)
if exit_event:
events.append(exit_event)
events.append({
"type": "fill",
"symbol": self.symbol,
"side": grid_order.side,
"price": fill_price,
"qty": grid_order.qty,
"inventory": self.inventory_imbalance,
})
elif status["status"] in ("CANCELED", "EXPIRED"):
grid_order.status = "cancelled"
return events
def _should_block_side(self, filled_side):
"""Check if inventory cap reached — cancel orders that increase imbalance."""
imb = abs(self.inventory_imbalance)
if imb < INVENTORY_MAX_LEVELS:
return False
# At cap: cancel orders on the side that INCREASES imbalance
# If we're LONG (imb > 0), cancel remaining BUYs (they make us more long)
# If we're SHORT (imb < 0), cancel remaining SELLs (they make us more short)
cancel_side = "BUY" if self.inventory_imbalance > 0 else "SELL"
cancelled = 0
for oid, order in list(self.orders.items()):
if order.status == "open" and order.side == cancel_side:
try:
self.exchange.cancel_order(self.symbol, oid)
except Exception:
pass
order.status = "cancelled"
cancelled += 1
if cancelled > 0:
logger.warning(
f"INV HARD CAP: {self.symbol} imb={self.inventory_imbalance}, "
f"cancelled {cancelled} {cancel_side} orders"
)
return False # still allow exit orders (they reduce inventory)
def _place_exit_order(self, filled_order):
spacing = self.spacing
if filled_order.side == "BUY":
exit_price = filled_order.fill_price + spacing
exit_side = "SELL"
else:
exit_price = filled_order.fill_price - spacing
exit_side = "BUY"
exit_order = self._place_grid_order(exit_side, exit_price, filled_order.qty)
if exit_order:
filled_order.pair_order_id = exit_order.order_id
exit_order.pair_order_id = filled_order.order_id
return {
"type": "exit_placed",
"side": exit_side,
"price": exit_price,
"for_fill": filled_order.fill_price,
}
return None
# ============================================================
# ROUND-TRIP DETECTION
# ============================================================
def check_round_trips(self):
completed = []
for order_id, order in list(self.orders.items()):
if order.status != "filled" or not order.pair_order_id:
continue
pair = self.orders.get(order.pair_order_id)
if not pair or pair.status != "filled":
continue
if order.side == "BUY" and pair.side == "SELL":
buy_price = order.fill_price
sell_price = pair.fill_price
elif order.side == "SELL" and pair.side == "BUY":
buy_price = pair.fill_price
sell_price = order.fill_price
else:
continue
qty = order.qty
pnl = qty * (sell_price - buy_price)
fee = qty * (buy_price + sell_price) * MAKER_FEE
net_pnl = pnl - fee
self.round_trips += 1
self.session_pnl += net_pnl
completed.append({
"rt_num": self.round_trips,
"buy_price": buy_price,
"sell_price": sell_price,
"qty": qty,
"pnl": round(net_pnl, 6),
"gross_pnl": round(pnl, 6),
"fee": round(fee, 6),
})
order.status = "closed"
pair.status = "closed"
return completed
# ============================================================
# INVENTORY MANAGEMENT (v2)
# ============================================================
def check_inventory(self, current_price):
"""
Check inventory and take action:
1. At WARN level → partial close excess
2. At UNSTUCK threshold → aggressive close (no EMA gate)
Returns list of events.
"""
events = []
imb = abs(self.inventory_imbalance)
if imb < INVENTORY_WARN_LEVELS:
return events
need_regrid = False
# Partial close at warn level — more aggressive formula
if imb >= INVENTORY_WARN_LEVELS:
excess = imb - INVENTORY_WARN_LEVELS + 1
levels_to_close = max(1, (excess + 1) // 2) # ceil division: 1→1, 2→1, 3→2, 4→2, 5→3
pnl, fee = self._close_inventory_partial(current_price, levels_to_close)
self.partial_closes += levels_to_close
need_regrid = True
events.append({
"type": "partial_close",
"symbol": self.symbol,
"levels_closed": levels_to_close,
"pnl": round(pnl, 6),
"fee": round(fee, 6),
"remaining_imbalance": self.inventory_imbalance,
})
# Passivbot unstucking at high imbalance — NO EMA proximity gate
if UNSTUCK_ENABLED and imb >= UNSTUCK_THRESHOLD:
levels_to_unstuck = max(1, int(imb * UNSTUCK_CLOSE_PCT))
pnl, fee = self._close_inventory_partial(current_price, levels_to_unstuck)
self.unstuck_closes += levels_to_unstuck
need_regrid = True
events.append({
"type": "unstuck",
"symbol": self.symbol,
"levels_closed": levels_to_unstuck,
"pnl": round(pnl, 6),
"remaining_imbalance": self.inventory_imbalance,
})
# Re-setup grid after partial close (orders were cancelled by close_position_market)
if need_regrid:
self.center_price = current_price
for order in self.orders.values():
if order.status == "open":
order.status = "cancelled"
self.setup_grid()
logger.info(f"Re-grid after inventory close: {self.symbol} center={current_price:.6f}")
return events
def _close_inventory_partial(self, current_price, levels):
"""Close N levels of inventory at market."""
if self.inventory_imbalance == 0 or levels <= 0:
return 0, 0
qty_per_level = self._calc_qty(current_price)
if qty_per_level <= 0:
return 0, 0
total_qty = qty_per_level * levels
if self.inventory_imbalance > 0:
# Long → sell to reduce
close_side = "SELL"
else:
# Short → buy to reduce
close_side = "BUY"
close_price = self.exchange.close_position_market(
self.symbol, close_side, total_qty, self.symbol_info
)
if close_price and close_price > 0:
# Estimate PnL (simplified — actual tracked in binance)
if self.inventory_imbalance > 0:
pnl = total_qty * (close_price - self.center_price)
else:
pnl = total_qty * (self.center_price - close_price)
fee = total_qty * close_price * TAKER_FEE
self.session_pnl += pnl - fee
self.session_fees += fee
# Update imbalance
if self.inventory_imbalance > 0:
self.inventory_imbalance = max(0, self.inventory_imbalance - levels)
else:
self.inventory_imbalance = min(0, self.inventory_imbalance + levels)
logger.info(
f"INV CLOSE: {self.symbol} {close_side} {levels} lvls, "
f"pnl=${pnl-fee:.4f}, imb={self.inventory_imbalance}"
)
return pnl - fee, fee
return 0, 0
# ============================================================
# EMA TRAILING CENTER (v2)
# ============================================================
def update_price_history(self, price):
"""Add price point for EMA calculation."""
self._price_history.append(price)
# Keep last 100 prices
if len(self._price_history) > 100:
self._price_history = self._price_history[-100:]
def _get_ema_center(self):
"""Calculate EMA from price history."""
if len(self._price_history) < EMA_CENTER_PERIOD:
return None
prices = self._price_history[-EMA_CENTER_PERIOD * 2:]
mult = 2 / (EMA_CENTER_PERIOD + 1)
ema = prices[0]
for p in prices[1:]:
ema = p * mult + ema * (1 - mult)
return ema
def check_trailing_center(self, current_price):
"""Check if center should be updated via EMA trailing."""
now = time.time()
if now - self._last_center_update < CENTER_UPDATE_SEC:
return None
self._last_center_update = now
ema = self._get_ema_center()
if not ema:
return None
# Check if price drifted enough from current center
spacing = self.spacing
drift_levels = abs(current_price - self.center_price) / spacing
if drift_levels >= CENTER_DRIFT_THRESHOLD:
old_center = self.center_price
new_center = ema
logger.info(
f"EMA RECENTER: {self.symbol} {old_center:.6f} → {new_center:.6f} "
f"(drift={drift_levels:.1f} levels) — keep position, replace orders"
)
# Cancel open orders only — keep position alive, no market close
self.exchange.cancel_all_orders(self.symbol)
self.center_price = new_center
for order in self.orders.values():
if order.status == "open":
order.status = "cancelled"
# Keep inventory — don't reset, position stays
self.setup_grid()
return {"old": old_center, "new": new_center, "drift": drift_levels}
return None
# ============================================================
# UNREALIZED PNL & MAX LOSS
# ============================================================
def get_unrealized_pnl(self, current_price):
unrealized = 0
for order in self.orders.values():
if order.status != "filled":
continue
if order.pair_order_id and self.orders.get(order.pair_order_id, None):
pair = self.orders[order.pair_order_id]
if pair.status in ("filled", "closed"):
continue
if order.side == "BUY":
unrealized += order.qty * (current_price - order.fill_price)
elif order.side == "SELL":
unrealized += order.qty * (order.fill_price - current_price)
return unrealized
def check_max_loss(self, current_price):
"""Legacy: абсолютный лимит лосса (совместимость)."""
unrealized = self.get_unrealized_pnl(current_price)
net_pnl = self.session_pnl + unrealized - self.session_fees
if net_pnl < -MAX_LOSS_PCT:
logger.warning(
f"MAX LOSS: {self.symbol} net=${net_pnl:.4f} < -${MAX_LOSS_PCT:.2f}"
)
return True
return False
def check_exit_conditions(self, current_price):
"""
v3.1: Profit target убран — бот работает пока монета в боковике.
Выход только при breakout (см. screener.is_breakout) или soft SL.
- Soft SL (-$SOFT_SL_USD) → "soft_sl" → закрытие + coin lockout tick
"""
unrealized = self.get_unrealized_pnl(current_price)
net_pnl = self.session_pnl + unrealized - self.session_fees
if net_pnl <= -SOFT_SL_USD:
logger.warning(
f"SOFT SL: {self.symbol} net=${net_pnl:.4f} ≤ -${SOFT_SL_USD:.2f} → close & lockout"
)
return "soft_sl"
return None
# ============================================================
# CLOSE GRID
# ============================================================
def _close_net_position(self):
positions = self.exchange.get_positions()
for pos in positions:
if pos["symbol"] != self.symbol:
continue
amt = float(pos["positionAmt"])
if amt == 0:
continue
entry = float(pos["entryPrice"])
close_side = "SELL" if amt > 0 else "BUY"
close_price = self.exchange.close_position_market(
self.symbol, close_side, abs(amt), self.symbol_info
)
if close_price:
if amt > 0:
pnl = abs(amt) * (close_price - entry)
else:
pnl = abs(amt) * (entry - close_price)
fee = abs(amt) * close_price * TAKER_FEE
self.session_pnl += pnl - fee
self.session_fees += fee
logger.info(f"Closed net position {self.symbol}: amt={amt} pnl={pnl:.4f}")
def close_grid(self, reason="manual"):
logger.info(f"CLOSE GRID: {self.symbol} reason={reason}")
self.active = False
self.close_reason = reason
self.exchange.cancel_all_orders(self.symbol)
self._close_net_position()
for order in self.orders.values():
if order.status == "open":
order.status = "cancelled"
return self.get_session_summary()
def get_session_summary(self):
duration_min = (time.time() - self.start_time) / 60
now_van = datetime.now(VANCOUVER_TZ).strftime("%H:%M")
return {
"symbol": self.symbol,
"center_price": self.center_price,
"spacing_pct": self.spacing_pct,
"round_trips": self.round_trips,
"session_pnl": round(self.session_pnl, 4),
"session_fees": round(self.session_fees, 4),
"net_pnl": round(self.session_pnl - self.session_fees, 4),
"duration_min": round(duration_min, 1),
"buys_filled": self.total_buys_filled,
"sells_filled": self.total_sells_filled,
"inventory_imbalance": self.inventory_imbalance,
"peak_inventory": self.peak_inventory,
"partial_closes": self.partial_closes,
"unstuck_closes": self.unstuck_closes,
"close_reason": self.close_reason,
"time": now_van,
}
# ============================================================
# ACTIVE ORDERS COUNT
# ============================================================
def get_open_order_count(self):
return sum(1 for o in self.orders.values() if o.status == "open")
def get_filled_unpaired_count(self):
count = 0
for o in self.orders.values():
if o.status == "filled" and o.pair_order_id:
pair = self.orders.get(o.pair_order_id)
if pair and pair.status == "open":
count += 1
return count
# ============================================================
# STATE PERSISTENCE
# ============================================================
def save_state(self):
Path(DATA_DIR).mkdir(parents=True, exist_ok=True)
state = {
"symbol": self.symbol,
"center_price": self.center_price,
"spacing_pct": self.spacing_pct,
"start_time": self.start_time,
"round_trips": self.round_trips,
"session_pnl": self.session_pnl,
"session_fees": self.session_fees,
"total_buys_filled": self.total_buys_filled,
"total_sells_filled": self.total_sells_filled,
"inventory_imbalance": self.inventory_imbalance,
"peak_inventory": self.peak_inventory,
"partial_closes": self.partial_closes,
"unstuck_closes": self.unstuck_closes,
"active": self.active,
"close_reason": self.close_reason,
"orders": {str(k): v.to_dict() for k, v in self.orders.items()},
"saved_at": datetime.now(VANCOUVER_TZ).isoformat(),
}
with open(GRID_STATE_FILE, 'w') as f:
json.dump(state, f, indent=2)
def log_session(self):
Path(DATA_DIR).mkdir(parents=True, exist_ok=True)
summary = self.get_session_summary()
summary["start_time"] = datetime.fromtimestamp(
self.start_time, VANCOUVER_TZ
).isoformat()
log_path = Path(SESSION_LOG_FILE)
sessions = []
if log_path.exists():
try:
sessions = json.loads(log_path.read_text())
except Exception:
sessions = []
sessions.append(summary)
if len(sessions) > 500:
sessions = sessions[-500:]
with open(SESSION_LOG_FILE, 'w') as f:
json.dump(sessions, f, indent=2)
class GridManager:
"""Manages grid sessions with v2 inventory management."""
def __init__(self, exchange):
self.exchange = exchange
self.sessions = {}
self.daily_pnl = 0.0
self.daily_stops = []
def start_grid(self, symbol, spacing_pct=None):
"""Start new grid session with ATR spacing."""
if symbol in self.sessions:
logger.warning(f"Grid already running on {symbol}")
return None
symbol_info = self.exchange.get_symbol_info(symbol)
if not symbol_info:
logger.error(f"No symbol info for {symbol}")
return None
self.exchange.set_leverage(symbol)
self.exchange.set_margin_type(symbol)
price = self.exchange.get_mark_price(symbol)
session = GridSession(symbol, price, self.exchange, symbol_info, spacing_pct)
success = session.setup_grid()
if success:
self.sessions[symbol] = session
session.save_state()
logger.info(
f"Grid started: {symbol} center={price:.6f} "
f"spacing={session.spacing_pct:.3f}% levels={GRID_LEVELS}×2 "
f"leverage={LEVERAGE}x"
)
return session
else:
logger.error(f"Failed to setup grid for {symbol}")
return None
def stop_grid(self, symbol, reason="manual"):
session = self.sessions.get(symbol)
if not session:
return None
summary = session.close_grid(reason)
session.log_session()
self.daily_pnl += summary["net_pnl"]
del self.sessions[symbol]
try:
Path(GRID_STATE_FILE).unlink(missing_ok=True)
except Exception:
pass
return summary
def tick(self):
"""Main loop tick with inventory management."""
all_events = []
for symbol, session in list(self.sessions.items()):
if not session.active:
continue
# 1. Check fills
fill_events = session.check_fills()
all_events.extend(fill_events)
# 2. Check round-trips
rts = session.check_round_trips()
for rt in rts:
all_events.append({"type": "round_trip", "symbol": symbol, **rt})
# 3. Get current price
try:
current_price = self.exchange.get_mark_price(symbol)
except Exception:
continue
# 4. Update price history for EMA
session.update_price_history(current_price)
# 5. v2: Check inventory management
inv_events = session.check_inventory(current_price)
all_events.extend(inv_events)
# 6. v3: Check profit target / soft SL / max loss
exit_reason = session.check_exit_conditions(current_price)
if exit_reason is None and session.check_max_loss(current_price):
exit_reason = "max_loss"
if exit_reason:
summary = self.stop_grid(symbol, reason=exit_reason)
if exit_reason in ("soft_sl", "max_loss"):
self.daily_stops.append(time.time())
all_events.append({
"type": exit_reason, # "profit_target" | "soft_sl" | "max_loss"
"symbol": symbol,
"summary": summary,
})
continue
# 7. v2: EMA trailing center
recenter = session.check_trailing_center(current_price)
if recenter:
session.save_state()
all_events.append({
"type": "recenter",
"symbol": symbol,
"old_center": recenter["old"],
"new_center": recenter["new"],
})
# 8. Save state
session.save_state()
return all_events
def is_circuit_breaker_active(self):
from src.config import CIRCUIT_BREAKER_STOPS
one_hour_ago = time.time() - 3600
recent_stops = [t for t in self.daily_stops if t > one_hour_ago]
return len(recent_stops) >= CIRCUIT_BREAKER_STOPS
def get_status(self):
lines = []
for symbol, session in self.sessions.items():
s = session.get_session_summary()
inv = s['inventory_imbalance']
inv_str = f"+{inv}L" if inv > 0 else f"{inv}S" if inv < 0 else "0"
lines.append(
f"📊 {symbol}: {s['round_trips']} RTs | "
f"PnL: ${s['net_pnl']:.4f} | "
f"Inv: {inv_str} | "
f"{s['duration_min']:.0f}min | "
f"sp={s['spacing_pct']:.2f}%"
)
if not lines:
lines.append("📭 No active grids")
return "\n".join(lines)