← Назад
#!/usr/bin/env python3 """ Π—Π°Ρ‚ΠΎΡ‡ΠΊΠΈ (Knife Catcher) β€” Backtest ==================================== Volume exhaustion reversal on 1m Bybit Futures. Downloads klines from Bybit, runs strategy, reports stats. Usage: python3 backtest_zatochki.py # all symbols python3 backtest_zatochki.py SIRENUSDT # single symbol """ import sys import json import time import math import requests import numpy as np import pandas as pd from datetime import datetime, timedelta from pathlib import Path # ============================================================ # CONFIG # ============================================================ # Symbols to test β€” mid-cap alts from our screener that had trades SYMBOLS = [ "SIRENUSDT", "STOUSDT", "JCTUSDT", "PIPPINUSDT", "ONGUSDT", "PUFFERUSDT", "NOMUSDT", "EDGEUSDT", "DUSDT", "ARIAUSDT", "MMTUSDT", "BERAUSDT", "AIOTUSDT", "KOMAUSDT", ] TIMEFRAME = "1" # 1 minute (Bybit format) DAYS_BACK = 7 # 7 days default COMMISSION = 0.00055 # 0.055% Bybit taker (per side) SLIPPAGE = 0.0001 # 0.01% # Strategy params β€” v2 optimized (from v1 analysis) VOL_SPIKE_MULT = 7.0 # was 5x β†’ 7x (8x+ bucket WR 69%) VOL_SMA_PERIOD = 20 # lookback for volume average SPIKE_LOOKBACK = 5 # spike in last N candles VWAP_PERIOD = 50 # rolling VWAP lookback VWAP_EXT_PCT = 0.02 # price extension >= 2% from VWAP RSI_PERIOD = 14 RSI_OVERSOLD = 25 RSI_OVERBOUGHT = 80 EXHAUSTION_BARS = 2 # N declining volume bars after spike SL_CAP_PCT = 0.012 # cap SL at 1.2% # Risk SL_BUFFER_PCT = 0.003 # 0.3% buffer beyond spike wick TP1_PCT = 0.007 # was 0.5% β†’ 0.7% (47% trades reach MFE 0.7%, covers commission) TP1_CLOSE_RATIO = 0.5 # close 50% at TP1 TRAIL_CALLBACK_PCT = 0.005 # 0.5% trailing after TP1 TRADE_SIZE_USD = 5.0 LEVERAGE = 10 NOTIONAL = TRADE_SIZE_USD * LEVERAGE # $50 # ============================================================ # DATA DOWNLOAD β€” Bybit # ============================================================ def get_bybit_klines(symbol, interval, days_back): """Download klines from Bybit linear futures.""" url = "https://api.bybit.com/v5/market/kline" interval_sec = {"1": 60, "5": 300, "15": 900, "60": 3600} sec = interval_sec.get(str(interval), 60) end_ms = int(time.time() * 1000) start_ms = end_ms - days_back * 24 * 3600 * 1000 all_klines = [] current_end = end_ms while current_end > start_ms: params = { "category": "linear", "symbol": symbol, "interval": str(interval), "end": current_end, "limit": 1000, } try: resp = requests.get(url, params=params, timeout=10) data = resp.json() rows = data.get("result", {}).get("list", []) if not rows: break all_klines.extend(rows) # Bybit returns newest first, last item = oldest oldest_ts = int(rows[-1][0]) current_end = oldest_ts - 1 time.sleep(0.1) except Exception as e: print(f" Error fetching {symbol}: {e}") time.sleep(1) continue if not all_klines: return None # Bybit format: [startTime, open, high, low, close, volume, turnover] df = pd.DataFrame(all_klines, columns=[ "timestamp", "open", "high", "low", "close", "volume", "turnover" ]) df["timestamp"] = pd.to_datetime(df["timestamp"].astype(int), unit="ms") for col in ["open", "high", "low", "close", "volume", "turnover"]: df[col] = df[col].astype(float) df = df.drop_duplicates(subset=["timestamp"]).sort_values("timestamp").reset_index(drop=True) return df # ============================================================ # INDICATORS # ============================================================ def calc_rsi(closes, period=14): """RSI calculation.""" deltas = np.diff(closes) gains = np.where(deltas > 0, deltas, 0) losses = np.where(deltas < 0, -deltas, 0) avg_gain = np.zeros(len(closes)) avg_loss = np.zeros(len(closes)) if len(gains) < period: return np.full(len(closes), 50.0) avg_gain[period] = np.mean(gains[:period]) avg_loss[period] = np.mean(losses[:period]) for i in range(period + 1, len(closes)): avg_gain[i] = (avg_gain[i-1] * (period - 1) + gains[i-1]) / period avg_loss[i] = (avg_loss[i-1] * (period - 1) + losses[i-1]) / period rsi = np.full(len(closes), 50.0) for i in range(period, len(closes)): if avg_loss[i] == 0: rsi[i] = 100.0 else: rs = avg_gain[i] / avg_loss[i] rsi[i] = 100 - (100 / (1 + rs)) return rsi def calc_rolling_vwap(df, period=50): """Rolling VWAP (volume-weighted average price).""" typical = (df["high"] + df["low"] + df["close"]) / 3 tp_vol = typical * df["volume"] vwap = tp_vol.rolling(period).sum() / df["volume"].rolling(period).sum() return vwap def calc_volume_sma(volumes, period=20): """Simple moving average of volume.""" return pd.Series(volumes).rolling(period).mean() # ============================================================ # STRATEGY β€” Zatochki (Knife Catcher) # ============================================================ def find_signals(df): """ Scan dataframe for knife-catch signals. Returns list of signal dicts. """ closes = df["close"].values highs = df["high"].values lows = df["low"].values volumes = df["volume"].values rsi = calc_rsi(closes, RSI_PERIOD) vwap = calc_rolling_vwap(df, VWAP_PERIOD).values vol_sma = calc_volume_sma(volumes, VOL_SMA_PERIOD).values signals = [] min_idx = max(VOL_SMA_PERIOD, VWAP_PERIOD, RSI_PERIOD) + SPIKE_LOOKBACK + EXHAUSTION_BARS for i in range(min_idx, len(df)): # Skip if VWAP or vol_sma is NaN if np.isnan(vwap[i]) or np.isnan(vol_sma[i]) or vol_sma[i] == 0: continue # 1. Volume spike in last SPIKE_LOOKBACK candles spike_window = volumes[i - SPIKE_LOOKBACK - EXHAUSTION_BARS : i - EXHAUSTION_BARS + 1] max_vol_ratio = np.max(spike_window) / vol_sma[i] if vol_sma[i] > 0 else 0 if max_vol_ratio < VOL_SPIKE_MULT: continue # Find the spike candle index spike_rel_idx = np.argmax(spike_window) spike_abs_idx = i - SPIKE_LOOKBACK - EXHAUSTION_BARS + spike_rel_idx # 2. Volume exhaustion β€” EXHAUSTION_BARS declining after spike area exhaustion = True for j in range(1, EXHAUSTION_BARS + 1): check_idx = i - EXHAUSTION_BARS + j if check_idx <= 0 or volumes[check_idx] >= volumes[check_idx - 1]: exhaustion = False break if not exhaustion: continue # 3. Price extension from VWAP vwap_dist = (closes[i] - vwap[i]) / vwap[i] if abs(vwap_dist) < VWAP_EXT_PCT: continue # 4. RSI extreme rsi_val = rsi[i] # Determine direction if rsi_val < RSI_OVERSOLD and vwap_dist < 0: direction = "LONG" # oversold + below VWAP β†’ buy the dip elif rsi_val > RSI_OVERBOUGHT and vwap_dist > 0: direction = "SHORT" # overbought + above VWAP β†’ sell the top else: continue # Calculate dynamic SL from spike wick if direction == "LONG": # SL below the lowest low in spike window spike_low = np.min(lows[spike_abs_idx : i + 1]) sl_price = spike_low * (1 - SL_BUFFER_PCT) sl_pct = (closes[i] - sl_price) / closes[i] else: spike_high = np.max(highs[spike_abs_idx : i + 1]) sl_price = spike_high * (1 + SL_BUFFER_PCT) sl_pct = (sl_price - closes[i]) / closes[i] # Cap SL at SL_CAP_PCT (was unlimited β€” wide SLs killed PnL) if sl_pct > SL_CAP_PCT: if direction == "LONG": sl_price = closes[i] * (1 - SL_CAP_PCT) else: sl_price = closes[i] * (1 + SL_CAP_PCT) sl_pct = SL_CAP_PCT # Skip if SL too tight (<0.3%) if sl_pct < 0.003: continue signals.append({ "idx": i, "timestamp": df["timestamp"].iloc[i], "direction": direction, "entry_price": closes[i], "sl_price": sl_price, "sl_pct": sl_pct, "vol_spike_ratio": round(max_vol_ratio, 1), "vwap_dist_pct": round(abs(vwap_dist) * 100, 2), "rsi": round(rsi_val, 1), "exhaustion_bars": EXHAUSTION_BARS, }) return signals # ============================================================ # BACKTEST ENGINE # ============================================================ def run_backtest(df, signals): """ Simulate trades with TP1 partial + trailing. Returns list of trade results. """ trades = [] in_trade = False cooldown_until = -1 for sig in signals: entry_idx = sig["idx"] if entry_idx <= cooldown_until: continue if in_trade: continue entry_price = sig["entry_price"] sl_price = sig["sl_price"] direction = sig["direction"] # TP1 price if direction == "LONG": tp1_price = entry_price * (1 + TP1_PCT) else: tp1_price = entry_price * (1 - TP1_PCT) # Simulate forward in_trade = True tp1_hit = False trail_high = 0 mfe = 0 # max favorable excursion mae = 0 # max adverse excursion exit_price = None exit_reason = None exit_idx = None for j in range(entry_idx + 1, min(entry_idx + 300, len(df))): # max 300 bars (5h) high = df["high"].iloc[j] low = df["low"].iloc[j] close = df["close"].iloc[j] if direction == "LONG": current_pnl_pct = (close - entry_price) / entry_price bar_best = (high - entry_price) / entry_price bar_worst = (low - entry_price) / entry_price else: current_pnl_pct = (entry_price - close) / entry_price bar_best = (entry_price - low) / entry_price bar_worst = (entry_price - high) / entry_price mfe = max(mfe, bar_best) mae = min(mae, bar_worst) # Check SL (before TP1) if not tp1_hit: if direction == "LONG" and low <= sl_price: exit_price = sl_price exit_reason = "SL" exit_idx = j break elif direction == "SHORT" and high >= sl_price: exit_price = sl_price exit_reason = "SL" exit_idx = j break # Check TP1 if not tp1_hit: if direction == "LONG" and high >= tp1_price: tp1_hit = True trail_high = high # SL moves to BE sl_price = entry_price continue elif direction == "SHORT" and low <= tp1_price: tp1_hit = True trail_high = low # for SHORT, track lowest sl_price = entry_price continue # Trailing stop (after TP1) if tp1_hit: if direction == "LONG": if high > trail_high: trail_high = high trail_sl = trail_high * (1 - TRAIL_CALLBACK_PCT) if low <= trail_sl: exit_price = trail_sl exit_reason = "TRAIL" exit_idx = j break # SL at BE if low <= sl_price: exit_price = sl_price exit_reason = "TP1+BE" exit_idx = j break else: # SHORT if low < trail_high or trail_high == 0: trail_high = low trail_sl = trail_high * (1 + TRAIL_CALLBACK_PCT) if high >= trail_sl: exit_price = trail_sl exit_reason = "TRAIL" exit_idx = j break if high >= sl_price: exit_price = sl_price exit_reason = "TP1+BE" exit_idx = j break # If no exit found in 300 bars, close at market if exit_price is None: exit_price = df["close"].iloc[min(entry_idx + 300, len(df) - 1)] exit_reason = "TIMEOUT" exit_idx = min(entry_idx + 300, len(df) - 1) # Calculate PnL if direction == "LONG": raw_pnl_pct = (exit_price - entry_price) / entry_price else: raw_pnl_pct = (entry_price - exit_price) / entry_price # Adjust for TP1 partial if hit if tp1_hit and exit_reason in ("TRAIL", "TP1+BE"): # TP1 portion: 50% at +TP1_PCT tp1_pnl = TP1_PCT * TP1_CLOSE_RATIO # Remaining 50%: at exit if direction == "LONG": rest_pnl_pct = (exit_price - entry_price) / entry_price else: rest_pnl_pct = (entry_price - exit_price) / entry_price rest_pnl = rest_pnl_pct * (1 - TP1_CLOSE_RATIO) total_pnl_pct = tp1_pnl + rest_pnl else: total_pnl_pct = raw_pnl_pct # Commission (both sides) total_pnl_pct -= COMMISSION * 2 if tp1_hit: total_pnl_pct -= COMMISSION * 2 * 0.5 # extra commission for partial close # Slippage total_pnl_pct -= SLIPPAGE * 2 pnl_usd = NOTIONAL * total_pnl_pct bars_in_trade = exit_idx - entry_idx trade = { "timestamp": str(sig["timestamp"]), "direction": direction, "entry_price": entry_price, "exit_price": exit_price, "sl_pct": round(sig["sl_pct"] * 100, 2), "result": exit_reason, "pnl_pct": round(total_pnl_pct * 100, 3), "pnl_usd": round(pnl_usd, 3), "tp1_hit": tp1_hit, "trail_high": trail_high, "mfe_pct": round(mfe * 100, 3), "mae_pct": round(mae * 100, 3), "bars": bars_in_trade, "vol_spike_ratio": sig["vol_spike_ratio"], "vwap_dist_pct": sig["vwap_dist_pct"], "rsi": sig["rsi"], } trades.append(trade) in_trade = False # Cooldown: 30 candles (30 min on 1m) cooldown_until = exit_idx + 30 return trades # ============================================================ # REPORTING # ============================================================ def print_report(symbol, trades): """Print backtest results for a symbol.""" if not trades: print(f"\n{symbol}: 0 signals found") return {} n = len(trades) wins = [t for t in trades if t["pnl_usd"] > 0] losses = [t for t in trades if t["pnl_usd"] <= 0] wr = len(wins) / n * 100 total_pnl = sum(t["pnl_usd"] for t in trades) avg_pnl = total_pnl / n avg_win = np.mean([t["pnl_usd"] for t in wins]) if wins else 0 avg_loss = np.mean([t["pnl_usd"] for t in losses]) if losses else 0 pf = abs(sum(t["pnl_usd"] for t in wins) / sum(t["pnl_usd"] for t in losses)) if losses and sum(t["pnl_usd"] for t in losses) != 0 else 0 tp1_hits = [t for t in trades if t["tp1_hit"]] tp1_rate = len(tp1_hits) / n * 100 avg_mfe = np.mean([t["mfe_pct"] for t in trades]) avg_mae = np.mean([t["mae_pct"] for t in trades]) avg_bars = np.mean([t["bars"] for t in trades]) # By result type by_res = {} for t in trades: r = t["result"] by_res.setdefault(r, []).append(t["pnl_usd"]) print(f"\n{'='*55}") print(f"πŸ”ͺ {symbol} β€” {n} trades ({DAYS_BACK} days, 1m)") print(f"{'='*55}") print(f"WR: {wr:.0f}% ({len(wins)}W/{len(losses)}L) | PnL: ${total_pnl:+.2f}") print(f"Avg win: ${avg_win:+.3f} | Avg loss: ${avg_loss:+.3f} | PF: {pf:.2f}") print(f"TP1 hit: {len(tp1_hits)}/{n} ({tp1_rate:.0f}%)") print(f"Avg MFE: {avg_mfe:+.2f}% | Avg MAE: {avg_mae:+.2f}% | Avg bars: {avg_bars:.0f}") for r, pnls in sorted(by_res.items()): w = sum(1 for p in pnls if p > 0) print(f" {r:10}: {len(pnls):3} trades, {w}W/{len(pnls)-w}L, ${sum(pnls):+.2f}") # Show individual trades if n <= 30: print(f"\nTrades:") for t in trades: emoji = "βœ…" if t["pnl_usd"] > 0 else "❌" tp1f = "🎯" if t["tp1_hit"] else " " print(f" {emoji}{tp1f} {t['direction']:5} {t['result']:8} " f"pnl:{t['pnl_pct']:+6.2f}% ${t['pnl_usd']:+.3f} " f"MFE:{t['mfe_pct']:+5.2f}% MAE:{t['mae_pct']:+5.2f}% " f"bars:{t['bars']:3} vol:{t['vol_spike_ratio']}x " f"vwap:{t['vwap_dist_pct']}% rsi:{t['rsi']}") return { "symbol": symbol, "trades": n, "wr": round(wr, 1), "pnl": round(total_pnl, 2), "pf": round(pf, 2), "tp1_rate": round(tp1_rate, 1), "avg_mfe": round(avg_mfe, 2), "avg_mae": round(avg_mae, 2), "avg_bars": round(avg_bars, 1), } # ============================================================ # MAIN # ============================================================ def main(): symbols = SYMBOLS if len(sys.argv) > 1: symbols = [sys.argv[1].upper()] print(f"πŸ”ͺ Π—Π°Ρ‚ΠΎΡ‡ΠΊΠΈ Backtest β€” {TIMEFRAME}m, {DAYS_BACK} days") print(f"Vol spike: {VOL_SPIKE_MULT}x | VWAP ext: {VWAP_EXT_PCT*100}%") print(f"RSI: <{RSI_OVERSOLD} / >{RSI_OVERBOUGHT}") print(f"TP1: {TP1_PCT*100}% | Trail: {TRAIL_CALLBACK_PCT*100}% | SL cap: {SL_CAP_PCT*100}%") print(f"Commission: {COMMISSION*100}% | Slippage: {SLIPPAGE*100}%") print(f"Symbols: {len(symbols)}") all_results = [] all_trades = [] for symbol in symbols: print(f"\nπŸ“₯ Downloading {symbol} 1m ({DAYS_BACK}d)...", end=" ", flush=True) df = get_bybit_klines(symbol, TIMEFRAME, DAYS_BACK) if df is None or len(df) < 200: print(f"SKIP (no data)") continue print(f"{len(df)} candles") signals = find_signals(df) print(f" Signals found: {len(signals)}") trades = run_backtest(df, signals) result = print_report(symbol, trades) if result: all_results.append(result) all_trades.extend(trades) # Summary if all_results: print(f"\n{'='*60}") print(f"πŸ“Š SUMMARY β€” {len(all_results)} symbols, {sum(r['trades'] for r in all_results)} trades") print(f"{'='*60}") print(f"\n{'Symbol':14} {'#':>4} {'WR':>5} {'PnL':>8} {'PF':>5} {'TP1%':>5} {'MFE':>6} {'MAE':>6}") print("-" * 55) for r in sorted(all_results, key=lambda x: x["pnl"], reverse=True): print(f"{r['symbol']:14} {r['trades']:4} {r['wr']:4.0f}% ${r['pnl']:+6.2f} {r['pf']:5.2f} {r['tp1_rate']:4.0f}% {r['avg_mfe']:+5.2f}% {r['avg_mae']:+5.2f}%") total_trades = sum(r["trades"] for r in all_results) total_pnl = sum(r["pnl"] for r in all_results) total_wins = sum(1 for t in all_trades if t["pnl_usd"] > 0) total_wr = total_wins / total_trades * 100 if total_trades else 0 print(f"\n{'TOTAL':14} {total_trades:4} {total_wr:4.0f}% ${total_pnl:+6.2f}") print(f"Per day: ~{total_trades/DAYS_BACK:.0f} trades, ~${total_pnl/DAYS_BACK:+.2f}/day") # Save results results_file = Path(__file__).parent / "results_zatochki.json" with open(results_file, "w") as f: json.dump({"config": { "timeframe": TIMEFRAME, "days": DAYS_BACK, "vol_spike": VOL_SPIKE_MULT, "vwap_ext": VWAP_EXT_PCT, "rsi_os": RSI_OVERSOLD, "rsi_ob": RSI_OVERBOUGHT, "tp1": TP1_PCT, "trail": TRAIL_CALLBACK_PCT, "commission": COMMISSION, "slippage": SLIPPAGE, }, "results": all_results, "trades": all_trades}, f, indent=2, default=str) print(f"\nResults saved to {results_file}") if __name__ == "__main__": main()