← Назад
""" Squeeze-VWAP Bot — Technical Indicators ========================================= LazyBear-inspired indicators. Чистые функции, numpy. Индикаторы: 1. Squeeze Momentum (BB vs KC + LinReg histogram) 2. Z-Distance from VWAP (mean reversion signal) 3. Waddah Attar Explosion (trend strength + direction) 4. ATR, EMA — вспомогательные Все работают с raw klines от Binance API. """ import numpy as np from src.config import ( SQZ_BB_LEN, SQZ_BB_MULT, SQZ_KC_LEN, SQZ_KC_MULT, SQZ_LINREG_LEN, ZVWAP_PERIOD, ZVWAP_ENTRY_THRESHOLD, WAD_FAST_LEN, WAD_SLOW_LEN, WAD_BB_LEN, WAD_BB_MULT, WAD_SENSITIVITY, ADX_PERIOD, ADX_TREND_THRESHOLD, ATR_PERIOD, ) # ============================================================ # HELPERS # ============================================================ def _klines_to_arrays(klines): """Конвертирует Binance klines в numpy arrays.""" opens = np.array([float(k[1]) for k in klines]) highs = np.array([float(k[2]) for k in klines]) lows = np.array([float(k[3]) for k in klines]) closes = np.array([float(k[4]) for k in klines]) volumes = np.array([float(k[5]) for k in klines]) return opens, highs, lows, closes, volumes def _ema(data, period): """Exponential Moving Average — full array.""" alpha = 2 / (period + 1) ema = np.zeros_like(data, dtype=float) ema[0] = data[0] for i in range(1, len(data)): ema[i] = alpha * data[i] + (1 - alpha) * ema[i - 1] return ema def _sma_array(data, period): """Simple Moving Average — rolling, full array.""" result = np.full_like(data, np.nan, dtype=float) for i in range(period - 1, len(data)): result[i] = np.mean(data[i - period + 1:i + 1]) return result def _linreg(data, period): """ Linear Regression Value — LazyBear's method. Вычисляет значение линейной регрессии для каждого бара. Это НЕ slope, а predicted value на конце regression line. """ result = np.full_like(data, 0.0, dtype=float) for i in range(period - 1, len(data)): y = data[i - period + 1:i + 1] x = np.arange(period, dtype=float) # y = mx + b, predict y at x = period - 1 x_mean = np.mean(x) y_mean = np.mean(y) denom = np.sum((x - x_mean) ** 2) if denom == 0: result[i] = y_mean else: slope = np.sum((x - x_mean) * (y - y_mean)) / denom intercept = y_mean - slope * x_mean result[i] = slope * (period - 1) + intercept return result def _true_range(highs, lows, closes): """True Range array.""" tr = np.zeros(len(highs)) tr[0] = highs[0] - lows[0] for i in range(1, len(highs)): tr[i] = max( highs[i] - lows[i], abs(highs[i] - closes[i - 1]), abs(lows[i] - closes[i - 1]), ) return tr # ============================================================ # 1. SQUEEZE MOMENTUM INDICATOR (LazyBear) # ============================================================ def calc_squeeze_momentum(klines): """ Squeeze Momentum Indicator (LazyBear). Логика: - BB внутри KC = SQUEEZE (чёрный крестик, рынок сжат) - BB выходит из KC = RELEASE (серый крестик, пружина разжалась) - Гистограмма = LinReg(close - avg(highest, lowest), period) - Яркий цвет = momentum нарастает, тёмный = затухает Returns: dict { 'histogram': float — последнее значение гистограммы (>0 = bullish, <0 = bearish) 'hist_prev': float — предпоследнее значение 'is_squeeze': bool — сейчас в squeeze? 'was_squeeze': bool — предыдущий бар был squeeze? 'squeeze_released': bool — squeeze только что отпустил? 'momentum_rising': bool — momentum нарастает? 'histogram_series': array — для анализа 'squeeze_series': array — True/False для каждого бара } """ min_len = max(SQZ_BB_LEN, SQZ_KC_LEN, SQZ_LINREG_LEN) + 20 if len(klines) < min_len: return None _, highs, lows, closes, _ = _klines_to_arrays(klines) # --- Bollinger Bands --- bb_basis = _sma_array(closes, SQZ_BB_LEN) bb_std = np.full_like(closes, 0.0) for i in range(SQZ_BB_LEN - 1, len(closes)): bb_std[i] = np.std(closes[i - SQZ_BB_LEN + 1:i + 1], ddof=0) bb_upper = bb_basis + SQZ_BB_MULT * bb_std bb_lower = bb_basis - SQZ_BB_MULT * bb_std # --- Keltner Channel (ATR-based) --- kc_basis = _sma_array(closes, SQZ_KC_LEN) tr = _true_range(highs, lows, closes) atr = _sma_array(tr, SQZ_KC_LEN) kc_upper = kc_basis + SQZ_KC_MULT * atr kc_lower = kc_basis - SQZ_KC_MULT * atr # --- Squeeze detection: BB inside KC --- squeeze_on = (bb_lower > kc_lower) & (bb_upper < kc_upper) # --- Momentum histogram: LinReg-based --- # LazyBear: linreg(close - avg(avg(highest(high, len), lowest(low, len)), sma(close, len)), len) highest = np.full_like(highs, 0.0) lowest = np.full_like(lows, 0.0) for i in range(SQZ_KC_LEN - 1, len(highs)): highest[i] = np.max(highs[i - SQZ_KC_LEN + 1:i + 1]) lowest[i] = np.min(lows[i - SQZ_KC_LEN + 1:i + 1]) mid_hl = (highest + lowest) / 2 mid_all = (mid_hl + bb_basis) / 2 momentum_src = closes - mid_all histogram = _linreg(momentum_src, SQZ_LINREG_LEN) # Используем закрытые свечи: [-2] = последняя закрытая, [-1] = текущая (не закрыта) idx = -2 # последняя закрытая свеча return { 'histogram': float(histogram[idx]), 'hist_prev': float(histogram[idx - 1]), 'is_squeeze': bool(squeeze_on[idx]), 'was_squeeze': bool(squeeze_on[idx - 1]), 'squeeze_released': bool(squeeze_on[idx - 1] and not squeeze_on[idx]), 'momentum_rising': abs(histogram[idx]) > abs(histogram[idx - 1]), 'histogram_series': histogram, 'squeeze_series': squeeze_on, } # ============================================================ # 2. Z-DISTANCE FROM VWAP (LazyBear) # ============================================================ def calc_z_vwap(klines, period=None): """ Z-Distance from VWAP. VWAP = Σ(typical_price × volume) / Σ(volume) Z = (close - VWAP) / std(close - VWAP) Z > +2 → цена слишком далеко вверх от VWAP → шорт Z < -2 → слишком далеко вниз → лонг |Z| < 0.5 → цена у fair value → выход Returns: dict { 'z_score': float — текущий Z-score 'z_prev': float — предыдущий Z-score 'vwap': float — текущий VWAP 'direction': int — 1=long setup, -1=short setup, 0=neutral } """ if period is None: period = ZVWAP_PERIOD if len(klines) < period + 5: return None _, highs, lows, closes, volumes = _klines_to_arrays(klines) # Рабочий окно: последние N свечей (закрытых, исключая текущую) end = len(closes) - 1 # исключаем текущую незакрытую свечу start = end - period h = highs[start:end] l = lows[start:end] c = closes[start:end] v = volumes[start:end] typical_price = (h + l + c) / 3 # VWAP cumul_tp_vol = np.cumsum(typical_price * v) cumul_vol = np.cumsum(v) cumul_vol_safe = np.where(cumul_vol == 0, 1, cumul_vol) vwap_arr = cumul_tp_vol / cumul_vol_safe vwap = vwap_arr[-1] last_close = c[-1] prev_close = c[-2] # Z-score: (close - VWAP) / rolling_std deviations = c - vwap_arr std = np.std(deviations) if std == 0: return None z_score = (last_close - vwap) / std z_prev = (prev_close - vwap_arr[-2]) / std # Direction threshold = ZVWAP_ENTRY_THRESHOLD direction = 0 if z_score < -threshold: direction = 1 # oversold → long elif z_score > threshold: direction = -1 # overbought → short return { 'z_score': float(z_score), 'z_prev': float(z_prev), 'vwap': float(vwap), 'direction': direction, } # ============================================================ # 3. WADDAH ATTAR EXPLOSION (LazyBear) # ============================================================ def calc_waddah_attar(klines): """ Waddah Attar Explosion Indicator. Логика: - MACD показывает direction (fast EMA - slow EMA) - BB width показывает explosion line (порог силы) - Если trend > explosion → есть сила - Если trend < dead_zone → рынок мёртвый, не торгуй Returns: dict { 'trend_up': float — зелёная линия (бычья сила) 'trend_down': float — красная линия (медвежья сила) 'explosion': float — линия взрыва (порог) 'dead_zone': float — мёртвая зона 'is_strong': bool — trend > explosion (есть сила) 'is_dead': bool — обе линии < dead_zone (мёртвый рынок) 'direction': int — 1=bullish, -1=bearish, 0=dead } """ min_len = max(WAD_SLOW_LEN, WAD_BB_LEN) + 10 if len(klines) < min_len: return None _, _, _, closes, _ = _klines_to_arrays(klines) # --- MACD component --- macd_fast = _ema(closes, WAD_FAST_LEN) macd_slow = _ema(closes, WAD_SLOW_LEN) macd = macd_fast - macd_slow # Trend = (MACD[current] - MACD[prev]) * sensitivity # LazyBear original: все в raw price terms, потом сравниваем idx = -2 # последняя закрытая t1 = (macd[idx] - macd[idx - 1]) * WAD_SENSITIVITY trend_up = max(t1, 0) trend_down = abs(min(t1, 0)) # --- Bollinger Bands explosion line = BB_upper - BB_lower --- # Это ширина канала — показывает "нормальный" уровень волатильности bb_basis = _sma_array(closes, WAD_BB_LEN) bb_std = np.full_like(closes, 0.0) for i in range(WAD_BB_LEN - 1, len(closes)): bb_std[i] = np.std(closes[i - WAD_BB_LEN + 1:i + 1], ddof=0) # explosion = (BB_upper - BB_lower) * sensitivity # В оригинале LazyBear: e1 = (bb2(close,20,2)[1] - bb2(close,20,2)[2]) * sens explosion = bb_std[idx] * WAD_BB_MULT * 2 # upper - lower = 2 * mult * std dead_zone = explosion * 0.1 # 10% от explosion = dead zone # Determine strength & direction (all in raw price terms) is_strong = (trend_up > explosion) or (trend_down > explosion) is_dead = (trend_up < dead_zone) and (trend_down < dead_zone) direction = 0 if trend_up > trend_down and trend_up > dead_zone: direction = 1 elif trend_down > trend_up and trend_down > dead_zone: direction = -1 return { 'trend_up': float(trend_up), 'trend_down': float(trend_down), 'explosion': float(explosion), 'dead_zone': float(dead_zone), 'is_strong': is_strong, 'is_dead': is_dead, 'direction': direction, } # ============================================================ # 4. ADX — REGIME FILTER # ============================================================ def calc_adx(klines, period=None): """ Average Directional Index. ADX > 25 → trending market (не скальпим mean reversion) ADX < 25 → ranging market (mean reversion ON) Returns: dict { 'adx': float — текущий ADX 'is_trending': bool — ADX > threshold 'plus_di': float — +DI 'minus_di': float — -DI } """ if period is None: period = ADX_PERIOD if len(klines) < period * 3: return None _, highs, lows, closes, _ = _klines_to_arrays(klines) # +DM, -DM plus_dm = np.zeros(len(highs)) minus_dm = np.zeros(len(highs)) for i in range(1, len(highs)): up = highs[i] - highs[i - 1] down = lows[i - 1] - lows[i] plus_dm[i] = up if (up > down and up > 0) else 0 minus_dm[i] = down if (down > up and down > 0) else 0 tr = _true_range(highs, lows, closes) # Smoothed TR, +DM, -DM (Wilder's smoothing = EMA with alpha=1/period) atr = _ema(tr, period) smooth_plus = _ema(plus_dm, period) smooth_minus = _ema(minus_dm, period) # +DI, -DI atr_safe = np.where(atr == 0, 1, atr) plus_di = 100 * smooth_plus / atr_safe minus_di = 100 * smooth_minus / atr_safe # DX di_sum = plus_di + minus_di di_sum_safe = np.where(di_sum == 0, 1, di_sum) dx = 100 * np.abs(plus_di - minus_di) / di_sum_safe # ADX = EMA(DX, period) adx = _ema(dx, period) idx = -2 # последняя закрытая свеча return { 'adx': float(adx[idx]), 'is_trending': float(adx[idx]) > ADX_TREND_THRESHOLD, 'plus_di': float(plus_di[idx]), 'minus_di': float(minus_di[idx]), } # ============================================================ # 5. ATR & EMA — HELPERS # ============================================================ def calc_atr_pct(klines, period=None): """ATR в процентах от цены.""" if period is None: period = ATR_PERIOD if len(klines) < period + 5: return None _, highs, lows, closes, _ = _klines_to_arrays(klines) tr = _true_range(highs, lows, closes) if len(tr) < period: return None atr = np.mean(tr[-period:]) last_price = closes[-1] if last_price == 0: return None return (atr / last_price) * 100 def calc_ema_value(klines, period=200): """EMA от close. Возвращает последнее значение.""" if len(klines) < period: return None closes = np.array([float(k[4]) for k in klines]) ema = _ema(closes, period) return float(ema[-1]) # ============================================================ # 6. COMBO SIGNAL — Все три индикатора вместе # ============================================================ def calc_combo_signal(klines): """ Рассчитывает все индикаторы и возвращает composite signal. Score system (v3, tuned on 50 trades): +1 Z-VWAP direction matches (|Z| > 1.8) +1 Squeeze released (был squeeze → вышел) +1 Squeeze histogram matches direction (0 if diverges, was -1) +1 Waddah Attar DIVERGES (weak trend = friend of MR, WR 86%) +1 ADX < 25 (ranging — mean reversion friendly) Range: 0 to 5. Entry requires ≥ 3. Returns: dict { 'score': int — -1 to 5 'direction': int — 1=long, -1=short, 0=no signal 'squeeze': dict 'zvwap': dict 'waddah': dict 'adx': dict 'reasons': list[str] } """ squeeze = calc_squeeze_momentum(klines) zvwap = calc_z_vwap(klines) waddah = calc_waddah_attar(klines) adx = calc_adx(klines) if not all([squeeze, zvwap, waddah, adx]): return None score = 0 direction = 0 reasons = [] # 1. Z-VWAP direction — base signal if zvwap['direction'] != 0: direction = zvwap['direction'] score += 1 z = zvwap['z_score'] reasons.append(f"Z-VWAP={z:.2f} ({'LONG' if direction == 1 else 'SHORT'})") if direction == 0: return { 'score': 0, 'direction': 0, 'squeeze': squeeze, 'zvwap': zvwap, 'waddah': waddah, 'adx': adx, 'reasons': ['No Z-VWAP signal'], } # 2. Squeeze released if squeeze['squeeze_released']: score += 1 reasons.append("Squeeze RELEASED") elif squeeze['is_squeeze']: # Всё ещё в squeeze — пружина не разжалась, но готовится reasons.append("In squeeze (pending)") # 3. Squeeze histogram matches direction (+1 confirm, 0 diverge) # Data: confirms WR 62%, diverges WR 20% — but -1 penalty was too harsh hist = squeeze['histogram'] if (direction == 1 and hist > 0) or (direction == -1 and hist < 0): score += 1 reasons.append(f"Histogram confirms ({hist:.4f})") else: # Was -1, now 0: diverge is bad but penalty killed too many setups reasons.append(f"Histogram diverges ({hist:.4f})") # 4. Waddah Attar — DIVERGES = +1 (weak trend = friend of MR) # Data: STRONG WR 27% (bad), DIVERGES WR 86% (good for MR) # Strong trend = enemy of mean reversion, weak trend = friend if waddah['direction'] == direction and waddah['is_strong']: # Strong trend hurts MR (WR 27%) — neutral reasons.append(f"Waddah STRONG {'UP' if direction == 1 else 'DOWN'}") elif waddah['direction'] == direction: reasons.append(f"Waddah direction OK, weak") elif waddah['is_dead']: reasons.append("Waddah DEAD market") else: # Diverges = GOOD for MR (WR 86%) — reward it score += 1 reasons.append(f"Waddah DIVERGES +1 (MR-friendly)") # 5. ADX regime filter if not adx['is_trending']: score += 1 reasons.append(f"ADX={adx['adx']:.1f} (ranging ✓)") else: reasons.append(f"ADX={adx['adx']:.1f} (trending ⚠️)") return { 'score': score, 'direction': direction, 'squeeze': squeeze, 'zvwap': zvwap, 'waddah': waddah, 'adx': adx, 'reasons': reasons, }