← Back
"""
Squeeze-VWAP Bot — Technical Indicators
=========================================
LazyBear-inspired indicators. Чистые функции, numpy.

Индикаторы:
1. Squeeze Momentum (BB vs KC + LinReg histogram)
2. Z-Distance from VWAP (mean reversion signal)
3. Waddah Attar Explosion (trend strength + direction)
4. ATR, EMA — вспомогательные

Все работают с raw klines от Binance API.
"""

import numpy as np
from src.config import (
    SQZ_BB_LEN, SQZ_BB_MULT, SQZ_KC_LEN, SQZ_KC_MULT, SQZ_LINREG_LEN,
    ZVWAP_PERIOD, ZVWAP_ENTRY_THRESHOLD,
    WAD_FAST_LEN, WAD_SLOW_LEN, WAD_BB_LEN, WAD_BB_MULT, WAD_SENSITIVITY,
    ADX_PERIOD, ADX_TREND_THRESHOLD,
    ATR_PERIOD,
)


# ============================================================
# HELPERS
# ============================================================

def _klines_to_arrays(klines):
    """Конвертирует Binance klines в numpy arrays."""
    opens = np.array([float(k[1]) for k in klines])
    highs = np.array([float(k[2]) for k in klines])
    lows = np.array([float(k[3]) for k in klines])
    closes = np.array([float(k[4]) for k in klines])
    volumes = np.array([float(k[5]) for k in klines])
    return opens, highs, lows, closes, volumes


def _ema(data, period):
    """Exponential Moving Average — full array."""
    alpha = 2 / (period + 1)
    ema = np.zeros_like(data, dtype=float)
    ema[0] = data[0]
    for i in range(1, len(data)):
        ema[i] = alpha * data[i] + (1 - alpha) * ema[i - 1]
    return ema


def _sma_array(data, period):
    """Simple Moving Average — rolling, full array."""
    result = np.full_like(data, np.nan, dtype=float)
    for i in range(period - 1, len(data)):
        result[i] = np.mean(data[i - period + 1:i + 1])
    return result


def _linreg(data, period):
    """
    Linear Regression Value — LazyBear's method.
    Вычисляет значение линейной регрессии для каждого бара.
    Это НЕ slope, а predicted value на конце regression line.
    """
    result = np.full_like(data, 0.0, dtype=float)
    for i in range(period - 1, len(data)):
        y = data[i - period + 1:i + 1]
        x = np.arange(period, dtype=float)
        # y = mx + b, predict y at x = period - 1
        x_mean = np.mean(x)
        y_mean = np.mean(y)
        denom = np.sum((x - x_mean) ** 2)
        if denom == 0:
            result[i] = y_mean
        else:
            slope = np.sum((x - x_mean) * (y - y_mean)) / denom
            intercept = y_mean - slope * x_mean
            result[i] = slope * (period - 1) + intercept
    return result


def _true_range(highs, lows, closes):
    """True Range array."""
    tr = np.zeros(len(highs))
    tr[0] = highs[0] - lows[0]
    for i in range(1, len(highs)):
        tr[i] = max(
            highs[i] - lows[i],
            abs(highs[i] - closes[i - 1]),
            abs(lows[i] - closes[i - 1]),
        )
    return tr


# ============================================================
# 1. SQUEEZE MOMENTUM INDICATOR (LazyBear)
# ============================================================

def calc_squeeze_momentum(klines):
    """
    Squeeze Momentum Indicator (LazyBear).

    Логика:
    - BB внутри KC = SQUEEZE (чёрный крестик, рынок сжат)
    - BB выходит из KC = RELEASE (серый крестик, пружина разжалась)
    - Гистограмма = LinReg(close - avg(highest, lowest), period)
    - Яркий цвет = momentum нарастает, тёмный = затухает

    Returns: dict {
        'histogram': float — последнее значение гистограммы (>0 = bullish, <0 = bearish)
        'hist_prev': float — предпоследнее значение
        'is_squeeze': bool — сейчас в squeeze?
        'was_squeeze': bool — предыдущий бар был squeeze?
        'squeeze_released': bool — squeeze только что отпустил?
        'momentum_rising': bool — momentum нарастает?
        'histogram_series': array — для анализа
        'squeeze_series': array — True/False для каждого бара
    }
    """
    min_len = max(SQZ_BB_LEN, SQZ_KC_LEN, SQZ_LINREG_LEN) + 20
    if len(klines) < min_len:
        return None

    _, highs, lows, closes, _ = _klines_to_arrays(klines)

    # --- Bollinger Bands ---
    bb_basis = _sma_array(closes, SQZ_BB_LEN)
    bb_std = np.full_like(closes, 0.0)
    for i in range(SQZ_BB_LEN - 1, len(closes)):
        bb_std[i] = np.std(closes[i - SQZ_BB_LEN + 1:i + 1], ddof=0)
    bb_upper = bb_basis + SQZ_BB_MULT * bb_std
    bb_lower = bb_basis - SQZ_BB_MULT * bb_std

    # --- Keltner Channel (ATR-based) ---
    kc_basis = _sma_array(closes, SQZ_KC_LEN)
    tr = _true_range(highs, lows, closes)
    atr = _sma_array(tr, SQZ_KC_LEN)
    kc_upper = kc_basis + SQZ_KC_MULT * atr
    kc_lower = kc_basis - SQZ_KC_MULT * atr

    # --- Squeeze detection: BB inside KC ---
    squeeze_on = (bb_lower > kc_lower) & (bb_upper < kc_upper)

    # --- Momentum histogram: LinReg-based ---
    # LazyBear: linreg(close - avg(avg(highest(high, len), lowest(low, len)), sma(close, len)), len)
    highest = np.full_like(highs, 0.0)
    lowest = np.full_like(lows, 0.0)
    for i in range(SQZ_KC_LEN - 1, len(highs)):
        highest[i] = np.max(highs[i - SQZ_KC_LEN + 1:i + 1])
        lowest[i] = np.min(lows[i - SQZ_KC_LEN + 1:i + 1])

    mid_hl = (highest + lowest) / 2
    mid_all = (mid_hl + bb_basis) / 2
    momentum_src = closes - mid_all
    histogram = _linreg(momentum_src, SQZ_LINREG_LEN)

    # Используем закрытые свечи: [-2] = последняя закрытая, [-1] = текущая (не закрыта)
    idx = -2  # последняя закрытая свеча

    return {
        'histogram': float(histogram[idx]),
        'hist_prev': float(histogram[idx - 1]),
        'is_squeeze': bool(squeeze_on[idx]),
        'was_squeeze': bool(squeeze_on[idx - 1]),
        'squeeze_released': bool(squeeze_on[idx - 1] and not squeeze_on[idx]),
        'momentum_rising': abs(histogram[idx]) > abs(histogram[idx - 1]),
        'histogram_series': histogram,
        'squeeze_series': squeeze_on,
    }


# ============================================================
# 2. Z-DISTANCE FROM VWAP (LazyBear)
# ============================================================

def calc_z_vwap(klines, period=None):
    """
    Z-Distance from VWAP.

    VWAP = Σ(typical_price × volume) / Σ(volume)
    Z = (close - VWAP) / std(close - VWAP)

    Z > +2  → цена слишком далеко вверх от VWAP → шорт
    Z < -2  → слишком далеко вниз → лонг
    |Z| < 0.5 → цена у fair value → выход

    Returns: dict {
        'z_score': float — текущий Z-score
        'z_prev': float — предыдущий Z-score
        'vwap': float — текущий VWAP
        'direction': int — 1=long setup, -1=short setup, 0=neutral
    }
    """
    if period is None:
        period = ZVWAP_PERIOD

    if len(klines) < period + 5:
        return None

    _, highs, lows, closes, volumes = _klines_to_arrays(klines)

    # Рабочий окно: последние N свечей (закрытых, исключая текущую)
    end = len(closes) - 1  # исключаем текущую незакрытую свечу
    start = end - period

    h = highs[start:end]
    l = lows[start:end]
    c = closes[start:end]
    v = volumes[start:end]

    typical_price = (h + l + c) / 3

    # VWAP
    cumul_tp_vol = np.cumsum(typical_price * v)
    cumul_vol = np.cumsum(v)
    cumul_vol_safe = np.where(cumul_vol == 0, 1, cumul_vol)
    vwap_arr = cumul_tp_vol / cumul_vol_safe

    vwap = vwap_arr[-1]
    last_close = c[-1]
    prev_close = c[-2]

    # Z-score: (close - VWAP) / rolling_std
    deviations = c - vwap_arr
    std = np.std(deviations)
    if std == 0:
        return None

    z_score = (last_close - vwap) / std
    z_prev = (prev_close - vwap_arr[-2]) / std

    # Direction
    threshold = ZVWAP_ENTRY_THRESHOLD
    direction = 0
    if z_score < -threshold:
        direction = 1   # oversold → long
    elif z_score > threshold:
        direction = -1  # overbought → short

    return {
        'z_score': float(z_score),
        'z_prev': float(z_prev),
        'vwap': float(vwap),
        'direction': direction,
    }


# ============================================================
# 3. WADDAH ATTAR EXPLOSION (LazyBear)
# ============================================================

def calc_waddah_attar(klines):
    """
    Waddah Attar Explosion Indicator.

    Логика:
    - MACD показывает direction (fast EMA - slow EMA)
    - BB width показывает explosion line (порог силы)
    - Если trend > explosion → есть сила
    - Если trend < dead_zone → рынок мёртвый, не торгуй

    Returns: dict {
        'trend_up': float — зелёная линия (бычья сила)
        'trend_down': float — красная линия (медвежья сила)
        'explosion': float — линия взрыва (порог)
        'dead_zone': float — мёртвая зона
        'is_strong': bool — trend > explosion (есть сила)
        'is_dead': bool — обе линии < dead_zone (мёртвый рынок)
        'direction': int — 1=bullish, -1=bearish, 0=dead
    }
    """
    min_len = max(WAD_SLOW_LEN, WAD_BB_LEN) + 10
    if len(klines) < min_len:
        return None

    _, _, _, closes, _ = _klines_to_arrays(klines)

    # --- MACD component ---
    macd_fast = _ema(closes, WAD_FAST_LEN)
    macd_slow = _ema(closes, WAD_SLOW_LEN)
    macd = macd_fast - macd_slow

    # Trend = (MACD[current] - MACD[prev]) * sensitivity
    # LazyBear original: все в raw price terms, потом сравниваем
    idx = -2  # последняя закрытая
    t1 = (macd[idx] - macd[idx - 1]) * WAD_SENSITIVITY
    trend_up = max(t1, 0)
    trend_down = abs(min(t1, 0))

    # --- Bollinger Bands explosion line = BB_upper - BB_lower ---
    # Это ширина канала — показывает "нормальный" уровень волатильности
    bb_basis = _sma_array(closes, WAD_BB_LEN)
    bb_std = np.full_like(closes, 0.0)
    for i in range(WAD_BB_LEN - 1, len(closes)):
        bb_std[i] = np.std(closes[i - WAD_BB_LEN + 1:i + 1], ddof=0)

    # explosion = (BB_upper - BB_lower) * sensitivity
    # В оригинале LazyBear: e1 = (bb2(close,20,2)[1] - bb2(close,20,2)[2]) * sens
    explosion = bb_std[idx] * WAD_BB_MULT * 2  # upper - lower = 2 * mult * std
    dead_zone = explosion * 0.1  # 10% от explosion = dead zone

    # Determine strength & direction (all in raw price terms)
    is_strong = (trend_up > explosion) or (trend_down > explosion)
    is_dead = (trend_up < dead_zone) and (trend_down < dead_zone)

    direction = 0
    if trend_up > trend_down and trend_up > dead_zone:
        direction = 1
    elif trend_down > trend_up and trend_down > dead_zone:
        direction = -1

    return {
        'trend_up': float(trend_up),
        'trend_down': float(trend_down),
        'explosion': float(explosion),
        'dead_zone': float(dead_zone),
        'is_strong': is_strong,
        'is_dead': is_dead,
        'direction': direction,
    }


# ============================================================
# 4. ADX — REGIME FILTER
# ============================================================

def calc_adx(klines, period=None):
    """
    Average Directional Index.
    ADX > 25 → trending market (не скальпим mean reversion)
    ADX < 25 → ranging market (mean reversion ON)

    Returns: dict {
        'adx': float — текущий ADX
        'is_trending': bool — ADX > threshold
        'plus_di': float — +DI
        'minus_di': float — -DI
    }
    """
    if period is None:
        period = ADX_PERIOD

    if len(klines) < period * 3:
        return None

    _, highs, lows, closes, _ = _klines_to_arrays(klines)

    # +DM, -DM
    plus_dm = np.zeros(len(highs))
    minus_dm = np.zeros(len(highs))
    for i in range(1, len(highs)):
        up = highs[i] - highs[i - 1]
        down = lows[i - 1] - lows[i]
        plus_dm[i] = up if (up > down and up > 0) else 0
        minus_dm[i] = down if (down > up and down > 0) else 0

    tr = _true_range(highs, lows, closes)

    # Smoothed TR, +DM, -DM (Wilder's smoothing = EMA with alpha=1/period)
    atr = _ema(tr, period)
    smooth_plus = _ema(plus_dm, period)
    smooth_minus = _ema(minus_dm, period)

    # +DI, -DI
    atr_safe = np.where(atr == 0, 1, atr)
    plus_di = 100 * smooth_plus / atr_safe
    minus_di = 100 * smooth_minus / atr_safe

    # DX
    di_sum = plus_di + minus_di
    di_sum_safe = np.where(di_sum == 0, 1, di_sum)
    dx = 100 * np.abs(plus_di - minus_di) / di_sum_safe

    # ADX = EMA(DX, period)
    adx = _ema(dx, period)

    idx = -2  # последняя закрытая свеча
    return {
        'adx': float(adx[idx]),
        'is_trending': float(adx[idx]) > ADX_TREND_THRESHOLD,
        'plus_di': float(plus_di[idx]),
        'minus_di': float(minus_di[idx]),
    }


# ============================================================
# 5. ATR & EMA — HELPERS
# ============================================================

def calc_atr_pct(klines, period=None):
    """ATR в процентах от цены."""
    if period is None:
        period = ATR_PERIOD

    if len(klines) < period + 5:
        return None

    _, highs, lows, closes, _ = _klines_to_arrays(klines)
    tr = _true_range(highs, lows, closes)

    if len(tr) < period:
        return None

    atr = np.mean(tr[-period:])
    last_price = closes[-1]
    if last_price == 0:
        return None

    return (atr / last_price) * 100


def calc_ema_value(klines, period=200):
    """EMA от close. Возвращает последнее значение."""
    if len(klines) < period:
        return None
    closes = np.array([float(k[4]) for k in klines])
    ema = _ema(closes, period)
    return float(ema[-1])


# ============================================================
# 6. COMBO SIGNAL — Все три индикатора вместе
# ============================================================

def calc_combo_signal(klines):
    """
    Рассчитывает все индикаторы и возвращает composite signal.

    Score system (v3, tuned on 50 trades):
    +1  Z-VWAP direction matches (|Z| > 1.8)
    +1  Squeeze released (был squeeze → вышел)
    +1  Squeeze histogram matches direction (0 if diverges, was -1)
    +1  Waddah Attar DIVERGES (weak trend = friend of MR, WR 86%)
    +1  ADX < 25 (ranging — mean reversion friendly)

    Range: 0 to 5.  Entry requires ≥ 3.

    Returns: dict {
        'score': int — -1 to 5
        'direction': int — 1=long, -1=short, 0=no signal
        'squeeze': dict
        'zvwap': dict
        'waddah': dict
        'adx': dict
        'reasons': list[str]
    }
    """
    squeeze = calc_squeeze_momentum(klines)
    zvwap = calc_z_vwap(klines)
    waddah = calc_waddah_attar(klines)
    adx = calc_adx(klines)

    if not all([squeeze, zvwap, waddah, adx]):
        return None

    score = 0
    direction = 0
    reasons = []

    # 1. Z-VWAP direction — base signal
    if zvwap['direction'] != 0:
        direction = zvwap['direction']
        score += 1
        z = zvwap['z_score']
        reasons.append(f"Z-VWAP={z:.2f} ({'LONG' if direction == 1 else 'SHORT'})")

    if direction == 0:
        return {
            'score': 0, 'direction': 0,
            'squeeze': squeeze, 'zvwap': zvwap,
            'waddah': waddah, 'adx': adx,
            'reasons': ['No Z-VWAP signal'],
        }

    # 2. Squeeze released
    if squeeze['squeeze_released']:
        score += 1
        reasons.append("Squeeze RELEASED")
    elif squeeze['is_squeeze']:
        # Всё ещё в squeeze — пружина не разжалась, но готовится
        reasons.append("In squeeze (pending)")

    # 3. Squeeze histogram matches direction (+1 confirm, 0 diverge)
    # Data: confirms WR 62%, diverges WR 20% — but -1 penalty was too harsh
    hist = squeeze['histogram']
    if (direction == 1 and hist > 0) or (direction == -1 and hist < 0):
        score += 1
        reasons.append(f"Histogram confirms ({hist:.4f})")
    else:
        # Was -1, now 0: diverge is bad but penalty killed too many setups
        reasons.append(f"Histogram diverges ({hist:.4f})")

    # 4. Waddah Attar — DIVERGES = +1 (weak trend = friend of MR)
    # Data: STRONG WR 27% (bad), DIVERGES WR 86% (good for MR)
    # Strong trend = enemy of mean reversion, weak trend = friend
    if waddah['direction'] == direction and waddah['is_strong']:
        # Strong trend hurts MR (WR 27%) — neutral
        reasons.append(f"Waddah STRONG {'UP' if direction == 1 else 'DOWN'}")
    elif waddah['direction'] == direction:
        reasons.append(f"Waddah direction OK, weak")
    elif waddah['is_dead']:
        reasons.append("Waddah DEAD market")
    else:
        # Diverges = GOOD for MR (WR 86%) — reward it
        score += 1
        reasons.append(f"Waddah DIVERGES +1 (MR-friendly)")

    # 5. ADX regime filter
    if not adx['is_trending']:
        score += 1
        reasons.append(f"ADX={adx['adx']:.1f} (ranging ✓)")
    else:
        reasons.append(f"ADX={adx['adx']:.1f} (trending ⚠️)")

    return {
        'score': score,
        'direction': direction,
        'squeeze': squeeze,
        'zvwap': zvwap,
        'waddah': waddah,
        'adx': adx,
        'reasons': reasons,
    }

📜 Git History

c6f6bd5chore: initial commit — version control setup5 weeks ago
Show last diff
Loading...