← Назад
""" Zatochki (Knife Catcher) — Market Screener ============================================= 3-фазное сканирование каждые 60 сек: 1. Volume + NATR фильтр (quick) 2. Signal detection (vol spike + exhaustion + VWAP + RSI) 3. OI change filter (liquidation confirmation) Ищет монеты где: - Был volume spike >= 7x - Объём иссякает (2 declining bars) - Цена перетянута от VWAP >= 2% - RSI в экстремуме - OI снижается (ликвидации, не conviction) """ import time import json import logging import requests from datetime import datetime, timezone from src.zatochki_config import ( BLACKLIST, MIN_VOLUME_24H, NATR_5M_MIN, TIMEFRAME, Z_COOLDOWNS_FILE, COOLDOWN_CANDLES, VOL_SPIKE_MULT, VOL_SMA_PERIOD, SPIKE_LOOKBACK, EXHAUSTION_BARS, VWAP_PERIOD, VWAP_EXT_PCT, RSI_PERIOD, RSI_OVERSOLD, RSI_OVERBOUGHT, SL_BUFFER_PCT, SL_CAP_PCT, SL_MIN_PCT, OI_DROP_PCT, OI_CHECK_ENABLED, ) from src.zatochki_indicators import check_zatochki_signal from src.indicators import calc_atr_pct # reuse ATR from squeeze logger = logging.getLogger("zatochki.screener") class ZatochkiScreener: def __init__(self, exchange, notifier=None): self.exchange = exchange self.notifier = notifier self.signals = [] # current pending signals self.get_open_positions = None # callback from manager # ============================================================ # PHASE 1: Quick filter — Volume + NATR # ============================================================ def get_candidates(self): """Volume + NATR + blacklist filter.""" tickers = self.exchange.get_all_tickers_24h() candidates = [] for t in tickers: symbol = t["symbol"] if not symbol.endswith("USDT"): continue if symbol in BLACKLIST: continue volume_24h = float(t.get("quoteVolume", 0)) if volume_24h < MIN_VOLUME_24H: continue candidates.append({ "symbol": symbol, "volume_24h": volume_24h, "price": float(t.get("lastPrice", 0)), "change_pct": float(t.get("priceChangePercent", 0)), }) logger.info(f"Phase 1: {len(candidates)} candidates (vol>${MIN_VOLUME_24H/1e6:.0f}M)") return candidates def filter_by_natr(self, candidates): """NATR/5m >= threshold to skip sleeping coins.""" passed = [] for c in candidates: symbol = c["symbol"] try: klines_5m = self.exchange.get_klines(symbol, "5m", limit=20) if len(klines_5m) < 15: continue natr_5m = calc_atr_pct(klines_5m, 14) if natr_5m is None or natr_5m < NATR_5M_MIN: continue c["natr_5m"] = round(natr_5m, 2) passed.append(c) time.sleep(0.02) except Exception as e: logger.debug(f"NATR error {symbol}: {e}") continue logger.info(f"Phase 1b: {len(passed)} passed NATR/5m>={NATR_5M_MIN}%") return passed # ============================================================ # PHASE 2: Signal detection # ============================================================ def check_signals(self, candidates): """Check each candidate for Zatochki entry signal.""" new_signals = [] cooldowns = self._load_cooldowns() # Get open positions to avoid duplicates open_symbols = set() if self.get_open_positions: positions = self.get_open_positions() open_symbols = {p.get("symbol") for p in positions.values()} if isinstance(positions, dict) else set() for c in candidates: symbol = c["symbol"] # Skip if already in position if symbol in open_symbols: continue # Cooldown check if symbol in cooldowns and time.time() < cooldowns[symbol]: continue try: # Get 1m klines (need enough for all indicators) klines = self.exchange.get_klines(symbol, TIMEFRAME, limit=200) if len(klines) < 80: continue signal = check_zatochki_signal( klines, rsi_period=RSI_PERIOD, rsi_os=RSI_OVERSOLD, rsi_ob=RSI_OVERBOUGHT, vwap_period=VWAP_PERIOD, vwap_ext=VWAP_EXT_PCT, vol_sma_period=VOL_SMA_PERIOD, vol_spike_mult=VOL_SPIKE_MULT, spike_lookback=SPIKE_LOOKBACK, exhaustion_bars=EXHAUSTION_BARS, sl_buffer=SL_BUFFER_PCT, sl_cap=SL_CAP_PCT, sl_min=SL_MIN_PCT, ) if signal is None: continue signal["symbol"] = symbol signal["volume_24h"] = c["volume_24h"] signal["natr_5m"] = c.get("natr_5m", 0) signal["timestamp"] = time.time() new_signals.append(signal) time.sleep(0.05) except Exception as e: logger.debug(f"Signal check error {symbol}: {e}") continue logger.info(f"Phase 2: {len(new_signals)} signals found") return new_signals # ============================================================ # PHASE 3: OI Change Filter # ============================================================ def filter_by_oi(self, signals): """ Check OI change — must be dropping (liquidation cascade). Uses Bybit open-interest API. """ if not OI_CHECK_ENABLED: logger.debug("OI filter disabled, passing all signals") return signals passed = [] for sig in signals: symbol = sig["symbol"] try: oi_change = self._get_oi_change(symbol) if oi_change is None: # Can't get OI data — let it through with warning logger.warning(f"OI data unavailable for {symbol}, allowing signal") sig["oi_change_pct"] = None passed.append(sig) continue sig["oi_change_pct"] = round(oi_change * 100, 3) if oi_change <= OI_DROP_PCT: # OI dropping — liquidation confirmed passed.append(sig) logger.info(f"OI confirmed {symbol}: {oi_change*100:+.2f}%") else: logger.info(f"OI filter skip {symbol}: {oi_change*100:+.2f}% (need <={OI_DROP_PCT*100}%)") except Exception as e: logger.debug(f"OI error {symbol}: {e}") # On error, let signal through sig["oi_change_pct"] = None passed.append(sig) logger.info(f"Phase 3: {len(passed)}/{len(signals)} passed OI filter") return passed def _get_oi_change(self, symbol, interval="5min", limit=2): """ Get OI change % over last interval. Bybit API: /v5/market/open-interest """ try: url = "https://api.bybit.com/v5/market/open-interest" params = { "category": "linear", "symbol": symbol, "intervalTime": interval, "limit": limit, } resp = requests.get(url, params=params, timeout=5) data = resp.json() rows = data.get("result", {}).get("list", []) if len(rows) < 2: return None # rows[0] = latest, rows[1] = previous oi_now = float(rows[0].get("openInterest", 0)) oi_prev = float(rows[1].get("openInterest", 0)) if oi_prev == 0: return None return (oi_now - oi_prev) / oi_prev except Exception as e: logger.debug(f"OI API error {symbol}: {e}") return None # ============================================================ # FULL SCAN # ============================================================ def run_scan(self): """ Full 3-phase scan. Returns list of ready-to-trade signals. Called every 60 seconds from main loop. """ try: # Phase 1: Volume filter candidates = self.get_candidates() if not candidates: return [] # Phase 1b: NATR filter candidates = self.filter_by_natr(candidates) if not candidates: return [] # Phase 2: Signal detection signals = self.check_signals(candidates) if not signals: return [] # Phase 3: OI filter signals = self.filter_by_oi(signals) self.signals = signals if signals and self.notifier: for sig in signals: self.notifier( f"\U0001f52a *ZATOCHKI Signal*\n" f"`{sig['symbol']}` {sig['direction']}\n" f"Vol spike: {sig['vol_spike_ratio']}x | " f"VWAP dist: {sig['vwap_dist_pct']}%\n" f"RSI: {sig['rsi']} | SL: {sig['sl_pct']}%\n" f"OI: {sig.get('oi_change_pct', 'N/A')}%" ) return signals except Exception as e: logger.error(f"Scan error: {e}", exc_info=True) return [] # ============================================================ # COOLDOWNS # ============================================================ def set_cooldown(self, symbol): """Set cooldown for symbol (COOLDOWN_CANDLES minutes on 1m).""" cooldowns = self._load_cooldowns() cooldowns[symbol] = time.time() + COOLDOWN_CANDLES * 60 self._save_cooldowns(cooldowns) def _load_cooldowns(self): try: with open(Z_COOLDOWNS_FILE, "r") as f: data = json.load(f) # Cleanup expired now = time.time() return {k: v for k, v in data.items() if v > now} except (FileNotFoundError, json.JSONDecodeError): return {} def _save_cooldowns(self, cooldowns): try: with open(Z_COOLDOWNS_FILE, "w") as f: json.dump(cooldowns, f, indent=2) except Exception as e: logger.error(f"Save cooldowns error: {e}")