← Back
"""
Zatochki (Knife Catcher) — Market Screener
=============================================
3-фазное сканирование каждые 60 сек:
1. Volume + NATR фильтр (quick)
2. Signal detection (vol spike + exhaustion + VWAP + RSI)
3. OI change filter (liquidation confirmation)

Ищет монеты где:
- Был volume spike >= 7x
- Объём иссякает (2 declining bars)
- Цена перетянута от VWAP >= 2%
- RSI в экстремуме
- OI снижается (ликвидации, не conviction)
"""

import time
import json
import logging
import requests
from datetime import datetime, timezone

from src.zatochki_config import (
    BLACKLIST, MIN_VOLUME_24H, NATR_5M_MIN,
    TIMEFRAME, Z_COOLDOWNS_FILE, COOLDOWN_CANDLES,
    VOL_SPIKE_MULT, VOL_SMA_PERIOD, SPIKE_LOOKBACK,
    EXHAUSTION_BARS, VWAP_PERIOD, VWAP_EXT_PCT,
    RSI_PERIOD, RSI_OVERSOLD, RSI_OVERBOUGHT,
    SL_BUFFER_PCT, SL_CAP_PCT, SL_MIN_PCT,
    OI_DROP_PCT, OI_CHECK_ENABLED,
)
from src.zatochki_indicators import check_zatochki_signal
from src.indicators import calc_atr_pct  # reuse ATR from squeeze

logger = logging.getLogger("zatochki.screener")


class ZatochkiScreener:
    def __init__(self, exchange, notifier=None):
        self.exchange = exchange
        self.notifier = notifier
        self.signals = []  # current pending signals
        self.get_open_positions = None  # callback from manager

    # ============================================================
    # PHASE 1: Quick filter — Volume + NATR
    # ============================================================

    def get_candidates(self):
        """Volume + NATR + blacklist filter."""
        tickers = self.exchange.get_all_tickers_24h()
        candidates = []

        for t in tickers:
            symbol = t["symbol"]
            if not symbol.endswith("USDT"):
                continue
            if symbol in BLACKLIST:
                continue

            volume_24h = float(t.get("quoteVolume", 0))
            if volume_24h < MIN_VOLUME_24H:
                continue

            candidates.append({
                "symbol": symbol,
                "volume_24h": volume_24h,
                "price": float(t.get("lastPrice", 0)),
                "change_pct": float(t.get("priceChangePercent", 0)),
            })

        logger.info(f"Phase 1: {len(candidates)} candidates (vol>${MIN_VOLUME_24H/1e6:.0f}M)")
        return candidates

    def filter_by_natr(self, candidates):
        """NATR/5m >= threshold to skip sleeping coins."""
        passed = []

        for c in candidates:
            symbol = c["symbol"]
            try:
                klines_5m = self.exchange.get_klines(symbol, "5m", limit=20)
                if len(klines_5m) < 15:
                    continue

                natr_5m = calc_atr_pct(klines_5m, 14)
                if natr_5m is None or natr_5m < NATR_5M_MIN:
                    continue

                c["natr_5m"] = round(natr_5m, 2)
                passed.append(c)
                time.sleep(0.02)

            except Exception as e:
                logger.debug(f"NATR error {symbol}: {e}")
                continue

        logger.info(f"Phase 1b: {len(passed)} passed NATR/5m>={NATR_5M_MIN}%")
        return passed

    # ============================================================
    # PHASE 2: Signal detection
    # ============================================================

    def check_signals(self, candidates):
        """Check each candidate for Zatochki entry signal."""
        new_signals = []
        cooldowns = self._load_cooldowns()

        # Get open positions to avoid duplicates
        open_symbols = set()
        if self.get_open_positions:
            positions = self.get_open_positions()
            open_symbols = {p.get("symbol") for p in positions.values()} if isinstance(positions, dict) else set()

        for c in candidates:
            symbol = c["symbol"]

            # Skip if already in position
            if symbol in open_symbols:
                continue

            # Cooldown check
            if symbol in cooldowns and time.time() < cooldowns[symbol]:
                continue

            try:
                # Get 1m klines (need enough for all indicators)
                klines = self.exchange.get_klines(symbol, TIMEFRAME, limit=200)
                if len(klines) < 80:
                    continue

                signal = check_zatochki_signal(
                    klines,
                    rsi_period=RSI_PERIOD,
                    rsi_os=RSI_OVERSOLD,
                    rsi_ob=RSI_OVERBOUGHT,
                    vwap_period=VWAP_PERIOD,
                    vwap_ext=VWAP_EXT_PCT,
                    vol_sma_period=VOL_SMA_PERIOD,
                    vol_spike_mult=VOL_SPIKE_MULT,
                    spike_lookback=SPIKE_LOOKBACK,
                    exhaustion_bars=EXHAUSTION_BARS,
                    sl_buffer=SL_BUFFER_PCT,
                    sl_cap=SL_CAP_PCT,
                    sl_min=SL_MIN_PCT,
                )

                if signal is None:
                    continue

                signal["symbol"] = symbol
                signal["volume_24h"] = c["volume_24h"]
                signal["natr_5m"] = c.get("natr_5m", 0)
                signal["timestamp"] = time.time()

                new_signals.append(signal)
                time.sleep(0.05)

            except Exception as e:
                logger.debug(f"Signal check error {symbol}: {e}")
                continue

        logger.info(f"Phase 2: {len(new_signals)} signals found")
        return new_signals

    # ============================================================
    # PHASE 3: OI Change Filter
    # ============================================================

    def filter_by_oi(self, signals):
        """
        Check OI change — must be dropping (liquidation cascade).
        Uses Bybit open-interest API.
        """
        if not OI_CHECK_ENABLED:
            logger.debug("OI filter disabled, passing all signals")
            return signals

        passed = []

        for sig in signals:
            symbol = sig["symbol"]
            try:
                oi_change = self._get_oi_change(symbol)

                if oi_change is None:
                    # Can't get OI data — let it through with warning
                    logger.warning(f"OI data unavailable for {symbol}, allowing signal")
                    sig["oi_change_pct"] = None
                    passed.append(sig)
                    continue

                sig["oi_change_pct"] = round(oi_change * 100, 3)

                if oi_change <= OI_DROP_PCT:
                    # OI dropping — liquidation confirmed
                    passed.append(sig)
                    logger.info(f"OI confirmed {symbol}: {oi_change*100:+.2f}%")
                else:
                    logger.info(f"OI filter skip {symbol}: {oi_change*100:+.2f}% (need <={OI_DROP_PCT*100}%)")

            except Exception as e:
                logger.debug(f"OI error {symbol}: {e}")
                # On error, let signal through
                sig["oi_change_pct"] = None
                passed.append(sig)

        logger.info(f"Phase 3: {len(passed)}/{len(signals)} passed OI filter")
        return passed

    def _get_oi_change(self, symbol, interval="5min", limit=2):
        """
        Get OI change % over last interval.
        Bybit API: /v5/market/open-interest
        """
        try:
            url = "https://api.bybit.com/v5/market/open-interest"
            params = {
                "category": "linear",
                "symbol": symbol,
                "intervalTime": interval,
                "limit": limit,
            }
            resp = requests.get(url, params=params, timeout=5)
            data = resp.json()

            rows = data.get("result", {}).get("list", [])
            if len(rows) < 2:
                return None

            # rows[0] = latest, rows[1] = previous
            oi_now = float(rows[0].get("openInterest", 0))
            oi_prev = float(rows[1].get("openInterest", 0))

            if oi_prev == 0:
                return None

            return (oi_now - oi_prev) / oi_prev

        except Exception as e:
            logger.debug(f"OI API error {symbol}: {e}")
            return None

    # ============================================================
    # FULL SCAN
    # ============================================================

    def run_scan(self):
        """
        Full 3-phase scan. Returns list of ready-to-trade signals.
        Called every 60 seconds from main loop.
        """
        try:
            # Phase 1: Volume filter
            candidates = self.get_candidates()
            if not candidates:
                return []

            # Phase 1b: NATR filter
            candidates = self.filter_by_natr(candidates)
            if not candidates:
                return []

            # Phase 2: Signal detection
            signals = self.check_signals(candidates)
            if not signals:
                return []

            # Phase 3: OI filter
            signals = self.filter_by_oi(signals)

            self.signals = signals

            if signals and self.notifier:
                for sig in signals:
                    self.notifier(
                        f"\U0001f52a *ZATOCHKI Signal*\n"
                        f"`{sig['symbol']}` {sig['direction']}\n"
                        f"Vol spike: {sig['vol_spike_ratio']}x | "
                        f"VWAP dist: {sig['vwap_dist_pct']}%\n"
                        f"RSI: {sig['rsi']} | SL: {sig['sl_pct']}%\n"
                        f"OI: {sig.get('oi_change_pct', 'N/A')}%"
                    )

            return signals

        except Exception as e:
            logger.error(f"Scan error: {e}", exc_info=True)
            return []

    # ============================================================
    # COOLDOWNS
    # ============================================================

    def set_cooldown(self, symbol):
        """Set cooldown for symbol (COOLDOWN_CANDLES minutes on 1m)."""
        cooldowns = self._load_cooldowns()
        cooldowns[symbol] = time.time() + COOLDOWN_CANDLES * 60
        self._save_cooldowns(cooldowns)

    def _load_cooldowns(self):
        try:
            with open(Z_COOLDOWNS_FILE, "r") as f:
                data = json.load(f)
            # Cleanup expired
            now = time.time()
            return {k: v for k, v in data.items() if v > now}
        except (FileNotFoundError, json.JSONDecodeError):
            return {}

    def _save_cooldowns(self, cooldowns):
        try:
            with open(Z_COOLDOWNS_FILE, "w") as f:
                json.dump(cooldowns, f, indent=2)
        except Exception as e:
            logger.error(f"Save cooldowns error: {e}")

📜 Git History

c6f6bd5chore: initial commit — version control setup5 weeks ago
Show last diff
Loading...