โ† ะะฐะทะฐะด
""" Backtest: DCA Z-VWAP with NATR filter โ€” ENA 5m 7 days ======================================================= Compares: no NATR filter vs NATR >= 1.0% Usage: cd /home/app/trading-bot-bybit && source venv/bin/activate python backtests/backtest_dca_natr.py """ import sys, os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import numpy as np from datetime import datetime from pybit.unified_trading import HTTP # ============================================================ # CONFIG (matches live bot) # ============================================================ CONFIG = { "base_order_usd": 5.0, "safety_order_usd": 7.0, "max_safety_orders": 4, "price_deviation_pct": 1.5, "step_scale": 1.5, "volume_scale": 1.3, "take_profit_pct": 1.5, "stop_loss_pct": 10.0, "z_entry_threshold": 1.8, "z_tp_threshold": 0.3, "cooldown_bars": 12, "leverage": 3, } VWAP_PERIOD = 50 ATR_PERIOD = 14 MAKER_FEE = 0.0002 TAKER_FEE = 0.00055 SYMBOL = "WIFUSDT" INTERVAL = "5" DAYS = 7 # ============================================================ # DATA # ============================================================ def fetch_klines(symbol, interval, days=7): session = HTTP(testnet=False) all_klines = [] bars_needed = days * 24 * 60 // int(interval) end_time = int(datetime.now().timestamp() * 1000) while len(all_klines) < bars_needed: resp = session.get_kline( category="linear", symbol=symbol, interval=interval, limit=1000, end=end_time, ) if resp["retCode"] != 0: break items = resp["result"]["list"] if not items: break for item in items: all_klines.append({ "ts": int(item[0]), "open": float(item[1]), "high": float(item[2]), "low": float(item[3]), "close": float(item[4]), "volume": float(item[5]), }) end_time = int(items[-1][0]) - 1 if len(items) < 1000: break all_klines.reverse() seen = set() unique = [k for k in all_klines if k["ts"] not in seen and not seen.add(k["ts"])] return unique[-bars_needed:] # ============================================================ # INDICATORS # ============================================================ def calc_zvwap(highs, lows, closes, volumes, period=VWAP_PERIOD): n = len(closes) z_scores = np.full(n, 0.0) for i in range(period, n): h, l, c, v = highs[i-period:i], lows[i-period:i], closes[i-period:i], volumes[i-period:i] tp = (h + l + c) / 3 cum_tp_vol = np.cumsum(tp * v) cum_vol = np.where(np.cumsum(v) == 0, 1, np.cumsum(v)) vwap_arr = cum_tp_vol / cum_vol vwap = vwap_arr[-1] std = np.std(c - vwap_arr) if std > 0: z_scores[i] = (closes[i] - vwap) / std return z_scores def calc_natr(highs, lows, closes, period=ATR_PERIOD): """NATR = ATR / close * 100. Returns array.""" n = len(closes) natr = np.full(n, 0.0) for i in range(1, n): start = max(0, i - period) tr = np.maximum( highs[start+1:i+1] - lows[start+1:i+1], np.maximum( np.abs(highs[start+1:i+1] - closes[start:i]), np.abs(lows[start+1:i+1] - closes[start:i]) ) ) if len(tr) > 0 and closes[i] > 0: natr[i] = (np.mean(tr) / closes[i]) * 100 return natr # ============================================================ # DCA DEAL # ============================================================ class DCADeal: def __init__(self, symbol, side, entry_price, entry_bar, config): self.symbol = symbol self.side = side self.config = config self.entry_bar = entry_bar lev = config["leverage"] self.orders = [{ "price": entry_price, "usd": config["base_order_usd"], "qty": (config["base_order_usd"] * lev) / entry_price, "type": "BO", }] self.so_triggers = [] cumul = 0 for i in range(config["max_safety_orders"]): dev = config["price_deviation_pct"] * (config["step_scale"] ** i) cumul += dev so_size = config["safety_order_usd"] * (config["volume_scale"] ** i) if side == "LONG": tp = entry_price * (1 - cumul / 100) else: tp = entry_price * (1 + cumul / 100) self.so_triggers.append({"price": tp, "usd": so_size, "filled": False}) self.so_filled = 0 self.closed = False self.close_price = 0 self.close_bar = 0 self.close_reason = "" self.pnl = 0 self.fees = 0 @property def avg_entry(self): tq = sum(o["qty"] for o in self.orders) tc = sum(o["qty"] * o["price"] for o in self.orders) return tc / tq if tq > 0 else 0 @property def total_qty(self): return sum(o["qty"] for o in self.orders) @property def total_invested(self): return sum(o["usd"] for o in self.orders) def tick(self, bar_idx, high, low, close, z_score): if self.closed: return True lev = self.config["leverage"] # SO fills for i, so in enumerate(self.so_triggers): if so["filled"]: continue filled = (self.side == "LONG" and low <= so["price"]) or \ (self.side == "SHORT" and high >= so["price"]) if filled: qty = (so["usd"] * lev) / so["price"] self.orders.append({"price": so["price"], "usd": so["usd"], "qty": qty}) so["filled"] = True self.so_filled += 1 self.fees += so["usd"] * lev * MAKER_FEE avg = self.avg_entry tp_pct = self.config["take_profit_pct"] sl_pct = self.config["stop_loss_pct"] # TP% if self.side == "LONG": tp_price = avg * (1 + tp_pct / 100) sl_price = avg * (1 - sl_pct / 100) else: tp_price = avg * (1 - tp_pct / 100) sl_price = avg * (1 + sl_pct / 100) tp_hit = (self.side == "LONG" and high >= tp_price) or \ (self.side == "SHORT" and low <= tp_price) sl_hit = (self.side == "LONG" and low <= sl_price) or \ (self.side == "SHORT" and high >= sl_price) # Z-reversion z_tp = False if self.side == "LONG" and z_score >= -self.config["z_tp_threshold"] and close > avg: z_tp = True elif self.side == "SHORT" and z_score <= self.config["z_tp_threshold"] and close < avg: z_tp = True if sl_hit: self.close_price = sl_price self._close(bar_idx, "SL") return True elif tp_hit: self.close_price = tp_price self._close(bar_idx, "TP%") return True elif z_tp: self.close_price = close self._close(bar_idx, "Z-TP") return True return False def _close(self, bar_idx, reason): self.closed = True self.close_bar = bar_idx self.close_reason = reason tq = self.total_qty avg = self.avg_entry if self.side == "LONG": self.pnl = tq * (self.close_price - avg) else: self.pnl = tq * (avg - self.close_price) self.fees += tq * self.close_price * TAKER_FEE self.fees += self.orders[0]["qty"] * self.orders[0]["price"] * TAKER_FEE self.pnl -= self.fees # ============================================================ # BACKTEST # ============================================================ def run_backtest(klines, natr_min=0.0, label=""): closes = np.array([k["close"] for k in klines]) highs = np.array([k["high"] for k in klines]) lows = np.array([k["low"] for k in klines]) volumes = np.array([k["volume"] for k in klines]) z_scores = calc_zvwap(highs, lows, closes, volumes) natr_arr = calc_natr(highs, lows, closes) n = len(closes) deals = [] active = None cooldown = 0 skipped_natr = 0 for i in range(VWAP_PERIOD, n): z = z_scores[i] if active: if active.tick(i, highs[i], lows[i], closes[i], z): deals.append(active) cooldown = i + CONFIG["cooldown_bars"] active = None continue if i < cooldown: continue threshold = CONFIG["z_entry_threshold"] if abs(z) > threshold: # NATR filter if natr_arr[i] < natr_min: skipped_natr += 1 continue side = "LONG" if z < -threshold else "SHORT" active = DCADeal(SYMBOL, side, closes[i], i, CONFIG) if active and not active.closed: active.close_price = closes[-1] active._close(n - 1, "END") deals.append(active) # Results if not deals: return {"label": label, "deals": 0, "pnl": 0, "wr": 0, "skipped": skipped_natr} total_pnl = sum(d.pnl for d in deals) wins = sum(1 for d in deals if d.pnl > 0) wr = wins / len(deals) * 100 longs = [d for d in deals if d.side == "LONG"] shorts = [d for d in deals if d.side == "SHORT"] tp_cnt = sum(1 for d in deals if d.close_reason == "TP%") ztp_cnt = sum(1 for d in deals if d.close_reason == "Z-TP") sl_cnt = sum(1 for d in deals if d.close_reason == "SL") end_cnt = sum(1 for d in deals if d.close_reason == "END") avg_sos = sum(d.so_filled for d in deals) / len(deals) avg_dur = sum(d.close_bar - d.entry_bar for d in deals) / len(deals) avg_inv = sum(d.total_invested for d in deals) / len(deals) total_fees = sum(d.fees for d in deals) return { "label": label, "deals": len(deals), "pnl": round(total_pnl, 4), "wr": round(wr, 1), "avg_pnl": round(total_pnl / len(deals), 4), "avg_sos": round(avg_sos, 1), "avg_dur_bars": round(avg_dur, 0), "avg_invested": round(avg_inv, 2), "fees": round(total_fees, 4), "tp": tp_cnt, "ztp": ztp_cnt, "sl": sl_cnt, "end": end_cnt, "longs": len(longs), "shorts": len(shorts), "long_pnl": round(sum(d.pnl for d in longs), 4), "short_pnl": round(sum(d.pnl for d in shorts), 4), "long_wr": round(sum(1 for d in longs if d.pnl > 0) / len(longs) * 100, 1) if longs else 0, "short_wr": round(sum(1 for d in shorts if d.pnl > 0) / len(shorts) * 100, 1) if shorts else 0, "skipped": skipped_natr, "detail": [ {"side": d.side[0], "pnl": f"${d.pnl:+.4f}", "sos": d.so_filled, "reason": d.close_reason, "bars": d.close_bar - d.entry_bar, "natr": f"{natr_arr[d.entry_bar]:.2f}%"} for d in deals ], } def print_result(r): emoji = "๐ŸŸข" if r["pnl"] > 0 else "๐Ÿ”ด" print(f"\n {emoji} {r['label']}") print(f" PnL: ${r['pnl']:+.4f} | Deals: {r['deals']} | WR: {r['wr']}%") if r["deals"] > 0: print(f" Avg PnL: ${r['avg_pnl']:+.4f} | Avg SOs: {r['avg_sos']} | Avg Dur: {r['avg_dur_bars']:.0f} bars") print(f" TP%: {r['tp']} | Z-TP: {r['ztp']} | SL: {r['sl']} | END: {r['end']}") print(f" Long: {r['longs']} (${r['long_pnl']:+.4f} WR={r['long_wr']}%) | " f"Short: {r['shorts']} (${r['short_pnl']:+.4f} WR={r['short_wr']}%)") print(f" Fees: ${r['fees']:.4f} | Avg Invested: ${r['avg_invested']:.2f}") print(f" Skipped (NATR): {r['skipped']}") print(f" --- Per deal ---") for d in r["detail"]: e = "โœ…" if d["pnl"].startswith("$+") or d["pnl"] == "$+0.0000" else "โŒ" print(f" {e} {d['side']} {d['pnl']} | SO:{d['sos']} | {d['reason']} | " f"{d['bars']}bars | NATR:{d['natr']}") def main(): print("=" * 60) print(f" DCA Z-VWAP + NATR Filter โ€” {SYMBOL} 5m 7d") print("=" * 60) print(f" Config: BO=${CONFIG['base_order_usd']}, SO=${CONFIG['safety_order_usd']}, " f"MaxSO={CONFIG['max_safety_orders']}, Lev={CONFIG['leverage']}x") print(f" Z-entry={CONFIG['z_entry_threshold']}, Z-TP={CONFIG['z_tp_threshold']}, " f"TP={CONFIG['take_profit_pct']}%, SL={CONFIG['stop_loss_pct']}%") print(f"\n Fetching {SYMBOL} 5m data ({DAYS}d)...", end=" ", flush=True) klines = fetch_klines(SYMBOL, INTERVAL, DAYS) print(f"{len(klines)} bars") # Run variants r_none = run_backtest(klines, natr_min=0.0, label="No NATR filter") r_08 = run_backtest(klines, natr_min=0.8, label="NATR >= 0.8%") r_10 = run_backtest(klines, natr_min=1.0, label="NATR >= 1.0%") r_12 = run_backtest(klines, natr_min=1.2, label="NATR >= 1.2%") r_15 = run_backtest(klines, natr_min=1.5, label="NATR >= 1.5%") for r in [r_none, r_08, r_10, r_12, r_15]: print_result(r) print("\n" + "=" * 60) print(" COMPARISON") print("=" * 60) print(f" {'Filter':<18} {'Deals':>5} {'PnL':>10} {'WR':>6} {'Skipped':>8}") print(f" {'-'*50}") for r in [r_none, r_08, r_10, r_12, r_15]: print(f" {r['label']:<18} {r['deals']:>5} ${r['pnl']:>+8.4f} {r['wr']:>5.1f}% {r['skipped']:>8}") if __name__ == "__main__": main()