← Back
"""
Zatochki (Knife Catcher) — Indicators
=======================================
Pure functions for volume exhaustion reversal detection.
Works with Bybit klines in Binance-compat format from exchange.py.

Kline format: [timestamp, open, high, low, close, volume, ...]
"""

import numpy as np
import logging

logger = logging.getLogger("zatochki.indicators")


# ============================================================
# HELPERS
# ============================================================

def _to_arrays(klines):
    """Convert klines to numpy arrays."""
    opens = np.array([float(k[1]) for k in klines])
    highs = np.array([float(k[2]) for k in klines])
    lows = np.array([float(k[3]) for k in klines])
    closes = np.array([float(k[4]) for k in klines])
    volumes = np.array([float(k[5]) for k in klines])
    return opens, highs, lows, closes, volumes


# ============================================================
# RSI
# ============================================================

def calc_rsi(klines, period=14):
    """RSI calculation. Returns array same length as klines."""
    _, _, _, closes, _ = _to_arrays(klines)

    if len(closes) < period + 1:
        return np.full(len(closes), 50.0)

    deltas = np.diff(closes)
    gains = np.where(deltas > 0, deltas, 0)
    losses = np.where(deltas < 0, -deltas, 0)

    avg_gain = np.zeros(len(closes))
    avg_loss = np.zeros(len(closes))

    avg_gain[period] = np.mean(gains[:period])
    avg_loss[period] = np.mean(losses[:period])

    for i in range(period + 1, len(closes)):
        avg_gain[i] = (avg_gain[i-1] * (period - 1) + gains[i-1]) / period
        avg_loss[i] = (avg_loss[i-1] * (period - 1) + losses[i-1]) / period

    rsi = np.full(len(closes), 50.0)
    for i in range(period, len(closes)):
        if avg_loss[i] == 0:
            rsi[i] = 100.0
        else:
            rs = avg_gain[i] / avg_loss[i]
            rsi[i] = 100 - (100 / (1 + rs))

    return rsi


# ============================================================
# ROLLING VWAP
# ============================================================

def calc_rolling_vwap(klines, period=50):
    """Rolling VWAP (volume-weighted average price). Returns array."""
    _, highs, lows, closes, volumes = _to_arrays(klines)
    typical = (highs + lows + closes) / 3
    tp_vol = typical * volumes

    vwap = np.full(len(closes), np.nan)
    for i in range(period - 1, len(closes)):
        vol_sum = np.sum(volumes[i - period + 1:i + 1])
        if vol_sum > 0:
            vwap[i] = np.sum(tp_vol[i - period + 1:i + 1]) / vol_sum

    return vwap


# ============================================================
# VOLUME SMA
# ============================================================

def calc_volume_sma(klines, period=20):
    """SMA of volume. Returns array."""
    _, _, _, _, volumes = _to_arrays(klines)

    sma = np.full(len(volumes), np.nan)
    for i in range(period - 1, len(volumes)):
        sma[i] = np.mean(volumes[i - period + 1:i + 1])

    return sma


# ============================================================
# VOLUME SPIKE DETECTION
# ============================================================

def detect_volume_spike(klines, vol_sma, spike_mult=7.0, lookback=5):
    """
    Check if there was a volume spike >= spike_mult * SMA in last `lookback` candles.
    Returns (spike_found, spike_idx, spike_ratio) or (False, -1, 0).
    """
    _, _, _, _, volumes = _to_arrays(klines)
    n = len(volumes)

    if n < lookback + 2 or np.isnan(vol_sma[-1]) or vol_sma[-1] == 0:
        return False, -1, 0

    # Look for spike in last `lookback` candles (excluding last 2 — need exhaustion after)
    best_ratio = 0
    best_idx = -1

    for i in range(n - lookback - 2, n - 2 + 1):
        if i < 0:
            continue
        ratio = volumes[i] / vol_sma[i] if not np.isnan(vol_sma[i]) and vol_sma[i] > 0 else 0
        if ratio >= spike_mult and ratio > best_ratio:
            best_ratio = ratio
            best_idx = i

    if best_idx >= 0:
        return True, best_idx, round(best_ratio, 1)

    return False, -1, 0


# ============================================================
# VOLUME EXHAUSTION
# ============================================================

def check_exhaustion(klines, exhaustion_bars=2):
    """
    Check if last `exhaustion_bars` candles have declining volume.
    Must be called AFTER spike detection — checks bars at end of klines.
    """
    _, _, _, _, volumes = _to_arrays(klines)
    n = len(volumes)

    if n < exhaustion_bars + 1:
        return False

    # Check last exhaustion_bars have declining volume
    for j in range(1, exhaustion_bars + 1):
        idx = n - exhaustion_bars + j - 1
        if idx <= 0 or volumes[idx] >= volumes[idx - 1]:
            return False

    return True


# ============================================================
# FULL SIGNAL CHECK
# ============================================================

def check_zatochki_signal(klines, rsi_period=14, rsi_os=25, rsi_ob=80,
                          vwap_period=50, vwap_ext=0.02,
                          vol_sma_period=20, vol_spike_mult=7.0,
                          spike_lookback=5, exhaustion_bars=2,
                          sl_buffer=0.003, sl_cap=0.012, sl_min=0.003):
    """
    Full Zatochki signal check on klines.

    Returns dict with signal info or None.
    """
    if len(klines) < max(rsi_period, vwap_period, vol_sma_period) + spike_lookback + exhaustion_bars + 5:
        return None

    _, highs, lows, closes, volumes = _to_arrays(klines)

    # 1. Volume SMA
    vol_sma = calc_volume_sma(klines, vol_sma_period)

    # 2. Volume spike detection
    spike_found, spike_idx, spike_ratio = detect_volume_spike(
        klines, vol_sma, vol_spike_mult, spike_lookback
    )
    if not spike_found:
        return None

    # 3. Volume exhaustion
    if not check_exhaustion(klines, exhaustion_bars):
        return None

    # 4. VWAP extension
    vwap = calc_rolling_vwap(klines, vwap_period)
    last_close = closes[-1]
    last_vwap = vwap[-1]

    if np.isnan(last_vwap) or last_vwap == 0:
        return None

    vwap_dist = (last_close - last_vwap) / last_vwap
    if abs(vwap_dist) < vwap_ext:
        return None

    # 5. RSI extreme
    rsi = calc_rsi(klines, rsi_period)
    rsi_val = rsi[-1]

    # Direction
    if rsi_val < rsi_os and vwap_dist < 0:
        direction = "LONG"
    elif rsi_val > rsi_ob and vwap_dist > 0:
        direction = "SHORT"
    else:
        return None

    # 6. Calculate dynamic SL
    if direction == "LONG":
        spike_low = np.min(lows[spike_idx:])
        sl_price = spike_low * (1 - sl_buffer)
        sl_pct = (last_close - sl_price) / last_close
    else:
        spike_high = np.max(highs[spike_idx:])
        sl_price = spike_high * (1 + sl_buffer)
        sl_pct = (sl_price - last_close) / last_close

    # Cap SL
    if sl_pct > sl_cap:
        if direction == "LONG":
            sl_price = last_close * (1 - sl_cap)
        else:
            sl_price = last_close * (1 + sl_cap)
        sl_pct = sl_cap

    # Skip if SL too tight
    if sl_pct < sl_min:
        return None

    return {
        "direction": direction,
        "entry_price": float(last_close),
        "sl_price": float(sl_price),
        "sl_pct": round(sl_pct * 100, 2),
        "vol_spike_ratio": spike_ratio,
        "vwap_dist_pct": round(abs(vwap_dist) * 100, 2),
        "rsi": round(rsi_val, 1),
        "exhaustion_bars": exhaustion_bars,
        "spike_idx": spike_idx,
    }

📜 Git History

c6f6bd5chore: initial commit — version control setup5 weeks ago
Show last diff
Loading...