โ ะะฐะทะฐะด"""
Backtest: DCA Bot with Z-VWAP Entry Signal
=============================================
Compares 1m vs 5m timeframes over 7 days.
Strategy:
- Screener calculates Z = (close - VWAP) / std(deviations)
- Z < -THRESHOLD โ LONG DCA deal
- Z > +THRESHOLD โ SHORT DCA deal
- Safety Orders placed at increasing distances (Martingale)
- TP: X% above avg entry OR Z crosses back to 0
- SL: Y% from avg entry
Usage:
python backtests/backtest_dca_zvwap.py
"""
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import json
import numpy as np
from datetime import datetime
from pybit.unified_trading import HTTP
# ============================================================
# DCA PARAMETERS
# ============================================================
DCA_CONFIG = {
"base_order_usd": 5.0,
"safety_order_usd": 10.0,
"max_safety_orders": 6,
"price_deviation_pct": 1.5, # % drop for first SO
"step_scale": 1.5, # distance multiplier between SOs
"volume_scale": 1.3, # size multiplier for each SO
"take_profit_pct": 1.5, # TP from average entry
"stop_loss_pct": 10.0, # SL from average entry
"z_entry_threshold": 1.8, # |Z| > 1.8 to enter
"z_tp_threshold": 0.3, # |Z| < 0.3 = fair value exit
"cooldown_bars": 12, # bars to wait between deals (1h on 5m, 12m on 1m)
"leverage": 3,
}
VWAP_PERIOD = 50
MAKER_FEE = 0.0002 # 0.02%
TAKER_FEE = 0.00055 # 0.055%
# Coins to test
SYMBOLS = ["BTCUSDT", "ETHUSDT", "SOLUSDT", "XRPUSDT", "DOGEUSDT"]
# ============================================================
# DATA FETCHING
# ============================================================
def fetch_klines(symbol: str, interval: str, days: int = 7) -> list[dict]:
"""Fetch klines from Bybit (no auth needed for market data)."""
session = HTTP(testnet=False)
all_klines = []
# Bybit returns max 1000 candles per request
# 7 days: 1m = 10080, 5m = 2016
bars_needed = days * 24 * 60 // int(interval)
end_time = int(datetime.now().timestamp() * 1000)
while len(all_klines) < bars_needed:
try:
resp = session.get_kline(
category="linear",
symbol=symbol,
interval=interval,
limit=1000,
end=end_time,
)
if resp["retCode"] != 0:
print(f" API error: {resp['retMsg']}")
break
items = resp["result"]["list"]
if not items:
break
for item in items:
all_klines.append({
"ts": int(item[0]),
"open": float(item[1]),
"high": float(item[2]),
"low": float(item[3]),
"close": float(item[4]),
"volume": float(item[5]),
})
# Move end_time to oldest candle - 1ms
end_time = int(items[-1][0]) - 1
if len(items) < 1000:
break
except Exception as e:
print(f" Fetch error: {e}")
break
# Reverse to chronological order (oldest first)
all_klines.reverse()
# Deduplicate by timestamp
seen = set()
unique = []
for k in all_klines:
if k["ts"] not in seen:
seen.add(k["ts"])
unique.append(k)
return unique[-bars_needed:] if len(unique) > bars_needed else unique
# ============================================================
# Z-VWAP CALCULATION
# ============================================================
def calc_zvwap(highs, lows, closes, volumes, period=VWAP_PERIOD):
"""Calculate Z-Score from VWAP. Returns array of Z-scores."""
n = len(closes)
z_scores = np.full(n, 0.0)
for i in range(period, n):
h = highs[i-period:i]
l = lows[i-period:i]
c = closes[i-period:i]
v = volumes[i-period:i]
tp = (h + l + c) / 3
cum_tp_vol = np.cumsum(tp * v)
cum_vol = np.cumsum(v)
cum_vol_safe = np.where(cum_vol == 0, 1, cum_vol)
vwap_arr = cum_tp_vol / cum_vol_safe
vwap = vwap_arr[-1]
deviations = c - vwap_arr
std = np.std(deviations)
if std > 0:
z_scores[i] = (closes[i] - vwap) / std
return z_scores
# ============================================================
# DCA DEAL SIMULATOR
# ============================================================
class DCADeal:
"""Simulates one DCA deal lifecycle."""
def __init__(self, symbol, side, entry_price, entry_bar, config):
self.symbol = symbol
self.side = side # "LONG" or "SHORT"
self.config = config
self.entry_bar = entry_bar
# Base order
leverage = config["leverage"]
self.orders = [{
"price": entry_price,
"usd": config["base_order_usd"],
"qty": (config["base_order_usd"] * leverage) / entry_price,
"type": "BO",
}]
# Pre-calculate safety order trigger prices
self.so_triggers = []
cumul_deviation = 0
for i in range(config["max_safety_orders"]):
deviation = config["price_deviation_pct"] * (config["step_scale"] ** i)
cumul_deviation += deviation
so_size = config["safety_order_usd"] * (config["volume_scale"] ** i)
if side == "LONG":
trigger_price = entry_price * (1 - cumul_deviation / 100)
else:
trigger_price = entry_price * (1 + cumul_deviation / 100)
self.so_triggers.append({
"price": trigger_price,
"usd": so_size,
"deviation_pct": cumul_deviation,
"filled": False,
})
self.so_filled = 0
self.closed = False
self.close_price = 0
self.close_bar = 0
self.close_reason = ""
self.pnl = 0
self.fees = 0
self.max_drawdown_pct = 0
@property
def avg_entry(self):
total_qty = sum(o["qty"] for o in self.orders)
total_cost = sum(o["qty"] * o["price"] for o in self.orders)
return total_cost / total_qty if total_qty > 0 else 0
@property
def total_qty(self):
return sum(o["qty"] for o in self.orders)
@property
def total_invested(self):
return sum(o["usd"] for o in self.orders)
def tick(self, bar_idx, high, low, close, z_score):
"""Process one bar. Returns True if deal closed."""
if self.closed:
return True
leverage = self.config["leverage"]
# Check safety order fills
for i, so in enumerate(self.so_triggers):
if so["filled"]:
continue
filled = False
if self.side == "LONG" and low <= so["price"]:
filled = True
elif self.side == "SHORT" and high >= so["price"]:
filled = True
if filled:
qty = (so["usd"] * leverage) / so["price"]
self.orders.append({
"price": so["price"],
"usd": so["usd"],
"qty": qty,
"type": f"SO{i+1}",
})
so["filled"] = True
self.so_filled += 1
self.fees += so["usd"] * leverage * MAKER_FEE # SO = limit = maker
avg = self.avg_entry
# Track max drawdown
if self.side == "LONG":
dd_pct = (avg - low) / avg * 100
else:
dd_pct = (high - avg) / avg * 100
self.max_drawdown_pct = max(self.max_drawdown_pct, dd_pct)
# Check TP (from average entry)
tp_price = avg * (1 + self.config["take_profit_pct"] / 100) if self.side == "LONG" else \
avg * (1 - self.config["take_profit_pct"] / 100)
tp_hit = False
if self.side == "LONG" and high >= tp_price:
tp_hit = True
self.close_price = tp_price
elif self.side == "SHORT" and low <= tp_price:
tp_hit = True
self.close_price = tp_price
# Check Z-reversion TP (Z crosses back past threshold)
z_tp = False
if self.side == "LONG" and z_score >= -self.config["z_tp_threshold"]:
# Only if we're in profit
if close > avg:
z_tp = True
self.close_price = close
elif self.side == "SHORT" and z_score <= self.config["z_tp_threshold"]:
if close < avg:
z_tp = True
self.close_price = close
# Check SL
sl_price_long = avg * (1 - self.config["stop_loss_pct"] / 100)
sl_price_short = avg * (1 + self.config["stop_loss_pct"] / 100)
sl_hit = False
if self.side == "LONG" and low <= sl_price_long:
sl_hit = True
self.close_price = sl_price_long
elif self.side == "SHORT" and high >= sl_price_short:
sl_hit = True
self.close_price = sl_price_short
# Priority: SL > TP > Z-reversion
if sl_hit:
self._close(bar_idx, "SL")
return True
elif tp_hit:
self._close(bar_idx, "TP%")
return True
elif z_tp:
self._close(bar_idx, "Z-TP")
return True
return False
def _close(self, bar_idx, reason):
self.closed = True
self.close_bar = bar_idx
self.close_reason = reason
# Calculate PnL
total_qty = self.total_qty
avg = self.avg_entry
leverage = self.config["leverage"]
if self.side == "LONG":
self.pnl = total_qty * (self.close_price - avg)
else:
self.pnl = total_qty * (avg - self.close_price)
# Add exit fee (taker for TP/SL market exit)
exit_fee = total_qty * self.close_price * TAKER_FEE
# Add entry fee (taker for BO market entry)
entry_fee = self.orders[0]["qty"] * self.orders[0]["price"] * TAKER_FEE
self.fees += exit_fee + entry_fee
self.pnl -= self.fees
# ============================================================
# BACKTEST ENGINE
# ============================================================
def run_backtest(symbol, klines, config, tf_label):
"""Run DCA backtest on kline data."""
n = len(klines)
if n < VWAP_PERIOD + 10:
return None
closes = np.array([k["close"] for k in klines])
highs = np.array([k["high"] for k in klines])
lows = np.array([k["low"] for k in klines])
volumes = np.array([k["volume"] for k in klines])
# Calculate Z-scores
z_scores = calc_zvwap(highs, lows, closes, volumes, VWAP_PERIOD)
deals = []
active_deal = None
cooldown_until = 0
for i in range(VWAP_PERIOD, n):
z = z_scores[i]
# Process active deal
if active_deal:
closed = active_deal.tick(i, highs[i], lows[i], closes[i], z)
if closed:
deals.append(active_deal)
cooldown_until = i + config["cooldown_bars"]
active_deal = None
continue
# Check for new entry (no active deal, past cooldown)
if i < cooldown_until:
continue
threshold = config["z_entry_threshold"]
if z < -threshold:
active_deal = DCADeal(symbol, "LONG", closes[i], i, config)
elif z > threshold:
active_deal = DCADeal(symbol, "SHORT", closes[i], i, config)
# Force-close any open deal at end
if active_deal and not active_deal.closed:
active_deal.close_price = closes[-1]
active_deal._close(n - 1, "END")
deals.append(active_deal)
# Aggregate results
if not deals:
return {
"symbol": symbol, "tf": tf_label, "deals": 0,
"total_pnl": 0, "win_rate": 0, "avg_pnl": 0,
"avg_sos": 0, "avg_duration_bars": 0, "max_dd": 0,
"tp_pct": 0, "ztp_pct": 0, "sl_pct": 0,
}
total_pnl = sum(d.pnl for d in deals)
wins = sum(1 for d in deals if d.pnl > 0)
avg_sos = sum(d.so_filled for d in deals) / len(deals)
avg_duration = sum(d.close_bar - d.entry_bar for d in deals) / len(deals)
max_dd = max(d.max_drawdown_pct for d in deals)
avg_invested = sum(d.total_invested for d in deals) / len(deals)
tp_count = sum(1 for d in deals if d.close_reason == "TP%")
ztp_count = sum(1 for d in deals if d.close_reason == "Z-TP")
sl_count = sum(1 for d in deals if d.close_reason == "SL")
end_count = sum(1 for d in deals if d.close_reason == "END")
# Long vs Short breakdown
longs = [d for d in deals if d.side == "LONG"]
shorts = [d for d in deals if d.side == "SHORT"]
long_pnl = sum(d.pnl for d in longs)
short_pnl = sum(d.pnl for d in shorts)
long_wr = sum(1 for d in longs if d.pnl > 0) / len(longs) * 100 if longs else 0
short_wr = sum(1 for d in shorts if d.pnl > 0) / len(shorts) * 100 if shorts else 0
return {
"symbol": symbol,
"tf": tf_label,
"deals": len(deals),
"total_pnl": round(total_pnl, 4),
"win_rate": round(wins / len(deals) * 100, 1),
"avg_pnl": round(total_pnl / len(deals), 4),
"avg_sos": round(avg_sos, 1),
"avg_duration_bars": round(avg_duration, 0),
"avg_invested": round(avg_invested, 2),
"max_dd_pct": round(max_dd, 2),
"tp_pct": tp_count,
"ztp_count": ztp_count,
"sl_count": sl_count,
"end_count": end_count,
"longs": len(longs),
"shorts": len(shorts),
"long_pnl": round(long_pnl, 4),
"short_pnl": round(short_pnl, 4),
"long_wr": round(long_wr, 1),
"short_wr": round(short_wr, 1),
"total_fees": round(sum(d.fees for d in deals), 4),
# Per-deal breakdown
"deals_detail": [
{
"side": d.side,
"entry": round(d.orders[0]["price"], 6),
"avg": round(d.avg_entry, 6),
"exit": round(d.close_price, 6),
"pnl": round(d.pnl, 4),
"sos": d.so_filled,
"reason": d.close_reason,
"bars": d.close_bar - d.entry_bar,
"invested": round(d.total_invested, 2),
"dd": round(d.max_drawdown_pct, 2),
}
for d in deals
],
}
# ============================================================
# MAIN
# ============================================================
def main():
print("=" * 70)
print(" DCA + Z-VWAP Backtest โ 1m vs 5m, 7 days")
print("=" * 70)
print(f"\nConfig: BO=${DCA_CONFIG['base_order_usd']}, SO=${DCA_CONFIG['safety_order_usd']}, "
f"MaxSO={DCA_CONFIG['max_safety_orders']}, "
f"StepScale={DCA_CONFIG['step_scale']}x, VolScale={DCA_CONFIG['volume_scale']}x")
print(f"TP={DCA_CONFIG['take_profit_pct']}%, SL={DCA_CONFIG['stop_loss_pct']}%, "
f"Z-entry={DCA_CONFIG['z_entry_threshold']}, Z-TP={DCA_CONFIG['z_tp_threshold']}, "
f"Leverage={DCA_CONFIG['leverage']}x")
print(f"Cooldown={DCA_CONFIG['cooldown_bars']} bars, VWAP period={VWAP_PERIOD}")
print()
all_results = []
for symbol in SYMBOLS:
print(f"โโโ {symbol} โโโ")
for interval, tf_label in [("1", "1m"), ("5", "5m")]:
print(f" Fetching {tf_label} data...", end=" ", flush=True)
klines = fetch_klines(symbol, interval, days=7)
print(f"{len(klines)} bars")
if len(klines) < 100:
print(f" โ ๏ธ Not enough data, skipping")
continue
result = run_backtest(symbol, klines, DCA_CONFIG, tf_label)
if result:
all_results.append(result)
wr = result["win_rate"]
wr_emoji = "๐ข" if wr >= 60 else "๐ก" if wr >= 45 else "๐ด"
pnl_emoji = "๐ข" if result["total_pnl"] > 0 else "๐ด"
print(f" {tf_label}: {pnl_emoji} PnL=${result['total_pnl']:+.2f} | "
f"{wr_emoji} WR={wr}% | "
f"Deals={result['deals']} (L:{result['longs']}/S:{result['shorts']}) | "
f"AvgSO={result['avg_sos']} | "
f"MaxDD={result['max_dd_pct']:.1f}%")
print(f" TP%={result['tp_pct']} Z-TP={result['ztp_count']} "
f"SL={result['sl_count']} END={result['end_count']} | "
f"Fees=${result['total_fees']:.2f}")
print(f" Long: PnL=${result['long_pnl']:+.2f} WR={result['long_wr']}% | "
f"Short: PnL=${result['short_pnl']:+.2f} WR={result['short_wr']}%")
print()
# Summary comparison
print("\n" + "=" * 70)
print(" SUMMARY: 1m vs 5m")
print("=" * 70)
for tf in ["1m", "5m"]:
tf_results = [r for r in all_results if r["tf"] == tf]
if not tf_results:
continue
total_pnl = sum(r["total_pnl"] for r in tf_results)
total_deals = sum(r["deals"] for r in tf_results)
total_wins = sum(r["deals"] * r["win_rate"] / 100 for r in tf_results)
avg_wr = total_wins / total_deals * 100 if total_deals > 0 else 0
max_dd = max(r["max_dd_pct"] for r in tf_results) if tf_results else 0
total_fees = sum(r["total_fees"] for r in tf_results)
print(f"\n ๐ {tf}:")
print(f" Total PnL: ${total_pnl:+.2f} (after fees ${total_fees:.2f})")
print(f" Deals: {total_deals} | Avg WR: {avg_wr:.1f}%")
print(f" Max DD: {max_dd:.1f}%")
# Per-symbol summary
for r in tf_results:
emoji = "๐ข" if r["total_pnl"] > 0 else "๐ด"
print(f" {emoji} {r['symbol']}: ${r['total_pnl']:+.2f} ({r['deals']} deals, WR={r['win_rate']}%)")
# Save results
output_path = os.path.join(os.path.dirname(__file__), "results_dca_zvwap.json")
save_data = {
"config": DCA_CONFIG,
"vwap_period": VWAP_PERIOD,
"symbols": SYMBOLS,
"results": [{k: v for k, v in r.items() if k != "deals_detail"} for r in all_results],
"details": {f"{r['symbol']}_{r['tf']}": r.get("deals_detail", []) for r in all_results},
}
with open(output_path, "w") as f:
json.dump(save_data, f, indent=2)
print(f"\n๐พ Results saved to {output_path}")
if __name__ == "__main__":
main()