← Назад
""" Grid Bot — Screener v2 (Auto-Rotation) ========================================= v2: Added Choppiness Index, 15m timeframe, ATR-adaptive spacing. Score factors: 1. BB Width in sweet spot (0.3-1.5%) 2. ADX < 25 — no strong trend 3. Choppiness Index > 55 — choppy/ranging 4. Volume > $30M — liquidity 5. Micro-volatility — level crossings per hour Breakout: BB expand + (ADX > 28 OR CHOP < 40) """ import time import logging import numpy as np import pandas as pd from src.config import ( SCREENER_INTERVAL_SEC, SCREENER_MIN_VOLUME, SCREENER_BB_PERIOD, SCREENER_BB_STD, SCREENER_BB_WIDTH_MIN, SCREENER_BB_WIDTH_MAX, SCREENER_ADX_PERIOD, SCREENER_ADX_MAX, SCREENER_BREAKOUT_BB_MULT, SCREENER_BREAKOUT_ADX, SCREENER_CHOP_PERIOD, SCREENER_CHOP_MIN_ENTRY, SCREENER_CHOP_MAX_EXIT, SCREENER_NATR_MAX, EMA_SLOPE_PERIOD, EMA_SLOPE_MAX_PCT, FUNDING_MAX_ABS, SCREENER_TOP_N, GRID_CANDIDATES, GRID_BLACKLIST, GRID_SPACING_PCT, SCREENER_TF, ATR_PERIOD, ATR_SPACING_MULT, SPACING_MIN_PCT, SPACING_MAX_PCT, ) logger = logging.getLogger("screener") class Screener: def __init__(self, exchange): self.exchange = exchange self.scores = {} self.last_scan = 0 self._ticker_cache = None self._ticker_ts = 0 # ============================================================ # MAIN SCAN # ============================================================ def scan(self): """Scan all candidates on 15m TF.""" self.scores = {} candidates = self._get_candidates() logger.info(f"Scanning {len(candidates)} candidates on {SCREENER_TF}...") for symbol in candidates: try: score_data = self._score_symbol(symbol) if score_data: self.scores[symbol] = score_data except Exception as e: logger.debug(f"Score error {symbol}: {e}") time.sleep(0.1) self.last_scan = time.time() sorted_scores = sorted(self.scores.items(), key=lambda x: x[1]["score"], reverse=True) top5 = sorted_scores[:5] if top5: logger.info("Top 5 grid scores:") for sym, data in top5: logger.info( f" {sym}: score={data['score']:.1f} " f"CHOP={data['chop']:.1f} NATR={data['natr']:.3f}% " f"slope={data.get('ema_slope', 0):+.2f}% fund={data.get('funding', 0)*100:+.4f}% " f"Vol=${data['volume']/1e6:.0f}M sp={data['atr_spacing']:.3f}%" ) return self.scores def _get_candidates(self): candidates = set() for sym in GRID_CANDIDATES: if sym not in GRID_BLACKLIST: candidates.add(sym) tickers = self._get_tickers() usdt_tickers = [ t for t in tickers if t["symbol"].endswith("USDT") and t["symbol"] not in GRID_BLACKLIST and float(t.get("quoteVolume", 0)) > SCREENER_MIN_VOLUME ] usdt_tickers.sort(key=lambda t: float(t.get("quoteVolume", 0)), reverse=True) for t in usdt_tickers[:SCREENER_TOP_N]: candidates.add(t["symbol"]) return list(candidates) def _get_tickers(self): now = time.time() if self._ticker_cache is None or now - self._ticker_ts > 60: self._ticker_cache = self.exchange.get_all_tickers_24h() self._ticker_ts = now return self._ticker_cache # ============================================================ # SCORING # ============================================================ def _score_symbol(self, symbol): """Score on 15m TF with BB + ADX + Choppiness + ATR spacing.""" try: klines = self.exchange.get_klines(symbol, SCREENER_TF, limit=100) except Exception: return None if not klines or len(klines) < SCREENER_BB_PERIOD + 10: return None df = pd.DataFrame(klines, columns=[ 'ts', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_volume', 'trades', 'taker_buy_base', 'taker_buy_quote', 'ignore' ]) for col in ['open', 'high', 'low', 'close', 'volume', 'quote_volume']: df[col] = df[col].astype(float) close = df['close'] # BB Width bb_mid = close.rolling(SCREENER_BB_PERIOD).mean() bb_std = close.rolling(SCREENER_BB_PERIOD).std() bb_upper = bb_mid + SCREENER_BB_STD * bb_std bb_lower = bb_mid - SCREENER_BB_STD * bb_std bb_width = ((bb_upper - bb_lower) / bb_mid * 100).iloc[-1] if np.isnan(bb_width): return None # ADX adx = self._calc_adx(df, SCREENER_ADX_PERIOD) if np.isnan(adx): return None # Choppiness Index chop = self._calc_choppiness(df, SCREENER_CHOP_PERIOD) if np.isnan(chop): return None # ATR for adaptive spacing atr = self._calc_atr(df, ATR_PERIOD) price = float(close.iloc[-1]) atr_spacing = self._get_atr_spacing(atr, price) natr = (atr / price * 100) if price > 0 else 999 # ============================================================ # HARD GATE 1: NATR < 0.65% (v3 — низкая вола) # ============================================================ if natr > SCREENER_NATR_MAX: return None # ============================================================ # HARD GATE 2: EMA20 slope < 0.4% (нет тренда) # ============================================================ ema_slope_pct = self._calc_ema_slope(close, EMA_SLOPE_PERIOD, 10) if abs(ema_slope_pct) > EMA_SLOPE_MAX_PCT: return None # Volume volume_24h = 0 tickers = self._get_tickers() for t in tickers: if t["symbol"] == symbol: volume_24h = float(t.get("quoteVolume", 0)) break if volume_24h < SCREENER_MIN_VOLUME: return None # Funding — только для инфо (фильтр убран по запросу Rick'а) try: funding = self.exchange.get_funding_rate(symbol) except Exception: funding = 0.0 # Micro-volatility (для инфо, не hard gate) micro_vol = self._calc_micro_volatility(df, 60, atr_spacing) # ============================================================ # SCORE CALCULATION (max 100 pts) # ============================================================ score = 0 # BB Width score (0-25 pts) if SCREENER_BB_WIDTH_MIN <= bb_width <= SCREENER_BB_WIDTH_MAX: mid = (SCREENER_BB_WIDTH_MIN + SCREENER_BB_WIDTH_MAX) / 2 distance = abs(bb_width - mid) / (SCREENER_BB_WIDTH_MAX - SCREENER_BB_WIDTH_MIN) score += 25 * (1 - distance) elif bb_width < SCREENER_BB_WIDTH_MIN: score += 5 # else: 0 (too wide) # ADX score (0-25 pts) if adx < SCREENER_ADX_MAX: score += 25 * (1 - adx / SCREENER_ADX_MAX) else: score -= 10 # Choppiness score (0-20 pts) — higher = more ranging = better if chop > SCREENER_CHOP_MIN_ENTRY: score += 20 * min((chop - SCREENER_CHOP_MIN_ENTRY) / 20, 1.0) elif chop > 45: score += 10 # moderate chop else: score -= 5 # trending # Volume score (0-15 pts) vol_score = min(volume_24h / 200_000_000, 1.0) score += 15 * vol_score # Micro-volatility score (0-15 pts) mv_score = min(micro_vol / 20, 1.0) score += 15 * mv_score return { "score": round(score, 2), "bb_width": round(bb_width, 4), "adx": round(adx, 2), "chop": round(chop, 2), "volume": volume_24h, "micro_vol": round(micro_vol, 1), "natr": round(natr, 3), "ema_slope": round(ema_slope_pct, 3), "funding": round(funding, 6), "price": price, "atr_spacing": round(atr_spacing, 4), "atr": round(atr, 6), "ts": time.time(), } def _calc_ema_slope(self, close_series, period=20, lookback=10): """EMA slope in %: (ema_now - ema_N_ago) / ema_N_ago * 100.""" mult = 2 / (period + 1) ema_vals = [] prices = close_series.values if len(prices) < period + lookback: return 0.0 ema = float(prices[0]) for p in prices[1:]: ema = float(p) * mult + ema * (1 - mult) ema_vals.append(ema) if len(ema_vals) < lookback + 1: return 0.0 ema_now = ema_vals[-1] ema_past = ema_vals[-lookback - 1] if ema_past == 0: return 0.0 return (ema_now - ema_past) / ema_past * 100 # ============================================================ # INDICATORS # ============================================================ def _calc_adx(self, df, period=14): high, low, close = df['high'], df['low'], df['close'] plus_dm = high.diff() minus_dm = -low.diff() plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0.0) minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0.0) tr = pd.concat([high - low, (high - close.shift(1)).abs(), (low - close.shift(1)).abs()], axis=1).max(axis=1) atr = tr.ewm(alpha=1/period, min_periods=period).mean() plus_di = 100 * (plus_dm.ewm(alpha=1/period, min_periods=period).mean() / atr) minus_di = 100 * (minus_dm.ewm(alpha=1/period, min_periods=period).mean() / atr) dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-10) adx = dx.ewm(alpha=1/period, min_periods=period).mean() return float(adx.iloc[-1]) def _calc_choppiness(self, df, period=14): """Choppiness Index: >62 = choppy/ranging, <38 = trending.""" high, low, close = df['high'], df['low'], df['close'] tr = pd.concat([ high - low, (high - close.shift(1)).abs(), (low - close.shift(1)).abs() ], axis=1).max(axis=1) atr_sum = tr.rolling(period).sum() highest = high.rolling(period).max() lowest = low.rolling(period).min() hl_range = highest - lowest hl_range = hl_range.replace(0, np.nan) chop = 100 * np.log10(atr_sum / hl_range) / np.log10(period) val = chop.iloc[-1] return float(val) if not np.isnan(val) else np.nan def _calc_atr(self, df, period=14): """Calculate current ATR value.""" high, low, close = df['high'], df['low'], df['close'] tr = pd.concat([ high - low, (high - close.shift(1)).abs(), (low - close.shift(1)).abs() ], axis=1).max(axis=1) atr = tr.ewm(alpha=1/period, min_periods=period).mean() return float(atr.iloc[-1]) def _get_atr_spacing(self, atr_val, price): """ATR-adaptive spacing: clamped to [MIN, MAX].""" if price <= 0: return SPACING_MIN_PCT raw_pct = (atr_val / price) * 100 * ATR_SPACING_MULT return float(np.clip(raw_pct, SPACING_MIN_PCT, SPACING_MAX_PCT)) def get_atr_spacing_for_symbol(self, symbol): """Get ATR spacing for a specific symbol (used by grid_manager).""" data = self.scores.get(symbol) if data and "atr_spacing" in data: return data["atr_spacing"] # Fallback: calculate fresh try: klines = self.exchange.get_klines(symbol, SCREENER_TF, limit=50) if not klines or len(klines) < ATR_PERIOD + 5: return GRID_SPACING_PCT df = pd.DataFrame(klines, columns=[ 'ts', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_volume', 'trades', 'taker_buy_base', 'taker_buy_quote', 'ignore' ]) for col in ['open', 'high', 'low', 'close']: df[col] = df[col].astype(float) atr = self._calc_atr(df, ATR_PERIOD) price = float(df['close'].iloc[-1]) return self._get_atr_spacing(atr, price) except Exception: return GRID_SPACING_PCT def _calc_micro_volatility(self, df, lookback=60, spacing_pct=None): """Level crossings using ATR spacing.""" if spacing_pct is None: spacing_pct = GRID_SPACING_PCT if len(df) < lookback: lookback = len(df) recent = df.tail(lookback) closes = recent['close'].values if len(closes) < 2: return 0 avg_price = np.mean(closes) spacing = avg_price * spacing_pct / 100 if spacing <= 0: return 0 crossings = 0 prev_level = int(closes[0] / spacing) for price in closes[1:]: curr_level = int(price / spacing) crossings += abs(curr_level - prev_level) prev_level = curr_level return crossings # ============================================================ # BEST COIN # ============================================================ def get_best_coin(self, exclude=None): if not self.scores: return None, None # exclude can be a single symbol string or a set of symbols if isinstance(exclude, str): exclude = {exclude} elif exclude is None: exclude = set() candidates = { sym: data for sym, data in self.scores.items() if sym not in exclude and data["score"] > 20 and data["chop"] >= SCREENER_CHOP_MIN_ENTRY # CHOP >= 55 (ranging) and data["natr"] <= SCREENER_NATR_MAX # NATR <= 0.65% (low vol) and abs(data.get("ema_slope", 0)) <= EMA_SLOPE_MAX_PCT # flat EMA } if not candidates: return None, None best_sym = max(candidates, key=lambda s: candidates[s]["score"]) return best_sym, candidates[best_sym] def should_scan(self): return time.time() - self.last_scan >= SCREENER_INTERVAL_SEC # ============================================================ # BREAKOUT DETECTION (BB + ADX + CHOP) # ============================================================ def is_breakout(self, symbol): """Breakout: BB expand AND (ADX > threshold OR CHOP trending).""" try: klines = self.exchange.get_klines(symbol, SCREENER_TF, limit=50) except Exception: return False if not klines or len(klines) < SCREENER_BB_PERIOD + 5: return False df = pd.DataFrame(klines, columns=[ 'ts', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_volume', 'trades', 'taker_buy_base', 'taker_buy_quote', 'ignore' ]) for col in ['open', 'high', 'low', 'close', 'volume', 'quote_volume']: df[col] = df[col].astype(float) close = df['close'] bb_mid = close.rolling(SCREENER_BB_PERIOD).mean() bb_std_val = close.rolling(SCREENER_BB_PERIOD).std() bb_width = (((bb_mid + SCREENER_BB_STD * bb_std_val) - (bb_mid - SCREENER_BB_STD * bb_std_val)) / bb_mid * 100).iloc[-1] adx = self._calc_adx(df, SCREENER_ADX_PERIOD) chop = self._calc_choppiness(df, SCREENER_CHOP_PERIOD) if np.isnan(bb_width) or np.isnan(adx) or np.isnan(chop): return False bb_break = bb_width > SCREENER_BB_WIDTH_MAX * SCREENER_BREAKOUT_BB_MULT adx_break = adx > SCREENER_BREAKOUT_ADX chop_trend = chop < SCREENER_CHOP_MAX_EXIT if bb_break and (adx_break or chop_trend): logger.info( f"BREAKOUT: {symbol} BB={bb_width:.3f}% ADX={adx:.1f} CHOP={chop:.1f}" ) return True return False def get_scan_summary(self): if not self.scores: return "📭 No scan data" sorted_scores = sorted(self.scores.items(), key=lambda x: x[1]["score"], reverse=True) lines = [f"📊 *Grid Screener v2* ({len(self.scores)} coins, {SCREENER_TF})\n"] for i, (sym, data) in enumerate(sorted_scores[:5]): medal = ["🥇", "🥈", "🥉", "4.", "5."][i] lines.append( f"{medal} *{sym}* score={data['score']:.0f}\n" f" BB={data['bb_width']:.2f}% ADX={data['adx']:.0f} " f"CHOP={data['chop']:.0f} " f"sp={data['atr_spacing']:.2f}%" ) return "\n".join(lines)