โ† ะะฐะทะฐะด
""" Backtest: DCA Bot with Z-VWAP Entry Signal ============================================= Compares 1m vs 5m timeframes over 7 days. Strategy: - Screener calculates Z = (close - VWAP) / std(deviations) - Z < -THRESHOLD โ†’ LONG DCA deal - Z > +THRESHOLD โ†’ SHORT DCA deal - Safety Orders placed at increasing distances (Martingale) - TP: X% above avg entry OR Z crosses back to 0 - SL: Y% from avg entry Usage: python backtests/backtest_dca_zvwap.py """ import sys, os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import json import numpy as np from datetime import datetime from pybit.unified_trading import HTTP # ============================================================ # DCA PARAMETERS # ============================================================ DCA_CONFIG = { "base_order_usd": 5.0, "safety_order_usd": 10.0, "max_safety_orders": 6, "price_deviation_pct": 1.5, # % drop for first SO "step_scale": 1.5, # distance multiplier between SOs "volume_scale": 1.3, # size multiplier for each SO "take_profit_pct": 1.5, # TP from average entry "stop_loss_pct": 10.0, # SL from average entry "z_entry_threshold": 1.8, # |Z| > 1.8 to enter "z_tp_threshold": 0.3, # |Z| < 0.3 = fair value exit "cooldown_bars": 12, # bars to wait between deals (1h on 5m, 12m on 1m) "leverage": 3, } VWAP_PERIOD = 50 MAKER_FEE = 0.0002 # 0.02% TAKER_FEE = 0.00055 # 0.055% # Coins to test SYMBOLS = ["BTCUSDT", "ETHUSDT", "SOLUSDT", "XRPUSDT", "DOGEUSDT"] # ============================================================ # DATA FETCHING # ============================================================ def fetch_klines(symbol: str, interval: str, days: int = 7) -> list[dict]: """Fetch klines from Bybit (no auth needed for market data).""" session = HTTP(testnet=False) all_klines = [] # Bybit returns max 1000 candles per request # 7 days: 1m = 10080, 5m = 2016 bars_needed = days * 24 * 60 // int(interval) end_time = int(datetime.now().timestamp() * 1000) while len(all_klines) < bars_needed: try: resp = session.get_kline( category="linear", symbol=symbol, interval=interval, limit=1000, end=end_time, ) if resp["retCode"] != 0: print(f" API error: {resp['retMsg']}") break items = resp["result"]["list"] if not items: break for item in items: all_klines.append({ "ts": int(item[0]), "open": float(item[1]), "high": float(item[2]), "low": float(item[3]), "close": float(item[4]), "volume": float(item[5]), }) # Move end_time to oldest candle - 1ms end_time = int(items[-1][0]) - 1 if len(items) < 1000: break except Exception as e: print(f" Fetch error: {e}") break # Reverse to chronological order (oldest first) all_klines.reverse() # Deduplicate by timestamp seen = set() unique = [] for k in all_klines: if k["ts"] not in seen: seen.add(k["ts"]) unique.append(k) return unique[-bars_needed:] if len(unique) > bars_needed else unique # ============================================================ # Z-VWAP CALCULATION # ============================================================ def calc_zvwap(highs, lows, closes, volumes, period=VWAP_PERIOD): """Calculate Z-Score from VWAP. Returns array of Z-scores.""" n = len(closes) z_scores = np.full(n, 0.0) for i in range(period, n): h = highs[i-period:i] l = lows[i-period:i] c = closes[i-period:i] v = volumes[i-period:i] tp = (h + l + c) / 3 cum_tp_vol = np.cumsum(tp * v) cum_vol = np.cumsum(v) cum_vol_safe = np.where(cum_vol == 0, 1, cum_vol) vwap_arr = cum_tp_vol / cum_vol_safe vwap = vwap_arr[-1] deviations = c - vwap_arr std = np.std(deviations) if std > 0: z_scores[i] = (closes[i] - vwap) / std return z_scores # ============================================================ # DCA DEAL SIMULATOR # ============================================================ class DCADeal: """Simulates one DCA deal lifecycle.""" def __init__(self, symbol, side, entry_price, entry_bar, config): self.symbol = symbol self.side = side # "LONG" or "SHORT" self.config = config self.entry_bar = entry_bar # Base order leverage = config["leverage"] self.orders = [{ "price": entry_price, "usd": config["base_order_usd"], "qty": (config["base_order_usd"] * leverage) / entry_price, "type": "BO", }] # Pre-calculate safety order trigger prices self.so_triggers = [] cumul_deviation = 0 for i in range(config["max_safety_orders"]): deviation = config["price_deviation_pct"] * (config["step_scale"] ** i) cumul_deviation += deviation so_size = config["safety_order_usd"] * (config["volume_scale"] ** i) if side == "LONG": trigger_price = entry_price * (1 - cumul_deviation / 100) else: trigger_price = entry_price * (1 + cumul_deviation / 100) self.so_triggers.append({ "price": trigger_price, "usd": so_size, "deviation_pct": cumul_deviation, "filled": False, }) self.so_filled = 0 self.closed = False self.close_price = 0 self.close_bar = 0 self.close_reason = "" self.pnl = 0 self.fees = 0 self.max_drawdown_pct = 0 @property def avg_entry(self): total_qty = sum(o["qty"] for o in self.orders) total_cost = sum(o["qty"] * o["price"] for o in self.orders) return total_cost / total_qty if total_qty > 0 else 0 @property def total_qty(self): return sum(o["qty"] for o in self.orders) @property def total_invested(self): return sum(o["usd"] for o in self.orders) def tick(self, bar_idx, high, low, close, z_score): """Process one bar. Returns True if deal closed.""" if self.closed: return True leverage = self.config["leverage"] # Check safety order fills for i, so in enumerate(self.so_triggers): if so["filled"]: continue filled = False if self.side == "LONG" and low <= so["price"]: filled = True elif self.side == "SHORT" and high >= so["price"]: filled = True if filled: qty = (so["usd"] * leverage) / so["price"] self.orders.append({ "price": so["price"], "usd": so["usd"], "qty": qty, "type": f"SO{i+1}", }) so["filled"] = True self.so_filled += 1 self.fees += so["usd"] * leverage * MAKER_FEE # SO = limit = maker avg = self.avg_entry # Track max drawdown if self.side == "LONG": dd_pct = (avg - low) / avg * 100 else: dd_pct = (high - avg) / avg * 100 self.max_drawdown_pct = max(self.max_drawdown_pct, dd_pct) # Check TP (from average entry) tp_price = avg * (1 + self.config["take_profit_pct"] / 100) if self.side == "LONG" else \ avg * (1 - self.config["take_profit_pct"] / 100) tp_hit = False if self.side == "LONG" and high >= tp_price: tp_hit = True self.close_price = tp_price elif self.side == "SHORT" and low <= tp_price: tp_hit = True self.close_price = tp_price # Check Z-reversion TP (Z crosses back past threshold) z_tp = False if self.side == "LONG" and z_score >= -self.config["z_tp_threshold"]: # Only if we're in profit if close > avg: z_tp = True self.close_price = close elif self.side == "SHORT" and z_score <= self.config["z_tp_threshold"]: if close < avg: z_tp = True self.close_price = close # Check SL sl_price_long = avg * (1 - self.config["stop_loss_pct"] / 100) sl_price_short = avg * (1 + self.config["stop_loss_pct"] / 100) sl_hit = False if self.side == "LONG" and low <= sl_price_long: sl_hit = True self.close_price = sl_price_long elif self.side == "SHORT" and high >= sl_price_short: sl_hit = True self.close_price = sl_price_short # Priority: SL > TP > Z-reversion if sl_hit: self._close(bar_idx, "SL") return True elif tp_hit: self._close(bar_idx, "TP%") return True elif z_tp: self._close(bar_idx, "Z-TP") return True return False def _close(self, bar_idx, reason): self.closed = True self.close_bar = bar_idx self.close_reason = reason # Calculate PnL total_qty = self.total_qty avg = self.avg_entry leverage = self.config["leverage"] if self.side == "LONG": self.pnl = total_qty * (self.close_price - avg) else: self.pnl = total_qty * (avg - self.close_price) # Add exit fee (taker for TP/SL market exit) exit_fee = total_qty * self.close_price * TAKER_FEE # Add entry fee (taker for BO market entry) entry_fee = self.orders[0]["qty"] * self.orders[0]["price"] * TAKER_FEE self.fees += exit_fee + entry_fee self.pnl -= self.fees # ============================================================ # BACKTEST ENGINE # ============================================================ def run_backtest(symbol, klines, config, tf_label): """Run DCA backtest on kline data.""" n = len(klines) if n < VWAP_PERIOD + 10: return None closes = np.array([k["close"] for k in klines]) highs = np.array([k["high"] for k in klines]) lows = np.array([k["low"] for k in klines]) volumes = np.array([k["volume"] for k in klines]) # Calculate Z-scores z_scores = calc_zvwap(highs, lows, closes, volumes, VWAP_PERIOD) deals = [] active_deal = None cooldown_until = 0 for i in range(VWAP_PERIOD, n): z = z_scores[i] # Process active deal if active_deal: closed = active_deal.tick(i, highs[i], lows[i], closes[i], z) if closed: deals.append(active_deal) cooldown_until = i + config["cooldown_bars"] active_deal = None continue # Check for new entry (no active deal, past cooldown) if i < cooldown_until: continue threshold = config["z_entry_threshold"] if z < -threshold: active_deal = DCADeal(symbol, "LONG", closes[i], i, config) elif z > threshold: active_deal = DCADeal(symbol, "SHORT", closes[i], i, config) # Force-close any open deal at end if active_deal and not active_deal.closed: active_deal.close_price = closes[-1] active_deal._close(n - 1, "END") deals.append(active_deal) # Aggregate results if not deals: return { "symbol": symbol, "tf": tf_label, "deals": 0, "total_pnl": 0, "win_rate": 0, "avg_pnl": 0, "avg_sos": 0, "avg_duration_bars": 0, "max_dd": 0, "tp_pct": 0, "ztp_pct": 0, "sl_pct": 0, } total_pnl = sum(d.pnl for d in deals) wins = sum(1 for d in deals if d.pnl > 0) avg_sos = sum(d.so_filled for d in deals) / len(deals) avg_duration = sum(d.close_bar - d.entry_bar for d in deals) / len(deals) max_dd = max(d.max_drawdown_pct for d in deals) avg_invested = sum(d.total_invested for d in deals) / len(deals) tp_count = sum(1 for d in deals if d.close_reason == "TP%") ztp_count = sum(1 for d in deals if d.close_reason == "Z-TP") sl_count = sum(1 for d in deals if d.close_reason == "SL") end_count = sum(1 for d in deals if d.close_reason == "END") # Long vs Short breakdown longs = [d for d in deals if d.side == "LONG"] shorts = [d for d in deals if d.side == "SHORT"] long_pnl = sum(d.pnl for d in longs) short_pnl = sum(d.pnl for d in shorts) long_wr = sum(1 for d in longs if d.pnl > 0) / len(longs) * 100 if longs else 0 short_wr = sum(1 for d in shorts if d.pnl > 0) / len(shorts) * 100 if shorts else 0 return { "symbol": symbol, "tf": tf_label, "deals": len(deals), "total_pnl": round(total_pnl, 4), "win_rate": round(wins / len(deals) * 100, 1), "avg_pnl": round(total_pnl / len(deals), 4), "avg_sos": round(avg_sos, 1), "avg_duration_bars": round(avg_duration, 0), "avg_invested": round(avg_invested, 2), "max_dd_pct": round(max_dd, 2), "tp_pct": tp_count, "ztp_count": ztp_count, "sl_count": sl_count, "end_count": end_count, "longs": len(longs), "shorts": len(shorts), "long_pnl": round(long_pnl, 4), "short_pnl": round(short_pnl, 4), "long_wr": round(long_wr, 1), "short_wr": round(short_wr, 1), "total_fees": round(sum(d.fees for d in deals), 4), # Per-deal breakdown "deals_detail": [ { "side": d.side, "entry": round(d.orders[0]["price"], 6), "avg": round(d.avg_entry, 6), "exit": round(d.close_price, 6), "pnl": round(d.pnl, 4), "sos": d.so_filled, "reason": d.close_reason, "bars": d.close_bar - d.entry_bar, "invested": round(d.total_invested, 2), "dd": round(d.max_drawdown_pct, 2), } for d in deals ], } # ============================================================ # MAIN # ============================================================ def main(): print("=" * 70) print(" DCA + Z-VWAP Backtest โ€” 1m vs 5m, 7 days") print("=" * 70) print(f"\nConfig: BO=${DCA_CONFIG['base_order_usd']}, SO=${DCA_CONFIG['safety_order_usd']}, " f"MaxSO={DCA_CONFIG['max_safety_orders']}, " f"StepScale={DCA_CONFIG['step_scale']}x, VolScale={DCA_CONFIG['volume_scale']}x") print(f"TP={DCA_CONFIG['take_profit_pct']}%, SL={DCA_CONFIG['stop_loss_pct']}%, " f"Z-entry={DCA_CONFIG['z_entry_threshold']}, Z-TP={DCA_CONFIG['z_tp_threshold']}, " f"Leverage={DCA_CONFIG['leverage']}x") print(f"Cooldown={DCA_CONFIG['cooldown_bars']} bars, VWAP period={VWAP_PERIOD}") print() all_results = [] for symbol in SYMBOLS: print(f"โ”โ”โ” {symbol} โ”โ”โ”") for interval, tf_label in [("1", "1m"), ("5", "5m")]: print(f" Fetching {tf_label} data...", end=" ", flush=True) klines = fetch_klines(symbol, interval, days=7) print(f"{len(klines)} bars") if len(klines) < 100: print(f" โš ๏ธ Not enough data, skipping") continue result = run_backtest(symbol, klines, DCA_CONFIG, tf_label) if result: all_results.append(result) wr = result["win_rate"] wr_emoji = "๐ŸŸข" if wr >= 60 else "๐ŸŸก" if wr >= 45 else "๐Ÿ”ด" pnl_emoji = "๐ŸŸข" if result["total_pnl"] > 0 else "๐Ÿ”ด" print(f" {tf_label}: {pnl_emoji} PnL=${result['total_pnl']:+.2f} | " f"{wr_emoji} WR={wr}% | " f"Deals={result['deals']} (L:{result['longs']}/S:{result['shorts']}) | " f"AvgSO={result['avg_sos']} | " f"MaxDD={result['max_dd_pct']:.1f}%") print(f" TP%={result['tp_pct']} Z-TP={result['ztp_count']} " f"SL={result['sl_count']} END={result['end_count']} | " f"Fees=${result['total_fees']:.2f}") print(f" Long: PnL=${result['long_pnl']:+.2f} WR={result['long_wr']}% | " f"Short: PnL=${result['short_pnl']:+.2f} WR={result['short_wr']}%") print() # Summary comparison print("\n" + "=" * 70) print(" SUMMARY: 1m vs 5m") print("=" * 70) for tf in ["1m", "5m"]: tf_results = [r for r in all_results if r["tf"] == tf] if not tf_results: continue total_pnl = sum(r["total_pnl"] for r in tf_results) total_deals = sum(r["deals"] for r in tf_results) total_wins = sum(r["deals"] * r["win_rate"] / 100 for r in tf_results) avg_wr = total_wins / total_deals * 100 if total_deals > 0 else 0 max_dd = max(r["max_dd_pct"] for r in tf_results) if tf_results else 0 total_fees = sum(r["total_fees"] for r in tf_results) print(f"\n ๐Ÿ“Š {tf}:") print(f" Total PnL: ${total_pnl:+.2f} (after fees ${total_fees:.2f})") print(f" Deals: {total_deals} | Avg WR: {avg_wr:.1f}%") print(f" Max DD: {max_dd:.1f}%") # Per-symbol summary for r in tf_results: emoji = "๐ŸŸข" if r["total_pnl"] > 0 else "๐Ÿ”ด" print(f" {emoji} {r['symbol']}: ${r['total_pnl']:+.2f} ({r['deals']} deals, WR={r['win_rate']}%)") # Save results output_path = os.path.join(os.path.dirname(__file__), "results_dca_zvwap.json") save_data = { "config": DCA_CONFIG, "vwap_period": VWAP_PERIOD, "symbols": SYMBOLS, "results": [{k: v for k, v in r.items() if k != "deals_detail"} for r in all_results], "details": {f"{r['symbol']}_{r['tf']}": r.get("deals_detail", []) for r in all_results}, } with open(output_path, "w") as f: json.dump(save_data, f, indent=2) print(f"\n๐Ÿ’พ Results saved to {output_path}") if __name__ == "__main__": main()