← Back
"""
Squeeze-VWAP Bot — Bybit Futures Client
==========================================
pybit v5 (unified trading API).

Bybit quirks vs Binance:
1. All qty/price must be strings in API calls
2. Side is "Buy"/"Sell" (capitalized, not uppercase)
3. Conditional orders ARE queryable (unlike Binance algo orders!)
4. UTA wallet: accountType=UNIFIED
5. Klines returned newest-first → need reverse
6. Tickers: turnover24h (not quoteVolume), price24hPcnt (decimal not %)
7. Positions: size (unsigned) + side, not signed positionAmt

Interface matches original Binance exchange.py exactly —
screener, manager, indicators work without changes.
"""

import math
import time
import logging

from pybit.unified_trading import HTTP
from pybit.exceptions import InvalidRequestError

from src.config import (
    BYBIT_API_KEY, BYBIT_API_SECRET, BYBIT_TESTNET,
    LEVERAGE, MAKER_FEE, TAKER_FEE,
)

logger = logging.getLogger("exchange")

# Timeframe conversion: Binance format → Bybit format
_TF_MAP = {
    "1m": "1", "3m": "3", "5m": "5", "15m": "15", "30m": "30",
    "1h": "60", "2h": "120", "4h": "240", "6h": "360", "12h": "720",
    "1d": "D", "1w": "W", "1M": "M",
}


class Exchange:
    def __init__(self):
        if not BYBIT_API_KEY or not BYBIT_API_SECRET:
            raise ValueError("BYBIT_API_KEY and BYBIT_API_SECRET must be set")

        self.session = HTTP(
            testnet=BYBIT_TESTNET,
            api_key=BYBIT_API_KEY,
            api_secret=BYBIT_API_SECRET,
        )
        self._instruments_cache = {}
        self._instruments_ts = 0
        mode = "TESTNET" if BYBIT_TESTNET else "MAINNET"
        logger.info(f"Bybit client initialized ({mode})")

    def _check(self, resp, context=""):
        """Check Bybit API response. Raises on error."""
        ret_code = resp.get("retCode", -1)
        if ret_code != 0:
            msg = resp.get("retMsg", "Unknown error")
            raise Exception(f"{context}: [{ret_code}] {msg}")
        return resp.get("result", {})

    def _bybit_interval(self, binance_tf):
        """Convert Binance timeframe ('5m') → Bybit ('5')."""
        return _TF_MAP.get(binance_tf, binance_tf)

    def _bybit_side(self, side):
        """Convert 'BUY'/'SELL' → Bybit 'Buy'/'Sell'."""
        return "Buy" if side.upper() == "BUY" else "Sell"

    # ============================================================
    # INSTRUMENT INFO (cached, bulk load)
    # ============================================================

    def _load_instruments(self):
        """Load all USDT linear instruments. Refresh every 1h."""
        now = time.time()
        if self._instruments_cache and now - self._instruments_ts < 3600:
            return
        try:
            resp = self.session.get_instruments_info(category="linear")
            result = self._check(resp, "instruments")
            count = 0
            for item in result.get("list", []):
                sym = item.get("symbol", "")
                if sym.endswith("USDT"):
                    self._instruments_cache[sym] = item
                    count += 1
            self._instruments_ts = now
            logger.info(f"Loaded {count} USDT linear instruments")
        except Exception as e:
            logger.error(f"Failed to load instruments: {e}")

    # ============================================================
    # MARKET DATA
    # ============================================================

    def get_all_tickers_24h(self):
        """
        All futures tickers (1 API call).
        Returns Binance-compatible format for screener.
        """
        try:
            resp = self.session.get_tickers(category="linear")
            result = self._check(resp, "tickers")
            tickers = []
            for t in result.get("list", []):
                # Bybit price24hPcnt is decimal (0.05 = 5%)
                pct = float(t.get("price24hPcnt", "0")) * 100
                tickers.append({
                    "symbol": t.get("symbol", ""),
                    "lastPrice": t.get("lastPrice", "0"),
                    "quoteVolume": t.get("turnover24h", "0"),
                    # Bybit doesn't expose trade count; use high value
                    # to pass screener filter (volume is the real filter)
                    "count": 999999,
                    "priceChangePercent": str(round(pct, 2)),
                })
            return tickers
        except Exception as e:
            logger.error(f"Failed to get tickers: {e}")
            return []

    def get_klines(self, symbol, interval, limit=500):
        """
        Get OHLCV klines in Binance list format:
        [timestamp, open, high, low, close, volume, ...]

        Indicators use k[1]=open, k[2]=high, k[3]=low, k[4]=close, k[5]=volume.
        """
        bybit_interval = self._bybit_interval(interval)
        try:
            resp = self.session.get_kline(
                category="linear", symbol=symbol,
                interval=bybit_interval, limit=limit,
            )
            result = self._check(resp, f"kline({symbol})")
            klines = []
            for item in result.get("list", []):
                # Bybit item: [timestamp, open, high, low, close, volume, turnover]
                klines.append([
                    int(item[0]),       # [0] timestamp
                    item[1],            # [1] open (str)
                    item[2],            # [2] high
                    item[3],            # [3] low
                    item[4],            # [4] close
                    item[5],            # [5] volume
                    int(item[0]) + 1,   # [6] close_time (placeholder)
                    item[6] if len(item) > 6 else "0",  # [7] turnover
                    0, "0", "0", "0",   # [8-11] padding for Binance compat
                ])
            # Bybit returns newest first → reverse to chronological
            klines.reverse()
            return klines
        except Exception as e:
            logger.error(f"Klines error {symbol}: {e}")
            return []

    def get_mark_price(self, symbol):
        """Current mark price."""
        try:
            resp = self.session.get_tickers(category="linear", symbol=symbol)
            result = self._check(resp, f"mark({symbol})")
            items = result.get("list", [])
            if items:
                return float(items[0].get("markPrice", "0"))
        except Exception as e:
            logger.error(f"Mark price error {symbol}: {e}")
        return 0.0

    # ============================================================
    # ACCOUNT
    # ============================================================

    def get_balance(self):
        """USDT available balance (Unified Trading Account)."""
        try:
            resp = self.session.get_wallet_balance(accountType="UNIFIED")
            result = self._check(resp, "balance")
            for acc in result.get("list", []):
                for coin in acc.get("coin", []):
                    if coin.get("coin") == "USDT":
                        avail = coin.get("availableToWithdraw", "")
                        if avail and avail != "":
                            return float(avail)
                        return float(coin.get("walletBalance", "0"))
        except Exception as e:
            logger.error(f"Balance error: {e}")
        return 0.0

    def get_positions(self):
        """
        All open positions in Binance-compatible format.
        positionAmt: positive=LONG, negative=SHORT (signed).
        """
        try:
            resp = self.session.get_positions(
                category="linear", settleCoin="USDT",
            )
            result = self._check(resp, "positions")
            positions = []
            for p in result.get("list", []):
                size = float(p.get("size", "0"))
                if size == 0:
                    continue
                side = p.get("side", "")
                # Binance convention: positionAmt positive=LONG, negative=SHORT
                pos_amt = size if side == "Buy" else -size
                positions.append({
                    "symbol": p["symbol"],
                    "positionAmt": str(pos_amt),
                    "markPrice": p.get("markPrice", "0"),
                    "entryPrice": p.get("avgPrice", "0"),
                    "unRealizedProfit": p.get("unrealisedPnl", "0"),
                })
            return positions
        except Exception as e:
            logger.error(f"Positions error: {e}")
            return []

    def set_leverage(self, symbol):
        """Set leverage (both buy and sell sides). Caps at symbol's maxLeverage."""
        target_lev = LEVERAGE

        # Check maxLeverage from instrument info
        info = self.get_instrument_info_raw(symbol)
        if info:
            max_lev_str = info.get("leverageFilter", {}).get("maxLeverage", "")
            if max_lev_str:
                try:
                    max_lev = int(float(max_lev_str))
                    if target_lev > max_lev:
                        logger.info(f"{symbol}: leverage {target_lev}x > max {max_lev}x, capping")
                        target_lev = max_lev
                except (ValueError, TypeError):
                    pass

        try:
            self.session.set_leverage(
                category="linear", symbol=symbol,
                buyLeverage=str(target_lev), sellLeverage=str(target_lev),
            )
            logger.info(f"Leverage set to {target_lev}x for {symbol}")
            return target_lev
        except InvalidRequestError as e:
            if "110043" not in str(e):  # Already set to this value
                logger.warning(f"Leverage error {symbol}: {e}")
            return target_lev
        except Exception as e:
            logger.warning(f"Leverage error {symbol}: {e}")
            return target_lev

    def get_instrument_info_raw(self, symbol):
        """Get raw instrument info from cache or API."""
        self._load_instruments()
        info = self._instruments_cache.get(symbol)
        if not info:
            try:
                resp = self.session.get_instruments_info(category="linear", symbol=symbol)
                result = self._check(resp, f"instrument({symbol})")
                items = result.get("list", [])
                if items:
                    info = items[0]
                    self._instruments_cache[symbol] = info
            except Exception:
                pass
        return info

    def set_margin_type(self, symbol, margin_type="CROSSED"):
        """Set margin mode. CROSSED=cross, ISOLATED=isolated."""
        mode = 0 if margin_type == "CROSSED" else 1
        try:
            self.session.switch_margin_mode(
                category="linear", symbol=symbol, tradeMode=mode,
                buyLeverage=str(LEVERAGE), sellLeverage=str(LEVERAGE),
            )
        except InvalidRequestError as e:
            if "110026" not in str(e):  # Already set
                logger.warning(f"Margin mode error {symbol}: {e}")
        except Exception as e:
            logger.warning(f"Margin mode error {symbol}: {e}")

    # ============================================================
    # SYMBOL INFO
    # ============================================================

    def get_symbol_info(self, symbol):
        """
        Symbol precision/limits info.
        Returns dict with: price_precision, qty_precision, min_qty, tick_size, qty_step.
        """
        self._load_instruments()
        info = self._instruments_cache.get(symbol)

        if not info:
            # Direct fetch fallback
            try:
                resp = self.session.get_instruments_info(
                    category="linear", symbol=symbol,
                )
                result = self._check(resp, f"instrument({symbol})")
                items = result.get("list", [])
                if items:
                    info = items[0]
                    self._instruments_cache[symbol] = info
            except Exception:
                pass

        if not info:
            return None

        lot = info.get("lotSizeFilter", {})
        price_filter = info.get("priceFilter", {})

        qty_step = float(lot.get("qtyStep", "0.001"))
        tick_size = float(price_filter.get("tickSize", "0.01"))
        min_qty = float(lot.get("minOrderQty", "0.001"))

        # Derive precision from step sizes
        qty_prec = max(0, -int(math.floor(math.log10(qty_step)))) if qty_step < 1 else 0
        price_prec = max(0, -int(math.floor(math.log10(tick_size)))) if tick_size < 1 else 0

        return {
            "price_precision": price_prec,
            "qty_precision": qty_prec,
            "min_qty": min_qty,
            "tick_size": tick_size,
            "qty_step": qty_step,
        }

    def round_price(self, symbol_info, price):
        """Round price down to valid tick size."""
        tick = symbol_info.get("tick_size")
        if tick and tick > 0:
            price = math.floor(price / tick) * tick
            prec = max(0, -int(math.floor(math.log10(tick)))) if tick < 1 else 0
            return round(price, prec)
        return round(price, symbol_info["price_precision"])

    def round_qty(self, symbol_info, qty):
        """Round quantity down to valid step size."""
        step = symbol_info.get("qty_step")
        if step and step > 0:
            qty = math.floor(qty / step) * step
            prec = max(0, -int(math.floor(math.log10(step)))) if step < 1 else 0
            return round(qty, prec)
        return round(qty, symbol_info["qty_precision"])

    # ============================================================
    # ORDERS
    # ============================================================

    def open_market(self, symbol, side, qty):
        """
        Market order. Returns (order_dict, fill_price).
        Same interface as Binance version.
        """
        try:
            resp = self.session.place_order(
                category="linear",
                symbol=symbol,
                side=self._bybit_side(side),
                orderType="Market",
                qty=str(qty),
                positionIdx=0,  # One-way mode
            )
            result = self._check(resp, f"market({symbol})")
            order_id = result.get("orderId", "")

            # Get fill price from order history
            time.sleep(0.3)
            fill_price = 0.0

            try:
                hist = self.session.get_order_history(
                    category="linear", symbol=symbol, orderId=order_id,
                )
                hist_result = self._check(hist, f"order_hist({symbol})")
                items = hist_result.get("list", [])
                if items:
                    fill_price = float(items[0].get("avgPrice", "0"))
            except Exception:
                pass

            # Fallback: get from position entry price
            if fill_price == 0:
                try:
                    pos_resp = self.session.get_positions(
                        category="linear", symbol=symbol,
                    )
                    pos_result = self._check(pos_resp, f"pos_fill({symbol})")
                    for p in pos_result.get("list", []):
                        if float(p.get("size", "0")) != 0:
                            fill_price = float(p.get("avgPrice", "0"))
                            break
                except Exception:
                    pass

            # Fallback 2: mark price
            if fill_price == 0:
                fill_price = self.get_mark_price(symbol)
                logger.warning(f"Using mark price as fill fallback: {fill_price}")

            logger.info(f"MARKET {side} {symbol} qty={qty} fill={fill_price}")
            return {"orderId": order_id}, fill_price

        except Exception as e:
            logger.error(f"Market order failed {symbol}: {e}")
            raise

    def place_sl(self, symbol, side, qty, stop_price, symbol_info):
        """
        Stop-loss = conditional Market order with trigger.

        Bybit advantage: conditional orders ARE queryable!
        No invisible algo orders like Binance.

        Args:
            side: closing side ("SELL" for long SL, "BUY" for short SL)
        """
        stop_price = self.round_price(symbol_info, stop_price)

        # triggerDirection: 1=rise above, 2=fall below
        # "SELL" = closing a LONG → price falls below SL → triggerDir=2
        # "BUY" = closing a SHORT → price rises above SL → triggerDir=1
        if side.upper() == "SELL":
            trigger_dir = 2  # Long SL: price falls below
        else:
            trigger_dir = 1  # Short SL: price rises above

        try:
            resp = self.session.place_order(
                category="linear",
                symbol=symbol,
                side=self._bybit_side(side),
                orderType="Market",
                qty=str(qty),
                triggerPrice=str(stop_price),
                triggerDirection=trigger_dir,
                triggerBy="MarkPrice",
                reduceOnly=True,
                positionIdx=0,
            )
            result = self._check(resp, f"SL({symbol})")
            logger.info(f"SL placed {symbol} side={side} trigger={stop_price} qty={qty}")
            return result
        except Exception as e:
            logger.error(f"SL order failed {symbol}: {e}")
            raise

    def place_tp(self, symbol, side, qty, price, symbol_info):
        """
        Take-profit = LIMIT reduceOnly order. Earns maker fee.

        Args:
            side: closing side ("SELL" for long TP, "BUY" for short TP)
        """
        price = self.round_price(symbol_info, price)
        try:
            resp = self.session.place_order(
                category="linear",
                symbol=symbol,
                side=self._bybit_side(side),
                orderType="Limit",
                qty=str(qty),
                price=str(price),
                timeInForce="GTC",
                reduceOnly=True,
                positionIdx=0,
            )
            result = self._check(resp, f"TP({symbol})")
            logger.info(f"TP placed {symbol} side={side} price={price} qty={qty}")
            return result
        except Exception as e:
            logger.error(f"TP order failed {symbol}: {e}")
            raise

    def cancel_all_orders(self, symbol, retries=3):
        """
        Cancel ALL orders on a symbol (regular + conditional).
        Bybit cancel_all_orders kills both — no invisible algo issue!
        """
        for attempt in range(retries):
            try:
                self.session.cancel_all_orders(
                    category="linear", symbol=symbol,
                )

                # Verify
                time.sleep(0.3)
                remaining = self.get_open_orders(symbol)
                if remaining is None or len(remaining) == 0:
                    logger.info(f"All orders cancelled {symbol}")
                    return True

                logger.warning(
                    f"Cancel {symbol}: {len(remaining)} survived attempt {attempt+1}"
                )

                # Individual cancel fallback
                for o in remaining:
                    oid = o.get("orderId", "")
                    if oid:
                        try:
                            self.session.cancel_order(
                                category="linear", symbol=symbol, orderId=oid,
                            )
                        except Exception:
                            pass
                time.sleep(0.3)

            except InvalidRequestError as e:
                if "110001" in str(e):  # No orders to cancel
                    return True
                logger.warning(f"Cancel error {symbol} attempt {attempt+1}: {e}")
            except Exception as e:
                logger.warning(f"Cancel error {symbol} attempt {attempt+1}: {e}")

        logger.error(f"Failed to cancel all orders on {symbol} after {retries} retries")
        return False

    def nuclear_cleanup(self):
        """
        Startup: cancel ALL orders on ALL symbols.
        Prevents order accumulation after restarts.
        """
        positions = self.get_positions()
        symbols = {p["symbol"] for p in positions}

        # Also find symbols with orphaned orders
        try:
            resp = self.session.get_open_orders(
                category="linear", settleCoin="USDT",
            )
            result = self._check(resp, "all_open_orders")
            for o in result.get("list", []):
                sym = o.get("symbol", "")
                if sym:
                    symbols.add(sym)
        except Exception as e:
            logger.warning(f"Error getting open orders: {e}")

        cleaned = 0
        for sym in symbols:
            if sym:
                try:
                    self.session.cancel_all_orders(category="linear", symbol=sym)
                    cleaned += 1
                except Exception:
                    pass

        logger.info(f"Nuclear cleanup: cancelled on {cleaned} symbols")
        return cleaned

    def close_position(self, symbol, side, qty):
        """
        Close position via market.
        Returns fill_price.
        """
        self.cancel_all_orders(symbol)
        _, fill_price = self.open_market(symbol, side, qty)
        return fill_price

    @staticmethod
    def _safe_float(val, default="0"):
        """Parse float safely — Bybit returns '' for some fields."""
        try:
            return float(val) if val and val != "" else float(default)
        except (ValueError, TypeError):
            return float(default)

    def get_open_orders(self, symbol):
        """
        Open orders in Binance-compatible format.
        type: "LIMIT" or "STOP_MARKET" (for manager order verification).
        """
        try:
            resp = self.session.get_open_orders(
                category="linear", symbol=symbol,
            )
            result = self._check(resp, f"open_orders({symbol})")
            orders = []
            for o in result.get("list", []):
                order_type = o.get("orderType", "")
                trigger = self._safe_float(o.get("triggerPrice", "0"))

                # Map to Binance-style types
                if trigger > 0 and order_type == "Market":
                    btype = "STOP_MARKET"
                elif order_type == "Limit":
                    btype = "LIMIT"
                else:
                    btype = order_type.upper()

                orders.append({
                    "orderId": o.get("orderId", ""),
                    "type": btype,
                    "side": o.get("side", "").upper(),
                    "price": str(self._safe_float(o.get("price", ""))),
                    "qty": str(self._safe_float(o.get("qty", ""))),
                    "status": o.get("orderStatus", ""),
                    "triggerPrice": str(self._safe_float(o.get("triggerPrice", ""))),
                })
            return orders
        except Exception as e:
            logger.error(f"Open orders error {symbol}: {e}")
            return None  # None = error (vs [] = genuinely no orders)

📜 Git History

c6f6bd5chore: initial commit — version control setup5 weeks ago
Show last diff
Loading...