"""
Bybit Futures Client — authenticated wrapper for order execution.
Handles position opening, partial closes, leverage, and quantity calculation.
Uses pybit v5 (unified trading API).
Bybit quirks vs Binance:
- All qty/price must be strings
- Side is "Buy"/"Sell" (capitalized, not uppercase)
- Conditional orders ARE queryable (unlike Binance algo orders)
- UTA wallet: nested structure accountType=UNIFIED
- set_leverage requires buyLeverage + sellLeverage separately
"""
import logging
import math
import time as _time
from pybit.unified_trading import HTTP
from pybit.exceptions import InvalidRequestError
from src.config import BYBIT_API_KEY, BYBIT_API_SECRET, BYBIT_TESTNET
logger = logging.getLogger(__name__)
class BybitFuturesClient:
"""Authenticated Bybit Futures (linear USDT perps) client."""
def __init__(self, api_key: str = "", api_secret: str = "", testnet: bool | None = None):
"""
Initialize Bybit client.
Args:
api_key: Bybit API key (falls back to config)
api_secret: Bybit API secret (falls back to config)
testnet: Use testnet (falls back to config)
"""
self.api_key = api_key or BYBIT_API_KEY
self.api_secret = api_secret or BYBIT_API_SECRET
self.testnet = testnet if testnet is not None else BYBIT_TESTNET
if not self.api_key or not self.api_secret:
raise ValueError("BYBIT_API_KEY and BYBIT_API_SECRET must be set")
self.session = HTTP(
testnet=self.testnet,
api_key=self.api_key,
api_secret=self.api_secret,
)
self._instruments_cache: dict[str, dict] = {}
mode = "TESTNET" if self.testnet else "MAINNET"
logger.info(f"Bybit Futures client initialized ({mode})")
# ── Helpers ──────────────────────────────────────────────
@staticmethod
def _safe_float(value, default=0.0):
"""Safely convert to float — handles empty strings from Bybit API."""
if value is None or value == "":
return default
try:
return float(value)
except (ValueError, TypeError):
return default
def _check(self, response: dict, context: str = "") -> dict:
"""Check Bybit API response. Raises on error."""
ret_code = response.get("retCode", -1)
if ret_code != 0:
msg = response.get("retMsg", "Unknown error")
raise InvalidRequestError(f"{context}: [{ret_code}] {msg}")
return response.get("result") or {}
# ── Instrument Info (cached) ─────────────────────────────
def get_instrument_info(self, symbol: str) -> dict | None:
"""Fetch instrument info for a symbol (cached)."""
if symbol in self._instruments_cache:
return self._instruments_cache[symbol]
try:
resp = self.session.get_instruments_info(category="linear", symbol=symbol)
result = self._check(resp, f"instruments_info({symbol})")
items = result.get("list", [])
if items:
self._instruments_cache[symbol] = items[0]
return items[0]
except Exception as e:
logger.error(f"Failed to get instrument info for {symbol}: {e}")
return None
def get_qty_step(self, symbol: str) -> float:
"""Get quantity step size (lotSizeFilter.qtyStep)."""
info = self.get_instrument_info(symbol)
if info:
return float(info.get("lotSizeFilter", {}).get("qtyStep", "0.001"))
return 0.001
def get_min_qty(self, symbol: str) -> float:
"""Get minimum order quantity."""
info = self.get_instrument_info(symbol)
if info:
return float(info.get("lotSizeFilter", {}).get("minOrderQty", "0.001"))
return 0.001
def get_tick_size(self, symbol: str) -> float:
"""Get price tick size (priceFilter.tickSize)."""
info = self.get_instrument_info(symbol)
if info:
return float(info.get("priceFilter", {}).get("tickSize", "0.01"))
return 0.01
def get_min_notional(self, symbol: str) -> float:
"""Get minimum order value. Bybit doesn't always have this, default 5."""
# Bybit lotSizeFilter has minNotionalValue in some instruments
info = self.get_instrument_info(symbol)
if info:
lot_filter = info.get("lotSizeFilter", {})
if "minNotionalValue" in lot_filter:
return float(lot_filter["minNotionalValue"])
return 5.0
def round_quantity(self, symbol: str, qty: float) -> float:
"""Round quantity down to valid step size."""
step = self.get_qty_step(symbol)
if step > 0:
qty = math.floor(qty / step) * step
# Determine decimal places from step
decimals = max(0, -int(math.floor(math.log10(step)))) if step < 1 else 0
return round(qty, decimals)
def round_price(self, symbol: str, price: float) -> float:
"""Round price to valid tick size."""
tick = self.get_tick_size(symbol)
if tick > 0:
price = math.floor(price / tick) * tick
decimals = max(0, -int(math.floor(math.log10(tick)))) if tick < 1 else 0
return round(price, decimals)
def calculate_quantity(self, symbol: str, usdt_amount: float, price: float, leverage: int) -> float:
"""
Calculate order quantity.
qty = (usdt_margin * leverage) / price, rounded to valid step.
Returns 0 if below minimum.
"""
if price <= 0:
return 0
raw_qty = (usdt_amount * leverage) / price
qty = self.round_quantity(symbol, raw_qty)
min_qty = self.get_min_qty(symbol)
if qty < min_qty:
logger.warning(f"{symbol}: calculated qty {qty} < min {min_qty}")
return 0
# Check min notional
notional = qty * price
min_notional = self.get_min_notional(symbol)
if notional < min_notional:
logger.warning(f"{symbol}: notional {notional:.2f} < min {min_notional}")
return 0
return qty
# ── Account ──────────────────────────────────────────────
def get_account_balance(self) -> float:
"""Get available USDT balance (Unified Trading Account)."""
try:
resp = self.session.get_wallet_balance(accountType="UNIFIED")
result = self._check(resp, "wallet_balance")
for account in result.get("list", []):
for coin in account.get("coin", []):
if coin.get("coin") == "USDT":
# Try availableToWithdraw first, fallback to walletBalance
avail = coin.get("availableToWithdraw", "")
if avail and avail != "":
return float(avail)
# Fallback: walletBalance - totalOrderIM - totalPositionIM
wallet = float(coin.get("walletBalance", "0"))
order_im = float(coin.get("totalOrderIM", "0"))
pos_im = float(coin.get("totalPositionIM", "0"))
return wallet - order_im - pos_im
except Exception as e:
logger.error(f"Failed to get account balance: {e}")
return 0.0
def get_total_equity(self) -> float:
"""Get total equity in USDT (balance + unrealized PnL)."""
try:
resp = self.session.get_wallet_balance(accountType="UNIFIED")
result = self._check(resp, "wallet_balance")
for account in result.get("list", []):
return float(account.get("totalEquity", "0"))
except Exception as e:
logger.error(f"Failed to get total equity: {e}")
return 0.0
# ── Leverage & Margin ────────────────────────────────────
def set_leverage(self, symbol: str, leverage: int) -> bool:
"""Set leverage for a symbol (both buy and sell sides)."""
try:
self.session.set_leverage(
category="linear",
symbol=symbol,
buyLeverage=str(leverage),
sellLeverage=str(leverage),
)
logger.info(f"Leverage set to {leverage}x for {symbol}")
return True
except InvalidRequestError as e:
# 110043 = leverage already set to this value
if "110043" in str(e):
return True
logger.error(f"Failed to set leverage for {symbol}: {e}")
return False
except Exception as e:
logger.error(f"Failed to set leverage for {symbol}: {e}")
return False
def set_margin_mode(self, symbol: str, mode: int = 1) -> bool:
"""Set margin mode. 0 = cross, 1 = isolated."""
try:
self.session.switch_margin_mode(
category="linear",
symbol=symbol,
tradeMode=mode,
buyLeverage=str(3),
sellLeverage=str(3),
)
mode_name = "ISOLATED" if mode == 1 else "CROSS"
logger.info(f"Margin mode set to {mode_name} for {symbol}")
return True
except InvalidRequestError as e:
# 110026 = already set
if "110026" in str(e):
return True
logger.error(f"Failed to set margin mode for {symbol}: {e}")
return False
def set_position_mode(self, mode: int = 0) -> bool:
"""Set position mode. 0 = one-way (merged), 3 = hedge (both sides)."""
try:
self.session.switch_position_mode(
category="linear",
coin="USDT",
mode=mode,
)
logger.info(f"Position mode set to {'ONE-WAY' if mode == 0 else 'HEDGE'}")
return True
except InvalidRequestError as e:
# Already set
if "110025" in str(e):
return True
logger.error(f"Failed to set position mode: {e}")
return False
# ── Market Data ──────────────────────────────────────────
def get_mark_price(self, symbol: str) -> float | None:
"""Get current mark price for a symbol."""
try:
resp = self.session.get_tickers(category="linear", symbol=symbol)
result = self._check(resp, f"tickers({symbol})")
items = result.get("list", [])
if items:
return float(items[0].get("markPrice", "0"))
except Exception as e:
logger.error(f"Failed to get mark price for {symbol}: {e}")
return None
def get_last_price(self, symbol: str) -> float | None:
"""Get last traded price."""
try:
resp = self.session.get_tickers(category="linear", symbol=symbol)
result = self._check(resp, f"tickers({symbol})")
items = result.get("list", [])
if items:
return float(items[0].get("lastPrice", "0"))
except Exception as e:
logger.error(f"Failed to get last price for {symbol}: {e}")
return None
def get_klines(self, symbol: str, interval: str = "15", limit: int = 200) -> list[dict]:
"""
Get OHLCV klines.
Args:
symbol: e.g. "BTCUSDT"
interval: "1","3","5","15","30","60","120","240","360","720","D","W","M"
limit: max 1000
Returns:
List of dicts with open, high, low, close, volume, timestamp
"""
try:
resp = self.session.get_kline(
category="linear", symbol=symbol, interval=interval, limit=limit
)
result = self._check(resp, f"kline({symbol})")
klines = []
for item in result.get("list", []):
klines.append({
"timestamp": int(item[0]),
"open": float(item[1]),
"high": float(item[2]),
"low": float(item[3]),
"close": float(item[4]),
"volume": float(item[5]),
})
# Bybit returns newest first — reverse to chronological order
klines.reverse()
return klines
except Exception as e:
logger.error(f"Failed to get klines for {symbol}: {e}")
return []
def get_tickers_all(self) -> list[dict]:
"""Get all linear USDT perp tickers (for scanning)."""
try:
resp = self.session.get_tickers(category="linear")
result = self._check(resp, "tickers_all")
return result.get("list", [])
except Exception as e:
logger.error(f"Failed to get all tickers: {e}")
return []
# ── Positions ────────────────────────────────────────────
def get_position(self, symbol: str) -> dict | None:
"""Get current open position for a symbol. Returns None if no position."""
try:
resp = self.session.get_positions(category="linear", symbol=symbol)
result = self._check(resp, f"positions({symbol})")
for p in result.get("list", []):
size = self._safe_float(p.get("size"))
if size != 0:
side_raw = p.get("side", "")
return {
"symbol": p["symbol"],
"side": side_raw.upper(), # Normalize to "BUY"/"SELL"
"quantity": size,
"entry_price": self._safe_float(p.get("avgPrice")),
"unrealized_pnl": self._safe_float(p.get("unrealisedPnl")),
"leverage": int(self._safe_float(p.get("leverage"), 1)),
"margin_type": "isolated" if p.get("tradeMode") == "1" else "cross",
}
except Exception as e:
logger.error(f"Failed to get position for {symbol}: {e}")
return None
def get_all_positions(self) -> list[dict]:
"""Get all open positions."""
try:
resp = self.session.get_positions(category="linear", settleCoin="USDT")
result = self._check(resp, "all_positions")
positions = []
for p in result.get("list", []):
size = self._safe_float(p.get("size"))
if size != 0:
positions.append({
"symbol": p["symbol"],
"side": p.get("side", "").upper(),
"quantity": size,
"entry_price": self._safe_float(p.get("avgPrice")),
"unrealized_pnl": self._safe_float(p.get("unrealisedPnl")),
"leverage": int(self._safe_float(p.get("leverage"), 1)),
})
return positions
except Exception as e:
logger.error(f"Failed to get all positions: {e}")
return []
# ── Order Execution ──────────────────────────────────────
def _normalize_side(self, side: str) -> str:
"""Convert 'BUY'/'SELL' to Bybit's 'Buy'/'Sell'."""
return "Buy" if side.upper() == "BUY" else "Sell"
def _opposite_side(self, side: str) -> str:
"""Get closing side. Input: 'BUY' or 'SELL'."""
return "Sell" if side.upper() == "BUY" else "Buy"
def open_position(self, symbol: str, side: str, usdt_margin: float, leverage: int) -> dict | None:
"""
Open a new futures position (market order).
Args:
symbol: e.g. "BTCUSDT"
side: "BUY" (long) or "SELL" (short)
usdt_margin: margin amount in USDT
leverage: leverage multiplier
Returns:
dict with fill info or None on error
"""
try:
# Set leverage
if not self.set_leverage(symbol, leverage):
return None
# Get current price for qty calculation
price = self.get_mark_price(symbol)
if not price:
return None
# Calculate quantity
qty = self.calculate_quantity(symbol, usdt_margin, price, leverage)
if qty <= 0:
logger.error(f"Invalid quantity for {symbol}: margin={usdt_margin}, price={price}, lev={leverage}")
return None
# Place market order
resp = self.session.place_order(
category="linear",
symbol=symbol,
side=self._normalize_side(side),
orderType="Market",
qty=str(qty),
positionIdx=0, # One-way mode
)
result = self._check(resp, f"open_position({symbol})")
order_id = result.get("orderId", "")
# Get fill price from position (market orders fill instantly)
_time.sleep(0.3) # Brief wait for fill
pos = self.get_position(symbol)
fill_price = pos["entry_price"] if pos else price
result_dict = {
"orderId": order_id,
"symbol": symbol,
"side": side.upper(),
"quantity": qty,
"fill_price": fill_price,
"leverage": leverage,
"usdt_margin": usdt_margin,
"status": "FILLED",
}
logger.info(f"Position opened: {side} {qty} {symbol} @ ${fill_price:.6f} ({leverage}x)")
return result_dict
except Exception as e:
logger.error(f"Error opening position {side} {symbol}: {e}")
return None
def open_limit_order(self, symbol: str, side: str, quantity: float,
price: float, reduce_only: bool = False) -> dict | None:
"""Place a LIMIT order."""
try:
rounded_price = self.round_price(symbol, price)
qty = self.round_quantity(symbol, quantity)
if qty <= 0:
return None
params = {
"category": "linear",
"symbol": symbol,
"side": self._normalize_side(side),
"orderType": "Limit",
"qty": str(qty),
"price": str(rounded_price),
"timeInForce": "GTC",
"positionIdx": 0,
}
if reduce_only:
params["reduceOnly"] = True
resp = self.session.place_order(**params)
result = self._check(resp, f"limit_order({symbol})")
order_id = result.get("orderId", "")
logger.info(f"Limit order: {side} {qty} {symbol} @ ${rounded_price:.6f} → #{order_id}")
return {
"orderId": order_id,
"symbol": symbol,
"side": side.upper(),
"quantity": qty,
"price": rounded_price,
"status": "NEW",
}
except Exception as e:
logger.error(f"Error limit order {side} {symbol}: {e}")
return None
def place_stop_order(self, symbol: str, position_side: str, quantity: float,
trigger_price: float) -> dict | None:
"""
Place a conditional stop-loss order (market execution on trigger).
Bybit conditional orders are queryable (unlike Binance algo orders).
Args:
position_side: original position side ("BUY" for long, "SELL" for short)
quantity: amount to close
trigger_price: trigger price
"""
try:
closing_side = self._opposite_side(position_side)
tp = self.round_price(symbol, trigger_price)
qty = self.round_quantity(symbol, quantity)
if qty <= 0:
logger.error(f"Invalid SL quantity for {symbol}: {quantity}")
return None
# triggerDirection: 1 = rise above, 2 = fall below
if position_side.upper() == "BUY":
trigger_dir = 2 # SL for long = price falls below
else:
trigger_dir = 1 # SL for short = price rises above
resp = self.session.place_order(
category="linear",
symbol=symbol,
side=closing_side,
orderType="Market",
qty=str(qty),
triggerPrice=str(tp),
triggerDirection=trigger_dir,
triggerBy="MarkPrice",
reduceOnly=True,
positionIdx=0,
)
result = self._check(resp, f"stop_order({symbol})")
order_id = result.get("orderId", "")
logger.info(f"Stop order placed: {closing_side} {qty} {symbol} trigger=${tp} #{order_id}")
return {
"orderId": order_id,
"symbol": symbol,
"status": "Untriggered",
}
except Exception as e:
logger.error(f"Error stop order {symbol}: {e}")
return None
def place_take_profit_limit(self, symbol: str, position_side: str, quantity: float,
trigger_price: float, limit_price: float) -> dict | None:
"""
Place a conditional TP order (limit execution on trigger).
Earns maker fee (0.02%) instead of taker (0.055%).
"""
try:
closing_side = self._opposite_side(position_side)
tp = self.round_price(symbol, trigger_price)
lp = self.round_price(symbol, limit_price)
qty = self.round_quantity(symbol, quantity)
if qty <= 0:
return None
# triggerDirection: 1 = rise above (TP for long), 2 = fall below (TP for short)
if position_side.upper() == "BUY":
trigger_dir = 1 # TP for long = price rises above
else:
trigger_dir = 2 # TP for short = price falls below
resp = self.session.place_order(
category="linear",
symbol=symbol,
side=closing_side,
orderType="Limit",
qty=str(qty),
price=str(lp),
triggerPrice=str(tp),
triggerDirection=trigger_dir,
triggerBy="MarkPrice",
timeInForce="GTC",
reduceOnly=True,
positionIdx=0,
)
result = self._check(resp, f"tp_limit({symbol})")
order_id = result.get("orderId", "")
logger.info(f"TP limit placed: {closing_side} {qty} {symbol} trigger=${tp} limit=${lp} #{order_id}")
return {
"orderId": order_id,
"symbol": symbol,
"status": "Untriggered",
}
except Exception as e:
logger.error(f"Error TP limit {symbol}: {e}")
return None
# ── Order Management ─────────────────────────────────────
def get_order_status(self, symbol: str, order_id: str) -> dict | None:
"""Check status of an existing order (works for both regular and conditional)."""
try:
resp = self.session.get_open_orders(
category="linear", symbol=symbol, orderId=order_id
)
result = self._check(resp, f"order_status({symbol})")
items = result.get("list", [])
if items:
o = items[0]
return {
"orderId": o.get("orderId", ""),
"status": o.get("orderStatus", ""),
"fill_price": self._safe_float(o.get("avgPrice")),
"executedQty": self._safe_float(o.get("cumExecQty")),
"origQty": self._safe_float(o.get("qty")),
}
# Not in open orders — check history
resp = self.session.get_order_history(
category="linear", symbol=symbol, orderId=order_id
)
result = self._check(resp, f"order_history({symbol})")
items = result.get("list", [])
if items:
o = items[0]
return {
"orderId": o.get("orderId", ""),
"status": o.get("orderStatus", ""),
"fill_price": self._safe_float(o.get("avgPrice")),
"executedQty": self._safe_float(o.get("cumExecQty")),
"origQty": self._safe_float(o.get("qty")),
}
except Exception as e:
logger.error(f"Failed to get order status {symbol} #{order_id}: {e}")
return None
def get_open_orders(self, symbol: str) -> list[dict]:
"""Get all open orders for a symbol (regular + conditional)."""
orders = []
try:
# Regular orders
resp = self.session.get_open_orders(category="linear", symbol=symbol)
result = self._check(resp, f"open_orders({symbol})")
for o in result.get("list", []):
orders.append({
"orderId": o.get("orderId", ""),
"orderType": o.get("orderType", ""),
"side": o.get("side", ""),
"status": o.get("orderStatus", ""),
"triggerPrice": self._safe_float(o.get("triggerPrice")),
"price": self._safe_float(o.get("price")),
"qty": self._safe_float(o.get("qty")),
"cumExecQty": self._safe_float(o.get("cumExecQty")),
})
except Exception as e:
logger.error(f"Failed to get open orders {symbol}: {e}")
return orders
def cancel_order(self, symbol: str, order_id: str) -> bool:
"""Cancel a single order."""
try:
self.session.cancel_order(category="linear", symbol=symbol, orderId=order_id)
logger.info(f"Cancelled order {symbol} #{order_id}")
return True
except InvalidRequestError as e:
# 110001 = order doesn't exist (already filled/cancelled)
if "110001" in str(e):
return True
logger.error(f"Failed to cancel order {symbol} #{order_id}: {e}")
return False
except Exception as e:
logger.error(f"Error cancelling order {symbol} #{order_id}: {e}")
return False
def cancel_all_orders(self, symbol: str) -> bool:
"""Cancel ALL open orders for a symbol. Retries + verification."""
max_attempts = 3
for attempt in range(1, max_attempts + 1):
try:
self.session.cancel_all_orders(category="linear", symbol=symbol)
logger.info(f"Cancelled all orders for {symbol} (attempt {attempt})")
except InvalidRequestError as e:
if "110001" in str(e): # No orders
return True
logger.warning(f"Cancel attempt {attempt} for {symbol}: {e}")
# Verify
_time.sleep(0.3)
remaining = self.get_open_orders(symbol)
if not remaining:
logger.info(f"Verified: 0 orders remaining for {symbol}")
return True
logger.warning(f"Cancel {symbol}: {len(remaining)} orders survived attempt {attempt}")
# Individual fallback
for order in remaining:
self.cancel_order(symbol, order["orderId"])
_time.sleep(0.3)
# Final check
remaining = self.get_open_orders(symbol)
if remaining:
logger.error(f"FAILED to cancel all orders for {symbol}: {len(remaining)} still alive")
return False
return True
def cancel_all_account_orders(self) -> int:
"""Cancel ALL open orders on ALL symbols. Used for startup cleanup."""
cleaned = 0
try:
# Get all open positions to find relevant symbols
positions = self.get_all_positions()
symbols = {p["symbol"] for p in positions}
# Also check for orphaned orders on symbols without positions
resp = self.session.get_open_orders(category="linear", settleCoin="USDT")
result = self._check(resp, "all_open_orders")
for o in result.get("list", []):
symbols.add(o.get("symbol", ""))
if not symbols:
logger.info("Startup cleanup: no open orders on account")
return 0
logger.info(f"Startup cleanup: checking {len(symbols)} symbols")
for sym in symbols:
if sym and self.cancel_all_orders(sym):
cleaned += 1
except Exception as e:
logger.error(f"Startup cleanup error: {e}")
return cleaned
# ── Close Positions ──────────────────────────────────────
def close_partial(self, symbol: str, side: str, quantity: float) -> dict | None:
"""Close part of a position (reduce-only market order)."""
try:
opposite = self._opposite_side(side)
qty = self.round_quantity(symbol, quantity)
if qty <= 0:
return None
resp = self.session.place_order(
category="linear",
symbol=symbol,
side=opposite,
orderType="Market",
qty=str(qty),
reduceOnly=True,
positionIdx=0,
)
result = self._check(resp, f"close_partial({symbol})")
order_id = result.get("orderId", "")
# Get fill price
_time.sleep(0.3)
fill_price = 0.0
status = self.get_order_status(symbol, order_id)
if status and status["fill_price"] > 0:
fill_price = status["fill_price"]
else:
fill_price = self.get_mark_price(symbol) or 0
logger.info(f"Partial close: {qty} {symbol} @ ${fill_price:.6f}")
return {
"orderId": order_id,
"symbol": symbol,
"quantity": qty,
"fill_price": fill_price,
"status": "FILLED",
}
except Exception as e:
logger.error(f"Error closing partial {symbol}: {e}")
return None
def close_full(self, symbol: str, side: str) -> dict | None:
"""Close entire position. Retries if residual dust remains."""
pos = self.get_position(symbol)
if not pos:
logger.warning(f"No open position to close for {symbol}")
return None
result = self.close_partial(symbol, side, pos["quantity"])
# Check for residual dust after close
_time.sleep(0.5)
remaining = self.get_position(symbol)
if remaining and remaining["quantity"] > 0:
dust_qty = remaining["quantity"]
logger.warning(f"Residual dust {dust_qty} {symbol} after close_full — cleaning up")
try:
# Use raw qty string from exchange (skip rounding)
opposite = self._opposite_side(side)
resp = self.session.place_order(
category="linear",
symbol=symbol,
side=opposite,
orderType="Market",
qty=str(dust_qty),
reduceOnly=True,
positionIdx=0,
)
self._check(resp, f"dust_cleanup({symbol})")
logger.info(f"Dust cleaned: {dust_qty} {symbol}")
except Exception as e:
logger.error(f"Failed to clean dust {symbol}: {e}")
return result