← Назад"""
Grid Bot — Screener v2 (Auto-Rotation)
=========================================
v2: Added Choppiness Index, 15m timeframe, ATR-adaptive spacing.
Score factors:
1. BB Width in sweet spot (0.3-1.5%)
2. ADX < 25 — no strong trend
3. Choppiness Index > 55 — choppy/ranging
4. Volume > $30M — liquidity
5. Micro-volatility — level crossings per hour
Breakout: BB expand + (ADX > 28 OR CHOP < 40)
"""
import time
import logging
import numpy as np
import pandas as pd
from src.config import (
SCREENER_INTERVAL_SEC, SCREENER_MIN_VOLUME,
SCREENER_BB_PERIOD, SCREENER_BB_STD,
SCREENER_BB_WIDTH_MIN, SCREENER_BB_WIDTH_MAX,
SCREENER_ADX_PERIOD, SCREENER_ADX_MAX,
SCREENER_BREAKOUT_BB_MULT, SCREENER_BREAKOUT_ADX,
SCREENER_CHOP_PERIOD, SCREENER_CHOP_MIN_ENTRY, SCREENER_CHOP_MAX_EXIT,
SCREENER_NATR_MAX, EMA_SLOPE_PERIOD, EMA_SLOPE_MAX_PCT, FUNDING_MAX_ABS,
SCREENER_TOP_N, GRID_CANDIDATES, GRID_BLACKLIST,
GRID_SPACING_PCT, SCREENER_TF,
ATR_PERIOD, ATR_SPACING_MULT, SPACING_MIN_PCT, SPACING_MAX_PCT,
)
logger = logging.getLogger("screener")
class Screener:
def __init__(self, exchange):
self.exchange = exchange
self.scores = {}
self.last_scan = 0
self._ticker_cache = None
self._ticker_ts = 0
# ============================================================
# MAIN SCAN
# ============================================================
def scan(self):
"""Scan all candidates on 15m TF."""
self.scores = {}
candidates = self._get_candidates()
logger.info(f"Scanning {len(candidates)} candidates on {SCREENER_TF}...")
for symbol in candidates:
try:
score_data = self._score_symbol(symbol)
if score_data:
self.scores[symbol] = score_data
except Exception as e:
logger.debug(f"Score error {symbol}: {e}")
time.sleep(0.1)
self.last_scan = time.time()
sorted_scores = sorted(self.scores.items(), key=lambda x: x[1]["score"], reverse=True)
top5 = sorted_scores[:5]
if top5:
logger.info("Top 5 grid scores:")
for sym, data in top5:
logger.info(
f" {sym}: score={data['score']:.1f} "
f"CHOP={data['chop']:.1f} NATR={data['natr']:.3f}% "
f"slope={data.get('ema_slope', 0):+.2f}% fund={data.get('funding', 0)*100:+.4f}% "
f"Vol=${data['volume']/1e6:.0f}M sp={data['atr_spacing']:.3f}%"
)
return self.scores
def _get_candidates(self):
candidates = set()
for sym in GRID_CANDIDATES:
if sym not in GRID_BLACKLIST:
candidates.add(sym)
tickers = self._get_tickers()
usdt_tickers = [
t for t in tickers
if t["symbol"].endswith("USDT")
and t["symbol"] not in GRID_BLACKLIST
and float(t.get("quoteVolume", 0)) > SCREENER_MIN_VOLUME
]
usdt_tickers.sort(key=lambda t: float(t.get("quoteVolume", 0)), reverse=True)
for t in usdt_tickers[:SCREENER_TOP_N]:
candidates.add(t["symbol"])
return list(candidates)
def _get_tickers(self):
now = time.time()
if self._ticker_cache is None or now - self._ticker_ts > 60:
self._ticker_cache = self.exchange.get_all_tickers_24h()
self._ticker_ts = now
return self._ticker_cache
# ============================================================
# SCORING
# ============================================================
def _score_symbol(self, symbol):
"""Score on 15m TF with BB + ADX + Choppiness + ATR spacing."""
try:
klines = self.exchange.get_klines(symbol, SCREENER_TF, limit=100)
except Exception:
return None
if not klines or len(klines) < SCREENER_BB_PERIOD + 10:
return None
df = pd.DataFrame(klines, columns=[
'ts', 'open', 'high', 'low', 'close', 'volume',
'close_time', 'quote_volume', 'trades', 'taker_buy_base',
'taker_buy_quote', 'ignore'
])
for col in ['open', 'high', 'low', 'close', 'volume', 'quote_volume']:
df[col] = df[col].astype(float)
close = df['close']
# BB Width
bb_mid = close.rolling(SCREENER_BB_PERIOD).mean()
bb_std = close.rolling(SCREENER_BB_PERIOD).std()
bb_upper = bb_mid + SCREENER_BB_STD * bb_std
bb_lower = bb_mid - SCREENER_BB_STD * bb_std
bb_width = ((bb_upper - bb_lower) / bb_mid * 100).iloc[-1]
if np.isnan(bb_width):
return None
# ADX
adx = self._calc_adx(df, SCREENER_ADX_PERIOD)
if np.isnan(adx):
return None
# Choppiness Index
chop = self._calc_choppiness(df, SCREENER_CHOP_PERIOD)
if np.isnan(chop):
return None
# ATR for adaptive spacing
atr = self._calc_atr(df, ATR_PERIOD)
price = float(close.iloc[-1])
atr_spacing = self._get_atr_spacing(atr, price)
natr = (atr / price * 100) if price > 0 else 999
# ============================================================
# HARD GATE 1: NATR < 0.65% (v3 — низкая вола)
# ============================================================
if natr > SCREENER_NATR_MAX:
return None
# ============================================================
# HARD GATE 2: EMA20 slope < 0.4% (нет тренда)
# ============================================================
ema_slope_pct = self._calc_ema_slope(close, EMA_SLOPE_PERIOD, 10)
if abs(ema_slope_pct) > EMA_SLOPE_MAX_PCT:
return None
# Volume
volume_24h = 0
tickers = self._get_tickers()
for t in tickers:
if t["symbol"] == symbol:
volume_24h = float(t.get("quoteVolume", 0))
break
if volume_24h < SCREENER_MIN_VOLUME:
return None
# Funding — только для инфо (фильтр убран по запросу Rick'а)
try:
funding = self.exchange.get_funding_rate(symbol)
except Exception:
funding = 0.0
# Micro-volatility (для инфо, не hard gate)
micro_vol = self._calc_micro_volatility(df, 60, atr_spacing)
# ============================================================
# SCORE CALCULATION (max 100 pts)
# ============================================================
score = 0
# BB Width score (0-25 pts)
if SCREENER_BB_WIDTH_MIN <= bb_width <= SCREENER_BB_WIDTH_MAX:
mid = (SCREENER_BB_WIDTH_MIN + SCREENER_BB_WIDTH_MAX) / 2
distance = abs(bb_width - mid) / (SCREENER_BB_WIDTH_MAX - SCREENER_BB_WIDTH_MIN)
score += 25 * (1 - distance)
elif bb_width < SCREENER_BB_WIDTH_MIN:
score += 5
# else: 0 (too wide)
# ADX score (0-25 pts)
if adx < SCREENER_ADX_MAX:
score += 25 * (1 - adx / SCREENER_ADX_MAX)
else:
score -= 10
# Choppiness score (0-20 pts) — higher = more ranging = better
if chop > SCREENER_CHOP_MIN_ENTRY:
score += 20 * min((chop - SCREENER_CHOP_MIN_ENTRY) / 20, 1.0)
elif chop > 45:
score += 10 # moderate chop
else:
score -= 5 # trending
# Volume score (0-15 pts)
vol_score = min(volume_24h / 200_000_000, 1.0)
score += 15 * vol_score
# Micro-volatility score (0-15 pts)
mv_score = min(micro_vol / 20, 1.0)
score += 15 * mv_score
return {
"score": round(score, 2),
"bb_width": round(bb_width, 4),
"adx": round(adx, 2),
"chop": round(chop, 2),
"volume": volume_24h,
"micro_vol": round(micro_vol, 1),
"natr": round(natr, 3),
"ema_slope": round(ema_slope_pct, 3),
"funding": round(funding, 6),
"price": price,
"atr_spacing": round(atr_spacing, 4),
"atr": round(atr, 6),
"ts": time.time(),
}
def _calc_ema_slope(self, close_series, period=20, lookback=10):
"""EMA slope in %: (ema_now - ema_N_ago) / ema_N_ago * 100."""
mult = 2 / (period + 1)
ema_vals = []
prices = close_series.values
if len(prices) < period + lookback:
return 0.0
ema = float(prices[0])
for p in prices[1:]:
ema = float(p) * mult + ema * (1 - mult)
ema_vals.append(ema)
if len(ema_vals) < lookback + 1:
return 0.0
ema_now = ema_vals[-1]
ema_past = ema_vals[-lookback - 1]
if ema_past == 0:
return 0.0
return (ema_now - ema_past) / ema_past * 100
# ============================================================
# INDICATORS
# ============================================================
def _calc_adx(self, df, period=14):
high, low, close = df['high'], df['low'], df['close']
plus_dm = high.diff()
minus_dm = -low.diff()
plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0.0)
minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0.0)
tr = pd.concat([high - low, (high - close.shift(1)).abs(), (low - close.shift(1)).abs()], axis=1).max(axis=1)
atr = tr.ewm(alpha=1/period, min_periods=period).mean()
plus_di = 100 * (plus_dm.ewm(alpha=1/period, min_periods=period).mean() / atr)
minus_di = 100 * (minus_dm.ewm(alpha=1/period, min_periods=period).mean() / atr)
dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-10)
adx = dx.ewm(alpha=1/period, min_periods=period).mean()
return float(adx.iloc[-1])
def _calc_choppiness(self, df, period=14):
"""Choppiness Index: >62 = choppy/ranging, <38 = trending."""
high, low, close = df['high'], df['low'], df['close']
tr = pd.concat([
high - low,
(high - close.shift(1)).abs(),
(low - close.shift(1)).abs()
], axis=1).max(axis=1)
atr_sum = tr.rolling(period).sum()
highest = high.rolling(period).max()
lowest = low.rolling(period).min()
hl_range = highest - lowest
hl_range = hl_range.replace(0, np.nan)
chop = 100 * np.log10(atr_sum / hl_range) / np.log10(period)
val = chop.iloc[-1]
return float(val) if not np.isnan(val) else np.nan
def _calc_atr(self, df, period=14):
"""Calculate current ATR value."""
high, low, close = df['high'], df['low'], df['close']
tr = pd.concat([
high - low,
(high - close.shift(1)).abs(),
(low - close.shift(1)).abs()
], axis=1).max(axis=1)
atr = tr.ewm(alpha=1/period, min_periods=period).mean()
return float(atr.iloc[-1])
def _get_atr_spacing(self, atr_val, price):
"""ATR-adaptive spacing: clamped to [MIN, MAX]."""
if price <= 0:
return SPACING_MIN_PCT
raw_pct = (atr_val / price) * 100 * ATR_SPACING_MULT
return float(np.clip(raw_pct, SPACING_MIN_PCT, SPACING_MAX_PCT))
def get_atr_spacing_for_symbol(self, symbol):
"""Get ATR spacing for a specific symbol (used by grid_manager)."""
data = self.scores.get(symbol)
if data and "atr_spacing" in data:
return data["atr_spacing"]
# Fallback: calculate fresh
try:
klines = self.exchange.get_klines(symbol, SCREENER_TF, limit=50)
if not klines or len(klines) < ATR_PERIOD + 5:
return GRID_SPACING_PCT
df = pd.DataFrame(klines, columns=[
'ts', 'open', 'high', 'low', 'close', 'volume',
'close_time', 'quote_volume', 'trades', 'taker_buy_base',
'taker_buy_quote', 'ignore'
])
for col in ['open', 'high', 'low', 'close']:
df[col] = df[col].astype(float)
atr = self._calc_atr(df, ATR_PERIOD)
price = float(df['close'].iloc[-1])
return self._get_atr_spacing(atr, price)
except Exception:
return GRID_SPACING_PCT
def _calc_micro_volatility(self, df, lookback=60, spacing_pct=None):
"""Level crossings using ATR spacing."""
if spacing_pct is None:
spacing_pct = GRID_SPACING_PCT
if len(df) < lookback:
lookback = len(df)
recent = df.tail(lookback)
closes = recent['close'].values
if len(closes) < 2:
return 0
avg_price = np.mean(closes)
spacing = avg_price * spacing_pct / 100
if spacing <= 0:
return 0
crossings = 0
prev_level = int(closes[0] / spacing)
for price in closes[1:]:
curr_level = int(price / spacing)
crossings += abs(curr_level - prev_level)
prev_level = curr_level
return crossings
# ============================================================
# BEST COIN
# ============================================================
def get_best_coin(self, exclude=None):
if not self.scores:
return None, None
# exclude can be a single symbol string or a set of symbols
if isinstance(exclude, str):
exclude = {exclude}
elif exclude is None:
exclude = set()
candidates = {
sym: data for sym, data in self.scores.items()
if sym not in exclude
and data["score"] > 20
and data["chop"] >= SCREENER_CHOP_MIN_ENTRY # CHOP >= 55 (ranging)
and data["natr"] <= SCREENER_NATR_MAX # NATR <= 0.65% (low vol)
and abs(data.get("ema_slope", 0)) <= EMA_SLOPE_MAX_PCT # flat EMA
}
if not candidates:
return None, None
best_sym = max(candidates, key=lambda s: candidates[s]["score"])
return best_sym, candidates[best_sym]
def should_scan(self):
return time.time() - self.last_scan >= SCREENER_INTERVAL_SEC
# ============================================================
# BREAKOUT DETECTION (BB + ADX + CHOP)
# ============================================================
def is_breakout(self, symbol):
"""Breakout: BB expand AND (ADX > threshold OR CHOP trending)."""
try:
klines = self.exchange.get_klines(symbol, SCREENER_TF, limit=50)
except Exception:
return False
if not klines or len(klines) < SCREENER_BB_PERIOD + 5:
return False
df = pd.DataFrame(klines, columns=[
'ts', 'open', 'high', 'low', 'close', 'volume',
'close_time', 'quote_volume', 'trades', 'taker_buy_base',
'taker_buy_quote', 'ignore'
])
for col in ['open', 'high', 'low', 'close', 'volume', 'quote_volume']:
df[col] = df[col].astype(float)
close = df['close']
bb_mid = close.rolling(SCREENER_BB_PERIOD).mean()
bb_std_val = close.rolling(SCREENER_BB_PERIOD).std()
bb_width = (((bb_mid + SCREENER_BB_STD * bb_std_val) -
(bb_mid - SCREENER_BB_STD * bb_std_val)) / bb_mid * 100).iloc[-1]
adx = self._calc_adx(df, SCREENER_ADX_PERIOD)
chop = self._calc_choppiness(df, SCREENER_CHOP_PERIOD)
if np.isnan(bb_width) or np.isnan(adx) or np.isnan(chop):
return False
bb_break = bb_width > SCREENER_BB_WIDTH_MAX * SCREENER_BREAKOUT_BB_MULT
adx_break = adx > SCREENER_BREAKOUT_ADX
chop_trend = chop < SCREENER_CHOP_MAX_EXIT
if bb_break and (adx_break or chop_trend):
logger.info(
f"BREAKOUT: {symbol} BB={bb_width:.3f}% ADX={adx:.1f} CHOP={chop:.1f}"
)
return True
return False
def get_scan_summary(self):
if not self.scores:
return "📭 No scan data"
sorted_scores = sorted(self.scores.items(), key=lambda x: x[1]["score"], reverse=True)
lines = [f"📊 *Grid Screener v2* ({len(self.scores)} coins, {SCREENER_TF})\n"]
for i, (sym, data) in enumerate(sorted_scores[:5]):
medal = ["🥇", "🥈", "🥉", "4.", "5."][i]
lines.append(
f"{medal} *{sym}* score={data['score']:.0f}\n"
f" BB={data['bb_width']:.2f}% ADX={data['adx']:.0f} "
f"CHOP={data['chop']:.0f} "
f"sp={data['atr_spacing']:.2f}%"
)
return "\n".join(lines)