β ΠΠ°Π·Π°Π΄#!/usr/bin/env python3
"""
ΠΠ°ΡΠΎΡΠΊΠΈ (Knife Catcher) β Backtest
====================================
Volume exhaustion reversal on 1m Bybit Futures.
Downloads klines from Bybit, runs strategy, reports stats.
Usage:
python3 backtest_zatochki.py # all symbols
python3 backtest_zatochki.py SIRENUSDT # single symbol
"""
import sys
import json
import time
import math
import requests
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from pathlib import Path
# ============================================================
# CONFIG
# ============================================================
# Symbols to test β mid-cap alts from our screener that had trades
SYMBOLS = [
"SIRENUSDT", "STOUSDT", "JCTUSDT", "PIPPINUSDT", "ONGUSDT",
"PUFFERUSDT", "NOMUSDT", "EDGEUSDT", "DUSDT", "ARIAUSDT",
"MMTUSDT", "BERAUSDT", "AIOTUSDT", "KOMAUSDT",
]
TIMEFRAME = "1" # 1 minute (Bybit format)
DAYS_BACK = 7 # 7 days default
COMMISSION = 0.00055 # 0.055% Bybit taker (per side)
SLIPPAGE = 0.0001 # 0.01%
# Strategy params β v2 optimized (from v1 analysis)
VOL_SPIKE_MULT = 7.0 # was 5x β 7x (8x+ bucket WR 69%)
VOL_SMA_PERIOD = 20 # lookback for volume average
SPIKE_LOOKBACK = 5 # spike in last N candles
VWAP_PERIOD = 50 # rolling VWAP lookback
VWAP_EXT_PCT = 0.02 # price extension >= 2% from VWAP
RSI_PERIOD = 14
RSI_OVERSOLD = 25
RSI_OVERBOUGHT = 80
EXHAUSTION_BARS = 2 # N declining volume bars after spike
SL_CAP_PCT = 0.012 # cap SL at 1.2%
# Risk
SL_BUFFER_PCT = 0.003 # 0.3% buffer beyond spike wick
TP1_PCT = 0.007 # was 0.5% β 0.7% (47% trades reach MFE 0.7%, covers commission)
TP1_CLOSE_RATIO = 0.5 # close 50% at TP1
TRAIL_CALLBACK_PCT = 0.005 # 0.5% trailing after TP1
TRADE_SIZE_USD = 5.0
LEVERAGE = 10
NOTIONAL = TRADE_SIZE_USD * LEVERAGE # $50
# ============================================================
# DATA DOWNLOAD β Bybit
# ============================================================
def get_bybit_klines(symbol, interval, days_back):
"""Download klines from Bybit linear futures."""
url = "https://api.bybit.com/v5/market/kline"
interval_sec = {"1": 60, "5": 300, "15": 900, "60": 3600}
sec = interval_sec.get(str(interval), 60)
end_ms = int(time.time() * 1000)
start_ms = end_ms - days_back * 24 * 3600 * 1000
all_klines = []
current_end = end_ms
while current_end > start_ms:
params = {
"category": "linear",
"symbol": symbol,
"interval": str(interval),
"end": current_end,
"limit": 1000,
}
try:
resp = requests.get(url, params=params, timeout=10)
data = resp.json()
rows = data.get("result", {}).get("list", [])
if not rows:
break
all_klines.extend(rows)
# Bybit returns newest first, last item = oldest
oldest_ts = int(rows[-1][0])
current_end = oldest_ts - 1
time.sleep(0.1)
except Exception as e:
print(f" Error fetching {symbol}: {e}")
time.sleep(1)
continue
if not all_klines:
return None
# Bybit format: [startTime, open, high, low, close, volume, turnover]
df = pd.DataFrame(all_klines, columns=[
"timestamp", "open", "high", "low", "close", "volume", "turnover"
])
df["timestamp"] = pd.to_datetime(df["timestamp"].astype(int), unit="ms")
for col in ["open", "high", "low", "close", "volume", "turnover"]:
df[col] = df[col].astype(float)
df = df.drop_duplicates(subset=["timestamp"]).sort_values("timestamp").reset_index(drop=True)
return df
# ============================================================
# INDICATORS
# ============================================================
def calc_rsi(closes, period=14):
"""RSI calculation."""
deltas = np.diff(closes)
gains = np.where(deltas > 0, deltas, 0)
losses = np.where(deltas < 0, -deltas, 0)
avg_gain = np.zeros(len(closes))
avg_loss = np.zeros(len(closes))
if len(gains) < period:
return np.full(len(closes), 50.0)
avg_gain[period] = np.mean(gains[:period])
avg_loss[period] = np.mean(losses[:period])
for i in range(period + 1, len(closes)):
avg_gain[i] = (avg_gain[i-1] * (period - 1) + gains[i-1]) / period
avg_loss[i] = (avg_loss[i-1] * (period - 1) + losses[i-1]) / period
rsi = np.full(len(closes), 50.0)
for i in range(period, len(closes)):
if avg_loss[i] == 0:
rsi[i] = 100.0
else:
rs = avg_gain[i] / avg_loss[i]
rsi[i] = 100 - (100 / (1 + rs))
return rsi
def calc_rolling_vwap(df, period=50):
"""Rolling VWAP (volume-weighted average price)."""
typical = (df["high"] + df["low"] + df["close"]) / 3
tp_vol = typical * df["volume"]
vwap = tp_vol.rolling(period).sum() / df["volume"].rolling(period).sum()
return vwap
def calc_volume_sma(volumes, period=20):
"""Simple moving average of volume."""
return pd.Series(volumes).rolling(period).mean()
# ============================================================
# STRATEGY β Zatochki (Knife Catcher)
# ============================================================
def find_signals(df):
"""
Scan dataframe for knife-catch signals.
Returns list of signal dicts.
"""
closes = df["close"].values
highs = df["high"].values
lows = df["low"].values
volumes = df["volume"].values
rsi = calc_rsi(closes, RSI_PERIOD)
vwap = calc_rolling_vwap(df, VWAP_PERIOD).values
vol_sma = calc_volume_sma(volumes, VOL_SMA_PERIOD).values
signals = []
min_idx = max(VOL_SMA_PERIOD, VWAP_PERIOD, RSI_PERIOD) + SPIKE_LOOKBACK + EXHAUSTION_BARS
for i in range(min_idx, len(df)):
# Skip if VWAP or vol_sma is NaN
if np.isnan(vwap[i]) or np.isnan(vol_sma[i]) or vol_sma[i] == 0:
continue
# 1. Volume spike in last SPIKE_LOOKBACK candles
spike_window = volumes[i - SPIKE_LOOKBACK - EXHAUSTION_BARS : i - EXHAUSTION_BARS + 1]
max_vol_ratio = np.max(spike_window) / vol_sma[i] if vol_sma[i] > 0 else 0
if max_vol_ratio < VOL_SPIKE_MULT:
continue
# Find the spike candle index
spike_rel_idx = np.argmax(spike_window)
spike_abs_idx = i - SPIKE_LOOKBACK - EXHAUSTION_BARS + spike_rel_idx
# 2. Volume exhaustion β EXHAUSTION_BARS declining after spike area
exhaustion = True
for j in range(1, EXHAUSTION_BARS + 1):
check_idx = i - EXHAUSTION_BARS + j
if check_idx <= 0 or volumes[check_idx] >= volumes[check_idx - 1]:
exhaustion = False
break
if not exhaustion:
continue
# 3. Price extension from VWAP
vwap_dist = (closes[i] - vwap[i]) / vwap[i]
if abs(vwap_dist) < VWAP_EXT_PCT:
continue
# 4. RSI extreme
rsi_val = rsi[i]
# Determine direction
if rsi_val < RSI_OVERSOLD and vwap_dist < 0:
direction = "LONG" # oversold + below VWAP β buy the dip
elif rsi_val > RSI_OVERBOUGHT and vwap_dist > 0:
direction = "SHORT" # overbought + above VWAP β sell the top
else:
continue
# Calculate dynamic SL from spike wick
if direction == "LONG":
# SL below the lowest low in spike window
spike_low = np.min(lows[spike_abs_idx : i + 1])
sl_price = spike_low * (1 - SL_BUFFER_PCT)
sl_pct = (closes[i] - sl_price) / closes[i]
else:
spike_high = np.max(highs[spike_abs_idx : i + 1])
sl_price = spike_high * (1 + SL_BUFFER_PCT)
sl_pct = (sl_price - closes[i]) / closes[i]
# Cap SL at SL_CAP_PCT (was unlimited β wide SLs killed PnL)
if sl_pct > SL_CAP_PCT:
if direction == "LONG":
sl_price = closes[i] * (1 - SL_CAP_PCT)
else:
sl_price = closes[i] * (1 + SL_CAP_PCT)
sl_pct = SL_CAP_PCT
# Skip if SL too tight (<0.3%)
if sl_pct < 0.003:
continue
signals.append({
"idx": i,
"timestamp": df["timestamp"].iloc[i],
"direction": direction,
"entry_price": closes[i],
"sl_price": sl_price,
"sl_pct": sl_pct,
"vol_spike_ratio": round(max_vol_ratio, 1),
"vwap_dist_pct": round(abs(vwap_dist) * 100, 2),
"rsi": round(rsi_val, 1),
"exhaustion_bars": EXHAUSTION_BARS,
})
return signals
# ============================================================
# BACKTEST ENGINE
# ============================================================
def run_backtest(df, signals):
"""
Simulate trades with TP1 partial + trailing.
Returns list of trade results.
"""
trades = []
in_trade = False
cooldown_until = -1
for sig in signals:
entry_idx = sig["idx"]
if entry_idx <= cooldown_until:
continue
if in_trade:
continue
entry_price = sig["entry_price"]
sl_price = sig["sl_price"]
direction = sig["direction"]
# TP1 price
if direction == "LONG":
tp1_price = entry_price * (1 + TP1_PCT)
else:
tp1_price = entry_price * (1 - TP1_PCT)
# Simulate forward
in_trade = True
tp1_hit = False
trail_high = 0
mfe = 0 # max favorable excursion
mae = 0 # max adverse excursion
exit_price = None
exit_reason = None
exit_idx = None
for j in range(entry_idx + 1, min(entry_idx + 300, len(df))): # max 300 bars (5h)
high = df["high"].iloc[j]
low = df["low"].iloc[j]
close = df["close"].iloc[j]
if direction == "LONG":
current_pnl_pct = (close - entry_price) / entry_price
bar_best = (high - entry_price) / entry_price
bar_worst = (low - entry_price) / entry_price
else:
current_pnl_pct = (entry_price - close) / entry_price
bar_best = (entry_price - low) / entry_price
bar_worst = (entry_price - high) / entry_price
mfe = max(mfe, bar_best)
mae = min(mae, bar_worst)
# Check SL (before TP1)
if not tp1_hit:
if direction == "LONG" and low <= sl_price:
exit_price = sl_price
exit_reason = "SL"
exit_idx = j
break
elif direction == "SHORT" and high >= sl_price:
exit_price = sl_price
exit_reason = "SL"
exit_idx = j
break
# Check TP1
if not tp1_hit:
if direction == "LONG" and high >= tp1_price:
tp1_hit = True
trail_high = high
# SL moves to BE
sl_price = entry_price
continue
elif direction == "SHORT" and low <= tp1_price:
tp1_hit = True
trail_high = low # for SHORT, track lowest
sl_price = entry_price
continue
# Trailing stop (after TP1)
if tp1_hit:
if direction == "LONG":
if high > trail_high:
trail_high = high
trail_sl = trail_high * (1 - TRAIL_CALLBACK_PCT)
if low <= trail_sl:
exit_price = trail_sl
exit_reason = "TRAIL"
exit_idx = j
break
# SL at BE
if low <= sl_price:
exit_price = sl_price
exit_reason = "TP1+BE"
exit_idx = j
break
else: # SHORT
if low < trail_high or trail_high == 0:
trail_high = low
trail_sl = trail_high * (1 + TRAIL_CALLBACK_PCT)
if high >= trail_sl:
exit_price = trail_sl
exit_reason = "TRAIL"
exit_idx = j
break
if high >= sl_price:
exit_price = sl_price
exit_reason = "TP1+BE"
exit_idx = j
break
# If no exit found in 300 bars, close at market
if exit_price is None:
exit_price = df["close"].iloc[min(entry_idx + 300, len(df) - 1)]
exit_reason = "TIMEOUT"
exit_idx = min(entry_idx + 300, len(df) - 1)
# Calculate PnL
if direction == "LONG":
raw_pnl_pct = (exit_price - entry_price) / entry_price
else:
raw_pnl_pct = (entry_price - exit_price) / entry_price
# Adjust for TP1 partial if hit
if tp1_hit and exit_reason in ("TRAIL", "TP1+BE"):
# TP1 portion: 50% at +TP1_PCT
tp1_pnl = TP1_PCT * TP1_CLOSE_RATIO
# Remaining 50%: at exit
if direction == "LONG":
rest_pnl_pct = (exit_price - entry_price) / entry_price
else:
rest_pnl_pct = (entry_price - exit_price) / entry_price
rest_pnl = rest_pnl_pct * (1 - TP1_CLOSE_RATIO)
total_pnl_pct = tp1_pnl + rest_pnl
else:
total_pnl_pct = raw_pnl_pct
# Commission (both sides)
total_pnl_pct -= COMMISSION * 2
if tp1_hit:
total_pnl_pct -= COMMISSION * 2 * 0.5 # extra commission for partial close
# Slippage
total_pnl_pct -= SLIPPAGE * 2
pnl_usd = NOTIONAL * total_pnl_pct
bars_in_trade = exit_idx - entry_idx
trade = {
"timestamp": str(sig["timestamp"]),
"direction": direction,
"entry_price": entry_price,
"exit_price": exit_price,
"sl_pct": round(sig["sl_pct"] * 100, 2),
"result": exit_reason,
"pnl_pct": round(total_pnl_pct * 100, 3),
"pnl_usd": round(pnl_usd, 3),
"tp1_hit": tp1_hit,
"trail_high": trail_high,
"mfe_pct": round(mfe * 100, 3),
"mae_pct": round(mae * 100, 3),
"bars": bars_in_trade,
"vol_spike_ratio": sig["vol_spike_ratio"],
"vwap_dist_pct": sig["vwap_dist_pct"],
"rsi": sig["rsi"],
}
trades.append(trade)
in_trade = False
# Cooldown: 30 candles (30 min on 1m)
cooldown_until = exit_idx + 30
return trades
# ============================================================
# REPORTING
# ============================================================
def print_report(symbol, trades):
"""Print backtest results for a symbol."""
if not trades:
print(f"\n{symbol}: 0 signals found")
return {}
n = len(trades)
wins = [t for t in trades if t["pnl_usd"] > 0]
losses = [t for t in trades if t["pnl_usd"] <= 0]
wr = len(wins) / n * 100
total_pnl = sum(t["pnl_usd"] for t in trades)
avg_pnl = total_pnl / n
avg_win = np.mean([t["pnl_usd"] for t in wins]) if wins else 0
avg_loss = np.mean([t["pnl_usd"] for t in losses]) if losses else 0
pf = abs(sum(t["pnl_usd"] for t in wins) / sum(t["pnl_usd"] for t in losses)) if losses and sum(t["pnl_usd"] for t in losses) != 0 else 0
tp1_hits = [t for t in trades if t["tp1_hit"]]
tp1_rate = len(tp1_hits) / n * 100
avg_mfe = np.mean([t["mfe_pct"] for t in trades])
avg_mae = np.mean([t["mae_pct"] for t in trades])
avg_bars = np.mean([t["bars"] for t in trades])
# By result type
by_res = {}
for t in trades:
r = t["result"]
by_res.setdefault(r, []).append(t["pnl_usd"])
print(f"\n{'='*55}")
print(f"πͺ {symbol} β {n} trades ({DAYS_BACK} days, 1m)")
print(f"{'='*55}")
print(f"WR: {wr:.0f}% ({len(wins)}W/{len(losses)}L) | PnL: ${total_pnl:+.2f}")
print(f"Avg win: ${avg_win:+.3f} | Avg loss: ${avg_loss:+.3f} | PF: {pf:.2f}")
print(f"TP1 hit: {len(tp1_hits)}/{n} ({tp1_rate:.0f}%)")
print(f"Avg MFE: {avg_mfe:+.2f}% | Avg MAE: {avg_mae:+.2f}% | Avg bars: {avg_bars:.0f}")
for r, pnls in sorted(by_res.items()):
w = sum(1 for p in pnls if p > 0)
print(f" {r:10}: {len(pnls):3} trades, {w}W/{len(pnls)-w}L, ${sum(pnls):+.2f}")
# Show individual trades
if n <= 30:
print(f"\nTrades:")
for t in trades:
emoji = "β
" if t["pnl_usd"] > 0 else "β"
tp1f = "π―" if t["tp1_hit"] else " "
print(f" {emoji}{tp1f} {t['direction']:5} {t['result']:8} "
f"pnl:{t['pnl_pct']:+6.2f}% ${t['pnl_usd']:+.3f} "
f"MFE:{t['mfe_pct']:+5.2f}% MAE:{t['mae_pct']:+5.2f}% "
f"bars:{t['bars']:3} vol:{t['vol_spike_ratio']}x "
f"vwap:{t['vwap_dist_pct']}% rsi:{t['rsi']}")
return {
"symbol": symbol, "trades": n, "wr": round(wr, 1),
"pnl": round(total_pnl, 2), "pf": round(pf, 2),
"tp1_rate": round(tp1_rate, 1), "avg_mfe": round(avg_mfe, 2),
"avg_mae": round(avg_mae, 2), "avg_bars": round(avg_bars, 1),
}
# ============================================================
# MAIN
# ============================================================
def main():
symbols = SYMBOLS
if len(sys.argv) > 1:
symbols = [sys.argv[1].upper()]
print(f"πͺ ΠΠ°ΡΠΎΡΠΊΠΈ Backtest β {TIMEFRAME}m, {DAYS_BACK} days")
print(f"Vol spike: {VOL_SPIKE_MULT}x | VWAP ext: {VWAP_EXT_PCT*100}%")
print(f"RSI: <{RSI_OVERSOLD} / >{RSI_OVERBOUGHT}")
print(f"TP1: {TP1_PCT*100}% | Trail: {TRAIL_CALLBACK_PCT*100}% | SL cap: {SL_CAP_PCT*100}%")
print(f"Commission: {COMMISSION*100}% | Slippage: {SLIPPAGE*100}%")
print(f"Symbols: {len(symbols)}")
all_results = []
all_trades = []
for symbol in symbols:
print(f"\nπ₯ Downloading {symbol} 1m ({DAYS_BACK}d)...", end=" ", flush=True)
df = get_bybit_klines(symbol, TIMEFRAME, DAYS_BACK)
if df is None or len(df) < 200:
print(f"SKIP (no data)")
continue
print(f"{len(df)} candles")
signals = find_signals(df)
print(f" Signals found: {len(signals)}")
trades = run_backtest(df, signals)
result = print_report(symbol, trades)
if result:
all_results.append(result)
all_trades.extend(trades)
# Summary
if all_results:
print(f"\n{'='*60}")
print(f"π SUMMARY β {len(all_results)} symbols, {sum(r['trades'] for r in all_results)} trades")
print(f"{'='*60}")
print(f"\n{'Symbol':14} {'#':>4} {'WR':>5} {'PnL':>8} {'PF':>5} {'TP1%':>5} {'MFE':>6} {'MAE':>6}")
print("-" * 55)
for r in sorted(all_results, key=lambda x: x["pnl"], reverse=True):
print(f"{r['symbol']:14} {r['trades']:4} {r['wr']:4.0f}% ${r['pnl']:+6.2f} {r['pf']:5.2f} {r['tp1_rate']:4.0f}% {r['avg_mfe']:+5.2f}% {r['avg_mae']:+5.2f}%")
total_trades = sum(r["trades"] for r in all_results)
total_pnl = sum(r["pnl"] for r in all_results)
total_wins = sum(1 for t in all_trades if t["pnl_usd"] > 0)
total_wr = total_wins / total_trades * 100 if total_trades else 0
print(f"\n{'TOTAL':14} {total_trades:4} {total_wr:4.0f}% ${total_pnl:+6.2f}")
print(f"Per day: ~{total_trades/DAYS_BACK:.0f} trades, ~${total_pnl/DAYS_BACK:+.2f}/day")
# Save results
results_file = Path(__file__).parent / "results_zatochki.json"
with open(results_file, "w") as f:
json.dump({"config": {
"timeframe": TIMEFRAME, "days": DAYS_BACK,
"vol_spike": VOL_SPIKE_MULT, "vwap_ext": VWAP_EXT_PCT,
"rsi_os": RSI_OVERSOLD, "rsi_ob": RSI_OVERBOUGHT,
"tp1": TP1_PCT, "trail": TRAIL_CALLBACK_PCT,
"commission": COMMISSION, "slippage": SLIPPAGE,
}, "results": all_results, "trades": all_trades}, f, indent=2, default=str)
print(f"\nResults saved to {results_file}")
if __name__ == "__main__":
main()