← Back
"""
Grid Bot — Screener v2 (Auto-Rotation)
=========================================
v2: Added Choppiness Index, 15m timeframe, ATR-adaptive spacing.

Score factors:
1. BB Width in sweet spot (0.3-1.5%)
2. ADX < 25 — no strong trend
3. Choppiness Index > 55 — choppy/ranging
4. Volume > $30M — liquidity
5. Micro-volatility — level crossings per hour

Breakout: BB expand + (ADX > 28 OR CHOP < 40)
"""

import time
import logging
import numpy as np
import pandas as pd

from src.config import (
    SCREENER_INTERVAL_SEC, SCREENER_MIN_VOLUME,
    SCREENER_BB_PERIOD, SCREENER_BB_STD,
    SCREENER_BB_WIDTH_MIN, SCREENER_BB_WIDTH_MAX,
    SCREENER_ADX_PERIOD, SCREENER_ADX_MAX,
    SCREENER_BREAKOUT_BB_MULT, SCREENER_BREAKOUT_ADX,
    SCREENER_CHOP_PERIOD, SCREENER_CHOP_MIN_ENTRY, SCREENER_CHOP_MAX_EXIT,
    SCREENER_NATR_MAX, EMA_SLOPE_PERIOD, EMA_SLOPE_MAX_PCT, FUNDING_MAX_ABS,
    SCREENER_TOP_N, GRID_CANDIDATES, GRID_BLACKLIST,
    GRID_SPACING_PCT, SCREENER_TF,
    ATR_PERIOD, ATR_SPACING_MULT, SPACING_MIN_PCT, SPACING_MAX_PCT,
)

logger = logging.getLogger("screener")


class Screener:
    def __init__(self, exchange):
        self.exchange = exchange
        self.scores = {}
        self.last_scan = 0
        self._ticker_cache = None
        self._ticker_ts = 0

    # ============================================================
    # MAIN SCAN
    # ============================================================

    def scan(self):
        """Scan all candidates on 15m TF."""
        self.scores = {}
        candidates = self._get_candidates()

        logger.info(f"Scanning {len(candidates)} candidates on {SCREENER_TF}...")

        for symbol in candidates:
            try:
                score_data = self._score_symbol(symbol)
                if score_data:
                    self.scores[symbol] = score_data
            except Exception as e:
                logger.debug(f"Score error {symbol}: {e}")
            time.sleep(0.1)

        self.last_scan = time.time()

        sorted_scores = sorted(self.scores.items(), key=lambda x: x[1]["score"], reverse=True)
        top5 = sorted_scores[:5]
        if top5:
            logger.info("Top 5 grid scores:")
            for sym, data in top5:
                logger.info(
                    f"  {sym}: score={data['score']:.1f} "
                    f"CHOP={data['chop']:.1f} NATR={data['natr']:.3f}% "
                    f"slope={data.get('ema_slope', 0):+.2f}% fund={data.get('funding', 0)*100:+.4f}% "
                    f"Vol=${data['volume']/1e6:.0f}M sp={data['atr_spacing']:.3f}%"
                )

        return self.scores

    def _get_candidates(self):
        candidates = set()
        for sym in GRID_CANDIDATES:
            if sym not in GRID_BLACKLIST:
                candidates.add(sym)

        tickers = self._get_tickers()
        usdt_tickers = [
            t for t in tickers
            if t["symbol"].endswith("USDT")
            and t["symbol"] not in GRID_BLACKLIST
            and float(t.get("quoteVolume", 0)) > SCREENER_MIN_VOLUME
        ]
        usdt_tickers.sort(key=lambda t: float(t.get("quoteVolume", 0)), reverse=True)
        for t in usdt_tickers[:SCREENER_TOP_N]:
            candidates.add(t["symbol"])

        return list(candidates)

    def _get_tickers(self):
        now = time.time()
        if self._ticker_cache is None or now - self._ticker_ts > 60:
            self._ticker_cache = self.exchange.get_all_tickers_24h()
            self._ticker_ts = now
        return self._ticker_cache

    # ============================================================
    # SCORING
    # ============================================================

    def _score_symbol(self, symbol):
        """Score on 15m TF with BB + ADX + Choppiness + ATR spacing."""
        try:
            klines = self.exchange.get_klines(symbol, SCREENER_TF, limit=100)
        except Exception:
            return None

        if not klines or len(klines) < SCREENER_BB_PERIOD + 10:
            return None

        df = pd.DataFrame(klines, columns=[
            'ts', 'open', 'high', 'low', 'close', 'volume',
            'close_time', 'quote_volume', 'trades', 'taker_buy_base',
            'taker_buy_quote', 'ignore'
        ])
        for col in ['open', 'high', 'low', 'close', 'volume', 'quote_volume']:
            df[col] = df[col].astype(float)

        close = df['close']

        # BB Width
        bb_mid = close.rolling(SCREENER_BB_PERIOD).mean()
        bb_std = close.rolling(SCREENER_BB_PERIOD).std()
        bb_upper = bb_mid + SCREENER_BB_STD * bb_std
        bb_lower = bb_mid - SCREENER_BB_STD * bb_std
        bb_width = ((bb_upper - bb_lower) / bb_mid * 100).iloc[-1]
        if np.isnan(bb_width):
            return None

        # ADX
        adx = self._calc_adx(df, SCREENER_ADX_PERIOD)
        if np.isnan(adx):
            return None

        # Choppiness Index
        chop = self._calc_choppiness(df, SCREENER_CHOP_PERIOD)
        if np.isnan(chop):
            return None

        # ATR for adaptive spacing
        atr = self._calc_atr(df, ATR_PERIOD)
        price = float(close.iloc[-1])
        atr_spacing = self._get_atr_spacing(atr, price)
        natr = (atr / price * 100) if price > 0 else 999

        # ============================================================
        # HARD GATE 1: NATR < 0.65% (v3 — низкая вола)
        # ============================================================
        if natr > SCREENER_NATR_MAX:
            return None

        # ============================================================
        # HARD GATE 2: EMA20 slope < 0.4% (нет тренда)
        # ============================================================
        ema_slope_pct = self._calc_ema_slope(close, EMA_SLOPE_PERIOD, 10)
        if abs(ema_slope_pct) > EMA_SLOPE_MAX_PCT:
            return None

        # Volume
        volume_24h = 0
        tickers = self._get_tickers()
        for t in tickers:
            if t["symbol"] == symbol:
                volume_24h = float(t.get("quoteVolume", 0))
                break
        if volume_24h < SCREENER_MIN_VOLUME:
            return None

        # Funding — только для инфо (фильтр убран по запросу Rick'а)
        try:
            funding = self.exchange.get_funding_rate(symbol)
        except Exception:
            funding = 0.0

        # Micro-volatility (для инфо, не hard gate)
        micro_vol = self._calc_micro_volatility(df, 60, atr_spacing)

        # ============================================================
        # SCORE CALCULATION (max 100 pts)
        # ============================================================
        score = 0

        # BB Width score (0-25 pts)
        if SCREENER_BB_WIDTH_MIN <= bb_width <= SCREENER_BB_WIDTH_MAX:
            mid = (SCREENER_BB_WIDTH_MIN + SCREENER_BB_WIDTH_MAX) / 2
            distance = abs(bb_width - mid) / (SCREENER_BB_WIDTH_MAX - SCREENER_BB_WIDTH_MIN)
            score += 25 * (1 - distance)
        elif bb_width < SCREENER_BB_WIDTH_MIN:
            score += 5
        # else: 0 (too wide)

        # ADX score (0-25 pts)
        if adx < SCREENER_ADX_MAX:
            score += 25 * (1 - adx / SCREENER_ADX_MAX)
        else:
            score -= 10

        # Choppiness score (0-20 pts) — higher = more ranging = better
        if chop > SCREENER_CHOP_MIN_ENTRY:
            score += 20 * min((chop - SCREENER_CHOP_MIN_ENTRY) / 20, 1.0)
        elif chop > 45:
            score += 10  # moderate chop
        else:
            score -= 5  # trending

        # Volume score (0-15 pts)
        vol_score = min(volume_24h / 200_000_000, 1.0)
        score += 15 * vol_score

        # Micro-volatility score (0-15 pts)
        mv_score = min(micro_vol / 20, 1.0)
        score += 15 * mv_score

        return {
            "score": round(score, 2),
            "bb_width": round(bb_width, 4),
            "adx": round(adx, 2),
            "chop": round(chop, 2),
            "volume": volume_24h,
            "micro_vol": round(micro_vol, 1),
            "natr": round(natr, 3),
            "ema_slope": round(ema_slope_pct, 3),
            "funding": round(funding, 6),
            "price": price,
            "atr_spacing": round(atr_spacing, 4),
            "atr": round(atr, 6),
            "ts": time.time(),
        }

    def _calc_ema_slope(self, close_series, period=20, lookback=10):
        """EMA slope in %: (ema_now - ema_N_ago) / ema_N_ago * 100."""
        mult = 2 / (period + 1)
        ema_vals = []
        prices = close_series.values
        if len(prices) < period + lookback:
            return 0.0
        ema = float(prices[0])
        for p in prices[1:]:
            ema = float(p) * mult + ema * (1 - mult)
            ema_vals.append(ema)
        if len(ema_vals) < lookback + 1:
            return 0.0
        ema_now = ema_vals[-1]
        ema_past = ema_vals[-lookback - 1]
        if ema_past == 0:
            return 0.0
        return (ema_now - ema_past) / ema_past * 100

    # ============================================================
    # INDICATORS
    # ============================================================

    def _calc_adx(self, df, period=14):
        high, low, close = df['high'], df['low'], df['close']
        plus_dm = high.diff()
        minus_dm = -low.diff()
        plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0.0)
        minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0.0)
        tr = pd.concat([high - low, (high - close.shift(1)).abs(), (low - close.shift(1)).abs()], axis=1).max(axis=1)
        atr = tr.ewm(alpha=1/period, min_periods=period).mean()
        plus_di = 100 * (plus_dm.ewm(alpha=1/period, min_periods=period).mean() / atr)
        minus_di = 100 * (minus_dm.ewm(alpha=1/period, min_periods=period).mean() / atr)
        dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-10)
        adx = dx.ewm(alpha=1/period, min_periods=period).mean()
        return float(adx.iloc[-1])

    def _calc_choppiness(self, df, period=14):
        """Choppiness Index: >62 = choppy/ranging, <38 = trending."""
        high, low, close = df['high'], df['low'], df['close']
        tr = pd.concat([
            high - low,
            (high - close.shift(1)).abs(),
            (low - close.shift(1)).abs()
        ], axis=1).max(axis=1)
        atr_sum = tr.rolling(period).sum()
        highest = high.rolling(period).max()
        lowest = low.rolling(period).min()
        hl_range = highest - lowest
        hl_range = hl_range.replace(0, np.nan)
        chop = 100 * np.log10(atr_sum / hl_range) / np.log10(period)
        val = chop.iloc[-1]
        return float(val) if not np.isnan(val) else np.nan

    def _calc_atr(self, df, period=14):
        """Calculate current ATR value."""
        high, low, close = df['high'], df['low'], df['close']
        tr = pd.concat([
            high - low,
            (high - close.shift(1)).abs(),
            (low - close.shift(1)).abs()
        ], axis=1).max(axis=1)
        atr = tr.ewm(alpha=1/period, min_periods=period).mean()
        return float(atr.iloc[-1])

    def _get_atr_spacing(self, atr_val, price):
        """ATR-adaptive spacing: clamped to [MIN, MAX]."""
        if price <= 0:
            return SPACING_MIN_PCT
        raw_pct = (atr_val / price) * 100 * ATR_SPACING_MULT
        return float(np.clip(raw_pct, SPACING_MIN_PCT, SPACING_MAX_PCT))

    def get_atr_spacing_for_symbol(self, symbol):
        """Get ATR spacing for a specific symbol (used by grid_manager)."""
        data = self.scores.get(symbol)
        if data and "atr_spacing" in data:
            return data["atr_spacing"]
        # Fallback: calculate fresh
        try:
            klines = self.exchange.get_klines(symbol, SCREENER_TF, limit=50)
            if not klines or len(klines) < ATR_PERIOD + 5:
                return GRID_SPACING_PCT
            df = pd.DataFrame(klines, columns=[
                'ts', 'open', 'high', 'low', 'close', 'volume',
                'close_time', 'quote_volume', 'trades', 'taker_buy_base',
                'taker_buy_quote', 'ignore'
            ])
            for col in ['open', 'high', 'low', 'close']:
                df[col] = df[col].astype(float)
            atr = self._calc_atr(df, ATR_PERIOD)
            price = float(df['close'].iloc[-1])
            return self._get_atr_spacing(atr, price)
        except Exception:
            return GRID_SPACING_PCT

    def _calc_micro_volatility(self, df, lookback=60, spacing_pct=None):
        """Level crossings using ATR spacing."""
        if spacing_pct is None:
            spacing_pct = GRID_SPACING_PCT
        if len(df) < lookback:
            lookback = len(df)

        recent = df.tail(lookback)
        closes = recent['close'].values
        if len(closes) < 2:
            return 0

        avg_price = np.mean(closes)
        spacing = avg_price * spacing_pct / 100
        if spacing <= 0:
            return 0

        crossings = 0
        prev_level = int(closes[0] / spacing)
        for price in closes[1:]:
            curr_level = int(price / spacing)
            crossings += abs(curr_level - prev_level)
            prev_level = curr_level

        return crossings

    # ============================================================
    # BEST COIN
    # ============================================================

    def get_best_coin(self, exclude=None):
        if not self.scores:
            return None, None
        # exclude can be a single symbol string or a set of symbols
        if isinstance(exclude, str):
            exclude = {exclude}
        elif exclude is None:
            exclude = set()
        candidates = {
            sym: data for sym, data in self.scores.items()
            if sym not in exclude
            and data["score"] > 20
            and data["chop"] >= SCREENER_CHOP_MIN_ENTRY        # CHOP >= 55 (ranging)
            and data["natr"] <= SCREENER_NATR_MAX              # NATR <= 0.65% (low vol)
            and abs(data.get("ema_slope", 0)) <= EMA_SLOPE_MAX_PCT  # flat EMA
        }
        if not candidates:
            return None, None
        best_sym = max(candidates, key=lambda s: candidates[s]["score"])
        return best_sym, candidates[best_sym]

    def should_scan(self):
        return time.time() - self.last_scan >= SCREENER_INTERVAL_SEC

    # ============================================================
    # BREAKOUT DETECTION (BB + ADX + CHOP)
    # ============================================================

    def is_breakout(self, symbol):
        """Breakout: BB expand AND (ADX > threshold OR CHOP trending)."""
        try:
            klines = self.exchange.get_klines(symbol, SCREENER_TF, limit=50)
        except Exception:
            return False

        if not klines or len(klines) < SCREENER_BB_PERIOD + 5:
            return False

        df = pd.DataFrame(klines, columns=[
            'ts', 'open', 'high', 'low', 'close', 'volume',
            'close_time', 'quote_volume', 'trades', 'taker_buy_base',
            'taker_buy_quote', 'ignore'
        ])
        for col in ['open', 'high', 'low', 'close', 'volume', 'quote_volume']:
            df[col] = df[col].astype(float)

        close = df['close']
        bb_mid = close.rolling(SCREENER_BB_PERIOD).mean()
        bb_std_val = close.rolling(SCREENER_BB_PERIOD).std()
        bb_width = (((bb_mid + SCREENER_BB_STD * bb_std_val) -
                     (bb_mid - SCREENER_BB_STD * bb_std_val)) / bb_mid * 100).iloc[-1]

        adx = self._calc_adx(df, SCREENER_ADX_PERIOD)
        chop = self._calc_choppiness(df, SCREENER_CHOP_PERIOD)

        if np.isnan(bb_width) or np.isnan(adx) or np.isnan(chop):
            return False

        bb_break = bb_width > SCREENER_BB_WIDTH_MAX * SCREENER_BREAKOUT_BB_MULT
        adx_break = adx > SCREENER_BREAKOUT_ADX
        chop_trend = chop < SCREENER_CHOP_MAX_EXIT

        if bb_break and (adx_break or chop_trend):
            logger.info(
                f"BREAKOUT: {symbol} BB={bb_width:.3f}% ADX={adx:.1f} CHOP={chop:.1f}"
            )
            return True

        return False

    def get_scan_summary(self):
        if not self.scores:
            return "📭 No scan data"
        sorted_scores = sorted(self.scores.items(), key=lambda x: x[1]["score"], reverse=True)
        lines = [f"📊 *Grid Screener v2* ({len(self.scores)} coins, {SCREENER_TF})\n"]
        for i, (sym, data) in enumerate(sorted_scores[:5]):
            medal = ["🥇", "🥈", "🥉", "4.", "5."][i]
            lines.append(
                f"{medal} *{sym}* score={data['score']:.0f}\n"
                f"   BB={data['bb_width']:.2f}% ADX={data['adx']:.0f} "
                f"CHOP={data['chop']:.0f} "
                f"sp={data['atr_spacing']:.2f}%"
            )
        return "\n".join(lines)

📜 Git History

c6f6bd5chore: initial commit — version control setup5 weeks ago
Show last diff
Loading...