← Назад"""
Squeeze-VWAP Bot — Technical Indicators
=========================================
LazyBear-inspired indicators. Чистые функции, numpy.
Индикаторы:
1. Squeeze Momentum (BB vs KC + LinReg histogram)
2. Z-Distance from VWAP (mean reversion signal)
3. Waddah Attar Explosion (trend strength + direction)
4. ATR, EMA — вспомогательные
Все работают с raw klines от Binance API.
"""
import numpy as np
from src.config import (
SQZ_BB_LEN, SQZ_BB_MULT, SQZ_KC_LEN, SQZ_KC_MULT, SQZ_LINREG_LEN,
ZVWAP_PERIOD, ZVWAP_ENTRY_THRESHOLD,
WAD_FAST_LEN, WAD_SLOW_LEN, WAD_BB_LEN, WAD_BB_MULT, WAD_SENSITIVITY,
ADX_PERIOD, ADX_TREND_THRESHOLD,
ATR_PERIOD,
)
# ============================================================
# HELPERS
# ============================================================
def _klines_to_arrays(klines):
"""Конвертирует Binance klines в numpy arrays."""
opens = np.array([float(k[1]) for k in klines])
highs = np.array([float(k[2]) for k in klines])
lows = np.array([float(k[3]) for k in klines])
closes = np.array([float(k[4]) for k in klines])
volumes = np.array([float(k[5]) for k in klines])
return opens, highs, lows, closes, volumes
def _ema(data, period):
"""Exponential Moving Average — full array."""
alpha = 2 / (period + 1)
ema = np.zeros_like(data, dtype=float)
ema[0] = data[0]
for i in range(1, len(data)):
ema[i] = alpha * data[i] + (1 - alpha) * ema[i - 1]
return ema
def _sma_array(data, period):
"""Simple Moving Average — rolling, full array."""
result = np.full_like(data, np.nan, dtype=float)
for i in range(period - 1, len(data)):
result[i] = np.mean(data[i - period + 1:i + 1])
return result
def _linreg(data, period):
"""
Linear Regression Value — LazyBear's method.
Вычисляет значение линейной регрессии для каждого бара.
Это НЕ slope, а predicted value на конце regression line.
"""
result = np.full_like(data, 0.0, dtype=float)
for i in range(period - 1, len(data)):
y = data[i - period + 1:i + 1]
x = np.arange(period, dtype=float)
# y = mx + b, predict y at x = period - 1
x_mean = np.mean(x)
y_mean = np.mean(y)
denom = np.sum((x - x_mean) ** 2)
if denom == 0:
result[i] = y_mean
else:
slope = np.sum((x - x_mean) * (y - y_mean)) / denom
intercept = y_mean - slope * x_mean
result[i] = slope * (period - 1) + intercept
return result
def _true_range(highs, lows, closes):
"""True Range array."""
tr = np.zeros(len(highs))
tr[0] = highs[0] - lows[0]
for i in range(1, len(highs)):
tr[i] = max(
highs[i] - lows[i],
abs(highs[i] - closes[i - 1]),
abs(lows[i] - closes[i - 1]),
)
return tr
# ============================================================
# 1. SQUEEZE MOMENTUM INDICATOR (LazyBear)
# ============================================================
def calc_squeeze_momentum(klines):
"""
Squeeze Momentum Indicator (LazyBear).
Логика:
- BB внутри KC = SQUEEZE (чёрный крестик, рынок сжат)
- BB выходит из KC = RELEASE (серый крестик, пружина разжалась)
- Гистограмма = LinReg(close - avg(highest, lowest), period)
- Яркий цвет = momentum нарастает, тёмный = затухает
Returns: dict {
'histogram': float — последнее значение гистограммы (>0 = bullish, <0 = bearish)
'hist_prev': float — предпоследнее значение
'is_squeeze': bool — сейчас в squeeze?
'was_squeeze': bool — предыдущий бар был squeeze?
'squeeze_released': bool — squeeze только что отпустил?
'momentum_rising': bool — momentum нарастает?
'histogram_series': array — для анализа
'squeeze_series': array — True/False для каждого бара
}
"""
min_len = max(SQZ_BB_LEN, SQZ_KC_LEN, SQZ_LINREG_LEN) + 20
if len(klines) < min_len:
return None
_, highs, lows, closes, _ = _klines_to_arrays(klines)
# --- Bollinger Bands ---
bb_basis = _sma_array(closes, SQZ_BB_LEN)
bb_std = np.full_like(closes, 0.0)
for i in range(SQZ_BB_LEN - 1, len(closes)):
bb_std[i] = np.std(closes[i - SQZ_BB_LEN + 1:i + 1], ddof=0)
bb_upper = bb_basis + SQZ_BB_MULT * bb_std
bb_lower = bb_basis - SQZ_BB_MULT * bb_std
# --- Keltner Channel (ATR-based) ---
kc_basis = _sma_array(closes, SQZ_KC_LEN)
tr = _true_range(highs, lows, closes)
atr = _sma_array(tr, SQZ_KC_LEN)
kc_upper = kc_basis + SQZ_KC_MULT * atr
kc_lower = kc_basis - SQZ_KC_MULT * atr
# --- Squeeze detection: BB inside KC ---
squeeze_on = (bb_lower > kc_lower) & (bb_upper < kc_upper)
# --- Momentum histogram: LinReg-based ---
# LazyBear: linreg(close - avg(avg(highest(high, len), lowest(low, len)), sma(close, len)), len)
highest = np.full_like(highs, 0.0)
lowest = np.full_like(lows, 0.0)
for i in range(SQZ_KC_LEN - 1, len(highs)):
highest[i] = np.max(highs[i - SQZ_KC_LEN + 1:i + 1])
lowest[i] = np.min(lows[i - SQZ_KC_LEN + 1:i + 1])
mid_hl = (highest + lowest) / 2
mid_all = (mid_hl + bb_basis) / 2
momentum_src = closes - mid_all
histogram = _linreg(momentum_src, SQZ_LINREG_LEN)
# Используем закрытые свечи: [-2] = последняя закрытая, [-1] = текущая (не закрыта)
idx = -2 # последняя закрытая свеча
return {
'histogram': float(histogram[idx]),
'hist_prev': float(histogram[idx - 1]),
'is_squeeze': bool(squeeze_on[idx]),
'was_squeeze': bool(squeeze_on[idx - 1]),
'squeeze_released': bool(squeeze_on[idx - 1] and not squeeze_on[idx]),
'momentum_rising': abs(histogram[idx]) > abs(histogram[idx - 1]),
'histogram_series': histogram,
'squeeze_series': squeeze_on,
}
# ============================================================
# 2. Z-DISTANCE FROM VWAP (LazyBear)
# ============================================================
def calc_z_vwap(klines, period=None):
"""
Z-Distance from VWAP.
VWAP = Σ(typical_price × volume) / Σ(volume)
Z = (close - VWAP) / std(close - VWAP)
Z > +2 → цена слишком далеко вверх от VWAP → шорт
Z < -2 → слишком далеко вниз → лонг
|Z| < 0.5 → цена у fair value → выход
Returns: dict {
'z_score': float — текущий Z-score
'z_prev': float — предыдущий Z-score
'vwap': float — текущий VWAP
'direction': int — 1=long setup, -1=short setup, 0=neutral
}
"""
if period is None:
period = ZVWAP_PERIOD
if len(klines) < period + 5:
return None
_, highs, lows, closes, volumes = _klines_to_arrays(klines)
# Рабочий окно: последние N свечей (закрытых, исключая текущую)
end = len(closes) - 1 # исключаем текущую незакрытую свечу
start = end - period
h = highs[start:end]
l = lows[start:end]
c = closes[start:end]
v = volumes[start:end]
typical_price = (h + l + c) / 3
# VWAP
cumul_tp_vol = np.cumsum(typical_price * v)
cumul_vol = np.cumsum(v)
cumul_vol_safe = np.where(cumul_vol == 0, 1, cumul_vol)
vwap_arr = cumul_tp_vol / cumul_vol_safe
vwap = vwap_arr[-1]
last_close = c[-1]
prev_close = c[-2]
# Z-score: (close - VWAP) / rolling_std
deviations = c - vwap_arr
std = np.std(deviations)
if std == 0:
return None
z_score = (last_close - vwap) / std
z_prev = (prev_close - vwap_arr[-2]) / std
# Direction
threshold = ZVWAP_ENTRY_THRESHOLD
direction = 0
if z_score < -threshold:
direction = 1 # oversold → long
elif z_score > threshold:
direction = -1 # overbought → short
return {
'z_score': float(z_score),
'z_prev': float(z_prev),
'vwap': float(vwap),
'direction': direction,
}
# ============================================================
# 3. WADDAH ATTAR EXPLOSION (LazyBear)
# ============================================================
def calc_waddah_attar(klines):
"""
Waddah Attar Explosion Indicator.
Логика:
- MACD показывает direction (fast EMA - slow EMA)
- BB width показывает explosion line (порог силы)
- Если trend > explosion → есть сила
- Если trend < dead_zone → рынок мёртвый, не торгуй
Returns: dict {
'trend_up': float — зелёная линия (бычья сила)
'trend_down': float — красная линия (медвежья сила)
'explosion': float — линия взрыва (порог)
'dead_zone': float — мёртвая зона
'is_strong': bool — trend > explosion (есть сила)
'is_dead': bool — обе линии < dead_zone (мёртвый рынок)
'direction': int — 1=bullish, -1=bearish, 0=dead
}
"""
min_len = max(WAD_SLOW_LEN, WAD_BB_LEN) + 10
if len(klines) < min_len:
return None
_, _, _, closes, _ = _klines_to_arrays(klines)
# --- MACD component ---
macd_fast = _ema(closes, WAD_FAST_LEN)
macd_slow = _ema(closes, WAD_SLOW_LEN)
macd = macd_fast - macd_slow
# Trend = (MACD[current] - MACD[prev]) * sensitivity
# LazyBear original: все в raw price terms, потом сравниваем
idx = -2 # последняя закрытая
t1 = (macd[idx] - macd[idx - 1]) * WAD_SENSITIVITY
trend_up = max(t1, 0)
trend_down = abs(min(t1, 0))
# --- Bollinger Bands explosion line = BB_upper - BB_lower ---
# Это ширина канала — показывает "нормальный" уровень волатильности
bb_basis = _sma_array(closes, WAD_BB_LEN)
bb_std = np.full_like(closes, 0.0)
for i in range(WAD_BB_LEN - 1, len(closes)):
bb_std[i] = np.std(closes[i - WAD_BB_LEN + 1:i + 1], ddof=0)
# explosion = (BB_upper - BB_lower) * sensitivity
# В оригинале LazyBear: e1 = (bb2(close,20,2)[1] - bb2(close,20,2)[2]) * sens
explosion = bb_std[idx] * WAD_BB_MULT * 2 # upper - lower = 2 * mult * std
dead_zone = explosion * 0.1 # 10% от explosion = dead zone
# Determine strength & direction (all in raw price terms)
is_strong = (trend_up > explosion) or (trend_down > explosion)
is_dead = (trend_up < dead_zone) and (trend_down < dead_zone)
direction = 0
if trend_up > trend_down and trend_up > dead_zone:
direction = 1
elif trend_down > trend_up and trend_down > dead_zone:
direction = -1
return {
'trend_up': float(trend_up),
'trend_down': float(trend_down),
'explosion': float(explosion),
'dead_zone': float(dead_zone),
'is_strong': is_strong,
'is_dead': is_dead,
'direction': direction,
}
# ============================================================
# 4. ADX — REGIME FILTER
# ============================================================
def calc_adx(klines, period=None):
"""
Average Directional Index.
ADX > 25 → trending market (не скальпим mean reversion)
ADX < 25 → ranging market (mean reversion ON)
Returns: dict {
'adx': float — текущий ADX
'is_trending': bool — ADX > threshold
'plus_di': float — +DI
'minus_di': float — -DI
}
"""
if period is None:
period = ADX_PERIOD
if len(klines) < period * 3:
return None
_, highs, lows, closes, _ = _klines_to_arrays(klines)
# +DM, -DM
plus_dm = np.zeros(len(highs))
minus_dm = np.zeros(len(highs))
for i in range(1, len(highs)):
up = highs[i] - highs[i - 1]
down = lows[i - 1] - lows[i]
plus_dm[i] = up if (up > down and up > 0) else 0
minus_dm[i] = down if (down > up and down > 0) else 0
tr = _true_range(highs, lows, closes)
# Smoothed TR, +DM, -DM (Wilder's smoothing = EMA with alpha=1/period)
atr = _ema(tr, period)
smooth_plus = _ema(plus_dm, period)
smooth_minus = _ema(minus_dm, period)
# +DI, -DI
atr_safe = np.where(atr == 0, 1, atr)
plus_di = 100 * smooth_plus / atr_safe
minus_di = 100 * smooth_minus / atr_safe
# DX
di_sum = plus_di + minus_di
di_sum_safe = np.where(di_sum == 0, 1, di_sum)
dx = 100 * np.abs(plus_di - minus_di) / di_sum_safe
# ADX = EMA(DX, period)
adx = _ema(dx, period)
idx = -2 # последняя закрытая свеча
return {
'adx': float(adx[idx]),
'is_trending': float(adx[idx]) > ADX_TREND_THRESHOLD,
'plus_di': float(plus_di[idx]),
'minus_di': float(minus_di[idx]),
}
# ============================================================
# 5. ATR & EMA — HELPERS
# ============================================================
def calc_atr_pct(klines, period=None):
"""ATR в процентах от цены."""
if period is None:
period = ATR_PERIOD
if len(klines) < period + 5:
return None
_, highs, lows, closes, _ = _klines_to_arrays(klines)
tr = _true_range(highs, lows, closes)
if len(tr) < period:
return None
atr = np.mean(tr[-period:])
last_price = closes[-1]
if last_price == 0:
return None
return (atr / last_price) * 100
def calc_ema_value(klines, period=200):
"""EMA от close. Возвращает последнее значение."""
if len(klines) < period:
return None
closes = np.array([float(k[4]) for k in klines])
ema = _ema(closes, period)
return float(ema[-1])
# ============================================================
# 6. COMBO SIGNAL — Все три индикатора вместе
# ============================================================
def calc_combo_signal(klines):
"""
Рассчитывает все индикаторы и возвращает composite signal.
Score system (v3, tuned on 50 trades):
+1 Z-VWAP direction matches (|Z| > 1.8)
+1 Squeeze released (был squeeze → вышел)
+1 Squeeze histogram matches direction (0 if diverges, was -1)
+1 Waddah Attar DIVERGES (weak trend = friend of MR, WR 86%)
+1 ADX < 25 (ranging — mean reversion friendly)
Range: 0 to 5. Entry requires ≥ 3.
Returns: dict {
'score': int — -1 to 5
'direction': int — 1=long, -1=short, 0=no signal
'squeeze': dict
'zvwap': dict
'waddah': dict
'adx': dict
'reasons': list[str]
}
"""
squeeze = calc_squeeze_momentum(klines)
zvwap = calc_z_vwap(klines)
waddah = calc_waddah_attar(klines)
adx = calc_adx(klines)
if not all([squeeze, zvwap, waddah, adx]):
return None
score = 0
direction = 0
reasons = []
# 1. Z-VWAP direction — base signal
if zvwap['direction'] != 0:
direction = zvwap['direction']
score += 1
z = zvwap['z_score']
reasons.append(f"Z-VWAP={z:.2f} ({'LONG' if direction == 1 else 'SHORT'})")
if direction == 0:
return {
'score': 0, 'direction': 0,
'squeeze': squeeze, 'zvwap': zvwap,
'waddah': waddah, 'adx': adx,
'reasons': ['No Z-VWAP signal'],
}
# 2. Squeeze released
if squeeze['squeeze_released']:
score += 1
reasons.append("Squeeze RELEASED")
elif squeeze['is_squeeze']:
# Всё ещё в squeeze — пружина не разжалась, но готовится
reasons.append("In squeeze (pending)")
# 3. Squeeze histogram matches direction (+1 confirm, 0 diverge)
# Data: confirms WR 62%, diverges WR 20% — but -1 penalty was too harsh
hist = squeeze['histogram']
if (direction == 1 and hist > 0) or (direction == -1 and hist < 0):
score += 1
reasons.append(f"Histogram confirms ({hist:.4f})")
else:
# Was -1, now 0: diverge is bad but penalty killed too many setups
reasons.append(f"Histogram diverges ({hist:.4f})")
# 4. Waddah Attar — DIVERGES = +1 (weak trend = friend of MR)
# Data: STRONG WR 27% (bad), DIVERGES WR 86% (good for MR)
# Strong trend = enemy of mean reversion, weak trend = friend
if waddah['direction'] == direction and waddah['is_strong']:
# Strong trend hurts MR (WR 27%) — neutral
reasons.append(f"Waddah STRONG {'UP' if direction == 1 else 'DOWN'}")
elif waddah['direction'] == direction:
reasons.append(f"Waddah direction OK, weak")
elif waddah['is_dead']:
reasons.append("Waddah DEAD market")
else:
# Diverges = GOOD for MR (WR 86%) — reward it
score += 1
reasons.append(f"Waddah DIVERGES +1 (MR-friendly)")
# 5. ADX regime filter
if not adx['is_trending']:
score += 1
reasons.append(f"ADX={adx['adx']:.1f} (ranging ✓)")
else:
reasons.append(f"ADX={adx['adx']:.1f} (trending ⚠️)")
return {
'score': score,
'direction': direction,
'squeeze': squeeze,
'zvwap': zvwap,
'waddah': waddah,
'adx': adx,
'reasons': reasons,
}