← Назад
""" Zatochki (Knife Catcher) — Indicators ======================================= Pure functions for volume exhaustion reversal detection. Works with Bybit klines in Binance-compat format from exchange.py. Kline format: [timestamp, open, high, low, close, volume, ...] """ import numpy as np import logging logger = logging.getLogger("zatochki.indicators") # ============================================================ # HELPERS # ============================================================ def _to_arrays(klines): """Convert klines to numpy arrays.""" opens = np.array([float(k[1]) for k in klines]) highs = np.array([float(k[2]) for k in klines]) lows = np.array([float(k[3]) for k in klines]) closes = np.array([float(k[4]) for k in klines]) volumes = np.array([float(k[5]) for k in klines]) return opens, highs, lows, closes, volumes # ============================================================ # RSI # ============================================================ def calc_rsi(klines, period=14): """RSI calculation. Returns array same length as klines.""" _, _, _, closes, _ = _to_arrays(klines) if len(closes) < period + 1: return np.full(len(closes), 50.0) deltas = np.diff(closes) gains = np.where(deltas > 0, deltas, 0) losses = np.where(deltas < 0, -deltas, 0) avg_gain = np.zeros(len(closes)) avg_loss = np.zeros(len(closes)) avg_gain[period] = np.mean(gains[:period]) avg_loss[period] = np.mean(losses[:period]) for i in range(period + 1, len(closes)): avg_gain[i] = (avg_gain[i-1] * (period - 1) + gains[i-1]) / period avg_loss[i] = (avg_loss[i-1] * (period - 1) + losses[i-1]) / period rsi = np.full(len(closes), 50.0) for i in range(period, len(closes)): if avg_loss[i] == 0: rsi[i] = 100.0 else: rs = avg_gain[i] / avg_loss[i] rsi[i] = 100 - (100 / (1 + rs)) return rsi # ============================================================ # ROLLING VWAP # ============================================================ def calc_rolling_vwap(klines, period=50): """Rolling VWAP (volume-weighted average price). Returns array.""" _, highs, lows, closes, volumes = _to_arrays(klines) typical = (highs + lows + closes) / 3 tp_vol = typical * volumes vwap = np.full(len(closes), np.nan) for i in range(period - 1, len(closes)): vol_sum = np.sum(volumes[i - period + 1:i + 1]) if vol_sum > 0: vwap[i] = np.sum(tp_vol[i - period + 1:i + 1]) / vol_sum return vwap # ============================================================ # VOLUME SMA # ============================================================ def calc_volume_sma(klines, period=20): """SMA of volume. Returns array.""" _, _, _, _, volumes = _to_arrays(klines) sma = np.full(len(volumes), np.nan) for i in range(period - 1, len(volumes)): sma[i] = np.mean(volumes[i - period + 1:i + 1]) return sma # ============================================================ # VOLUME SPIKE DETECTION # ============================================================ def detect_volume_spike(klines, vol_sma, spike_mult=7.0, lookback=5): """ Check if there was a volume spike >= spike_mult * SMA in last `lookback` candles. Returns (spike_found, spike_idx, spike_ratio) or (False, -1, 0). """ _, _, _, _, volumes = _to_arrays(klines) n = len(volumes) if n < lookback + 2 or np.isnan(vol_sma[-1]) or vol_sma[-1] == 0: return False, -1, 0 # Look for spike in last `lookback` candles (excluding last 2 — need exhaustion after) best_ratio = 0 best_idx = -1 for i in range(n - lookback - 2, n - 2 + 1): if i < 0: continue ratio = volumes[i] / vol_sma[i] if not np.isnan(vol_sma[i]) and vol_sma[i] > 0 else 0 if ratio >= spike_mult and ratio > best_ratio: best_ratio = ratio best_idx = i if best_idx >= 0: return True, best_idx, round(best_ratio, 1) return False, -1, 0 # ============================================================ # VOLUME EXHAUSTION # ============================================================ def check_exhaustion(klines, exhaustion_bars=2): """ Check if last `exhaustion_bars` candles have declining volume. Must be called AFTER spike detection — checks bars at end of klines. """ _, _, _, _, volumes = _to_arrays(klines) n = len(volumes) if n < exhaustion_bars + 1: return False # Check last exhaustion_bars have declining volume for j in range(1, exhaustion_bars + 1): idx = n - exhaustion_bars + j - 1 if idx <= 0 or volumes[idx] >= volumes[idx - 1]: return False return True # ============================================================ # FULL SIGNAL CHECK # ============================================================ def check_zatochki_signal(klines, rsi_period=14, rsi_os=25, rsi_ob=80, vwap_period=50, vwap_ext=0.02, vol_sma_period=20, vol_spike_mult=7.0, spike_lookback=5, exhaustion_bars=2, sl_buffer=0.003, sl_cap=0.012, sl_min=0.003): """ Full Zatochki signal check on klines. Returns dict with signal info or None. """ if len(klines) < max(rsi_period, vwap_period, vol_sma_period) + spike_lookback + exhaustion_bars + 5: return None _, highs, lows, closes, volumes = _to_arrays(klines) # 1. Volume SMA vol_sma = calc_volume_sma(klines, vol_sma_period) # 2. Volume spike detection spike_found, spike_idx, spike_ratio = detect_volume_spike( klines, vol_sma, vol_spike_mult, spike_lookback ) if not spike_found: return None # 3. Volume exhaustion if not check_exhaustion(klines, exhaustion_bars): return None # 4. VWAP extension vwap = calc_rolling_vwap(klines, vwap_period) last_close = closes[-1] last_vwap = vwap[-1] if np.isnan(last_vwap) or last_vwap == 0: return None vwap_dist = (last_close - last_vwap) / last_vwap if abs(vwap_dist) < vwap_ext: return None # 5. RSI extreme rsi = calc_rsi(klines, rsi_period) rsi_val = rsi[-1] # Direction if rsi_val < rsi_os and vwap_dist < 0: direction = "LONG" elif rsi_val > rsi_ob and vwap_dist > 0: direction = "SHORT" else: return None # 6. Calculate dynamic SL if direction == "LONG": spike_low = np.min(lows[spike_idx:]) sl_price = spike_low * (1 - sl_buffer) sl_pct = (last_close - sl_price) / last_close else: spike_high = np.max(highs[spike_idx:]) sl_price = spike_high * (1 + sl_buffer) sl_pct = (sl_price - last_close) / last_close # Cap SL if sl_pct > sl_cap: if direction == "LONG": sl_price = last_close * (1 - sl_cap) else: sl_price = last_close * (1 + sl_cap) sl_pct = sl_cap # Skip if SL too tight if sl_pct < sl_min: return None return { "direction": direction, "entry_price": float(last_close), "sl_price": float(sl_price), "sl_pct": round(sl_pct * 100, 2), "vol_spike_ratio": spike_ratio, "vwap_dist_pct": round(abs(vwap_dist) * 100, 2), "rsi": round(rsi_val, 1), "exhaustion_bars": exhaustion_bars, "spike_idx": spike_idx, }