โ† ะะฐะทะฐะด
""" Backtest: Z-VWAP Clean Mean Reversion (NO DCA / NO Safety Orders) ================================================================= Clean entry โ†’ TP 3% / SL 1.5% / Z-TP โ†’ R:R 2:1 Filters: NATR >= 0.75%, Volume >= $20M Blacklist: BTC, ETH, USDC only (heavyweights) Usage: python backtests/backtest_zvwap_clean.py python backtests/backtest_zvwap_clean.py --days 14 python backtests/backtest_zvwap_clean.py --top 30 """ import sys, os, argparse sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import json import time import numpy as np from datetime import datetime from pybit.unified_trading import HTTP # ============================================================ # STRATEGY PARAMETERS # ============================================================ CONFIG = { "order_usd": 5.0, # single entry, no SOs "take_profit_pct": 3.0, # TP from entry "stop_loss_pct": 1.5, # SL from entry โ†’ R:R = 2:1 "z_entry_threshold": 1.8, # |Z| > 1.8 to enter "z_max_threshold": 2.5, # |Z| > 2.5 = breakout, skip "z_tp_threshold": 0.3, # |Z| < 0.3 = fair value exit (only if in profit) "cooldown_bars": 12, # 12 bars ร— 5m = 1 hour "leverage": 3, "min_natr_pct": 0.75, # NATR >= 0.75% "min_volume_24h": 20_000_000, } VWAP_PERIOD = 50 NATR_PERIOD = 14 MAKER_FEE = 0.0002 # 0.02% (TP limit) TAKER_FEE = 0.00055 # 0.055% (entry market, SL market) BLACKLIST = {"BTCUSDT", "ETHUSDT", "USDCUSDT", "BTCPERP", "ETHPERP"} # ============================================================ # DATA FETCHING # ============================================================ def fetch_klines(session, symbol: str, interval: str = "5", days: int = 7) -> list[dict]: """Fetch klines from Bybit.""" all_klines = [] bars_needed = days * 24 * 60 // int(interval) end_time = int(datetime.now().timestamp() * 1000) while len(all_klines) < bars_needed: try: resp = session.get_kline( category="linear", symbol=symbol, interval=interval, limit=1000, end=end_time, ) if resp["retCode"] != 0: break items = resp["result"]["list"] if not items: break for item in items: all_klines.append({ "ts": int(item[0]), "open": float(item[1]), "high": float(item[2]), "low": float(item[3]), "close": float(item[4]), "volume": float(item[5]), "turnover": float(item[6]) if len(item) > 6 else 0, }) end_time = int(items[-1][0]) - 1 if len(items) < 1000: break except Exception as e: print(f" Fetch error {symbol}: {e}") break all_klines.reverse() seen = set() unique = [] for k in all_klines: if k["ts"] not in seen: seen.add(k["ts"]) unique.append(k) return unique[-bars_needed:] if len(unique) > bars_needed else unique def get_top_symbols(session, top_n=20, min_volume=20_000_000) -> list[tuple[str, float]]: """Get top USDT perps by 24h volume, excluding blacklist.""" try: tickers = session.get_tickers(category="linear") if not tickers or "result" not in tickers: return [] candidates = [] for t in tickers["result"]["list"]: sym = t["symbol"] if not sym.endswith("USDT") or sym in BLACKLIST: continue vol = float(t.get("turnover24h", 0)) if vol >= min_volume: candidates.append((sym, vol)) candidates.sort(key=lambda x: x[1], reverse=True) return candidates[:top_n] except Exception as e: print(f"Error fetching tickers: {e}") return [] # ============================================================ # INDICATORS # ============================================================ def calc_zvwap(highs, lows, closes, volumes, period=VWAP_PERIOD): """Calculate rolling Z-Score from VWAP.""" n = len(closes) z_scores = np.full(n, 0.0) for i in range(period, n): h = highs[i-period:i] l = lows[i-period:i] c = closes[i-period:i] v = volumes[i-period:i] tp = (h + l + c) / 3 cum_tp_vol = np.cumsum(tp * v) cum_vol = np.cumsum(v) cum_vol_safe = np.where(cum_vol == 0, 1, cum_vol) vwap_arr = cum_tp_vol / cum_vol_safe vwap = vwap_arr[-1] deviations = c - vwap_arr std = np.std(deviations) if std > 0: z_scores[i] = (closes[i] - vwap) / std return z_scores def calc_natr(highs, lows, closes, period=NATR_PERIOD): """Calculate NATR (Normalized ATR) array. Returns % values.""" n = len(closes) natr = np.full(n, 0.0) for i in range(1, n): tr = max(highs[i] - lows[i], abs(highs[i] - closes[i-1]), abs(lows[i] - closes[i-1])) if i >= period: trs = [] for j in range(i - period + 1, i + 1): trs.append(max(highs[j] - lows[j], abs(highs[j] - closes[j-1]), abs(lows[j] - closes[j-1]))) atr = np.mean(trs) natr[i] = (atr / closes[i]) * 100 if closes[i] > 0 else 0 return natr # ============================================================ # TRADE SIMULATOR (clean, no DCA) # ============================================================ class Trade: """One clean trade: entry โ†’ TP/SL/Z-TP โ†’ exit.""" def __init__(self, symbol, side, entry_price, entry_bar, z_score, natr, config): self.symbol = symbol self.side = side # "LONG" or "SHORT" self.entry_price = entry_price self.entry_bar = entry_bar self.z_entry = z_score self.natr_entry = natr self.config = config lev = config["leverage"] self.qty = (config["order_usd"] * lev) / entry_price self.invested = config["order_usd"] # TP / SL prices if side == "LONG": self.tp_price = entry_price * (1 + config["take_profit_pct"] / 100) self.sl_price = entry_price * (1 - config["stop_loss_pct"] / 100) else: self.tp_price = entry_price * (1 - config["take_profit_pct"] / 100) self.sl_price = entry_price * (1 + config["stop_loss_pct"] / 100) self.closed = False self.close_price = 0 self.close_bar = 0 self.close_reason = "" self.pnl = 0 self.max_dd_pct = 0 self.z_exit = 0 def tick(self, bar_idx, high, low, close, z_score): """Process one bar. Returns True if trade closed.""" if self.closed: return True # Track max drawdown if self.side == "LONG": dd = (self.entry_price - low) / self.entry_price * 100 else: dd = (high - self.entry_price) / self.entry_price * 100 self.max_dd_pct = max(self.max_dd_pct, dd) # Check SL first (worst case) sl_hit = False if self.side == "LONG" and low <= self.sl_price: sl_hit = True self.close_price = self.sl_price elif self.side == "SHORT" and high >= self.sl_price: sl_hit = True self.close_price = self.sl_price # Check TP tp_hit = False if self.side == "LONG" and high >= self.tp_price: tp_hit = True self.close_price = self.tp_price elif self.side == "SHORT" and low <= self.tp_price: tp_hit = True self.close_price = self.tp_price # Check Z-TP (fair value reversion, only if in profit) z_tp = False if self.side == "LONG" and z_score >= -self.config["z_tp_threshold"]: if close > self.entry_price: z_tp = True self.close_price = close elif self.side == "SHORT" and z_score <= self.config["z_tp_threshold"]: if close < self.entry_price: z_tp = True self.close_price = close # Priority: SL > TP% > Z-TP # On same bar: if both SL and TP could hit, SL wins (conservative) if sl_hit: self._close(bar_idx, "SL", z_score) return True elif tp_hit: self._close(bar_idx, "TP%", z_score) return True elif z_tp: self._close(bar_idx, "Z-TP", z_score) return True return False def _close(self, bar_idx, reason, z_score): self.closed = True self.close_bar = bar_idx self.close_reason = reason self.z_exit = z_score if self.side == "LONG": self.pnl = self.qty * (self.close_price - self.entry_price) else: self.pnl = self.qty * (self.entry_price - self.close_price) # Fees: entry = taker (market), exit = maker if TP% (limit), taker otherwise entry_fee = self.qty * self.entry_price * TAKER_FEE if reason == "TP%": exit_fee = self.qty * self.close_price * MAKER_FEE else: exit_fee = self.qty * self.close_price * TAKER_FEE self.pnl -= (entry_fee + exit_fee) # ============================================================ # BACKTEST ENGINE # ============================================================ def run_backtest(symbol, klines, config): """Run clean Z-VWAP backtest on one symbol.""" n = len(klines) if n < VWAP_PERIOD + 20: return None closes = np.array([k["close"] for k in klines]) highs = np.array([k["high"] for k in klines]) lows = np.array([k["low"] for k in klines]) volumes = np.array([k["volume"] for k in klines]) z_scores = calc_zvwap(highs, lows, closes, volumes, VWAP_PERIOD) natrs = calc_natr(highs, lows, closes, NATR_PERIOD) trades = [] active_trade = None cooldown_until = 0 for i in range(VWAP_PERIOD, n): z = z_scores[i] natr = natrs[i] # Process active trade if active_trade: closed = active_trade.tick(i, highs[i], lows[i], closes[i], z) if closed: trades.append(active_trade) cooldown_until = i + config["cooldown_bars"] active_trade = None continue # Check for new entry if i < cooldown_until: continue # Filter: NATR if natr < config["min_natr_pct"]: continue # Filter: Z-cap (breakout skip) if abs(z) > config["z_max_threshold"]: continue # Entry signal threshold = config["z_entry_threshold"] if z < -threshold: active_trade = Trade(symbol, "LONG", closes[i], i, z, natr, config) elif z > threshold: active_trade = Trade(symbol, "SHORT", closes[i], i, z, natr, config) # Force-close open trade at end if active_trade and not active_trade.closed: active_trade.close_price = closes[-1] active_trade._close(n - 1, "END", z_scores[-1]) trades.append(active_trade) if not trades: return None # Aggregate wins = [t for t in trades if t.pnl > 0] losses = [t for t in trades if t.pnl <= 0] longs = [t for t in trades if t.side == "LONG"] shorts = [t for t in trades if t.side == "SHORT"] total_pnl = sum(t.pnl for t in trades) gross_profit = sum(t.pnl for t in wins) gross_loss = abs(sum(t.pnl for t in losses)) pf = gross_profit / gross_loss if gross_loss > 0 else 999 tp_trades = [t for t in trades if t.close_reason == "TP%"] ztp_trades = [t for t in trades if t.close_reason == "Z-TP"] sl_trades = [t for t in trades if t.close_reason == "SL"] return { "symbol": symbol, "trades": len(trades), "wins": len(wins), "losses": len(losses), "win_rate": round(len(wins) / len(trades) * 100, 1), "total_pnl": round(total_pnl, 4), "profit_factor": round(pf, 2), "avg_win": round(sum(t.pnl for t in wins) / len(wins), 4) if wins else 0, "avg_loss": round(sum(t.pnl for t in losses) / len(losses), 4) if losses else 0, "max_dd_pct": round(max(t.max_dd_pct for t in trades), 2), "avg_duration_bars": round(sum(t.close_bar - t.entry_bar for t in trades) / len(trades), 1), # Exit reasons "tp_count": len(tp_trades), "ztp_count": len(ztp_trades), "sl_count": len(sl_trades), "end_count": sum(1 for t in trades if t.close_reason == "END"), "tp_pnl": round(sum(t.pnl for t in tp_trades), 4), "ztp_pnl": round(sum(t.pnl for t in ztp_trades), 4), "sl_pnl": round(sum(t.pnl for t in sl_trades), 4), # Long vs Short "longs": len(longs), "shorts": len(shorts), "long_pnl": round(sum(t.pnl for t in longs), 4), "short_pnl": round(sum(t.pnl for t in shorts), 4), "long_wr": round(sum(1 for t in longs if t.pnl > 0) / len(longs) * 100, 1) if longs else 0, "short_wr": round(sum(1 for t in shorts if t.pnl > 0) / len(shorts) * 100, 1) if shorts else 0, # NATR analysis "avg_natr": round(sum(t.natr_entry for t in trades) / len(trades), 2), "avg_z_entry": round(sum(abs(t.z_entry) for t in trades) / len(trades), 2), # Per-trade detail "details": [ { "side": t.side[0], # L or S "entry": round(t.entry_price, 6), "exit": round(t.close_price, 6), "pnl": round(t.pnl, 4), "reason": t.close_reason, "bars": t.close_bar - t.entry_bar, "z_in": round(t.z_entry, 2), "z_out": round(t.z_exit, 2), "natr": round(t.natr_entry, 2), "dd": round(t.max_dd_pct, 2), } for t in trades ], } # ============================================================ # MAIN # ============================================================ def main(): parser = argparse.ArgumentParser(description="Z-VWAP Clean Backtest") parser.add_argument("--days", type=int, default=7, help="Days of data (default 7)") parser.add_argument("--top", type=int, default=20, help="Top N symbols by volume (default 20)") parser.add_argument("--tp", type=float, default=None, help="Override TP %") parser.add_argument("--sl", type=float, default=None, help="Override SL %") parser.add_argument("--z-entry", type=float, default=None, help="Override Z entry threshold") parser.add_argument("--symbols", type=str, default=None, help="Comma-separated symbols (skip screener)") args = parser.parse_args() config = CONFIG.copy() if args.tp: config["take_profit_pct"] = args.tp if args.sl: config["stop_loss_pct"] = args.sl if args.z_entry: config["z_entry_threshold"] = args.z_entry rr = config["take_profit_pct"] / config["stop_loss_pct"] print("=" * 70) print(" Z-VWAP Clean Mean Reversion Backtest") print("=" * 70) print(f"\n Entry: ${config['order_usd']} ร— {config['leverage']}x leverage") print(f" TP: {config['take_profit_pct']}% | SL: {config['stop_loss_pct']}% | R:R = {rr:.1f}:1") print(f" Z-entry: |Z| > {config['z_entry_threshold']} | Z-max: |Z| > {config['z_max_threshold']} | Z-TP: |Z| < {config['z_tp_threshold']}") print(f" Filters: NATR >= {config['min_natr_pct']}% | Vol >= ${config['min_volume_24h']/1e6:.0f}M") print(f" Blacklist: {', '.join(sorted(BLACKLIST))}") print(f" Period: {args.days} days | Cooldown: {config['cooldown_bars']} bars (5m)") print(f" Breakeven WR needed: {1/(1+rr)*100:.0f}%") print() session = HTTP(testnet=False) # Get symbols if args.symbols: symbols = [(s.strip().upper(), 0) for s in args.symbols.split(",")] print(f" Using {len(symbols)} specified symbols\n") else: print(f" Fetching top {args.top} symbols by volume...", end=" ", flush=True) symbols = get_top_symbols(session, top_n=args.top, min_volume=config["min_volume_24h"]) print(f"{len(symbols)} found\n") all_results = [] total_start = time.time() for idx, (symbol, vol_24h) in enumerate(symbols): vol_str = f"${vol_24h/1e6:.0f}M" if vol_24h > 0 else "" print(f"[{idx+1}/{len(symbols)}] {symbol} {vol_str}", end=" ", flush=True) klines = fetch_klines(session, symbol, "5", days=args.days) if len(klines) < VWAP_PERIOD + 50: print(f"โ€” skip ({len(klines)} bars)") continue result = run_backtest(symbol, klines, config) if not result or result["trades"] == 0: print(f"โ€” no trades ({len(klines)} bars)") continue all_results.append(result) # Compact output pnl_e = "๐ŸŸข" if result["total_pnl"] > 0 else "๐Ÿ”ด" wr_e = "๐ŸŸข" if result["win_rate"] >= 50 else "๐ŸŸก" if result["win_rate"] >= 34 else "๐Ÿ”ด" print(f"โ€” {pnl_e} ${result['total_pnl']:+.2f} | {wr_e} WR {result['win_rate']}% | " f"PF {result['profit_factor']} | {result['trades']}t " f"(TP:{result['tp_count']} Z:{result['ztp_count']} SL:{result['sl_count']}) | " f"L:{result['longs']}/S:{result['shorts']}") time.sleep(0.3) # rate limit elapsed = time.time() - total_start if not all_results: print("\nโŒ No results!") return # ============================================================ # SUMMARY # ============================================================ print("\n" + "=" * 70) print(" SUMMARY") print("=" * 70) total_trades = sum(r["trades"] for r in all_results) total_wins = sum(r["wins"] for r in all_results) total_losses = sum(r["losses"] for r in all_results) total_pnl = sum(r["total_pnl"] for r in all_results) gross_profit = sum(max(0, r["total_pnl"]) for r in all_results) gross_loss_syms = sum(abs(min(0, r["total_pnl"])) for r in all_results) all_wins_pnl = sum(r["avg_win"] * r["wins"] for r in all_results) all_losses_pnl = sum(r["avg_loss"] * r["losses"] for r in all_results) avg_win = all_wins_pnl / total_wins if total_wins > 0 else 0 avg_loss = all_losses_pnl / total_losses if total_losses > 0 else 0 total_tp = sum(r["tp_count"] for r in all_results) total_ztp = sum(r["ztp_count"] for r in all_results) total_sl = sum(r["sl_count"] for r in all_results) total_end = sum(r["end_count"] for r in all_results) tp_pnl = sum(r["tp_pnl"] for r in all_results) ztp_pnl = sum(r["ztp_pnl"] for r in all_results) sl_pnl = sum(r["sl_pnl"] for r in all_results) wr = total_wins / total_trades * 100 if total_trades > 0 else 0 pf = abs(all_wins_pnl / all_losses_pnl) if all_losses_pnl != 0 else 999 pnl_emoji = "๐ŸŸข" if total_pnl > 0 else "๐Ÿ”ด" print(f"\n {pnl_emoji} Total PnL: ${total_pnl:+.2f}") print(f" ๐Ÿ“Š Trades: {total_trades} ({total_wins}W / {total_losses}L)") print(f" ๐ŸŽฏ Win Rate: {wr:.1f}% (breakeven: {1/(1+rr)*100:.0f}%)") print(f" ๐Ÿ“ˆ Profit Factor: {pf:.2f}") print(f" ๐Ÿ’ฐ Avg Win: ${avg_win:+.4f} | Avg Loss: ${avg_loss:+.4f} | Ratio: 1:{abs(avg_win/avg_loss):.1f}" if avg_loss != 0 else "") print(f" ๐Ÿ“‰ Max DD: {max(r['max_dd_pct'] for r in all_results):.2f}%") print(f" โฑ Avg Duration: {sum(r['avg_duration_bars']*r['trades'] for r in all_results)/total_trades:.0f} bars (~{sum(r['avg_duration_bars']*r['trades'] for r in all_results)/total_trades*5:.0f} min)") print(f"\n Exit Breakdown:") print(f" TP%: {total_tp:>4} (${tp_pnl:+.2f})") print(f" Z-TP: {total_ztp:>4} (${ztp_pnl:+.2f})") print(f" SL: {total_sl:>4} (${sl_pnl:+.2f})") if total_end: print(f" END: {total_end:>4}") # Top / Bottom symbols sorted_by_pnl = sorted(all_results, key=lambda r: r["total_pnl"], reverse=True) print(f"\n ๐Ÿ† Top 5 Symbols:") for r in sorted_by_pnl[:5]: print(f" ๐ŸŸข {r['symbol']:>14} ${r['total_pnl']:+.2f} | {r['trades']}t WR {r['win_rate']}% PF {r['profit_factor']}") print(f"\n ๐Ÿ’€ Bottom 5 Symbols:") for r in sorted_by_pnl[-5:]: print(f" ๐Ÿ”ด {r['symbol']:>14} ${r['total_pnl']:+.2f} | {r['trades']}t WR {r['win_rate']}% PF {r['profit_factor']}") # Profitable vs unprofitable symbols profitable = [r for r in all_results if r["total_pnl"] > 0] unprofitable = [r for r in all_results if r["total_pnl"] <= 0] print(f"\n Symbols: {len(profitable)} profitable / {len(unprofitable)} unprofitable out of {len(all_results)}") # NATR analysis print(f"\n NATR Distribution of trades:") all_details = [] for r in all_results: all_details.extend(r["details"]) natr_buckets = { "0.75-1.0%": [d for d in all_details if 0.75 <= d["natr"] < 1.0], "1.0-1.5%": [d for d in all_details if 1.0 <= d["natr"] < 1.5], "1.5-2.0%": [d for d in all_details if 1.5 <= d["natr"] < 2.0], "2.0-3.0%": [d for d in all_details if 2.0 <= d["natr"] < 3.0], "3.0%+": [d for d in all_details if d["natr"] >= 3.0], } for label, bucket in natr_buckets.items(): if not bucket: continue bw = sum(1 for d in bucket if d["pnl"] > 0) bpnl = sum(d["pnl"] for d in bucket) bwr = bw / len(bucket) * 100 e = "๐ŸŸข" if bpnl > 0 else "๐Ÿ”ด" print(f" {e} NATR {label:>8}: {len(bucket):>3}t WR {bwr:.0f}% PnL ${bpnl:+.2f}") # Z-score entry analysis print(f"\n Z-Score Entry Distribution:") z_buckets = { "1.8-2.0": [d for d in all_details if 1.8 <= abs(d["z_in"]) < 2.0], "2.0-2.2": [d for d in all_details if 2.0 <= abs(d["z_in"]) < 2.2], "2.2-2.5": [d for d in all_details if 2.2 <= abs(d["z_in"]) <= 2.5], } for label, bucket in z_buckets.items(): if not bucket: continue bw = sum(1 for d in bucket if d["pnl"] > 0) bpnl = sum(d["pnl"] for d in bucket) bwr = bw / len(bucket) * 100 e = "๐ŸŸข" if bpnl > 0 else "๐Ÿ”ด" print(f" {e} Z {label}: {len(bucket):>3}t WR {bwr:.0f}% PnL ${bpnl:+.2f}") print(f"\n โฑ Backtest completed in {elapsed:.0f}s") # Save results output_path = os.path.join(os.path.dirname(__file__), "results_zvwap_clean.json") save_data = { "config": config, "days": args.days, "top_n": args.top, "run_time": datetime.now().isoformat(), "summary": { "total_pnl": round(total_pnl, 4), "trades": total_trades, "win_rate": round(wr, 1), "profit_factor": round(pf, 2), "avg_win": round(avg_win, 4), "avg_loss": round(avg_loss, 4), }, "results": [{k: v for k, v in r.items() if k != "details"} for r in all_results], "all_trades": all_details, } with open(output_path, "w") as f: json.dump(save_data, f, indent=2) print(f" ๐Ÿ’พ Saved to {output_path}") if __name__ == "__main__": main()