← Back
ā˜†
"""
Backtest: Z-VWAP Clean Mean Reversion (NO DCA / NO Safety Orders)
=================================================================
Clean entry → TP 3% / SL 1.5% / Z-TP → R:R 2:1

Filters: NATR >= 0.75%, Volume >= $20M
Blacklist: BTC, ETH, USDC only (heavyweights)

Usage:
  python backtests/backtest_zvwap_clean.py
  python backtests/backtest_zvwap_clean.py --days 14
  python backtests/backtest_zvwap_clean.py --top 30
"""

import sys, os, argparse
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

import json
import time
import numpy as np
from datetime import datetime
from pybit.unified_trading import HTTP

# ============================================================
# STRATEGY PARAMETERS
# ============================================================
CONFIG = {
    "order_usd": 5.0,           # single entry, no SOs
    "take_profit_pct": 3.0,     # TP from entry
    "stop_loss_pct": 1.5,       # SL from entry  →  R:R = 2:1
    "z_entry_threshold": 1.8,   # |Z| > 1.8 to enter
    "z_max_threshold": 2.5,     # |Z| > 2.5 = breakout, skip
    "z_tp_threshold": 0.3,      # |Z| < 0.3 = fair value exit (only if in profit)
    "cooldown_bars": 12,        # 12 bars Ɨ 5m = 1 hour
    "leverage": 3,
    "min_natr_pct": 0.75,       # NATR >= 0.75%
    "min_volume_24h": 20_000_000,
}

VWAP_PERIOD = 50
NATR_PERIOD = 14
MAKER_FEE = 0.0002   # 0.02% (TP limit)
TAKER_FEE = 0.00055  # 0.055% (entry market, SL market)

BLACKLIST = {"BTCUSDT", "ETHUSDT", "USDCUSDT", "BTCPERP", "ETHPERP"}


# ============================================================
# DATA FETCHING
# ============================================================
def fetch_klines(session, symbol: str, interval: str = "5", days: int = 7) -> list[dict]:
    """Fetch klines from Bybit."""
    all_klines = []
    bars_needed = days * 24 * 60 // int(interval)
    end_time = int(datetime.now().timestamp() * 1000)

    while len(all_klines) < bars_needed:
        try:
            resp = session.get_kline(
                category="linear", symbol=symbol,
                interval=interval, limit=1000, end=end_time,
            )
            if resp["retCode"] != 0:
                break
            items = resp["result"]["list"]
            if not items:
                break
            for item in items:
                all_klines.append({
                    "ts": int(item[0]),
                    "open": float(item[1]),
                    "high": float(item[2]),
                    "low": float(item[3]),
                    "close": float(item[4]),
                    "volume": float(item[5]),
                    "turnover": float(item[6]) if len(item) > 6 else 0,
                })
            end_time = int(items[-1][0]) - 1
            if len(items) < 1000:
                break
        except Exception as e:
            print(f"  Fetch error {symbol}: {e}")
            break

    all_klines.reverse()
    seen = set()
    unique = []
    for k in all_klines:
        if k["ts"] not in seen:
            seen.add(k["ts"])
            unique.append(k)
    return unique[-bars_needed:] if len(unique) > bars_needed else unique


def get_top_symbols(session, top_n=20, min_volume=20_000_000) -> list[tuple[str, float]]:
    """Get top USDT perps by 24h volume, excluding blacklist."""
    try:
        tickers = session.get_tickers(category="linear")
        if not tickers or "result" not in tickers:
            return []
        candidates = []
        for t in tickers["result"]["list"]:
            sym = t["symbol"]
            if not sym.endswith("USDT") or sym in BLACKLIST:
                continue
            vol = float(t.get("turnover24h", 0))
            if vol >= min_volume:
                candidates.append((sym, vol))
        candidates.sort(key=lambda x: x[1], reverse=True)
        return candidates[:top_n]
    except Exception as e:
        print(f"Error fetching tickers: {e}")
        return []


# ============================================================
# INDICATORS
# ============================================================
def calc_zvwap(highs, lows, closes, volumes, period=VWAP_PERIOD):
    """Calculate rolling Z-Score from VWAP."""
    n = len(closes)
    z_scores = np.full(n, 0.0)
    for i in range(period, n):
        h = highs[i-period:i]
        l = lows[i-period:i]
        c = closes[i-period:i]
        v = volumes[i-period:i]
        tp = (h + l + c) / 3
        cum_tp_vol = np.cumsum(tp * v)
        cum_vol = np.cumsum(v)
        cum_vol_safe = np.where(cum_vol == 0, 1, cum_vol)
        vwap_arr = cum_tp_vol / cum_vol_safe
        vwap = vwap_arr[-1]
        deviations = c - vwap_arr
        std = np.std(deviations)
        if std > 0:
            z_scores[i] = (closes[i] - vwap) / std
    return z_scores


def calc_natr(highs, lows, closes, period=NATR_PERIOD):
    """Calculate NATR (Normalized ATR) array. Returns % values."""
    n = len(closes)
    natr = np.full(n, 0.0)
    for i in range(1, n):
        tr = max(highs[i] - lows[i],
                 abs(highs[i] - closes[i-1]),
                 abs(lows[i] - closes[i-1]))
        if i >= period:
            trs = []
            for j in range(i - period + 1, i + 1):
                trs.append(max(highs[j] - lows[j],
                               abs(highs[j] - closes[j-1]),
                               abs(lows[j] - closes[j-1])))
            atr = np.mean(trs)
            natr[i] = (atr / closes[i]) * 100 if closes[i] > 0 else 0
    return natr


# ============================================================
# TRADE SIMULATOR (clean, no DCA)
# ============================================================
class Trade:
    """One clean trade: entry → TP/SL/Z-TP → exit."""

    def __init__(self, symbol, side, entry_price, entry_bar, z_score, natr, config):
        self.symbol = symbol
        self.side = side          # "LONG" or "SHORT"
        self.entry_price = entry_price
        self.entry_bar = entry_bar
        self.z_entry = z_score
        self.natr_entry = natr
        self.config = config

        lev = config["leverage"]
        self.qty = (config["order_usd"] * lev) / entry_price
        self.invested = config["order_usd"]

        # TP / SL prices
        if side == "LONG":
            self.tp_price = entry_price * (1 + config["take_profit_pct"] / 100)
            self.sl_price = entry_price * (1 - config["stop_loss_pct"] / 100)
        else:
            self.tp_price = entry_price * (1 - config["take_profit_pct"] / 100)
            self.sl_price = entry_price * (1 + config["stop_loss_pct"] / 100)

        self.closed = False
        self.close_price = 0
        self.close_bar = 0
        self.close_reason = ""
        self.pnl = 0
        self.max_dd_pct = 0
        self.z_exit = 0

    def tick(self, bar_idx, high, low, close, z_score):
        """Process one bar. Returns True if trade closed."""
        if self.closed:
            return True

        # Track max drawdown
        if self.side == "LONG":
            dd = (self.entry_price - low) / self.entry_price * 100
        else:
            dd = (high - self.entry_price) / self.entry_price * 100
        self.max_dd_pct = max(self.max_dd_pct, dd)

        # Check SL first (worst case)
        sl_hit = False
        if self.side == "LONG" and low <= self.sl_price:
            sl_hit = True
            self.close_price = self.sl_price
        elif self.side == "SHORT" and high >= self.sl_price:
            sl_hit = True
            self.close_price = self.sl_price

        # Check TP
        tp_hit = False
        if self.side == "LONG" and high >= self.tp_price:
            tp_hit = True
            self.close_price = self.tp_price
        elif self.side == "SHORT" and low <= self.tp_price:
            tp_hit = True
            self.close_price = self.tp_price

        # Check Z-TP (fair value reversion, only if in profit)
        z_tp = False
        if self.side == "LONG" and z_score >= -self.config["z_tp_threshold"]:
            if close > self.entry_price:
                z_tp = True
                self.close_price = close
        elif self.side == "SHORT" and z_score <= self.config["z_tp_threshold"]:
            if close < self.entry_price:
                z_tp = True
                self.close_price = close

        # Priority: SL > TP% > Z-TP
        # On same bar: if both SL and TP could hit, SL wins (conservative)
        if sl_hit:
            self._close(bar_idx, "SL", z_score)
            return True
        elif tp_hit:
            self._close(bar_idx, "TP%", z_score)
            return True
        elif z_tp:
            self._close(bar_idx, "Z-TP", z_score)
            return True

        return False

    def _close(self, bar_idx, reason, z_score):
        self.closed = True
        self.close_bar = bar_idx
        self.close_reason = reason
        self.z_exit = z_score

        if self.side == "LONG":
            self.pnl = self.qty * (self.close_price - self.entry_price)
        else:
            self.pnl = self.qty * (self.entry_price - self.close_price)

        # Fees: entry = taker (market), exit = maker if TP% (limit), taker otherwise
        entry_fee = self.qty * self.entry_price * TAKER_FEE
        if reason == "TP%":
            exit_fee = self.qty * self.close_price * MAKER_FEE
        else:
            exit_fee = self.qty * self.close_price * TAKER_FEE
        self.pnl -= (entry_fee + exit_fee)


# ============================================================
# BACKTEST ENGINE
# ============================================================
def run_backtest(symbol, klines, config):
    """Run clean Z-VWAP backtest on one symbol."""
    n = len(klines)
    if n < VWAP_PERIOD + 20:
        return None

    closes = np.array([k["close"] for k in klines])
    highs = np.array([k["high"] for k in klines])
    lows = np.array([k["low"] for k in klines])
    volumes = np.array([k["volume"] for k in klines])

    z_scores = calc_zvwap(highs, lows, closes, volumes, VWAP_PERIOD)
    natrs = calc_natr(highs, lows, closes, NATR_PERIOD)

    trades = []
    active_trade = None
    cooldown_until = 0

    for i in range(VWAP_PERIOD, n):
        z = z_scores[i]
        natr = natrs[i]

        # Process active trade
        if active_trade:
            closed = active_trade.tick(i, highs[i], lows[i], closes[i], z)
            if closed:
                trades.append(active_trade)
                cooldown_until = i + config["cooldown_bars"]
                active_trade = None
            continue

        # Check for new entry
        if i < cooldown_until:
            continue

        # Filter: NATR
        if natr < config["min_natr_pct"]:
            continue

        # Filter: Z-cap (breakout skip)
        if abs(z) > config["z_max_threshold"]:
            continue

        # Entry signal
        threshold = config["z_entry_threshold"]
        if z < -threshold:
            active_trade = Trade(symbol, "LONG", closes[i], i, z, natr, config)
        elif z > threshold:
            active_trade = Trade(symbol, "SHORT", closes[i], i, z, natr, config)

    # Force-close open trade at end
    if active_trade and not active_trade.closed:
        active_trade.close_price = closes[-1]
        active_trade._close(n - 1, "END", z_scores[-1])
        trades.append(active_trade)

    if not trades:
        return None

    # Aggregate
    wins = [t for t in trades if t.pnl > 0]
    losses = [t for t in trades if t.pnl <= 0]
    longs = [t for t in trades if t.side == "LONG"]
    shorts = [t for t in trades if t.side == "SHORT"]

    total_pnl = sum(t.pnl for t in trades)
    gross_profit = sum(t.pnl for t in wins)
    gross_loss = abs(sum(t.pnl for t in losses))
    pf = gross_profit / gross_loss if gross_loss > 0 else 999

    tp_trades = [t for t in trades if t.close_reason == "TP%"]
    ztp_trades = [t for t in trades if t.close_reason == "Z-TP"]
    sl_trades = [t for t in trades if t.close_reason == "SL"]

    return {
        "symbol": symbol,
        "trades": len(trades),
        "wins": len(wins),
        "losses": len(losses),
        "win_rate": round(len(wins) / len(trades) * 100, 1),
        "total_pnl": round(total_pnl, 4),
        "profit_factor": round(pf, 2),
        "avg_win": round(sum(t.pnl for t in wins) / len(wins), 4) if wins else 0,
        "avg_loss": round(sum(t.pnl for t in losses) / len(losses), 4) if losses else 0,
        "max_dd_pct": round(max(t.max_dd_pct for t in trades), 2),
        "avg_duration_bars": round(sum(t.close_bar - t.entry_bar for t in trades) / len(trades), 1),
        # Exit reasons
        "tp_count": len(tp_trades),
        "ztp_count": len(ztp_trades),
        "sl_count": len(sl_trades),
        "end_count": sum(1 for t in trades if t.close_reason == "END"),
        "tp_pnl": round(sum(t.pnl for t in tp_trades), 4),
        "ztp_pnl": round(sum(t.pnl for t in ztp_trades), 4),
        "sl_pnl": round(sum(t.pnl for t in sl_trades), 4),
        # Long vs Short
        "longs": len(longs),
        "shorts": len(shorts),
        "long_pnl": round(sum(t.pnl for t in longs), 4),
        "short_pnl": round(sum(t.pnl for t in shorts), 4),
        "long_wr": round(sum(1 for t in longs if t.pnl > 0) / len(longs) * 100, 1) if longs else 0,
        "short_wr": round(sum(1 for t in shorts if t.pnl > 0) / len(shorts) * 100, 1) if shorts else 0,
        # NATR analysis
        "avg_natr": round(sum(t.natr_entry for t in trades) / len(trades), 2),
        "avg_z_entry": round(sum(abs(t.z_entry) for t in trades) / len(trades), 2),
        # Per-trade detail
        "details": [
            {
                "side": t.side[0],  # L or S
                "entry": round(t.entry_price, 6),
                "exit": round(t.close_price, 6),
                "pnl": round(t.pnl, 4),
                "reason": t.close_reason,
                "bars": t.close_bar - t.entry_bar,
                "z_in": round(t.z_entry, 2),
                "z_out": round(t.z_exit, 2),
                "natr": round(t.natr_entry, 2),
                "dd": round(t.max_dd_pct, 2),
            }
            for t in trades
        ],
    }


# ============================================================
# MAIN
# ============================================================
def main():
    parser = argparse.ArgumentParser(description="Z-VWAP Clean Backtest")
    parser.add_argument("--days", type=int, default=7, help="Days of data (default 7)")
    parser.add_argument("--top", type=int, default=20, help="Top N symbols by volume (default 20)")
    parser.add_argument("--tp", type=float, default=None, help="Override TP %")
    parser.add_argument("--sl", type=float, default=None, help="Override SL %")
    parser.add_argument("--z-entry", type=float, default=None, help="Override Z entry threshold")
    parser.add_argument("--symbols", type=str, default=None, help="Comma-separated symbols (skip screener)")
    args = parser.parse_args()

    config = CONFIG.copy()
    if args.tp:
        config["take_profit_pct"] = args.tp
    if args.sl:
        config["stop_loss_pct"] = args.sl
    if args.z_entry:
        config["z_entry_threshold"] = args.z_entry

    rr = config["take_profit_pct"] / config["stop_loss_pct"]

    print("=" * 70)
    print("  Z-VWAP Clean Mean Reversion Backtest")
    print("=" * 70)
    print(f"\n  Entry: ${config['order_usd']} Ɨ {config['leverage']}x leverage")
    print(f"  TP: {config['take_profit_pct']}% | SL: {config['stop_loss_pct']}% | R:R = {rr:.1f}:1")
    print(f"  Z-entry: |Z| > {config['z_entry_threshold']} | Z-max: |Z| > {config['z_max_threshold']} | Z-TP: |Z| < {config['z_tp_threshold']}")
    print(f"  Filters: NATR >= {config['min_natr_pct']}% | Vol >= ${config['min_volume_24h']/1e6:.0f}M")
    print(f"  Blacklist: {', '.join(sorted(BLACKLIST))}")
    print(f"  Period: {args.days} days | Cooldown: {config['cooldown_bars']} bars (5m)")
    print(f"  Breakeven WR needed: {1/(1+rr)*100:.0f}%")
    print()

    session = HTTP(testnet=False)

    # Get symbols
    if args.symbols:
        symbols = [(s.strip().upper(), 0) for s in args.symbols.split(",")]
        print(f"  Using {len(symbols)} specified symbols\n")
    else:
        print(f"  Fetching top {args.top} symbols by volume...", end=" ", flush=True)
        symbols = get_top_symbols(session, top_n=args.top, min_volume=config["min_volume_24h"])
        print(f"{len(symbols)} found\n")

    all_results = []
    total_start = time.time()

    for idx, (symbol, vol_24h) in enumerate(symbols):
        vol_str = f"${vol_24h/1e6:.0f}M" if vol_24h > 0 else ""
        print(f"[{idx+1}/{len(symbols)}] {symbol} {vol_str}", end=" ", flush=True)

        klines = fetch_klines(session, symbol, "5", days=args.days)
        if len(klines) < VWAP_PERIOD + 50:
            print(f"— skip ({len(klines)} bars)")
            continue

        result = run_backtest(symbol, klines, config)
        if not result or result["trades"] == 0:
            print(f"— no trades ({len(klines)} bars)")
            continue

        all_results.append(result)

        # Compact output
        pnl_e = "🟢" if result["total_pnl"] > 0 else "šŸ”“"
        wr_e = "🟢" if result["win_rate"] >= 50 else "🟔" if result["win_rate"] >= 34 else "šŸ”“"
        print(f"— {pnl_e} ${result['total_pnl']:+.2f} | {wr_e} WR {result['win_rate']}% | "
              f"PF {result['profit_factor']} | {result['trades']}t "
              f"(TP:{result['tp_count']} Z:{result['ztp_count']} SL:{result['sl_count']}) | "
              f"L:{result['longs']}/S:{result['shorts']}")

        time.sleep(0.3)  # rate limit

    elapsed = time.time() - total_start

    if not all_results:
        print("\nāŒ No results!")
        return

    # ============================================================
    # SUMMARY
    # ============================================================
    print("\n" + "=" * 70)
    print("  SUMMARY")
    print("=" * 70)

    total_trades = sum(r["trades"] for r in all_results)
    total_wins = sum(r["wins"] for r in all_results)
    total_losses = sum(r["losses"] for r in all_results)
    total_pnl = sum(r["total_pnl"] for r in all_results)
    gross_profit = sum(max(0, r["total_pnl"]) for r in all_results)
    gross_loss_syms = sum(abs(min(0, r["total_pnl"])) for r in all_results)

    all_wins_pnl = sum(r["avg_win"] * r["wins"] for r in all_results)
    all_losses_pnl = sum(r["avg_loss"] * r["losses"] for r in all_results)
    avg_win = all_wins_pnl / total_wins if total_wins > 0 else 0
    avg_loss = all_losses_pnl / total_losses if total_losses > 0 else 0

    total_tp = sum(r["tp_count"] for r in all_results)
    total_ztp = sum(r["ztp_count"] for r in all_results)
    total_sl = sum(r["sl_count"] for r in all_results)
    total_end = sum(r["end_count"] for r in all_results)

    tp_pnl = sum(r["tp_pnl"] for r in all_results)
    ztp_pnl = sum(r["ztp_pnl"] for r in all_results)
    sl_pnl = sum(r["sl_pnl"] for r in all_results)

    wr = total_wins / total_trades * 100 if total_trades > 0 else 0
    pf = abs(all_wins_pnl / all_losses_pnl) if all_losses_pnl != 0 else 999

    pnl_emoji = "🟢" if total_pnl > 0 else "šŸ”“"

    print(f"\n  {pnl_emoji} Total PnL: ${total_pnl:+.2f}")
    print(f"  šŸ“Š Trades: {total_trades} ({total_wins}W / {total_losses}L)")
    print(f"  šŸŽÆ Win Rate: {wr:.1f}% (breakeven: {1/(1+rr)*100:.0f}%)")
    print(f"  šŸ“ˆ Profit Factor: {pf:.2f}")
    print(f"  šŸ’° Avg Win: ${avg_win:+.4f} | Avg Loss: ${avg_loss:+.4f} | Ratio: 1:{abs(avg_win/avg_loss):.1f}" if avg_loss != 0 else "")
    print(f"  šŸ“‰ Max DD: {max(r['max_dd_pct'] for r in all_results):.2f}%")
    print(f"  ā±  Avg Duration: {sum(r['avg_duration_bars']*r['trades'] for r in all_results)/total_trades:.0f} bars (~{sum(r['avg_duration_bars']*r['trades'] for r in all_results)/total_trades*5:.0f} min)")

    print(f"\n  Exit Breakdown:")
    print(f"    TP%:  {total_tp:>4}  (${tp_pnl:+.2f})")
    print(f"    Z-TP: {total_ztp:>4}  (${ztp_pnl:+.2f})")
    print(f"    SL:   {total_sl:>4}  (${sl_pnl:+.2f})")
    if total_end:
        print(f"    END:  {total_end:>4}")

    # Top / Bottom symbols
    sorted_by_pnl = sorted(all_results, key=lambda r: r["total_pnl"], reverse=True)
    print(f"\n  šŸ† Top 5 Symbols:")
    for r in sorted_by_pnl[:5]:
        print(f"    🟢 {r['symbol']:>14} ${r['total_pnl']:+.2f} | {r['trades']}t WR {r['win_rate']}% PF {r['profit_factor']}")
    print(f"\n  šŸ’€ Bottom 5 Symbols:")
    for r in sorted_by_pnl[-5:]:
        print(f"    šŸ”“ {r['symbol']:>14} ${r['total_pnl']:+.2f} | {r['trades']}t WR {r['win_rate']}% PF {r['profit_factor']}")

    # Profitable vs unprofitable symbols
    profitable = [r for r in all_results if r["total_pnl"] > 0]
    unprofitable = [r for r in all_results if r["total_pnl"] <= 0]
    print(f"\n  Symbols: {len(profitable)} profitable / {len(unprofitable)} unprofitable out of {len(all_results)}")

    # NATR analysis
    print(f"\n  NATR Distribution of trades:")
    all_details = []
    for r in all_results:
        all_details.extend(r["details"])

    natr_buckets = {
        "0.75-1.0%": [d for d in all_details if 0.75 <= d["natr"] < 1.0],
        "1.0-1.5%":  [d for d in all_details if 1.0 <= d["natr"] < 1.5],
        "1.5-2.0%":  [d for d in all_details if 1.5 <= d["natr"] < 2.0],
        "2.0-3.0%":  [d for d in all_details if 2.0 <= d["natr"] < 3.0],
        "3.0%+":     [d for d in all_details if d["natr"] >= 3.0],
    }
    for label, bucket in natr_buckets.items():
        if not bucket:
            continue
        bw = sum(1 for d in bucket if d["pnl"] > 0)
        bpnl = sum(d["pnl"] for d in bucket)
        bwr = bw / len(bucket) * 100
        e = "🟢" if bpnl > 0 else "šŸ”“"
        print(f"    {e} NATR {label:>8}: {len(bucket):>3}t  WR {bwr:.0f}%  PnL ${bpnl:+.2f}")

    # Z-score entry analysis
    print(f"\n  Z-Score Entry Distribution:")
    z_buckets = {
        "1.8-2.0": [d for d in all_details if 1.8 <= abs(d["z_in"]) < 2.0],
        "2.0-2.2": [d for d in all_details if 2.0 <= abs(d["z_in"]) < 2.2],
        "2.2-2.5": [d for d in all_details if 2.2 <= abs(d["z_in"]) <= 2.5],
    }
    for label, bucket in z_buckets.items():
        if not bucket:
            continue
        bw = sum(1 for d in bucket if d["pnl"] > 0)
        bpnl = sum(d["pnl"] for d in bucket)
        bwr = bw / len(bucket) * 100
        e = "🟢" if bpnl > 0 else "šŸ”“"
        print(f"    {e} Z {label}: {len(bucket):>3}t  WR {bwr:.0f}%  PnL ${bpnl:+.2f}")

    print(f"\n  ā± Backtest completed in {elapsed:.0f}s")

    # Save results
    output_path = os.path.join(os.path.dirname(__file__), "results_zvwap_clean.json")
    save_data = {
        "config": config,
        "days": args.days,
        "top_n": args.top,
        "run_time": datetime.now().isoformat(),
        "summary": {
            "total_pnl": round(total_pnl, 4),
            "trades": total_trades,
            "win_rate": round(wr, 1),
            "profit_factor": round(pf, 2),
            "avg_win": round(avg_win, 4),
            "avg_loss": round(avg_loss, 4),
        },
        "results": [{k: v for k, v in r.items() if k != "details"} for r in all_results],
        "all_trades": all_details,
    }
    with open(output_path, "w") as f:
        json.dump(save_data, f, indent=2)
    print(f"  šŸ’¾ Saved to {output_path}")


if __name__ == "__main__":
    main()

šŸ“œ Git History

dd32dfdchore: initial commit — version control setup5 weeks ago
Show last diff
Loading...