← Назад"""
Zatochki (Knife Catcher) — Indicators
=======================================
Pure functions for volume exhaustion reversal detection.
Works with Bybit klines in Binance-compat format from exchange.py.
Kline format: [timestamp, open, high, low, close, volume, ...]
"""
import numpy as np
import logging
logger = logging.getLogger("zatochki.indicators")
# ============================================================
# HELPERS
# ============================================================
def _to_arrays(klines):
"""Convert klines to numpy arrays."""
opens = np.array([float(k[1]) for k in klines])
highs = np.array([float(k[2]) for k in klines])
lows = np.array([float(k[3]) for k in klines])
closes = np.array([float(k[4]) for k in klines])
volumes = np.array([float(k[5]) for k in klines])
return opens, highs, lows, closes, volumes
# ============================================================
# RSI
# ============================================================
def calc_rsi(klines, period=14):
"""RSI calculation. Returns array same length as klines."""
_, _, _, closes, _ = _to_arrays(klines)
if len(closes) < period + 1:
return np.full(len(closes), 50.0)
deltas = np.diff(closes)
gains = np.where(deltas > 0, deltas, 0)
losses = np.where(deltas < 0, -deltas, 0)
avg_gain = np.zeros(len(closes))
avg_loss = np.zeros(len(closes))
avg_gain[period] = np.mean(gains[:period])
avg_loss[period] = np.mean(losses[:period])
for i in range(period + 1, len(closes)):
avg_gain[i] = (avg_gain[i-1] * (period - 1) + gains[i-1]) / period
avg_loss[i] = (avg_loss[i-1] * (period - 1) + losses[i-1]) / period
rsi = np.full(len(closes), 50.0)
for i in range(period, len(closes)):
if avg_loss[i] == 0:
rsi[i] = 100.0
else:
rs = avg_gain[i] / avg_loss[i]
rsi[i] = 100 - (100 / (1 + rs))
return rsi
# ============================================================
# ROLLING VWAP
# ============================================================
def calc_rolling_vwap(klines, period=50):
"""Rolling VWAP (volume-weighted average price). Returns array."""
_, highs, lows, closes, volumes = _to_arrays(klines)
typical = (highs + lows + closes) / 3
tp_vol = typical * volumes
vwap = np.full(len(closes), np.nan)
for i in range(period - 1, len(closes)):
vol_sum = np.sum(volumes[i - period + 1:i + 1])
if vol_sum > 0:
vwap[i] = np.sum(tp_vol[i - period + 1:i + 1]) / vol_sum
return vwap
# ============================================================
# VOLUME SMA
# ============================================================
def calc_volume_sma(klines, period=20):
"""SMA of volume. Returns array."""
_, _, _, _, volumes = _to_arrays(klines)
sma = np.full(len(volumes), np.nan)
for i in range(period - 1, len(volumes)):
sma[i] = np.mean(volumes[i - period + 1:i + 1])
return sma
# ============================================================
# VOLUME SPIKE DETECTION
# ============================================================
def detect_volume_spike(klines, vol_sma, spike_mult=7.0, lookback=5):
"""
Check if there was a volume spike >= spike_mult * SMA in last `lookback` candles.
Returns (spike_found, spike_idx, spike_ratio) or (False, -1, 0).
"""
_, _, _, _, volumes = _to_arrays(klines)
n = len(volumes)
if n < lookback + 2 or np.isnan(vol_sma[-1]) or vol_sma[-1] == 0:
return False, -1, 0
# Look for spike in last `lookback` candles (excluding last 2 — need exhaustion after)
best_ratio = 0
best_idx = -1
for i in range(n - lookback - 2, n - 2 + 1):
if i < 0:
continue
ratio = volumes[i] / vol_sma[i] if not np.isnan(vol_sma[i]) and vol_sma[i] > 0 else 0
if ratio >= spike_mult and ratio > best_ratio:
best_ratio = ratio
best_idx = i
if best_idx >= 0:
return True, best_idx, round(best_ratio, 1)
return False, -1, 0
# ============================================================
# VOLUME EXHAUSTION
# ============================================================
def check_exhaustion(klines, exhaustion_bars=2):
"""
Check if last `exhaustion_bars` candles have declining volume.
Must be called AFTER spike detection — checks bars at end of klines.
"""
_, _, _, _, volumes = _to_arrays(klines)
n = len(volumes)
if n < exhaustion_bars + 1:
return False
# Check last exhaustion_bars have declining volume
for j in range(1, exhaustion_bars + 1):
idx = n - exhaustion_bars + j - 1
if idx <= 0 or volumes[idx] >= volumes[idx - 1]:
return False
return True
# ============================================================
# FULL SIGNAL CHECK
# ============================================================
def check_zatochki_signal(klines, rsi_period=14, rsi_os=25, rsi_ob=80,
vwap_period=50, vwap_ext=0.02,
vol_sma_period=20, vol_spike_mult=7.0,
spike_lookback=5, exhaustion_bars=2,
sl_buffer=0.003, sl_cap=0.012, sl_min=0.003):
"""
Full Zatochki signal check on klines.
Returns dict with signal info or None.
"""
if len(klines) < max(rsi_period, vwap_period, vol_sma_period) + spike_lookback + exhaustion_bars + 5:
return None
_, highs, lows, closes, volumes = _to_arrays(klines)
# 1. Volume SMA
vol_sma = calc_volume_sma(klines, vol_sma_period)
# 2. Volume spike detection
spike_found, spike_idx, spike_ratio = detect_volume_spike(
klines, vol_sma, vol_spike_mult, spike_lookback
)
if not spike_found:
return None
# 3. Volume exhaustion
if not check_exhaustion(klines, exhaustion_bars):
return None
# 4. VWAP extension
vwap = calc_rolling_vwap(klines, vwap_period)
last_close = closes[-1]
last_vwap = vwap[-1]
if np.isnan(last_vwap) or last_vwap == 0:
return None
vwap_dist = (last_close - last_vwap) / last_vwap
if abs(vwap_dist) < vwap_ext:
return None
# 5. RSI extreme
rsi = calc_rsi(klines, rsi_period)
rsi_val = rsi[-1]
# Direction
if rsi_val < rsi_os and vwap_dist < 0:
direction = "LONG"
elif rsi_val > rsi_ob and vwap_dist > 0:
direction = "SHORT"
else:
return None
# 6. Calculate dynamic SL
if direction == "LONG":
spike_low = np.min(lows[spike_idx:])
sl_price = spike_low * (1 - sl_buffer)
sl_pct = (last_close - sl_price) / last_close
else:
spike_high = np.max(highs[spike_idx:])
sl_price = spike_high * (1 + sl_buffer)
sl_pct = (sl_price - last_close) / last_close
# Cap SL
if sl_pct > sl_cap:
if direction == "LONG":
sl_price = last_close * (1 - sl_cap)
else:
sl_price = last_close * (1 + sl_cap)
sl_pct = sl_cap
# Skip if SL too tight
if sl_pct < sl_min:
return None
return {
"direction": direction,
"entry_price": float(last_close),
"sl_price": float(sl_price),
"sl_pct": round(sl_pct * 100, 2),
"vol_spike_ratio": spike_ratio,
"vwap_dist_pct": round(abs(vwap_dist) * 100, 2),
"rsi": round(rsi_val, 1),
"exhaustion_bars": exhaustion_bars,
"spike_idx": spike_idx,
}