โ ะะฐะทะฐะด"""
Squeeze-VWAP Bot โ Bybit Futures Client
==========================================
pybit v5 (unified trading API).
Bybit quirks vs Binance:
1. All qty/price must be strings in API calls
2. Side is "Buy"/"Sell" (capitalized, not uppercase)
3. Conditional orders ARE queryable (unlike Binance algo orders!)
4. UTA wallet: accountType=UNIFIED
5. Klines returned newest-first โ need reverse
6. Tickers: turnover24h (not quoteVolume), price24hPcnt (decimal not %)
7. Positions: size (unsigned) + side, not signed positionAmt
Interface matches original Binance exchange.py exactly โ
screener, manager, indicators work without changes.
"""
import math
import time
import logging
from pybit.unified_trading import HTTP
from pybit.exceptions import InvalidRequestError
from src.config import (
BYBIT_API_KEY, BYBIT_API_SECRET, BYBIT_TESTNET,
LEVERAGE, MAKER_FEE, TAKER_FEE,
)
logger = logging.getLogger("exchange")
# Timeframe conversion: Binance format โ Bybit format
_TF_MAP = {
"1m": "1", "3m": "3", "5m": "5", "15m": "15", "30m": "30",
"1h": "60", "2h": "120", "4h": "240", "6h": "360", "12h": "720",
"1d": "D", "1w": "W", "1M": "M",
}
class Exchange:
def __init__(self):
if not BYBIT_API_KEY or not BYBIT_API_SECRET:
raise ValueError("BYBIT_API_KEY and BYBIT_API_SECRET must be set")
self.session = HTTP(
testnet=BYBIT_TESTNET,
api_key=BYBIT_API_KEY,
api_secret=BYBIT_API_SECRET,
)
self._instruments_cache = {}
self._instruments_ts = 0
mode = "TESTNET" if BYBIT_TESTNET else "MAINNET"
logger.info(f"Bybit client initialized ({mode})")
def _check(self, resp, context=""):
"""Check Bybit API response. Raises on error."""
ret_code = resp.get("retCode", -1)
if ret_code != 0:
msg = resp.get("retMsg", "Unknown error")
raise Exception(f"{context}: [{ret_code}] {msg}")
return resp.get("result", {})
def _bybit_interval(self, binance_tf):
"""Convert Binance timeframe ('5m') โ Bybit ('5')."""
return _TF_MAP.get(binance_tf, binance_tf)
def _bybit_side(self, side):
"""Convert 'BUY'/'SELL' โ Bybit 'Buy'/'Sell'."""
return "Buy" if side.upper() == "BUY" else "Sell"
# ============================================================
# INSTRUMENT INFO (cached, bulk load)
# ============================================================
def _load_instruments(self):
"""Load all USDT linear instruments. Refresh every 1h."""
now = time.time()
if self._instruments_cache and now - self._instruments_ts < 3600:
return
try:
resp = self.session.get_instruments_info(category="linear")
result = self._check(resp, "instruments")
count = 0
for item in result.get("list", []):
sym = item.get("symbol", "")
if sym.endswith("USDT"):
self._instruments_cache[sym] = item
count += 1
self._instruments_ts = now
logger.info(f"Loaded {count} USDT linear instruments")
except Exception as e:
logger.error(f"Failed to load instruments: {e}")
# ============================================================
# MARKET DATA
# ============================================================
def get_all_tickers_24h(self):
"""
All futures tickers (1 API call).
Returns Binance-compatible format for screener.
"""
try:
resp = self.session.get_tickers(category="linear")
result = self._check(resp, "tickers")
tickers = []
for t in result.get("list", []):
# Bybit price24hPcnt is decimal (0.05 = 5%)
pct = float(t.get("price24hPcnt", "0")) * 100
tickers.append({
"symbol": t.get("symbol", ""),
"lastPrice": t.get("lastPrice", "0"),
"quoteVolume": t.get("turnover24h", "0"),
# Bybit doesn't expose trade count; use high value
# to pass screener filter (volume is the real filter)
"count": 999999,
"priceChangePercent": str(round(pct, 2)),
})
return tickers
except Exception as e:
logger.error(f"Failed to get tickers: {e}")
return []
def get_klines(self, symbol, interval, limit=500):
"""
Get OHLCV klines in Binance list format:
[timestamp, open, high, low, close, volume, ...]
Indicators use k[1]=open, k[2]=high, k[3]=low, k[4]=close, k[5]=volume.
"""
bybit_interval = self._bybit_interval(interval)
try:
resp = self.session.get_kline(
category="linear", symbol=symbol,
interval=bybit_interval, limit=limit,
)
result = self._check(resp, f"kline({symbol})")
klines = []
for item in result.get("list", []):
# Bybit item: [timestamp, open, high, low, close, volume, turnover]
klines.append([
int(item[0]), # [0] timestamp
item[1], # [1] open (str)
item[2], # [2] high
item[3], # [3] low
item[4], # [4] close
item[5], # [5] volume
int(item[0]) + 1, # [6] close_time (placeholder)
item[6] if len(item) > 6 else "0", # [7] turnover
0, "0", "0", "0", # [8-11] padding for Binance compat
])
# Bybit returns newest first โ reverse to chronological
klines.reverse()
return klines
except Exception as e:
logger.error(f"Klines error {symbol}: {e}")
return []
def get_mark_price(self, symbol):
"""Current mark price."""
try:
resp = self.session.get_tickers(category="linear", symbol=symbol)
result = self._check(resp, f"mark({symbol})")
items = result.get("list", [])
if items:
return float(items[0].get("markPrice", "0"))
except Exception as e:
logger.error(f"Mark price error {symbol}: {e}")
return 0.0
# ============================================================
# ACCOUNT
# ============================================================
def get_balance(self):
"""USDT available balance (Unified Trading Account)."""
try:
resp = self.session.get_wallet_balance(accountType="UNIFIED")
result = self._check(resp, "balance")
for acc in result.get("list", []):
for coin in acc.get("coin", []):
if coin.get("coin") == "USDT":
avail = coin.get("availableToWithdraw", "")
if avail and avail != "":
return float(avail)
return float(coin.get("walletBalance", "0"))
except Exception as e:
logger.error(f"Balance error: {e}")
return 0.0
def get_positions(self):
"""
All open positions in Binance-compatible format.
positionAmt: positive=LONG, negative=SHORT (signed).
"""
try:
resp = self.session.get_positions(
category="linear", settleCoin="USDT",
)
result = self._check(resp, "positions")
positions = []
for p in result.get("list", []):
size = float(p.get("size", "0"))
if size == 0:
continue
side = p.get("side", "")
# Binance convention: positionAmt positive=LONG, negative=SHORT
pos_amt = size if side == "Buy" else -size
positions.append({
"symbol": p["symbol"],
"positionAmt": str(pos_amt),
"markPrice": p.get("markPrice", "0"),
"entryPrice": p.get("avgPrice", "0"),
"unRealizedProfit": p.get("unrealisedPnl", "0"),
})
return positions
except Exception as e:
logger.error(f"Positions error: {e}")
return []
def set_leverage(self, symbol):
"""Set leverage (both buy and sell sides). Caps at symbol's maxLeverage."""
target_lev = LEVERAGE
# Check maxLeverage from instrument info
info = self.get_instrument_info_raw(symbol)
if info:
max_lev_str = info.get("leverageFilter", {}).get("maxLeverage", "")
if max_lev_str:
try:
max_lev = int(float(max_lev_str))
if target_lev > max_lev:
logger.info(f"{symbol}: leverage {target_lev}x > max {max_lev}x, capping")
target_lev = max_lev
except (ValueError, TypeError):
pass
try:
self.session.set_leverage(
category="linear", symbol=symbol,
buyLeverage=str(target_lev), sellLeverage=str(target_lev),
)
logger.info(f"Leverage set to {target_lev}x for {symbol}")
return target_lev
except InvalidRequestError as e:
if "110043" not in str(e): # Already set to this value
logger.warning(f"Leverage error {symbol}: {e}")
return target_lev
except Exception as e:
logger.warning(f"Leverage error {symbol}: {e}")
return target_lev
def get_instrument_info_raw(self, symbol):
"""Get raw instrument info from cache or API."""
self._load_instruments()
info = self._instruments_cache.get(symbol)
if not info:
try:
resp = self.session.get_instruments_info(category="linear", symbol=symbol)
result = self._check(resp, f"instrument({symbol})")
items = result.get("list", [])
if items:
info = items[0]
self._instruments_cache[symbol] = info
except Exception:
pass
return info
def set_margin_type(self, symbol, margin_type="CROSSED"):
"""Set margin mode. CROSSED=cross, ISOLATED=isolated."""
mode = 0 if margin_type == "CROSSED" else 1
try:
self.session.switch_margin_mode(
category="linear", symbol=symbol, tradeMode=mode,
buyLeverage=str(LEVERAGE), sellLeverage=str(LEVERAGE),
)
except InvalidRequestError as e:
if "110026" not in str(e): # Already set
logger.warning(f"Margin mode error {symbol}: {e}")
except Exception as e:
logger.warning(f"Margin mode error {symbol}: {e}")
# ============================================================
# SYMBOL INFO
# ============================================================
def get_symbol_info(self, symbol):
"""
Symbol precision/limits info.
Returns dict with: price_precision, qty_precision, min_qty, tick_size, qty_step.
"""
self._load_instruments()
info = self._instruments_cache.get(symbol)
if not info:
# Direct fetch fallback
try:
resp = self.session.get_instruments_info(
category="linear", symbol=symbol,
)
result = self._check(resp, f"instrument({symbol})")
items = result.get("list", [])
if items:
info = items[0]
self._instruments_cache[symbol] = info
except Exception:
pass
if not info:
return None
lot = info.get("lotSizeFilter", {})
price_filter = info.get("priceFilter", {})
qty_step = float(lot.get("qtyStep", "0.001"))
tick_size = float(price_filter.get("tickSize", "0.01"))
min_qty = float(lot.get("minOrderQty", "0.001"))
# Derive precision from step sizes
qty_prec = max(0, -int(math.floor(math.log10(qty_step)))) if qty_step < 1 else 0
price_prec = max(0, -int(math.floor(math.log10(tick_size)))) if tick_size < 1 else 0
return {
"price_precision": price_prec,
"qty_precision": qty_prec,
"min_qty": min_qty,
"tick_size": tick_size,
"qty_step": qty_step,
}
def round_price(self, symbol_info, price):
"""Round price down to valid tick size."""
tick = symbol_info.get("tick_size")
if tick and tick > 0:
price = math.floor(price / tick) * tick
prec = max(0, -int(math.floor(math.log10(tick)))) if tick < 1 else 0
return round(price, prec)
return round(price, symbol_info["price_precision"])
def round_qty(self, symbol_info, qty):
"""Round quantity down to valid step size."""
step = symbol_info.get("qty_step")
if step and step > 0:
qty = math.floor(qty / step) * step
prec = max(0, -int(math.floor(math.log10(step)))) if step < 1 else 0
return round(qty, prec)
return round(qty, symbol_info["qty_precision"])
# ============================================================
# ORDERS
# ============================================================
def open_market(self, symbol, side, qty):
"""
Market order. Returns (order_dict, fill_price).
Same interface as Binance version.
"""
try:
resp = self.session.place_order(
category="linear",
symbol=symbol,
side=self._bybit_side(side),
orderType="Market",
qty=str(qty),
positionIdx=0, # One-way mode
)
result = self._check(resp, f"market({symbol})")
order_id = result.get("orderId", "")
# Get fill price from order history
time.sleep(0.3)
fill_price = 0.0
try:
hist = self.session.get_order_history(
category="linear", symbol=symbol, orderId=order_id,
)
hist_result = self._check(hist, f"order_hist({symbol})")
items = hist_result.get("list", [])
if items:
fill_price = float(items[0].get("avgPrice", "0"))
except Exception:
pass
# Fallback: get from position entry price
if fill_price == 0:
try:
pos_resp = self.session.get_positions(
category="linear", symbol=symbol,
)
pos_result = self._check(pos_resp, f"pos_fill({symbol})")
for p in pos_result.get("list", []):
if float(p.get("size", "0")) != 0:
fill_price = float(p.get("avgPrice", "0"))
break
except Exception:
pass
# Fallback 2: mark price
if fill_price == 0:
fill_price = self.get_mark_price(symbol)
logger.warning(f"Using mark price as fill fallback: {fill_price}")
logger.info(f"MARKET {side} {symbol} qty={qty} fill={fill_price}")
return {"orderId": order_id}, fill_price
except Exception as e:
logger.error(f"Market order failed {symbol}: {e}")
raise
def place_sl(self, symbol, side, qty, stop_price, symbol_info):
"""
Stop-loss = conditional Market order with trigger.
Bybit advantage: conditional orders ARE queryable!
No invisible algo orders like Binance.
Args:
side: closing side ("SELL" for long SL, "BUY" for short SL)
"""
stop_price = self.round_price(symbol_info, stop_price)
# triggerDirection: 1=rise above, 2=fall below
# "SELL" = closing a LONG โ price falls below SL โ triggerDir=2
# "BUY" = closing a SHORT โ price rises above SL โ triggerDir=1
if side.upper() == "SELL":
trigger_dir = 2 # Long SL: price falls below
else:
trigger_dir = 1 # Short SL: price rises above
try:
resp = self.session.place_order(
category="linear",
symbol=symbol,
side=self._bybit_side(side),
orderType="Market",
qty=str(qty),
triggerPrice=str(stop_price),
triggerDirection=trigger_dir,
triggerBy="MarkPrice",
reduceOnly=True,
positionIdx=0,
)
result = self._check(resp, f"SL({symbol})")
logger.info(f"SL placed {symbol} side={side} trigger={stop_price} qty={qty}")
return result
except Exception as e:
logger.error(f"SL order failed {symbol}: {e}")
raise
def place_tp(self, symbol, side, qty, price, symbol_info):
"""
Take-profit = LIMIT reduceOnly order. Earns maker fee.
Args:
side: closing side ("SELL" for long TP, "BUY" for short TP)
"""
price = self.round_price(symbol_info, price)
try:
resp = self.session.place_order(
category="linear",
symbol=symbol,
side=self._bybit_side(side),
orderType="Limit",
qty=str(qty),
price=str(price),
timeInForce="GTC",
reduceOnly=True,
positionIdx=0,
)
result = self._check(resp, f"TP({symbol})")
logger.info(f"TP placed {symbol} side={side} price={price} qty={qty}")
return result
except Exception as e:
logger.error(f"TP order failed {symbol}: {e}")
raise
def cancel_all_orders(self, symbol, retries=3):
"""
Cancel ALL orders on a symbol (regular + conditional).
Bybit cancel_all_orders kills both โ no invisible algo issue!
"""
for attempt in range(retries):
try:
self.session.cancel_all_orders(
category="linear", symbol=symbol,
)
# Verify
time.sleep(0.3)
remaining = self.get_open_orders(symbol)
if remaining is None or len(remaining) == 0:
logger.info(f"All orders cancelled {symbol}")
return True
logger.warning(
f"Cancel {symbol}: {len(remaining)} survived attempt {attempt+1}"
)
# Individual cancel fallback
for o in remaining:
oid = o.get("orderId", "")
if oid:
try:
self.session.cancel_order(
category="linear", symbol=symbol, orderId=oid,
)
except Exception:
pass
time.sleep(0.3)
except InvalidRequestError as e:
if "110001" in str(e): # No orders to cancel
return True
logger.warning(f"Cancel error {symbol} attempt {attempt+1}: {e}")
except Exception as e:
logger.warning(f"Cancel error {symbol} attempt {attempt+1}: {e}")
logger.error(f"Failed to cancel all orders on {symbol} after {retries} retries")
return False
def nuclear_cleanup(self):
"""
Startup: cancel ALL orders on ALL symbols.
Prevents order accumulation after restarts.
"""
positions = self.get_positions()
symbols = {p["symbol"] for p in positions}
# Also find symbols with orphaned orders
try:
resp = self.session.get_open_orders(
category="linear", settleCoin="USDT",
)
result = self._check(resp, "all_open_orders")
for o in result.get("list", []):
sym = o.get("symbol", "")
if sym:
symbols.add(sym)
except Exception as e:
logger.warning(f"Error getting open orders: {e}")
cleaned = 0
for sym in symbols:
if sym:
try:
self.session.cancel_all_orders(category="linear", symbol=sym)
cleaned += 1
except Exception:
pass
logger.info(f"Nuclear cleanup: cancelled on {cleaned} symbols")
return cleaned
def close_position(self, symbol, side, qty):
"""
Close position via market.
Returns fill_price.
"""
self.cancel_all_orders(symbol)
_, fill_price = self.open_market(symbol, side, qty)
return fill_price
@staticmethod
def _safe_float(val, default="0"):
"""Parse float safely โ Bybit returns '' for some fields."""
try:
return float(val) if val and val != "" else float(default)
except (ValueError, TypeError):
return float(default)
def get_open_orders(self, symbol):
"""
Open orders in Binance-compatible format.
type: "LIMIT" or "STOP_MARKET" (for manager order verification).
"""
try:
resp = self.session.get_open_orders(
category="linear", symbol=symbol,
)
result = self._check(resp, f"open_orders({symbol})")
orders = []
for o in result.get("list", []):
order_type = o.get("orderType", "")
trigger = self._safe_float(o.get("triggerPrice", "0"))
# Map to Binance-style types
if trigger > 0 and order_type == "Market":
btype = "STOP_MARKET"
elif order_type == "Limit":
btype = "LIMIT"
else:
btype = order_type.upper()
orders.append({
"orderId": o.get("orderId", ""),
"type": btype,
"side": o.get("side", "").upper(),
"price": str(self._safe_float(o.get("price", ""))),
"qty": str(self._safe_float(o.get("qty", ""))),
"status": o.get("orderStatus", ""),
"triggerPrice": str(self._safe_float(o.get("triggerPrice", ""))),
})
return orders
except Exception as e:
logger.error(f"Open orders error {symbol}: {e}")
return None # None = error (vs [] = genuinely no orders)