โ† ะะฐะทะฐะด
""" Squeeze-VWAP Bot โ€” Bybit Futures Client ========================================== pybit v5 (unified trading API). Bybit quirks vs Binance: 1. All qty/price must be strings in API calls 2. Side is "Buy"/"Sell" (capitalized, not uppercase) 3. Conditional orders ARE queryable (unlike Binance algo orders!) 4. UTA wallet: accountType=UNIFIED 5. Klines returned newest-first โ†’ need reverse 6. Tickers: turnover24h (not quoteVolume), price24hPcnt (decimal not %) 7. Positions: size (unsigned) + side, not signed positionAmt Interface matches original Binance exchange.py exactly โ€” screener, manager, indicators work without changes. """ import math import time import logging from pybit.unified_trading import HTTP from pybit.exceptions import InvalidRequestError from src.config import ( BYBIT_API_KEY, BYBIT_API_SECRET, BYBIT_TESTNET, LEVERAGE, MAKER_FEE, TAKER_FEE, ) logger = logging.getLogger("exchange") # Timeframe conversion: Binance format โ†’ Bybit format _TF_MAP = { "1m": "1", "3m": "3", "5m": "5", "15m": "15", "30m": "30", "1h": "60", "2h": "120", "4h": "240", "6h": "360", "12h": "720", "1d": "D", "1w": "W", "1M": "M", } class Exchange: def __init__(self): if not BYBIT_API_KEY or not BYBIT_API_SECRET: raise ValueError("BYBIT_API_KEY and BYBIT_API_SECRET must be set") self.session = HTTP( testnet=BYBIT_TESTNET, api_key=BYBIT_API_KEY, api_secret=BYBIT_API_SECRET, ) self._instruments_cache = {} self._instruments_ts = 0 mode = "TESTNET" if BYBIT_TESTNET else "MAINNET" logger.info(f"Bybit client initialized ({mode})") def _check(self, resp, context=""): """Check Bybit API response. Raises on error.""" ret_code = resp.get("retCode", -1) if ret_code != 0: msg = resp.get("retMsg", "Unknown error") raise Exception(f"{context}: [{ret_code}] {msg}") return resp.get("result", {}) def _bybit_interval(self, binance_tf): """Convert Binance timeframe ('5m') โ†’ Bybit ('5').""" return _TF_MAP.get(binance_tf, binance_tf) def _bybit_side(self, side): """Convert 'BUY'/'SELL' โ†’ Bybit 'Buy'/'Sell'.""" return "Buy" if side.upper() == "BUY" else "Sell" # ============================================================ # INSTRUMENT INFO (cached, bulk load) # ============================================================ def _load_instruments(self): """Load all USDT linear instruments. Refresh every 1h.""" now = time.time() if self._instruments_cache and now - self._instruments_ts < 3600: return try: resp = self.session.get_instruments_info(category="linear") result = self._check(resp, "instruments") count = 0 for item in result.get("list", []): sym = item.get("symbol", "") if sym.endswith("USDT"): self._instruments_cache[sym] = item count += 1 self._instruments_ts = now logger.info(f"Loaded {count} USDT linear instruments") except Exception as e: logger.error(f"Failed to load instruments: {e}") # ============================================================ # MARKET DATA # ============================================================ def get_all_tickers_24h(self): """ All futures tickers (1 API call). Returns Binance-compatible format for screener. """ try: resp = self.session.get_tickers(category="linear") result = self._check(resp, "tickers") tickers = [] for t in result.get("list", []): # Bybit price24hPcnt is decimal (0.05 = 5%) pct = float(t.get("price24hPcnt", "0")) * 100 tickers.append({ "symbol": t.get("symbol", ""), "lastPrice": t.get("lastPrice", "0"), "quoteVolume": t.get("turnover24h", "0"), # Bybit doesn't expose trade count; use high value # to pass screener filter (volume is the real filter) "count": 999999, "priceChangePercent": str(round(pct, 2)), }) return tickers except Exception as e: logger.error(f"Failed to get tickers: {e}") return [] def get_klines(self, symbol, interval, limit=500): """ Get OHLCV klines in Binance list format: [timestamp, open, high, low, close, volume, ...] Indicators use k[1]=open, k[2]=high, k[3]=low, k[4]=close, k[5]=volume. """ bybit_interval = self._bybit_interval(interval) try: resp = self.session.get_kline( category="linear", symbol=symbol, interval=bybit_interval, limit=limit, ) result = self._check(resp, f"kline({symbol})") klines = [] for item in result.get("list", []): # Bybit item: [timestamp, open, high, low, close, volume, turnover] klines.append([ int(item[0]), # [0] timestamp item[1], # [1] open (str) item[2], # [2] high item[3], # [3] low item[4], # [4] close item[5], # [5] volume int(item[0]) + 1, # [6] close_time (placeholder) item[6] if len(item) > 6 else "0", # [7] turnover 0, "0", "0", "0", # [8-11] padding for Binance compat ]) # Bybit returns newest first โ†’ reverse to chronological klines.reverse() return klines except Exception as e: logger.error(f"Klines error {symbol}: {e}") return [] def get_mark_price(self, symbol): """Current mark price.""" try: resp = self.session.get_tickers(category="linear", symbol=symbol) result = self._check(resp, f"mark({symbol})") items = result.get("list", []) if items: return float(items[0].get("markPrice", "0")) except Exception as e: logger.error(f"Mark price error {symbol}: {e}") return 0.0 # ============================================================ # ACCOUNT # ============================================================ def get_balance(self): """USDT available balance (Unified Trading Account).""" try: resp = self.session.get_wallet_balance(accountType="UNIFIED") result = self._check(resp, "balance") for acc in result.get("list", []): for coin in acc.get("coin", []): if coin.get("coin") == "USDT": avail = coin.get("availableToWithdraw", "") if avail and avail != "": return float(avail) return float(coin.get("walletBalance", "0")) except Exception as e: logger.error(f"Balance error: {e}") return 0.0 def get_positions(self): """ All open positions in Binance-compatible format. positionAmt: positive=LONG, negative=SHORT (signed). """ try: resp = self.session.get_positions( category="linear", settleCoin="USDT", ) result = self._check(resp, "positions") positions = [] for p in result.get("list", []): size = float(p.get("size", "0")) if size == 0: continue side = p.get("side", "") # Binance convention: positionAmt positive=LONG, negative=SHORT pos_amt = size if side == "Buy" else -size positions.append({ "symbol": p["symbol"], "positionAmt": str(pos_amt), "markPrice": p.get("markPrice", "0"), "entryPrice": p.get("avgPrice", "0"), "unRealizedProfit": p.get("unrealisedPnl", "0"), }) return positions except Exception as e: logger.error(f"Positions error: {e}") return [] def set_leverage(self, symbol): """Set leverage (both buy and sell sides). Caps at symbol's maxLeverage.""" target_lev = LEVERAGE # Check maxLeverage from instrument info info = self.get_instrument_info_raw(symbol) if info: max_lev_str = info.get("leverageFilter", {}).get("maxLeverage", "") if max_lev_str: try: max_lev = int(float(max_lev_str)) if target_lev > max_lev: logger.info(f"{symbol}: leverage {target_lev}x > max {max_lev}x, capping") target_lev = max_lev except (ValueError, TypeError): pass try: self.session.set_leverage( category="linear", symbol=symbol, buyLeverage=str(target_lev), sellLeverage=str(target_lev), ) logger.info(f"Leverage set to {target_lev}x for {symbol}") return target_lev except InvalidRequestError as e: if "110043" not in str(e): # Already set to this value logger.warning(f"Leverage error {symbol}: {e}") return target_lev except Exception as e: logger.warning(f"Leverage error {symbol}: {e}") return target_lev def get_instrument_info_raw(self, symbol): """Get raw instrument info from cache or API.""" self._load_instruments() info = self._instruments_cache.get(symbol) if not info: try: resp = self.session.get_instruments_info(category="linear", symbol=symbol) result = self._check(resp, f"instrument({symbol})") items = result.get("list", []) if items: info = items[0] self._instruments_cache[symbol] = info except Exception: pass return info def set_margin_type(self, symbol, margin_type="CROSSED"): """Set margin mode. CROSSED=cross, ISOLATED=isolated.""" mode = 0 if margin_type == "CROSSED" else 1 try: self.session.switch_margin_mode( category="linear", symbol=symbol, tradeMode=mode, buyLeverage=str(LEVERAGE), sellLeverage=str(LEVERAGE), ) except InvalidRequestError as e: if "110026" not in str(e): # Already set logger.warning(f"Margin mode error {symbol}: {e}") except Exception as e: logger.warning(f"Margin mode error {symbol}: {e}") # ============================================================ # SYMBOL INFO # ============================================================ def get_symbol_info(self, symbol): """ Symbol precision/limits info. Returns dict with: price_precision, qty_precision, min_qty, tick_size, qty_step. """ self._load_instruments() info = self._instruments_cache.get(symbol) if not info: # Direct fetch fallback try: resp = self.session.get_instruments_info( category="linear", symbol=symbol, ) result = self._check(resp, f"instrument({symbol})") items = result.get("list", []) if items: info = items[0] self._instruments_cache[symbol] = info except Exception: pass if not info: return None lot = info.get("lotSizeFilter", {}) price_filter = info.get("priceFilter", {}) qty_step = float(lot.get("qtyStep", "0.001")) tick_size = float(price_filter.get("tickSize", "0.01")) min_qty = float(lot.get("minOrderQty", "0.001")) # Derive precision from step sizes qty_prec = max(0, -int(math.floor(math.log10(qty_step)))) if qty_step < 1 else 0 price_prec = max(0, -int(math.floor(math.log10(tick_size)))) if tick_size < 1 else 0 return { "price_precision": price_prec, "qty_precision": qty_prec, "min_qty": min_qty, "tick_size": tick_size, "qty_step": qty_step, } def round_price(self, symbol_info, price): """Round price down to valid tick size.""" tick = symbol_info.get("tick_size") if tick and tick > 0: price = math.floor(price / tick) * tick prec = max(0, -int(math.floor(math.log10(tick)))) if tick < 1 else 0 return round(price, prec) return round(price, symbol_info["price_precision"]) def round_qty(self, symbol_info, qty): """Round quantity down to valid step size.""" step = symbol_info.get("qty_step") if step and step > 0: qty = math.floor(qty / step) * step prec = max(0, -int(math.floor(math.log10(step)))) if step < 1 else 0 return round(qty, prec) return round(qty, symbol_info["qty_precision"]) # ============================================================ # ORDERS # ============================================================ def open_market(self, symbol, side, qty): """ Market order. Returns (order_dict, fill_price). Same interface as Binance version. """ try: resp = self.session.place_order( category="linear", symbol=symbol, side=self._bybit_side(side), orderType="Market", qty=str(qty), positionIdx=0, # One-way mode ) result = self._check(resp, f"market({symbol})") order_id = result.get("orderId", "") # Get fill price from order history time.sleep(0.3) fill_price = 0.0 try: hist = self.session.get_order_history( category="linear", symbol=symbol, orderId=order_id, ) hist_result = self._check(hist, f"order_hist({symbol})") items = hist_result.get("list", []) if items: fill_price = float(items[0].get("avgPrice", "0")) except Exception: pass # Fallback: get from position entry price if fill_price == 0: try: pos_resp = self.session.get_positions( category="linear", symbol=symbol, ) pos_result = self._check(pos_resp, f"pos_fill({symbol})") for p in pos_result.get("list", []): if float(p.get("size", "0")) != 0: fill_price = float(p.get("avgPrice", "0")) break except Exception: pass # Fallback 2: mark price if fill_price == 0: fill_price = self.get_mark_price(symbol) logger.warning(f"Using mark price as fill fallback: {fill_price}") logger.info(f"MARKET {side} {symbol} qty={qty} fill={fill_price}") return {"orderId": order_id}, fill_price except Exception as e: logger.error(f"Market order failed {symbol}: {e}") raise def place_sl(self, symbol, side, qty, stop_price, symbol_info): """ Stop-loss = conditional Market order with trigger. Bybit advantage: conditional orders ARE queryable! No invisible algo orders like Binance. Args: side: closing side ("SELL" for long SL, "BUY" for short SL) """ stop_price = self.round_price(symbol_info, stop_price) # triggerDirection: 1=rise above, 2=fall below # "SELL" = closing a LONG โ†’ price falls below SL โ†’ triggerDir=2 # "BUY" = closing a SHORT โ†’ price rises above SL โ†’ triggerDir=1 if side.upper() == "SELL": trigger_dir = 2 # Long SL: price falls below else: trigger_dir = 1 # Short SL: price rises above try: resp = self.session.place_order( category="linear", symbol=symbol, side=self._bybit_side(side), orderType="Market", qty=str(qty), triggerPrice=str(stop_price), triggerDirection=trigger_dir, triggerBy="MarkPrice", reduceOnly=True, positionIdx=0, ) result = self._check(resp, f"SL({symbol})") logger.info(f"SL placed {symbol} side={side} trigger={stop_price} qty={qty}") return result except Exception as e: logger.error(f"SL order failed {symbol}: {e}") raise def place_tp(self, symbol, side, qty, price, symbol_info): """ Take-profit = LIMIT reduceOnly order. Earns maker fee. Args: side: closing side ("SELL" for long TP, "BUY" for short TP) """ price = self.round_price(symbol_info, price) try: resp = self.session.place_order( category="linear", symbol=symbol, side=self._bybit_side(side), orderType="Limit", qty=str(qty), price=str(price), timeInForce="GTC", reduceOnly=True, positionIdx=0, ) result = self._check(resp, f"TP({symbol})") logger.info(f"TP placed {symbol} side={side} price={price} qty={qty}") return result except Exception as e: logger.error(f"TP order failed {symbol}: {e}") raise def cancel_all_orders(self, symbol, retries=3): """ Cancel ALL orders on a symbol (regular + conditional). Bybit cancel_all_orders kills both โ€” no invisible algo issue! """ for attempt in range(retries): try: self.session.cancel_all_orders( category="linear", symbol=symbol, ) # Verify time.sleep(0.3) remaining = self.get_open_orders(symbol) if remaining is None or len(remaining) == 0: logger.info(f"All orders cancelled {symbol}") return True logger.warning( f"Cancel {symbol}: {len(remaining)} survived attempt {attempt+1}" ) # Individual cancel fallback for o in remaining: oid = o.get("orderId", "") if oid: try: self.session.cancel_order( category="linear", symbol=symbol, orderId=oid, ) except Exception: pass time.sleep(0.3) except InvalidRequestError as e: if "110001" in str(e): # No orders to cancel return True logger.warning(f"Cancel error {symbol} attempt {attempt+1}: {e}") except Exception as e: logger.warning(f"Cancel error {symbol} attempt {attempt+1}: {e}") logger.error(f"Failed to cancel all orders on {symbol} after {retries} retries") return False def nuclear_cleanup(self): """ Startup: cancel ALL orders on ALL symbols. Prevents order accumulation after restarts. """ positions = self.get_positions() symbols = {p["symbol"] for p in positions} # Also find symbols with orphaned orders try: resp = self.session.get_open_orders( category="linear", settleCoin="USDT", ) result = self._check(resp, "all_open_orders") for o in result.get("list", []): sym = o.get("symbol", "") if sym: symbols.add(sym) except Exception as e: logger.warning(f"Error getting open orders: {e}") cleaned = 0 for sym in symbols: if sym: try: self.session.cancel_all_orders(category="linear", symbol=sym) cleaned += 1 except Exception: pass logger.info(f"Nuclear cleanup: cancelled on {cleaned} symbols") return cleaned def close_position(self, symbol, side, qty): """ Close position via market. Returns fill_price. """ self.cancel_all_orders(symbol) _, fill_price = self.open_market(symbol, side, qty) return fill_price @staticmethod def _safe_float(val, default="0"): """Parse float safely โ€” Bybit returns '' for some fields.""" try: return float(val) if val and val != "" else float(default) except (ValueError, TypeError): return float(default) def get_open_orders(self, symbol): """ Open orders in Binance-compatible format. type: "LIMIT" or "STOP_MARKET" (for manager order verification). """ try: resp = self.session.get_open_orders( category="linear", symbol=symbol, ) result = self._check(resp, f"open_orders({symbol})") orders = [] for o in result.get("list", []): order_type = o.get("orderType", "") trigger = self._safe_float(o.get("triggerPrice", "0")) # Map to Binance-style types if trigger > 0 and order_type == "Market": btype = "STOP_MARKET" elif order_type == "Limit": btype = "LIMIT" else: btype = order_type.upper() orders.append({ "orderId": o.get("orderId", ""), "type": btype, "side": o.get("side", "").upper(), "price": str(self._safe_float(o.get("price", ""))), "qty": str(self._safe_float(o.get("qty", ""))), "status": o.get("orderStatus", ""), "triggerPrice": str(self._safe_float(o.get("triggerPrice", ""))), }) return orders except Exception as e: logger.error(f"Open orders error {symbol}: {e}") return None # None = error (vs [] = genuinely no orders)