โ ะะฐะทะฐะด"""
Backtest: Z-VWAP Clean Mean Reversion (NO DCA / NO Safety Orders)
=================================================================
Clean entry โ TP 3% / SL 1.5% / Z-TP โ R:R 2:1
Filters: NATR >= 0.75%, Volume >= $20M
Blacklist: BTC, ETH, USDC only (heavyweights)
Usage:
python backtests/backtest_zvwap_clean.py
python backtests/backtest_zvwap_clean.py --days 14
python backtests/backtest_zvwap_clean.py --top 30
"""
import sys, os, argparse
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import json
import time
import numpy as np
from datetime import datetime
from pybit.unified_trading import HTTP
# ============================================================
# STRATEGY PARAMETERS
# ============================================================
CONFIG = {
"order_usd": 5.0, # single entry, no SOs
"take_profit_pct": 3.0, # TP from entry
"stop_loss_pct": 1.5, # SL from entry โ R:R = 2:1
"z_entry_threshold": 1.8, # |Z| > 1.8 to enter
"z_max_threshold": 2.5, # |Z| > 2.5 = breakout, skip
"z_tp_threshold": 0.3, # |Z| < 0.3 = fair value exit (only if in profit)
"cooldown_bars": 12, # 12 bars ร 5m = 1 hour
"leverage": 3,
"min_natr_pct": 0.75, # NATR >= 0.75%
"min_volume_24h": 20_000_000,
}
VWAP_PERIOD = 50
NATR_PERIOD = 14
MAKER_FEE = 0.0002 # 0.02% (TP limit)
TAKER_FEE = 0.00055 # 0.055% (entry market, SL market)
BLACKLIST = {"BTCUSDT", "ETHUSDT", "USDCUSDT", "BTCPERP", "ETHPERP"}
# ============================================================
# DATA FETCHING
# ============================================================
def fetch_klines(session, symbol: str, interval: str = "5", days: int = 7) -> list[dict]:
"""Fetch klines from Bybit."""
all_klines = []
bars_needed = days * 24 * 60 // int(interval)
end_time = int(datetime.now().timestamp() * 1000)
while len(all_klines) < bars_needed:
try:
resp = session.get_kline(
category="linear", symbol=symbol,
interval=interval, limit=1000, end=end_time,
)
if resp["retCode"] != 0:
break
items = resp["result"]["list"]
if not items:
break
for item in items:
all_klines.append({
"ts": int(item[0]),
"open": float(item[1]),
"high": float(item[2]),
"low": float(item[3]),
"close": float(item[4]),
"volume": float(item[5]),
"turnover": float(item[6]) if len(item) > 6 else 0,
})
end_time = int(items[-1][0]) - 1
if len(items) < 1000:
break
except Exception as e:
print(f" Fetch error {symbol}: {e}")
break
all_klines.reverse()
seen = set()
unique = []
for k in all_klines:
if k["ts"] not in seen:
seen.add(k["ts"])
unique.append(k)
return unique[-bars_needed:] if len(unique) > bars_needed else unique
def get_top_symbols(session, top_n=20, min_volume=20_000_000) -> list[tuple[str, float]]:
"""Get top USDT perps by 24h volume, excluding blacklist."""
try:
tickers = session.get_tickers(category="linear")
if not tickers or "result" not in tickers:
return []
candidates = []
for t in tickers["result"]["list"]:
sym = t["symbol"]
if not sym.endswith("USDT") or sym in BLACKLIST:
continue
vol = float(t.get("turnover24h", 0))
if vol >= min_volume:
candidates.append((sym, vol))
candidates.sort(key=lambda x: x[1], reverse=True)
return candidates[:top_n]
except Exception as e:
print(f"Error fetching tickers: {e}")
return []
# ============================================================
# INDICATORS
# ============================================================
def calc_zvwap(highs, lows, closes, volumes, period=VWAP_PERIOD):
"""Calculate rolling Z-Score from VWAP."""
n = len(closes)
z_scores = np.full(n, 0.0)
for i in range(period, n):
h = highs[i-period:i]
l = lows[i-period:i]
c = closes[i-period:i]
v = volumes[i-period:i]
tp = (h + l + c) / 3
cum_tp_vol = np.cumsum(tp * v)
cum_vol = np.cumsum(v)
cum_vol_safe = np.where(cum_vol == 0, 1, cum_vol)
vwap_arr = cum_tp_vol / cum_vol_safe
vwap = vwap_arr[-1]
deviations = c - vwap_arr
std = np.std(deviations)
if std > 0:
z_scores[i] = (closes[i] - vwap) / std
return z_scores
def calc_natr(highs, lows, closes, period=NATR_PERIOD):
"""Calculate NATR (Normalized ATR) array. Returns % values."""
n = len(closes)
natr = np.full(n, 0.0)
for i in range(1, n):
tr = max(highs[i] - lows[i],
abs(highs[i] - closes[i-1]),
abs(lows[i] - closes[i-1]))
if i >= period:
trs = []
for j in range(i - period + 1, i + 1):
trs.append(max(highs[j] - lows[j],
abs(highs[j] - closes[j-1]),
abs(lows[j] - closes[j-1])))
atr = np.mean(trs)
natr[i] = (atr / closes[i]) * 100 if closes[i] > 0 else 0
return natr
# ============================================================
# TRADE SIMULATOR (clean, no DCA)
# ============================================================
class Trade:
"""One clean trade: entry โ TP/SL/Z-TP โ exit."""
def __init__(self, symbol, side, entry_price, entry_bar, z_score, natr, config):
self.symbol = symbol
self.side = side # "LONG" or "SHORT"
self.entry_price = entry_price
self.entry_bar = entry_bar
self.z_entry = z_score
self.natr_entry = natr
self.config = config
lev = config["leverage"]
self.qty = (config["order_usd"] * lev) / entry_price
self.invested = config["order_usd"]
# TP / SL prices
if side == "LONG":
self.tp_price = entry_price * (1 + config["take_profit_pct"] / 100)
self.sl_price = entry_price * (1 - config["stop_loss_pct"] / 100)
else:
self.tp_price = entry_price * (1 - config["take_profit_pct"] / 100)
self.sl_price = entry_price * (1 + config["stop_loss_pct"] / 100)
self.closed = False
self.close_price = 0
self.close_bar = 0
self.close_reason = ""
self.pnl = 0
self.max_dd_pct = 0
self.z_exit = 0
def tick(self, bar_idx, high, low, close, z_score):
"""Process one bar. Returns True if trade closed."""
if self.closed:
return True
# Track max drawdown
if self.side == "LONG":
dd = (self.entry_price - low) / self.entry_price * 100
else:
dd = (high - self.entry_price) / self.entry_price * 100
self.max_dd_pct = max(self.max_dd_pct, dd)
# Check SL first (worst case)
sl_hit = False
if self.side == "LONG" and low <= self.sl_price:
sl_hit = True
self.close_price = self.sl_price
elif self.side == "SHORT" and high >= self.sl_price:
sl_hit = True
self.close_price = self.sl_price
# Check TP
tp_hit = False
if self.side == "LONG" and high >= self.tp_price:
tp_hit = True
self.close_price = self.tp_price
elif self.side == "SHORT" and low <= self.tp_price:
tp_hit = True
self.close_price = self.tp_price
# Check Z-TP (fair value reversion, only if in profit)
z_tp = False
if self.side == "LONG" and z_score >= -self.config["z_tp_threshold"]:
if close > self.entry_price:
z_tp = True
self.close_price = close
elif self.side == "SHORT" and z_score <= self.config["z_tp_threshold"]:
if close < self.entry_price:
z_tp = True
self.close_price = close
# Priority: SL > TP% > Z-TP
# On same bar: if both SL and TP could hit, SL wins (conservative)
if sl_hit:
self._close(bar_idx, "SL", z_score)
return True
elif tp_hit:
self._close(bar_idx, "TP%", z_score)
return True
elif z_tp:
self._close(bar_idx, "Z-TP", z_score)
return True
return False
def _close(self, bar_idx, reason, z_score):
self.closed = True
self.close_bar = bar_idx
self.close_reason = reason
self.z_exit = z_score
if self.side == "LONG":
self.pnl = self.qty * (self.close_price - self.entry_price)
else:
self.pnl = self.qty * (self.entry_price - self.close_price)
# Fees: entry = taker (market), exit = maker if TP% (limit), taker otherwise
entry_fee = self.qty * self.entry_price * TAKER_FEE
if reason == "TP%":
exit_fee = self.qty * self.close_price * MAKER_FEE
else:
exit_fee = self.qty * self.close_price * TAKER_FEE
self.pnl -= (entry_fee + exit_fee)
# ============================================================
# BACKTEST ENGINE
# ============================================================
def run_backtest(symbol, klines, config):
"""Run clean Z-VWAP backtest on one symbol."""
n = len(klines)
if n < VWAP_PERIOD + 20:
return None
closes = np.array([k["close"] for k in klines])
highs = np.array([k["high"] for k in klines])
lows = np.array([k["low"] for k in klines])
volumes = np.array([k["volume"] for k in klines])
z_scores = calc_zvwap(highs, lows, closes, volumes, VWAP_PERIOD)
natrs = calc_natr(highs, lows, closes, NATR_PERIOD)
trades = []
active_trade = None
cooldown_until = 0
for i in range(VWAP_PERIOD, n):
z = z_scores[i]
natr = natrs[i]
# Process active trade
if active_trade:
closed = active_trade.tick(i, highs[i], lows[i], closes[i], z)
if closed:
trades.append(active_trade)
cooldown_until = i + config["cooldown_bars"]
active_trade = None
continue
# Check for new entry
if i < cooldown_until:
continue
# Filter: NATR
if natr < config["min_natr_pct"]:
continue
# Filter: Z-cap (breakout skip)
if abs(z) > config["z_max_threshold"]:
continue
# Entry signal
threshold = config["z_entry_threshold"]
if z < -threshold:
active_trade = Trade(symbol, "LONG", closes[i], i, z, natr, config)
elif z > threshold:
active_trade = Trade(symbol, "SHORT", closes[i], i, z, natr, config)
# Force-close open trade at end
if active_trade and not active_trade.closed:
active_trade.close_price = closes[-1]
active_trade._close(n - 1, "END", z_scores[-1])
trades.append(active_trade)
if not trades:
return None
# Aggregate
wins = [t for t in trades if t.pnl > 0]
losses = [t for t in trades if t.pnl <= 0]
longs = [t for t in trades if t.side == "LONG"]
shorts = [t for t in trades if t.side == "SHORT"]
total_pnl = sum(t.pnl for t in trades)
gross_profit = sum(t.pnl for t in wins)
gross_loss = abs(sum(t.pnl for t in losses))
pf = gross_profit / gross_loss if gross_loss > 0 else 999
tp_trades = [t for t in trades if t.close_reason == "TP%"]
ztp_trades = [t for t in trades if t.close_reason == "Z-TP"]
sl_trades = [t for t in trades if t.close_reason == "SL"]
return {
"symbol": symbol,
"trades": len(trades),
"wins": len(wins),
"losses": len(losses),
"win_rate": round(len(wins) / len(trades) * 100, 1),
"total_pnl": round(total_pnl, 4),
"profit_factor": round(pf, 2),
"avg_win": round(sum(t.pnl for t in wins) / len(wins), 4) if wins else 0,
"avg_loss": round(sum(t.pnl for t in losses) / len(losses), 4) if losses else 0,
"max_dd_pct": round(max(t.max_dd_pct for t in trades), 2),
"avg_duration_bars": round(sum(t.close_bar - t.entry_bar for t in trades) / len(trades), 1),
# Exit reasons
"tp_count": len(tp_trades),
"ztp_count": len(ztp_trades),
"sl_count": len(sl_trades),
"end_count": sum(1 for t in trades if t.close_reason == "END"),
"tp_pnl": round(sum(t.pnl for t in tp_trades), 4),
"ztp_pnl": round(sum(t.pnl for t in ztp_trades), 4),
"sl_pnl": round(sum(t.pnl for t in sl_trades), 4),
# Long vs Short
"longs": len(longs),
"shorts": len(shorts),
"long_pnl": round(sum(t.pnl for t in longs), 4),
"short_pnl": round(sum(t.pnl for t in shorts), 4),
"long_wr": round(sum(1 for t in longs if t.pnl > 0) / len(longs) * 100, 1) if longs else 0,
"short_wr": round(sum(1 for t in shorts if t.pnl > 0) / len(shorts) * 100, 1) if shorts else 0,
# NATR analysis
"avg_natr": round(sum(t.natr_entry for t in trades) / len(trades), 2),
"avg_z_entry": round(sum(abs(t.z_entry) for t in trades) / len(trades), 2),
# Per-trade detail
"details": [
{
"side": t.side[0], # L or S
"entry": round(t.entry_price, 6),
"exit": round(t.close_price, 6),
"pnl": round(t.pnl, 4),
"reason": t.close_reason,
"bars": t.close_bar - t.entry_bar,
"z_in": round(t.z_entry, 2),
"z_out": round(t.z_exit, 2),
"natr": round(t.natr_entry, 2),
"dd": round(t.max_dd_pct, 2),
}
for t in trades
],
}
# ============================================================
# MAIN
# ============================================================
def main():
parser = argparse.ArgumentParser(description="Z-VWAP Clean Backtest")
parser.add_argument("--days", type=int, default=7, help="Days of data (default 7)")
parser.add_argument("--top", type=int, default=20, help="Top N symbols by volume (default 20)")
parser.add_argument("--tp", type=float, default=None, help="Override TP %")
parser.add_argument("--sl", type=float, default=None, help="Override SL %")
parser.add_argument("--z-entry", type=float, default=None, help="Override Z entry threshold")
parser.add_argument("--symbols", type=str, default=None, help="Comma-separated symbols (skip screener)")
args = parser.parse_args()
config = CONFIG.copy()
if args.tp:
config["take_profit_pct"] = args.tp
if args.sl:
config["stop_loss_pct"] = args.sl
if args.z_entry:
config["z_entry_threshold"] = args.z_entry
rr = config["take_profit_pct"] / config["stop_loss_pct"]
print("=" * 70)
print(" Z-VWAP Clean Mean Reversion Backtest")
print("=" * 70)
print(f"\n Entry: ${config['order_usd']} ร {config['leverage']}x leverage")
print(f" TP: {config['take_profit_pct']}% | SL: {config['stop_loss_pct']}% | R:R = {rr:.1f}:1")
print(f" Z-entry: |Z| > {config['z_entry_threshold']} | Z-max: |Z| > {config['z_max_threshold']} | Z-TP: |Z| < {config['z_tp_threshold']}")
print(f" Filters: NATR >= {config['min_natr_pct']}% | Vol >= ${config['min_volume_24h']/1e6:.0f}M")
print(f" Blacklist: {', '.join(sorted(BLACKLIST))}")
print(f" Period: {args.days} days | Cooldown: {config['cooldown_bars']} bars (5m)")
print(f" Breakeven WR needed: {1/(1+rr)*100:.0f}%")
print()
session = HTTP(testnet=False)
# Get symbols
if args.symbols:
symbols = [(s.strip().upper(), 0) for s in args.symbols.split(",")]
print(f" Using {len(symbols)} specified symbols\n")
else:
print(f" Fetching top {args.top} symbols by volume...", end=" ", flush=True)
symbols = get_top_symbols(session, top_n=args.top, min_volume=config["min_volume_24h"])
print(f"{len(symbols)} found\n")
all_results = []
total_start = time.time()
for idx, (symbol, vol_24h) in enumerate(symbols):
vol_str = f"${vol_24h/1e6:.0f}M" if vol_24h > 0 else ""
print(f"[{idx+1}/{len(symbols)}] {symbol} {vol_str}", end=" ", flush=True)
klines = fetch_klines(session, symbol, "5", days=args.days)
if len(klines) < VWAP_PERIOD + 50:
print(f"โ skip ({len(klines)} bars)")
continue
result = run_backtest(symbol, klines, config)
if not result or result["trades"] == 0:
print(f"โ no trades ({len(klines)} bars)")
continue
all_results.append(result)
# Compact output
pnl_e = "๐ข" if result["total_pnl"] > 0 else "๐ด"
wr_e = "๐ข" if result["win_rate"] >= 50 else "๐ก" if result["win_rate"] >= 34 else "๐ด"
print(f"โ {pnl_e} ${result['total_pnl']:+.2f} | {wr_e} WR {result['win_rate']}% | "
f"PF {result['profit_factor']} | {result['trades']}t "
f"(TP:{result['tp_count']} Z:{result['ztp_count']} SL:{result['sl_count']}) | "
f"L:{result['longs']}/S:{result['shorts']}")
time.sleep(0.3) # rate limit
elapsed = time.time() - total_start
if not all_results:
print("\nโ No results!")
return
# ============================================================
# SUMMARY
# ============================================================
print("\n" + "=" * 70)
print(" SUMMARY")
print("=" * 70)
total_trades = sum(r["trades"] for r in all_results)
total_wins = sum(r["wins"] for r in all_results)
total_losses = sum(r["losses"] for r in all_results)
total_pnl = sum(r["total_pnl"] for r in all_results)
gross_profit = sum(max(0, r["total_pnl"]) for r in all_results)
gross_loss_syms = sum(abs(min(0, r["total_pnl"])) for r in all_results)
all_wins_pnl = sum(r["avg_win"] * r["wins"] for r in all_results)
all_losses_pnl = sum(r["avg_loss"] * r["losses"] for r in all_results)
avg_win = all_wins_pnl / total_wins if total_wins > 0 else 0
avg_loss = all_losses_pnl / total_losses if total_losses > 0 else 0
total_tp = sum(r["tp_count"] for r in all_results)
total_ztp = sum(r["ztp_count"] for r in all_results)
total_sl = sum(r["sl_count"] for r in all_results)
total_end = sum(r["end_count"] for r in all_results)
tp_pnl = sum(r["tp_pnl"] for r in all_results)
ztp_pnl = sum(r["ztp_pnl"] for r in all_results)
sl_pnl = sum(r["sl_pnl"] for r in all_results)
wr = total_wins / total_trades * 100 if total_trades > 0 else 0
pf = abs(all_wins_pnl / all_losses_pnl) if all_losses_pnl != 0 else 999
pnl_emoji = "๐ข" if total_pnl > 0 else "๐ด"
print(f"\n {pnl_emoji} Total PnL: ${total_pnl:+.2f}")
print(f" ๐ Trades: {total_trades} ({total_wins}W / {total_losses}L)")
print(f" ๐ฏ Win Rate: {wr:.1f}% (breakeven: {1/(1+rr)*100:.0f}%)")
print(f" ๐ Profit Factor: {pf:.2f}")
print(f" ๐ฐ Avg Win: ${avg_win:+.4f} | Avg Loss: ${avg_loss:+.4f} | Ratio: 1:{abs(avg_win/avg_loss):.1f}" if avg_loss != 0 else "")
print(f" ๐ Max DD: {max(r['max_dd_pct'] for r in all_results):.2f}%")
print(f" โฑ Avg Duration: {sum(r['avg_duration_bars']*r['trades'] for r in all_results)/total_trades:.0f} bars (~{sum(r['avg_duration_bars']*r['trades'] for r in all_results)/total_trades*5:.0f} min)")
print(f"\n Exit Breakdown:")
print(f" TP%: {total_tp:>4} (${tp_pnl:+.2f})")
print(f" Z-TP: {total_ztp:>4} (${ztp_pnl:+.2f})")
print(f" SL: {total_sl:>4} (${sl_pnl:+.2f})")
if total_end:
print(f" END: {total_end:>4}")
# Top / Bottom symbols
sorted_by_pnl = sorted(all_results, key=lambda r: r["total_pnl"], reverse=True)
print(f"\n ๐ Top 5 Symbols:")
for r in sorted_by_pnl[:5]:
print(f" ๐ข {r['symbol']:>14} ${r['total_pnl']:+.2f} | {r['trades']}t WR {r['win_rate']}% PF {r['profit_factor']}")
print(f"\n ๐ Bottom 5 Symbols:")
for r in sorted_by_pnl[-5:]:
print(f" ๐ด {r['symbol']:>14} ${r['total_pnl']:+.2f} | {r['trades']}t WR {r['win_rate']}% PF {r['profit_factor']}")
# Profitable vs unprofitable symbols
profitable = [r for r in all_results if r["total_pnl"] > 0]
unprofitable = [r for r in all_results if r["total_pnl"] <= 0]
print(f"\n Symbols: {len(profitable)} profitable / {len(unprofitable)} unprofitable out of {len(all_results)}")
# NATR analysis
print(f"\n NATR Distribution of trades:")
all_details = []
for r in all_results:
all_details.extend(r["details"])
natr_buckets = {
"0.75-1.0%": [d for d in all_details if 0.75 <= d["natr"] < 1.0],
"1.0-1.5%": [d for d in all_details if 1.0 <= d["natr"] < 1.5],
"1.5-2.0%": [d for d in all_details if 1.5 <= d["natr"] < 2.0],
"2.0-3.0%": [d for d in all_details if 2.0 <= d["natr"] < 3.0],
"3.0%+": [d for d in all_details if d["natr"] >= 3.0],
}
for label, bucket in natr_buckets.items():
if not bucket:
continue
bw = sum(1 for d in bucket if d["pnl"] > 0)
bpnl = sum(d["pnl"] for d in bucket)
bwr = bw / len(bucket) * 100
e = "๐ข" if bpnl > 0 else "๐ด"
print(f" {e} NATR {label:>8}: {len(bucket):>3}t WR {bwr:.0f}% PnL ${bpnl:+.2f}")
# Z-score entry analysis
print(f"\n Z-Score Entry Distribution:")
z_buckets = {
"1.8-2.0": [d for d in all_details if 1.8 <= abs(d["z_in"]) < 2.0],
"2.0-2.2": [d for d in all_details if 2.0 <= abs(d["z_in"]) < 2.2],
"2.2-2.5": [d for d in all_details if 2.2 <= abs(d["z_in"]) <= 2.5],
}
for label, bucket in z_buckets.items():
if not bucket:
continue
bw = sum(1 for d in bucket if d["pnl"] > 0)
bpnl = sum(d["pnl"] for d in bucket)
bwr = bw / len(bucket) * 100
e = "๐ข" if bpnl > 0 else "๐ด"
print(f" {e} Z {label}: {len(bucket):>3}t WR {bwr:.0f}% PnL ${bpnl:+.2f}")
print(f"\n โฑ Backtest completed in {elapsed:.0f}s")
# Save results
output_path = os.path.join(os.path.dirname(__file__), "results_zvwap_clean.json")
save_data = {
"config": config,
"days": args.days,
"top_n": args.top,
"run_time": datetime.now().isoformat(),
"summary": {
"total_pnl": round(total_pnl, 4),
"trades": total_trades,
"win_rate": round(wr, 1),
"profit_factor": round(pf, 2),
"avg_win": round(avg_win, 4),
"avg_loss": round(avg_loss, 4),
},
"results": [{k: v for k, v in r.items() if k != "details"} for r in all_results],
"all_trades": all_details,
}
with open(output_path, "w") as f:
json.dump(save_data, f, indent=2)
print(f" ๐พ Saved to {output_path}")
if __name__ == "__main__":
main()