← Назад"""
Zatochki (Knife Catcher) — Market Screener
=============================================
3-фазное сканирование каждые 60 сек:
1. Volume + NATR фильтр (quick)
2. Signal detection (vol spike + exhaustion + VWAP + RSI)
3. OI change filter (liquidation confirmation)
Ищет монеты где:
- Был volume spike >= 7x
- Объём иссякает (2 declining bars)
- Цена перетянута от VWAP >= 2%
- RSI в экстремуме
- OI снижается (ликвидации, не conviction)
"""
import time
import json
import logging
import requests
from datetime import datetime, timezone
from src.zatochki_config import (
BLACKLIST, MIN_VOLUME_24H, NATR_5M_MIN,
TIMEFRAME, Z_COOLDOWNS_FILE, COOLDOWN_CANDLES,
VOL_SPIKE_MULT, VOL_SMA_PERIOD, SPIKE_LOOKBACK,
EXHAUSTION_BARS, VWAP_PERIOD, VWAP_EXT_PCT,
RSI_PERIOD, RSI_OVERSOLD, RSI_OVERBOUGHT,
SL_BUFFER_PCT, SL_CAP_PCT, SL_MIN_PCT,
OI_DROP_PCT, OI_CHECK_ENABLED,
)
from src.zatochki_indicators import check_zatochki_signal
from src.indicators import calc_atr_pct # reuse ATR from squeeze
logger = logging.getLogger("zatochki.screener")
class ZatochkiScreener:
def __init__(self, exchange, notifier=None):
self.exchange = exchange
self.notifier = notifier
self.signals = [] # current pending signals
self.get_open_positions = None # callback from manager
# ============================================================
# PHASE 1: Quick filter — Volume + NATR
# ============================================================
def get_candidates(self):
"""Volume + NATR + blacklist filter."""
tickers = self.exchange.get_all_tickers_24h()
candidates = []
for t in tickers:
symbol = t["symbol"]
if not symbol.endswith("USDT"):
continue
if symbol in BLACKLIST:
continue
volume_24h = float(t.get("quoteVolume", 0))
if volume_24h < MIN_VOLUME_24H:
continue
candidates.append({
"symbol": symbol,
"volume_24h": volume_24h,
"price": float(t.get("lastPrice", 0)),
"change_pct": float(t.get("priceChangePercent", 0)),
})
logger.info(f"Phase 1: {len(candidates)} candidates (vol>${MIN_VOLUME_24H/1e6:.0f}M)")
return candidates
def filter_by_natr(self, candidates):
"""NATR/5m >= threshold to skip sleeping coins."""
passed = []
for c in candidates:
symbol = c["symbol"]
try:
klines_5m = self.exchange.get_klines(symbol, "5m", limit=20)
if len(klines_5m) < 15:
continue
natr_5m = calc_atr_pct(klines_5m, 14)
if natr_5m is None or natr_5m < NATR_5M_MIN:
continue
c["natr_5m"] = round(natr_5m, 2)
passed.append(c)
time.sleep(0.02)
except Exception as e:
logger.debug(f"NATR error {symbol}: {e}")
continue
logger.info(f"Phase 1b: {len(passed)} passed NATR/5m>={NATR_5M_MIN}%")
return passed
# ============================================================
# PHASE 2: Signal detection
# ============================================================
def check_signals(self, candidates):
"""Check each candidate for Zatochki entry signal."""
new_signals = []
cooldowns = self._load_cooldowns()
# Get open positions to avoid duplicates
open_symbols = set()
if self.get_open_positions:
positions = self.get_open_positions()
open_symbols = {p.get("symbol") for p in positions.values()} if isinstance(positions, dict) else set()
for c in candidates:
symbol = c["symbol"]
# Skip if already in position
if symbol in open_symbols:
continue
# Cooldown check
if symbol in cooldowns and time.time() < cooldowns[symbol]:
continue
try:
# Get 1m klines (need enough for all indicators)
klines = self.exchange.get_klines(symbol, TIMEFRAME, limit=200)
if len(klines) < 80:
continue
signal = check_zatochki_signal(
klines,
rsi_period=RSI_PERIOD,
rsi_os=RSI_OVERSOLD,
rsi_ob=RSI_OVERBOUGHT,
vwap_period=VWAP_PERIOD,
vwap_ext=VWAP_EXT_PCT,
vol_sma_period=VOL_SMA_PERIOD,
vol_spike_mult=VOL_SPIKE_MULT,
spike_lookback=SPIKE_LOOKBACK,
exhaustion_bars=EXHAUSTION_BARS,
sl_buffer=SL_BUFFER_PCT,
sl_cap=SL_CAP_PCT,
sl_min=SL_MIN_PCT,
)
if signal is None:
continue
signal["symbol"] = symbol
signal["volume_24h"] = c["volume_24h"]
signal["natr_5m"] = c.get("natr_5m", 0)
signal["timestamp"] = time.time()
new_signals.append(signal)
time.sleep(0.05)
except Exception as e:
logger.debug(f"Signal check error {symbol}: {e}")
continue
logger.info(f"Phase 2: {len(new_signals)} signals found")
return new_signals
# ============================================================
# PHASE 3: OI Change Filter
# ============================================================
def filter_by_oi(self, signals):
"""
Check OI change — must be dropping (liquidation cascade).
Uses Bybit open-interest API.
"""
if not OI_CHECK_ENABLED:
logger.debug("OI filter disabled, passing all signals")
return signals
passed = []
for sig in signals:
symbol = sig["symbol"]
try:
oi_change = self._get_oi_change(symbol)
if oi_change is None:
# Can't get OI data — let it through with warning
logger.warning(f"OI data unavailable for {symbol}, allowing signal")
sig["oi_change_pct"] = None
passed.append(sig)
continue
sig["oi_change_pct"] = round(oi_change * 100, 3)
if oi_change <= OI_DROP_PCT:
# OI dropping — liquidation confirmed
passed.append(sig)
logger.info(f"OI confirmed {symbol}: {oi_change*100:+.2f}%")
else:
logger.info(f"OI filter skip {symbol}: {oi_change*100:+.2f}% (need <={OI_DROP_PCT*100}%)")
except Exception as e:
logger.debug(f"OI error {symbol}: {e}")
# On error, let signal through
sig["oi_change_pct"] = None
passed.append(sig)
logger.info(f"Phase 3: {len(passed)}/{len(signals)} passed OI filter")
return passed
def _get_oi_change(self, symbol, interval="5min", limit=2):
"""
Get OI change % over last interval.
Bybit API: /v5/market/open-interest
"""
try:
url = "https://api.bybit.com/v5/market/open-interest"
params = {
"category": "linear",
"symbol": symbol,
"intervalTime": interval,
"limit": limit,
}
resp = requests.get(url, params=params, timeout=5)
data = resp.json()
rows = data.get("result", {}).get("list", [])
if len(rows) < 2:
return None
# rows[0] = latest, rows[1] = previous
oi_now = float(rows[0].get("openInterest", 0))
oi_prev = float(rows[1].get("openInterest", 0))
if oi_prev == 0:
return None
return (oi_now - oi_prev) / oi_prev
except Exception as e:
logger.debug(f"OI API error {symbol}: {e}")
return None
# ============================================================
# FULL SCAN
# ============================================================
def run_scan(self):
"""
Full 3-phase scan. Returns list of ready-to-trade signals.
Called every 60 seconds from main loop.
"""
try:
# Phase 1: Volume filter
candidates = self.get_candidates()
if not candidates:
return []
# Phase 1b: NATR filter
candidates = self.filter_by_natr(candidates)
if not candidates:
return []
# Phase 2: Signal detection
signals = self.check_signals(candidates)
if not signals:
return []
# Phase 3: OI filter
signals = self.filter_by_oi(signals)
self.signals = signals
if signals and self.notifier:
for sig in signals:
self.notifier(
f"\U0001f52a *ZATOCHKI Signal*\n"
f"`{sig['symbol']}` {sig['direction']}\n"
f"Vol spike: {sig['vol_spike_ratio']}x | "
f"VWAP dist: {sig['vwap_dist_pct']}%\n"
f"RSI: {sig['rsi']} | SL: {sig['sl_pct']}%\n"
f"OI: {sig.get('oi_change_pct', 'N/A')}%"
)
return signals
except Exception as e:
logger.error(f"Scan error: {e}", exc_info=True)
return []
# ============================================================
# COOLDOWNS
# ============================================================
def set_cooldown(self, symbol):
"""Set cooldown for symbol (COOLDOWN_CANDLES minutes on 1m)."""
cooldowns = self._load_cooldowns()
cooldowns[symbol] = time.time() + COOLDOWN_CANDLES * 60
self._save_cooldowns(cooldowns)
def _load_cooldowns(self):
try:
with open(Z_COOLDOWNS_FILE, "r") as f:
data = json.load(f)
# Cleanup expired
now = time.time()
return {k: v for k, v in data.items() if v > now}
except (FileNotFoundError, json.JSONDecodeError):
return {}
def _save_cooldowns(self, cooldowns):
try:
with open(Z_COOLDOWNS_FILE, "w") as f:
json.dump(cooldowns, f, indent=2)
except Exception as e:
logger.error(f"Save cooldowns error: {e}")