← Back
β˜†
#!/usr/bin/env python3
"""
Π—Π°Ρ‚ΠΎΡ‡ΠΊΠΈ (Knife Catcher) β€” Backtest
====================================
Volume exhaustion reversal on 1m Bybit Futures.
Downloads klines from Bybit, runs strategy, reports stats.

Usage:
  python3 backtest_zatochki.py                  # all symbols
  python3 backtest_zatochki.py SIRENUSDT        # single symbol
"""

import sys
import json
import time
import math
import requests
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from pathlib import Path

# ============================================================
# CONFIG
# ============================================================

# Symbols to test β€” mid-cap alts from our screener that had trades
SYMBOLS = [
    "SIRENUSDT", "STOUSDT", "JCTUSDT", "PIPPINUSDT", "ONGUSDT",
    "PUFFERUSDT", "NOMUSDT", "EDGEUSDT", "DUSDT", "ARIAUSDT",
    "MMTUSDT", "BERAUSDT", "AIOTUSDT", "KOMAUSDT",
]

TIMEFRAME = "1"  # 1 minute (Bybit format)
DAYS_BACK = 7    # 7 days default
COMMISSION = 0.00055  # 0.055% Bybit taker (per side)
SLIPPAGE = 0.0001    # 0.01%

# Strategy params β€” v2 optimized (from v1 analysis)
VOL_SPIKE_MULT = 7.0     # was 5x β†’ 7x (8x+ bucket WR 69%)
VOL_SMA_PERIOD = 20      # lookback for volume average
SPIKE_LOOKBACK = 5       # spike in last N candles
VWAP_PERIOD = 50         # rolling VWAP lookback
VWAP_EXT_PCT = 0.02      # price extension >= 2% from VWAP
RSI_PERIOD = 14
RSI_OVERSOLD = 25
RSI_OVERBOUGHT = 80
EXHAUSTION_BARS = 2      # N declining volume bars after spike
SL_CAP_PCT = 0.012       # cap SL at 1.2%

# Risk
SL_BUFFER_PCT = 0.003    # 0.3% buffer beyond spike wick
TP1_PCT = 0.007          # was 0.5% β†’ 0.7% (47% trades reach MFE 0.7%, covers commission)
TP1_CLOSE_RATIO = 0.5    # close 50% at TP1
TRAIL_CALLBACK_PCT = 0.005  # 0.5% trailing after TP1
TRADE_SIZE_USD = 5.0
LEVERAGE = 10
NOTIONAL = TRADE_SIZE_USD * LEVERAGE  # $50

# ============================================================
# DATA DOWNLOAD β€” Bybit
# ============================================================

def get_bybit_klines(symbol, interval, days_back):
    """Download klines from Bybit linear futures."""
    url = "https://api.bybit.com/v5/market/kline"

    interval_sec = {"1": 60, "5": 300, "15": 900, "60": 3600}
    sec = interval_sec.get(str(interval), 60)

    end_ms = int(time.time() * 1000)
    start_ms = end_ms - days_back * 24 * 3600 * 1000

    all_klines = []
    current_end = end_ms

    while current_end > start_ms:
        params = {
            "category": "linear",
            "symbol": symbol,
            "interval": str(interval),
            "end": current_end,
            "limit": 1000,
        }

        try:
            resp = requests.get(url, params=params, timeout=10)
            data = resp.json()

            rows = data.get("result", {}).get("list", [])
            if not rows:
                break

            all_klines.extend(rows)
            # Bybit returns newest first, last item = oldest
            oldest_ts = int(rows[-1][0])
            current_end = oldest_ts - 1
            time.sleep(0.1)

        except Exception as e:
            print(f"  Error fetching {symbol}: {e}")
            time.sleep(1)
            continue

    if not all_klines:
        return None

    # Bybit format: [startTime, open, high, low, close, volume, turnover]
    df = pd.DataFrame(all_klines, columns=[
        "timestamp", "open", "high", "low", "close", "volume", "turnover"
    ])

    df["timestamp"] = pd.to_datetime(df["timestamp"].astype(int), unit="ms")
    for col in ["open", "high", "low", "close", "volume", "turnover"]:
        df[col] = df[col].astype(float)

    df = df.drop_duplicates(subset=["timestamp"]).sort_values("timestamp").reset_index(drop=True)
    return df


# ============================================================
# INDICATORS
# ============================================================

def calc_rsi(closes, period=14):
    """RSI calculation."""
    deltas = np.diff(closes)
    gains = np.where(deltas > 0, deltas, 0)
    losses = np.where(deltas < 0, -deltas, 0)

    avg_gain = np.zeros(len(closes))
    avg_loss = np.zeros(len(closes))

    if len(gains) < period:
        return np.full(len(closes), 50.0)

    avg_gain[period] = np.mean(gains[:period])
    avg_loss[period] = np.mean(losses[:period])

    for i in range(period + 1, len(closes)):
        avg_gain[i] = (avg_gain[i-1] * (period - 1) + gains[i-1]) / period
        avg_loss[i] = (avg_loss[i-1] * (period - 1) + losses[i-1]) / period

    rsi = np.full(len(closes), 50.0)
    for i in range(period, len(closes)):
        if avg_loss[i] == 0:
            rsi[i] = 100.0
        else:
            rs = avg_gain[i] / avg_loss[i]
            rsi[i] = 100 - (100 / (1 + rs))

    return rsi


def calc_rolling_vwap(df, period=50):
    """Rolling VWAP (volume-weighted average price)."""
    typical = (df["high"] + df["low"] + df["close"]) / 3
    tp_vol = typical * df["volume"]

    vwap = tp_vol.rolling(period).sum() / df["volume"].rolling(period).sum()
    return vwap


def calc_volume_sma(volumes, period=20):
    """Simple moving average of volume."""
    return pd.Series(volumes).rolling(period).mean()


# ============================================================
# STRATEGY β€” Zatochki (Knife Catcher)
# ============================================================

def find_signals(df):
    """
    Scan dataframe for knife-catch signals.
    Returns list of signal dicts.
    """
    closes = df["close"].values
    highs = df["high"].values
    lows = df["low"].values
    volumes = df["volume"].values

    rsi = calc_rsi(closes, RSI_PERIOD)
    vwap = calc_rolling_vwap(df, VWAP_PERIOD).values
    vol_sma = calc_volume_sma(volumes, VOL_SMA_PERIOD).values

    signals = []
    min_idx = max(VOL_SMA_PERIOD, VWAP_PERIOD, RSI_PERIOD) + SPIKE_LOOKBACK + EXHAUSTION_BARS

    for i in range(min_idx, len(df)):
        # Skip if VWAP or vol_sma is NaN
        if np.isnan(vwap[i]) or np.isnan(vol_sma[i]) or vol_sma[i] == 0:
            continue

        # 1. Volume spike in last SPIKE_LOOKBACK candles
        spike_window = volumes[i - SPIKE_LOOKBACK - EXHAUSTION_BARS : i - EXHAUSTION_BARS + 1]
        max_vol_ratio = np.max(spike_window) / vol_sma[i] if vol_sma[i] > 0 else 0

        if max_vol_ratio < VOL_SPIKE_MULT:
            continue

        # Find the spike candle index
        spike_rel_idx = np.argmax(spike_window)
        spike_abs_idx = i - SPIKE_LOOKBACK - EXHAUSTION_BARS + spike_rel_idx

        # 2. Volume exhaustion β€” EXHAUSTION_BARS declining after spike area
        exhaustion = True
        for j in range(1, EXHAUSTION_BARS + 1):
            check_idx = i - EXHAUSTION_BARS + j
            if check_idx <= 0 or volumes[check_idx] >= volumes[check_idx - 1]:
                exhaustion = False
                break

        if not exhaustion:
            continue

        # 3. Price extension from VWAP
        vwap_dist = (closes[i] - vwap[i]) / vwap[i]
        if abs(vwap_dist) < VWAP_EXT_PCT:
            continue

        # 4. RSI extreme
        rsi_val = rsi[i]

        # Determine direction
        if rsi_val < RSI_OVERSOLD and vwap_dist < 0:
            direction = "LONG"  # oversold + below VWAP β†’ buy the dip
        elif rsi_val > RSI_OVERBOUGHT and vwap_dist > 0:
            direction = "SHORT"  # overbought + above VWAP β†’ sell the top
        else:
            continue

        # Calculate dynamic SL from spike wick
        if direction == "LONG":
            # SL below the lowest low in spike window
            spike_low = np.min(lows[spike_abs_idx : i + 1])
            sl_price = spike_low * (1 - SL_BUFFER_PCT)
            sl_pct = (closes[i] - sl_price) / closes[i]
        else:
            spike_high = np.max(highs[spike_abs_idx : i + 1])
            sl_price = spike_high * (1 + SL_BUFFER_PCT)
            sl_pct = (sl_price - closes[i]) / closes[i]

        # Cap SL at SL_CAP_PCT (was unlimited β€” wide SLs killed PnL)
        if sl_pct > SL_CAP_PCT:
            if direction == "LONG":
                sl_price = closes[i] * (1 - SL_CAP_PCT)
            else:
                sl_price = closes[i] * (1 + SL_CAP_PCT)
            sl_pct = SL_CAP_PCT

        # Skip if SL too tight (<0.3%)
        if sl_pct < 0.003:
            continue

        signals.append({
            "idx": i,
            "timestamp": df["timestamp"].iloc[i],
            "direction": direction,
            "entry_price": closes[i],
            "sl_price": sl_price,
            "sl_pct": sl_pct,
            "vol_spike_ratio": round(max_vol_ratio, 1),
            "vwap_dist_pct": round(abs(vwap_dist) * 100, 2),
            "rsi": round(rsi_val, 1),
            "exhaustion_bars": EXHAUSTION_BARS,
        })

    return signals


# ============================================================
# BACKTEST ENGINE
# ============================================================

def run_backtest(df, signals):
    """
    Simulate trades with TP1 partial + trailing.
    Returns list of trade results.
    """
    trades = []
    in_trade = False
    cooldown_until = -1

    for sig in signals:
        entry_idx = sig["idx"]

        if entry_idx <= cooldown_until:
            continue

        if in_trade:
            continue

        entry_price = sig["entry_price"]
        sl_price = sig["sl_price"]
        direction = sig["direction"]

        # TP1 price
        if direction == "LONG":
            tp1_price = entry_price * (1 + TP1_PCT)
        else:
            tp1_price = entry_price * (1 - TP1_PCT)

        # Simulate forward
        in_trade = True
        tp1_hit = False
        trail_high = 0
        mfe = 0  # max favorable excursion
        mae = 0  # max adverse excursion
        exit_price = None
        exit_reason = None
        exit_idx = None

        for j in range(entry_idx + 1, min(entry_idx + 300, len(df))):  # max 300 bars (5h)
            high = df["high"].iloc[j]
            low = df["low"].iloc[j]
            close = df["close"].iloc[j]

            if direction == "LONG":
                current_pnl_pct = (close - entry_price) / entry_price
                bar_best = (high - entry_price) / entry_price
                bar_worst = (low - entry_price) / entry_price
            else:
                current_pnl_pct = (entry_price - close) / entry_price
                bar_best = (entry_price - low) / entry_price
                bar_worst = (entry_price - high) / entry_price

            mfe = max(mfe, bar_best)
            mae = min(mae, bar_worst)

            # Check SL (before TP1)
            if not tp1_hit:
                if direction == "LONG" and low <= sl_price:
                    exit_price = sl_price
                    exit_reason = "SL"
                    exit_idx = j
                    break
                elif direction == "SHORT" and high >= sl_price:
                    exit_price = sl_price
                    exit_reason = "SL"
                    exit_idx = j
                    break

            # Check TP1
            if not tp1_hit:
                if direction == "LONG" and high >= tp1_price:
                    tp1_hit = True
                    trail_high = high
                    # SL moves to BE
                    sl_price = entry_price
                    continue
                elif direction == "SHORT" and low <= tp1_price:
                    tp1_hit = True
                    trail_high = low  # for SHORT, track lowest
                    sl_price = entry_price
                    continue

            # Trailing stop (after TP1)
            if tp1_hit:
                if direction == "LONG":
                    if high > trail_high:
                        trail_high = high
                    trail_sl = trail_high * (1 - TRAIL_CALLBACK_PCT)
                    if low <= trail_sl:
                        exit_price = trail_sl
                        exit_reason = "TRAIL"
                        exit_idx = j
                        break
                    # SL at BE
                    if low <= sl_price:
                        exit_price = sl_price
                        exit_reason = "TP1+BE"
                        exit_idx = j
                        break
                else:  # SHORT
                    if low < trail_high or trail_high == 0:
                        trail_high = low
                    trail_sl = trail_high * (1 + TRAIL_CALLBACK_PCT)
                    if high >= trail_sl:
                        exit_price = trail_sl
                        exit_reason = "TRAIL"
                        exit_idx = j
                        break
                    if high >= sl_price:
                        exit_price = sl_price
                        exit_reason = "TP1+BE"
                        exit_idx = j
                        break

        # If no exit found in 300 bars, close at market
        if exit_price is None:
            exit_price = df["close"].iloc[min(entry_idx + 300, len(df) - 1)]
            exit_reason = "TIMEOUT"
            exit_idx = min(entry_idx + 300, len(df) - 1)

        # Calculate PnL
        if direction == "LONG":
            raw_pnl_pct = (exit_price - entry_price) / entry_price
        else:
            raw_pnl_pct = (entry_price - exit_price) / entry_price

        # Adjust for TP1 partial if hit
        if tp1_hit and exit_reason in ("TRAIL", "TP1+BE"):
            # TP1 portion: 50% at +TP1_PCT
            tp1_pnl = TP1_PCT * TP1_CLOSE_RATIO
            # Remaining 50%: at exit
            if direction == "LONG":
                rest_pnl_pct = (exit_price - entry_price) / entry_price
            else:
                rest_pnl_pct = (entry_price - exit_price) / entry_price
            rest_pnl = rest_pnl_pct * (1 - TP1_CLOSE_RATIO)
            total_pnl_pct = tp1_pnl + rest_pnl
        else:
            total_pnl_pct = raw_pnl_pct

        # Commission (both sides)
        total_pnl_pct -= COMMISSION * 2
        if tp1_hit:
            total_pnl_pct -= COMMISSION * 2 * 0.5  # extra commission for partial close

        # Slippage
        total_pnl_pct -= SLIPPAGE * 2

        pnl_usd = NOTIONAL * total_pnl_pct

        bars_in_trade = exit_idx - entry_idx

        trade = {
            "timestamp": str(sig["timestamp"]),
            "direction": direction,
            "entry_price": entry_price,
            "exit_price": exit_price,
            "sl_pct": round(sig["sl_pct"] * 100, 2),
            "result": exit_reason,
            "pnl_pct": round(total_pnl_pct * 100, 3),
            "pnl_usd": round(pnl_usd, 3),
            "tp1_hit": tp1_hit,
            "trail_high": trail_high,
            "mfe_pct": round(mfe * 100, 3),
            "mae_pct": round(mae * 100, 3),
            "bars": bars_in_trade,
            "vol_spike_ratio": sig["vol_spike_ratio"],
            "vwap_dist_pct": sig["vwap_dist_pct"],
            "rsi": sig["rsi"],
        }
        trades.append(trade)

        in_trade = False
        # Cooldown: 30 candles (30 min on 1m)
        cooldown_until = exit_idx + 30

    return trades


# ============================================================
# REPORTING
# ============================================================

def print_report(symbol, trades):
    """Print backtest results for a symbol."""
    if not trades:
        print(f"\n{symbol}: 0 signals found")
        return {}

    n = len(trades)
    wins = [t for t in trades if t["pnl_usd"] > 0]
    losses = [t for t in trades if t["pnl_usd"] <= 0]
    wr = len(wins) / n * 100
    total_pnl = sum(t["pnl_usd"] for t in trades)
    avg_pnl = total_pnl / n

    avg_win = np.mean([t["pnl_usd"] for t in wins]) if wins else 0
    avg_loss = np.mean([t["pnl_usd"] for t in losses]) if losses else 0
    pf = abs(sum(t["pnl_usd"] for t in wins) / sum(t["pnl_usd"] for t in losses)) if losses and sum(t["pnl_usd"] for t in losses) != 0 else 0

    tp1_hits = [t for t in trades if t["tp1_hit"]]
    tp1_rate = len(tp1_hits) / n * 100

    avg_mfe = np.mean([t["mfe_pct"] for t in trades])
    avg_mae = np.mean([t["mae_pct"] for t in trades])
    avg_bars = np.mean([t["bars"] for t in trades])

    # By result type
    by_res = {}
    for t in trades:
        r = t["result"]
        by_res.setdefault(r, []).append(t["pnl_usd"])

    print(f"\n{'='*55}")
    print(f"πŸ”ͺ {symbol} β€” {n} trades ({DAYS_BACK} days, 1m)")
    print(f"{'='*55}")
    print(f"WR: {wr:.0f}% ({len(wins)}W/{len(losses)}L) | PnL: ${total_pnl:+.2f}")
    print(f"Avg win: ${avg_win:+.3f} | Avg loss: ${avg_loss:+.3f} | PF: {pf:.2f}")
    print(f"TP1 hit: {len(tp1_hits)}/{n} ({tp1_rate:.0f}%)")
    print(f"Avg MFE: {avg_mfe:+.2f}% | Avg MAE: {avg_mae:+.2f}% | Avg bars: {avg_bars:.0f}")

    for r, pnls in sorted(by_res.items()):
        w = sum(1 for p in pnls if p > 0)
        print(f"  {r:10}: {len(pnls):3} trades, {w}W/{len(pnls)-w}L, ${sum(pnls):+.2f}")

    # Show individual trades
    if n <= 30:
        print(f"\nTrades:")
        for t in trades:
            emoji = "βœ…" if t["pnl_usd"] > 0 else "❌"
            tp1f = "🎯" if t["tp1_hit"] else "  "
            print(f"  {emoji}{tp1f} {t['direction']:5} {t['result']:8} "
                  f"pnl:{t['pnl_pct']:+6.2f}% ${t['pnl_usd']:+.3f} "
                  f"MFE:{t['mfe_pct']:+5.2f}% MAE:{t['mae_pct']:+5.2f}% "
                  f"bars:{t['bars']:3} vol:{t['vol_spike_ratio']}x "
                  f"vwap:{t['vwap_dist_pct']}% rsi:{t['rsi']}")

    return {
        "symbol": symbol, "trades": n, "wr": round(wr, 1),
        "pnl": round(total_pnl, 2), "pf": round(pf, 2),
        "tp1_rate": round(tp1_rate, 1), "avg_mfe": round(avg_mfe, 2),
        "avg_mae": round(avg_mae, 2), "avg_bars": round(avg_bars, 1),
    }


# ============================================================
# MAIN
# ============================================================

def main():
    symbols = SYMBOLS
    if len(sys.argv) > 1:
        symbols = [sys.argv[1].upper()]

    print(f"πŸ”ͺ Π—Π°Ρ‚ΠΎΡ‡ΠΊΠΈ Backtest β€” {TIMEFRAME}m, {DAYS_BACK} days")
    print(f"Vol spike: {VOL_SPIKE_MULT}x | VWAP ext: {VWAP_EXT_PCT*100}%")
    print(f"RSI: <{RSI_OVERSOLD} / >{RSI_OVERBOUGHT}")
    print(f"TP1: {TP1_PCT*100}% | Trail: {TRAIL_CALLBACK_PCT*100}% | SL cap: {SL_CAP_PCT*100}%")
    print(f"Commission: {COMMISSION*100}% | Slippage: {SLIPPAGE*100}%")
    print(f"Symbols: {len(symbols)}")

    all_results = []
    all_trades = []

    for symbol in symbols:
        print(f"\nπŸ“₯ Downloading {symbol} 1m ({DAYS_BACK}d)...", end=" ", flush=True)
        df = get_bybit_klines(symbol, TIMEFRAME, DAYS_BACK)

        if df is None or len(df) < 200:
            print(f"SKIP (no data)")
            continue

        print(f"{len(df)} candles")

        signals = find_signals(df)
        print(f"   Signals found: {len(signals)}")

        trades = run_backtest(df, signals)
        result = print_report(symbol, trades)

        if result:
            all_results.append(result)
            all_trades.extend(trades)

    # Summary
    if all_results:
        print(f"\n{'='*60}")
        print(f"πŸ“Š SUMMARY β€” {len(all_results)} symbols, {sum(r['trades'] for r in all_results)} trades")
        print(f"{'='*60}")

        print(f"\n{'Symbol':14} {'#':>4} {'WR':>5} {'PnL':>8} {'PF':>5} {'TP1%':>5} {'MFE':>6} {'MAE':>6}")
        print("-" * 55)
        for r in sorted(all_results, key=lambda x: x["pnl"], reverse=True):
            print(f"{r['symbol']:14} {r['trades']:4} {r['wr']:4.0f}% ${r['pnl']:+6.2f} {r['pf']:5.2f} {r['tp1_rate']:4.0f}% {r['avg_mfe']:+5.2f}% {r['avg_mae']:+5.2f}%")

        total_trades = sum(r["trades"] for r in all_results)
        total_pnl = sum(r["pnl"] for r in all_results)
        total_wins = sum(1 for t in all_trades if t["pnl_usd"] > 0)
        total_wr = total_wins / total_trades * 100 if total_trades else 0

        print(f"\n{'TOTAL':14} {total_trades:4} {total_wr:4.0f}% ${total_pnl:+6.2f}")
        print(f"Per day: ~{total_trades/DAYS_BACK:.0f} trades, ~${total_pnl/DAYS_BACK:+.2f}/day")

        # Save results
        results_file = Path(__file__).parent / "results_zatochki.json"
        with open(results_file, "w") as f:
            json.dump({"config": {
                "timeframe": TIMEFRAME, "days": DAYS_BACK,
                "vol_spike": VOL_SPIKE_MULT, "vwap_ext": VWAP_EXT_PCT,
                "rsi_os": RSI_OVERSOLD, "rsi_ob": RSI_OVERBOUGHT,
                "tp1": TP1_PCT, "trail": TRAIL_CALLBACK_PCT,
                "commission": COMMISSION, "slippage": SLIPPAGE,
            }, "results": all_results, "trades": all_trades}, f, indent=2, default=str)
        print(f"\nResults saved to {results_file}")


if __name__ == "__main__":
    main()

πŸ“œ Git History

c6f6bd5chore: initial commit β€” version control setup5 weeks ago
Show last diff
Loading...