← Back
ā˜†
"""
Backtest: DCA Bot with Z-VWAP Entry Signal
=============================================
Compares 1m vs 5m timeframes over 7 days.

Strategy:
  - Screener calculates Z = (close - VWAP) / std(deviations)
  - Z < -THRESHOLD → LONG DCA deal
  - Z > +THRESHOLD → SHORT DCA deal
  - Safety Orders placed at increasing distances (Martingale)
  - TP: X% above avg entry OR Z crosses back to 0
  - SL: Y% from avg entry

Usage:
  python backtests/backtest_dca_zvwap.py
"""

import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

import json
import numpy as np
from datetime import datetime
from pybit.unified_trading import HTTP

# ============================================================
# DCA PARAMETERS
# ============================================================
DCA_CONFIG = {
    "base_order_usd": 5.0,
    "safety_order_usd": 10.0,
    "max_safety_orders": 6,
    "price_deviation_pct": 1.5,     # % drop for first SO
    "step_scale": 1.5,              # distance multiplier between SOs
    "volume_scale": 1.3,            # size multiplier for each SO
    "take_profit_pct": 1.5,         # TP from average entry
    "stop_loss_pct": 10.0,          # SL from average entry
    "z_entry_threshold": 1.8,       # |Z| > 1.8 to enter
    "z_tp_threshold": 0.3,          # |Z| < 0.3 = fair value exit
    "cooldown_bars": 12,            # bars to wait between deals (1h on 5m, 12m on 1m)
    "leverage": 3,
}

VWAP_PERIOD = 50
MAKER_FEE = 0.0002   # 0.02%
TAKER_FEE = 0.00055  # 0.055%

# Coins to test
SYMBOLS = ["BTCUSDT", "ETHUSDT", "SOLUSDT", "XRPUSDT", "DOGEUSDT"]


# ============================================================
# DATA FETCHING
# ============================================================
def fetch_klines(symbol: str, interval: str, days: int = 7) -> list[dict]:
    """Fetch klines from Bybit (no auth needed for market data)."""
    session = HTTP(testnet=False)
    all_klines = []

    # Bybit returns max 1000 candles per request
    # 7 days: 1m = 10080, 5m = 2016
    bars_needed = days * 24 * 60 // int(interval)
    end_time = int(datetime.now().timestamp() * 1000)

    while len(all_klines) < bars_needed:
        try:
            resp = session.get_kline(
                category="linear",
                symbol=symbol,
                interval=interval,
                limit=1000,
                end=end_time,
            )
            if resp["retCode"] != 0:
                print(f"  API error: {resp['retMsg']}")
                break

            items = resp["result"]["list"]
            if not items:
                break

            for item in items:
                all_klines.append({
                    "ts": int(item[0]),
                    "open": float(item[1]),
                    "high": float(item[2]),
                    "low": float(item[3]),
                    "close": float(item[4]),
                    "volume": float(item[5]),
                })

            # Move end_time to oldest candle - 1ms
            end_time = int(items[-1][0]) - 1

            if len(items) < 1000:
                break

        except Exception as e:
            print(f"  Fetch error: {e}")
            break

    # Reverse to chronological order (oldest first)
    all_klines.reverse()

    # Deduplicate by timestamp
    seen = set()
    unique = []
    for k in all_klines:
        if k["ts"] not in seen:
            seen.add(k["ts"])
            unique.append(k)

    return unique[-bars_needed:] if len(unique) > bars_needed else unique


# ============================================================
# Z-VWAP CALCULATION
# ============================================================
def calc_zvwap(highs, lows, closes, volumes, period=VWAP_PERIOD):
    """Calculate Z-Score from VWAP. Returns array of Z-scores."""
    n = len(closes)
    z_scores = np.full(n, 0.0)

    for i in range(period, n):
        h = highs[i-period:i]
        l = lows[i-period:i]
        c = closes[i-period:i]
        v = volumes[i-period:i]

        tp = (h + l + c) / 3
        cum_tp_vol = np.cumsum(tp * v)
        cum_vol = np.cumsum(v)
        cum_vol_safe = np.where(cum_vol == 0, 1, cum_vol)
        vwap_arr = cum_tp_vol / cum_vol_safe

        vwap = vwap_arr[-1]
        deviations = c - vwap_arr
        std = np.std(deviations)

        if std > 0:
            z_scores[i] = (closes[i] - vwap) / std

    return z_scores


# ============================================================
# DCA DEAL SIMULATOR
# ============================================================
class DCADeal:
    """Simulates one DCA deal lifecycle."""

    def __init__(self, symbol, side, entry_price, entry_bar, config):
        self.symbol = symbol
        self.side = side  # "LONG" or "SHORT"
        self.config = config
        self.entry_bar = entry_bar

        # Base order
        leverage = config["leverage"]
        self.orders = [{
            "price": entry_price,
            "usd": config["base_order_usd"],
            "qty": (config["base_order_usd"] * leverage) / entry_price,
            "type": "BO",
        }]

        # Pre-calculate safety order trigger prices
        self.so_triggers = []
        cumul_deviation = 0
        for i in range(config["max_safety_orders"]):
            deviation = config["price_deviation_pct"] * (config["step_scale"] ** i)
            cumul_deviation += deviation
            so_size = config["safety_order_usd"] * (config["volume_scale"] ** i)

            if side == "LONG":
                trigger_price = entry_price * (1 - cumul_deviation / 100)
            else:
                trigger_price = entry_price * (1 + cumul_deviation / 100)

            self.so_triggers.append({
                "price": trigger_price,
                "usd": so_size,
                "deviation_pct": cumul_deviation,
                "filled": False,
            })

        self.so_filled = 0
        self.closed = False
        self.close_price = 0
        self.close_bar = 0
        self.close_reason = ""
        self.pnl = 0
        self.fees = 0
        self.max_drawdown_pct = 0

    @property
    def avg_entry(self):
        total_qty = sum(o["qty"] for o in self.orders)
        total_cost = sum(o["qty"] * o["price"] for o in self.orders)
        return total_cost / total_qty if total_qty > 0 else 0

    @property
    def total_qty(self):
        return sum(o["qty"] for o in self.orders)

    @property
    def total_invested(self):
        return sum(o["usd"] for o in self.orders)

    def tick(self, bar_idx, high, low, close, z_score):
        """Process one bar. Returns True if deal closed."""
        if self.closed:
            return True

        leverage = self.config["leverage"]

        # Check safety order fills
        for i, so in enumerate(self.so_triggers):
            if so["filled"]:
                continue

            filled = False
            if self.side == "LONG" and low <= so["price"]:
                filled = True
            elif self.side == "SHORT" and high >= so["price"]:
                filled = True

            if filled:
                qty = (so["usd"] * leverage) / so["price"]
                self.orders.append({
                    "price": so["price"],
                    "usd": so["usd"],
                    "qty": qty,
                    "type": f"SO{i+1}",
                })
                so["filled"] = True
                self.so_filled += 1
                self.fees += so["usd"] * leverage * MAKER_FEE  # SO = limit = maker

        avg = self.avg_entry

        # Track max drawdown
        if self.side == "LONG":
            dd_pct = (avg - low) / avg * 100
        else:
            dd_pct = (high - avg) / avg * 100
        self.max_drawdown_pct = max(self.max_drawdown_pct, dd_pct)

        # Check TP (from average entry)
        tp_price = avg * (1 + self.config["take_profit_pct"] / 100) if self.side == "LONG" else \
                   avg * (1 - self.config["take_profit_pct"] / 100)

        tp_hit = False
        if self.side == "LONG" and high >= tp_price:
            tp_hit = True
            self.close_price = tp_price
        elif self.side == "SHORT" and low <= tp_price:
            tp_hit = True
            self.close_price = tp_price

        # Check Z-reversion TP (Z crosses back past threshold)
        z_tp = False
        if self.side == "LONG" and z_score >= -self.config["z_tp_threshold"]:
            # Only if we're in profit
            if close > avg:
                z_tp = True
                self.close_price = close
        elif self.side == "SHORT" and z_score <= self.config["z_tp_threshold"]:
            if close < avg:
                z_tp = True
                self.close_price = close

        # Check SL
        sl_price_long = avg * (1 - self.config["stop_loss_pct"] / 100)
        sl_price_short = avg * (1 + self.config["stop_loss_pct"] / 100)

        sl_hit = False
        if self.side == "LONG" and low <= sl_price_long:
            sl_hit = True
            self.close_price = sl_price_long
        elif self.side == "SHORT" and high >= sl_price_short:
            sl_hit = True
            self.close_price = sl_price_short

        # Priority: SL > TP > Z-reversion
        if sl_hit:
            self._close(bar_idx, "SL")
            return True
        elif tp_hit:
            self._close(bar_idx, "TP%")
            return True
        elif z_tp:
            self._close(bar_idx, "Z-TP")
            return True

        return False

    def _close(self, bar_idx, reason):
        self.closed = True
        self.close_bar = bar_idx
        self.close_reason = reason

        # Calculate PnL
        total_qty = self.total_qty
        avg = self.avg_entry
        leverage = self.config["leverage"]

        if self.side == "LONG":
            self.pnl = total_qty * (self.close_price - avg)
        else:
            self.pnl = total_qty * (avg - self.close_price)

        # Add exit fee (taker for TP/SL market exit)
        exit_fee = total_qty * self.close_price * TAKER_FEE
        # Add entry fee (taker for BO market entry)
        entry_fee = self.orders[0]["qty"] * self.orders[0]["price"] * TAKER_FEE
        self.fees += exit_fee + entry_fee
        self.pnl -= self.fees


# ============================================================
# BACKTEST ENGINE
# ============================================================
def run_backtest(symbol, klines, config, tf_label):
    """Run DCA backtest on kline data."""
    n = len(klines)
    if n < VWAP_PERIOD + 10:
        return None

    closes = np.array([k["close"] for k in klines])
    highs = np.array([k["high"] for k in klines])
    lows = np.array([k["low"] for k in klines])
    volumes = np.array([k["volume"] for k in klines])

    # Calculate Z-scores
    z_scores = calc_zvwap(highs, lows, closes, volumes, VWAP_PERIOD)

    deals = []
    active_deal = None
    cooldown_until = 0

    for i in range(VWAP_PERIOD, n):
        z = z_scores[i]

        # Process active deal
        if active_deal:
            closed = active_deal.tick(i, highs[i], lows[i], closes[i], z)
            if closed:
                deals.append(active_deal)
                cooldown_until = i + config["cooldown_bars"]
                active_deal = None
            continue

        # Check for new entry (no active deal, past cooldown)
        if i < cooldown_until:
            continue

        threshold = config["z_entry_threshold"]

        if z < -threshold:
            active_deal = DCADeal(symbol, "LONG", closes[i], i, config)
        elif z > threshold:
            active_deal = DCADeal(symbol, "SHORT", closes[i], i, config)

    # Force-close any open deal at end
    if active_deal and not active_deal.closed:
        active_deal.close_price = closes[-1]
        active_deal._close(n - 1, "END")
        deals.append(active_deal)

    # Aggregate results
    if not deals:
        return {
            "symbol": symbol, "tf": tf_label, "deals": 0,
            "total_pnl": 0, "win_rate": 0, "avg_pnl": 0,
            "avg_sos": 0, "avg_duration_bars": 0, "max_dd": 0,
            "tp_pct": 0, "ztp_pct": 0, "sl_pct": 0,
        }

    total_pnl = sum(d.pnl for d in deals)
    wins = sum(1 for d in deals if d.pnl > 0)
    avg_sos = sum(d.so_filled for d in deals) / len(deals)
    avg_duration = sum(d.close_bar - d.entry_bar for d in deals) / len(deals)
    max_dd = max(d.max_drawdown_pct for d in deals)
    avg_invested = sum(d.total_invested for d in deals) / len(deals)

    tp_count = sum(1 for d in deals if d.close_reason == "TP%")
    ztp_count = sum(1 for d in deals if d.close_reason == "Z-TP")
    sl_count = sum(1 for d in deals if d.close_reason == "SL")
    end_count = sum(1 for d in deals if d.close_reason == "END")

    # Long vs Short breakdown
    longs = [d for d in deals if d.side == "LONG"]
    shorts = [d for d in deals if d.side == "SHORT"]
    long_pnl = sum(d.pnl for d in longs)
    short_pnl = sum(d.pnl for d in shorts)
    long_wr = sum(1 for d in longs if d.pnl > 0) / len(longs) * 100 if longs else 0
    short_wr = sum(1 for d in shorts if d.pnl > 0) / len(shorts) * 100 if shorts else 0

    return {
        "symbol": symbol,
        "tf": tf_label,
        "deals": len(deals),
        "total_pnl": round(total_pnl, 4),
        "win_rate": round(wins / len(deals) * 100, 1),
        "avg_pnl": round(total_pnl / len(deals), 4),
        "avg_sos": round(avg_sos, 1),
        "avg_duration_bars": round(avg_duration, 0),
        "avg_invested": round(avg_invested, 2),
        "max_dd_pct": round(max_dd, 2),
        "tp_pct": tp_count,
        "ztp_count": ztp_count,
        "sl_count": sl_count,
        "end_count": end_count,
        "longs": len(longs),
        "shorts": len(shorts),
        "long_pnl": round(long_pnl, 4),
        "short_pnl": round(short_pnl, 4),
        "long_wr": round(long_wr, 1),
        "short_wr": round(short_wr, 1),
        "total_fees": round(sum(d.fees for d in deals), 4),
        # Per-deal breakdown
        "deals_detail": [
            {
                "side": d.side,
                "entry": round(d.orders[0]["price"], 6),
                "avg": round(d.avg_entry, 6),
                "exit": round(d.close_price, 6),
                "pnl": round(d.pnl, 4),
                "sos": d.so_filled,
                "reason": d.close_reason,
                "bars": d.close_bar - d.entry_bar,
                "invested": round(d.total_invested, 2),
                "dd": round(d.max_drawdown_pct, 2),
            }
            for d in deals
        ],
    }


# ============================================================
# MAIN
# ============================================================
def main():
    print("=" * 70)
    print("  DCA + Z-VWAP Backtest — 1m vs 5m, 7 days")
    print("=" * 70)
    print(f"\nConfig: BO=${DCA_CONFIG['base_order_usd']}, SO=${DCA_CONFIG['safety_order_usd']}, "
          f"MaxSO={DCA_CONFIG['max_safety_orders']}, "
          f"StepScale={DCA_CONFIG['step_scale']}x, VolScale={DCA_CONFIG['volume_scale']}x")
    print(f"TP={DCA_CONFIG['take_profit_pct']}%, SL={DCA_CONFIG['stop_loss_pct']}%, "
          f"Z-entry={DCA_CONFIG['z_entry_threshold']}, Z-TP={DCA_CONFIG['z_tp_threshold']}, "
          f"Leverage={DCA_CONFIG['leverage']}x")
    print(f"Cooldown={DCA_CONFIG['cooldown_bars']} bars, VWAP period={VWAP_PERIOD}")
    print()

    all_results = []

    for symbol in SYMBOLS:
        print(f"━━━ {symbol} ━━━")

        for interval, tf_label in [("1", "1m"), ("5", "5m")]:
            print(f"  Fetching {tf_label} data...", end=" ", flush=True)
            klines = fetch_klines(symbol, interval, days=7)
            print(f"{len(klines)} bars")

            if len(klines) < 100:
                print(f"  āš ļø Not enough data, skipping")
                continue

            result = run_backtest(symbol, klines, DCA_CONFIG, tf_label)
            if result:
                all_results.append(result)

                wr = result["win_rate"]
                wr_emoji = "🟢" if wr >= 60 else "🟔" if wr >= 45 else "šŸ”“"
                pnl_emoji = "🟢" if result["total_pnl"] > 0 else "šŸ”“"

                print(f"  {tf_label}: {pnl_emoji} PnL=${result['total_pnl']:+.2f} | "
                      f"{wr_emoji} WR={wr}% | "
                      f"Deals={result['deals']} (L:{result['longs']}/S:{result['shorts']}) | "
                      f"AvgSO={result['avg_sos']} | "
                      f"MaxDD={result['max_dd_pct']:.1f}%")
                print(f"        TP%={result['tp_pct']} Z-TP={result['ztp_count']} "
                      f"SL={result['sl_count']} END={result['end_count']} | "
                      f"Fees=${result['total_fees']:.2f}")
                print(f"        Long: PnL=${result['long_pnl']:+.2f} WR={result['long_wr']}% | "
                      f"Short: PnL=${result['short_pnl']:+.2f} WR={result['short_wr']}%")
        print()

    # Summary comparison
    print("\n" + "=" * 70)
    print("  SUMMARY: 1m vs 5m")
    print("=" * 70)

    for tf in ["1m", "5m"]:
        tf_results = [r for r in all_results if r["tf"] == tf]
        if not tf_results:
            continue

        total_pnl = sum(r["total_pnl"] for r in tf_results)
        total_deals = sum(r["deals"] for r in tf_results)
        total_wins = sum(r["deals"] * r["win_rate"] / 100 for r in tf_results)
        avg_wr = total_wins / total_deals * 100 if total_deals > 0 else 0
        max_dd = max(r["max_dd_pct"] for r in tf_results) if tf_results else 0
        total_fees = sum(r["total_fees"] for r in tf_results)

        print(f"\n  šŸ“Š {tf}:")
        print(f"     Total PnL: ${total_pnl:+.2f} (after fees ${total_fees:.2f})")
        print(f"     Deals: {total_deals} | Avg WR: {avg_wr:.1f}%")
        print(f"     Max DD: {max_dd:.1f}%")

        # Per-symbol summary
        for r in tf_results:
            emoji = "🟢" if r["total_pnl"] > 0 else "šŸ”“"
            print(f"     {emoji} {r['symbol']}: ${r['total_pnl']:+.2f} ({r['deals']} deals, WR={r['win_rate']}%)")

    # Save results
    output_path = os.path.join(os.path.dirname(__file__), "results_dca_zvwap.json")
    save_data = {
        "config": DCA_CONFIG,
        "vwap_period": VWAP_PERIOD,
        "symbols": SYMBOLS,
        "results": [{k: v for k, v in r.items() if k != "deals_detail"} for r in all_results],
        "details": {f"{r['symbol']}_{r['tf']}": r.get("deals_detail", []) for r in all_results},
    }
    with open(output_path, "w") as f:
        json.dump(save_data, f, indent=2)
    print(f"\nšŸ’¾ Results saved to {output_path}")


if __name__ == "__main__":
    main()

šŸ“œ Git History

dd32dfdchore: initial commit — version control setup5 weeks ago
Show last diff
Loading...