← Back
"""
Backtest: DCA Z-VWAP with NATR filter — ENA 5m 7 days
=======================================================
Compares: no NATR filter vs NATR >= 1.0%

Usage:
  cd /home/app/trading-bot-bybit && source venv/bin/activate
  python backtests/backtest_dca_natr.py
"""

import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

import numpy as np
from datetime import datetime
from pybit.unified_trading import HTTP

# ============================================================
# CONFIG (matches live bot)
# ============================================================
CONFIG = {
    "base_order_usd": 5.0,
    "safety_order_usd": 7.0,
    "max_safety_orders": 4,
    "price_deviation_pct": 1.5,
    "step_scale": 1.5,
    "volume_scale": 1.3,
    "take_profit_pct": 1.5,
    "stop_loss_pct": 10.0,
    "z_entry_threshold": 1.8,
    "z_tp_threshold": 0.3,
    "cooldown_bars": 12,
    "leverage": 3,
}

VWAP_PERIOD = 50
ATR_PERIOD = 14
MAKER_FEE = 0.0002
TAKER_FEE = 0.00055

SYMBOL = "WIFUSDT"
INTERVAL = "5"
DAYS = 7


# ============================================================
# DATA
# ============================================================
def fetch_klines(symbol, interval, days=7):
    session = HTTP(testnet=False)
    all_klines = []
    bars_needed = days * 24 * 60 // int(interval)
    end_time = int(datetime.now().timestamp() * 1000)

    while len(all_klines) < bars_needed:
        resp = session.get_kline(
            category="linear", symbol=symbol,
            interval=interval, limit=1000, end=end_time,
        )
        if resp["retCode"] != 0:
            break
        items = resp["result"]["list"]
        if not items:
            break
        for item in items:
            all_klines.append({
                "ts": int(item[0]),
                "open": float(item[1]),
                "high": float(item[2]),
                "low": float(item[3]),
                "close": float(item[4]),
                "volume": float(item[5]),
            })
        end_time = int(items[-1][0]) - 1
        if len(items) < 1000:
            break

    all_klines.reverse()
    seen = set()
    unique = [k for k in all_klines if k["ts"] not in seen and not seen.add(k["ts"])]
    return unique[-bars_needed:]


# ============================================================
# INDICATORS
# ============================================================
def calc_zvwap(highs, lows, closes, volumes, period=VWAP_PERIOD):
    n = len(closes)
    z_scores = np.full(n, 0.0)
    for i in range(period, n):
        h, l, c, v = highs[i-period:i], lows[i-period:i], closes[i-period:i], volumes[i-period:i]
        tp = (h + l + c) / 3
        cum_tp_vol = np.cumsum(tp * v)
        cum_vol = np.where(np.cumsum(v) == 0, 1, np.cumsum(v))
        vwap_arr = cum_tp_vol / cum_vol
        vwap = vwap_arr[-1]
        std = np.std(c - vwap_arr)
        if std > 0:
            z_scores[i] = (closes[i] - vwap) / std
    return z_scores


def calc_natr(highs, lows, closes, period=ATR_PERIOD):
    """NATR = ATR / close * 100. Returns array."""
    n = len(closes)
    natr = np.full(n, 0.0)
    for i in range(1, n):
        start = max(0, i - period)
        tr = np.maximum(
            highs[start+1:i+1] - lows[start+1:i+1],
            np.maximum(
                np.abs(highs[start+1:i+1] - closes[start:i]),
                np.abs(lows[start+1:i+1] - closes[start:i])
            )
        )
        if len(tr) > 0 and closes[i] > 0:
            natr[i] = (np.mean(tr) / closes[i]) * 100
    return natr


# ============================================================
# DCA DEAL
# ============================================================
class DCADeal:
    def __init__(self, symbol, side, entry_price, entry_bar, config):
        self.symbol = symbol
        self.side = side
        self.config = config
        self.entry_bar = entry_bar
        lev = config["leverage"]

        self.orders = [{
            "price": entry_price,
            "usd": config["base_order_usd"],
            "qty": (config["base_order_usd"] * lev) / entry_price,
            "type": "BO",
        }]

        self.so_triggers = []
        cumul = 0
        for i in range(config["max_safety_orders"]):
            dev = config["price_deviation_pct"] * (config["step_scale"] ** i)
            cumul += dev
            so_size = config["safety_order_usd"] * (config["volume_scale"] ** i)
            if side == "LONG":
                tp = entry_price * (1 - cumul / 100)
            else:
                tp = entry_price * (1 + cumul / 100)
            self.so_triggers.append({"price": tp, "usd": so_size, "filled": False})

        self.so_filled = 0
        self.closed = False
        self.close_price = 0
        self.close_bar = 0
        self.close_reason = ""
        self.pnl = 0
        self.fees = 0

    @property
    def avg_entry(self):
        tq = sum(o["qty"] for o in self.orders)
        tc = sum(o["qty"] * o["price"] for o in self.orders)
        return tc / tq if tq > 0 else 0

    @property
    def total_qty(self):
        return sum(o["qty"] for o in self.orders)

    @property
    def total_invested(self):
        return sum(o["usd"] for o in self.orders)

    def tick(self, bar_idx, high, low, close, z_score):
        if self.closed:
            return True
        lev = self.config["leverage"]

        # SO fills
        for i, so in enumerate(self.so_triggers):
            if so["filled"]:
                continue
            filled = (self.side == "LONG" and low <= so["price"]) or \
                     (self.side == "SHORT" and high >= so["price"])
            if filled:
                qty = (so["usd"] * lev) / so["price"]
                self.orders.append({"price": so["price"], "usd": so["usd"], "qty": qty})
                so["filled"] = True
                self.so_filled += 1
                self.fees += so["usd"] * lev * MAKER_FEE

        avg = self.avg_entry
        tp_pct = self.config["take_profit_pct"]
        sl_pct = self.config["stop_loss_pct"]

        # TP%
        if self.side == "LONG":
            tp_price = avg * (1 + tp_pct / 100)
            sl_price = avg * (1 - sl_pct / 100)
        else:
            tp_price = avg * (1 - tp_pct / 100)
            sl_price = avg * (1 + sl_pct / 100)

        tp_hit = (self.side == "LONG" and high >= tp_price) or \
                 (self.side == "SHORT" and low <= tp_price)
        sl_hit = (self.side == "LONG" and low <= sl_price) or \
                 (self.side == "SHORT" and high >= sl_price)

        # Z-reversion
        z_tp = False
        if self.side == "LONG" and z_score >= -self.config["z_tp_threshold"] and close > avg:
            z_tp = True
        elif self.side == "SHORT" and z_score <= self.config["z_tp_threshold"] and close < avg:
            z_tp = True

        if sl_hit:
            self.close_price = sl_price
            self._close(bar_idx, "SL")
            return True
        elif tp_hit:
            self.close_price = tp_price
            self._close(bar_idx, "TP%")
            return True
        elif z_tp:
            self.close_price = close
            self._close(bar_idx, "Z-TP")
            return True
        return False

    def _close(self, bar_idx, reason):
        self.closed = True
        self.close_bar = bar_idx
        self.close_reason = reason
        tq = self.total_qty
        avg = self.avg_entry
        if self.side == "LONG":
            self.pnl = tq * (self.close_price - avg)
        else:
            self.pnl = tq * (avg - self.close_price)
        self.fees += tq * self.close_price * TAKER_FEE
        self.fees += self.orders[0]["qty"] * self.orders[0]["price"] * TAKER_FEE
        self.pnl -= self.fees


# ============================================================
# BACKTEST
# ============================================================
def run_backtest(klines, natr_min=0.0, label=""):
    closes = np.array([k["close"] for k in klines])
    highs = np.array([k["high"] for k in klines])
    lows = np.array([k["low"] for k in klines])
    volumes = np.array([k["volume"] for k in klines])

    z_scores = calc_zvwap(highs, lows, closes, volumes)
    natr_arr = calc_natr(highs, lows, closes)

    n = len(closes)
    deals = []
    active = None
    cooldown = 0
    skipped_natr = 0

    for i in range(VWAP_PERIOD, n):
        z = z_scores[i]

        if active:
            if active.tick(i, highs[i], lows[i], closes[i], z):
                deals.append(active)
                cooldown = i + CONFIG["cooldown_bars"]
                active = None
            continue

        if i < cooldown:
            continue

        threshold = CONFIG["z_entry_threshold"]

        if abs(z) > threshold:
            # NATR filter
            if natr_arr[i] < natr_min:
                skipped_natr += 1
                continue

            side = "LONG" if z < -threshold else "SHORT"
            active = DCADeal(SYMBOL, side, closes[i], i, CONFIG)

    if active and not active.closed:
        active.close_price = closes[-1]
        active._close(n - 1, "END")
        deals.append(active)

    # Results
    if not deals:
        return {"label": label, "deals": 0, "pnl": 0, "wr": 0, "skipped": skipped_natr}

    total_pnl = sum(d.pnl for d in deals)
    wins = sum(1 for d in deals if d.pnl > 0)
    wr = wins / len(deals) * 100

    longs = [d for d in deals if d.side == "LONG"]
    shorts = [d for d in deals if d.side == "SHORT"]

    tp_cnt = sum(1 for d in deals if d.close_reason == "TP%")
    ztp_cnt = sum(1 for d in deals if d.close_reason == "Z-TP")
    sl_cnt = sum(1 for d in deals if d.close_reason == "SL")
    end_cnt = sum(1 for d in deals if d.close_reason == "END")

    avg_sos = sum(d.so_filled for d in deals) / len(deals)
    avg_dur = sum(d.close_bar - d.entry_bar for d in deals) / len(deals)
    avg_inv = sum(d.total_invested for d in deals) / len(deals)
    total_fees = sum(d.fees for d in deals)

    return {
        "label": label,
        "deals": len(deals),
        "pnl": round(total_pnl, 4),
        "wr": round(wr, 1),
        "avg_pnl": round(total_pnl / len(deals), 4),
        "avg_sos": round(avg_sos, 1),
        "avg_dur_bars": round(avg_dur, 0),
        "avg_invested": round(avg_inv, 2),
        "fees": round(total_fees, 4),
        "tp": tp_cnt, "ztp": ztp_cnt, "sl": sl_cnt, "end": end_cnt,
        "longs": len(longs), "shorts": len(shorts),
        "long_pnl": round(sum(d.pnl for d in longs), 4),
        "short_pnl": round(sum(d.pnl for d in shorts), 4),
        "long_wr": round(sum(1 for d in longs if d.pnl > 0) / len(longs) * 100, 1) if longs else 0,
        "short_wr": round(sum(1 for d in shorts if d.pnl > 0) / len(shorts) * 100, 1) if shorts else 0,
        "skipped": skipped_natr,
        "detail": [
            {"side": d.side[0], "pnl": f"${d.pnl:+.4f}", "sos": d.so_filled,
             "reason": d.close_reason, "bars": d.close_bar - d.entry_bar,
             "natr": f"{natr_arr[d.entry_bar]:.2f}%"}
            for d in deals
        ],
    }


def print_result(r):
    emoji = "🟢" if r["pnl"] > 0 else "🔴"
    print(f"\n  {emoji} {r['label']}")
    print(f"     PnL: ${r['pnl']:+.4f} | Deals: {r['deals']} | WR: {r['wr']}%")
    if r["deals"] > 0:
        print(f"     Avg PnL: ${r['avg_pnl']:+.4f} | Avg SOs: {r['avg_sos']} | Avg Dur: {r['avg_dur_bars']:.0f} bars")
        print(f"     TP%: {r['tp']} | Z-TP: {r['ztp']} | SL: {r['sl']} | END: {r['end']}")
        print(f"     Long: {r['longs']} (${r['long_pnl']:+.4f} WR={r['long_wr']}%) | "
              f"Short: {r['shorts']} (${r['short_pnl']:+.4f} WR={r['short_wr']}%)")
        print(f"     Fees: ${r['fees']:.4f} | Avg Invested: ${r['avg_invested']:.2f}")
        print(f"     Skipped (NATR): {r['skipped']}")
        print(f"     --- Per deal ---")
        for d in r["detail"]:
            e = "✅" if d["pnl"].startswith("$+") or d["pnl"] == "$+0.0000" else "❌"
            print(f"       {e} {d['side']} {d['pnl']} | SO:{d['sos']} | {d['reason']} | "
                  f"{d['bars']}bars | NATR:{d['natr']}")


def main():
    print("=" * 60)
    print(f"  DCA Z-VWAP + NATR Filter — {SYMBOL} 5m 7d")
    print("=" * 60)
    print(f"  Config: BO=${CONFIG['base_order_usd']}, SO=${CONFIG['safety_order_usd']}, "
          f"MaxSO={CONFIG['max_safety_orders']}, Lev={CONFIG['leverage']}x")
    print(f"  Z-entry={CONFIG['z_entry_threshold']}, Z-TP={CONFIG['z_tp_threshold']}, "
          f"TP={CONFIG['take_profit_pct']}%, SL={CONFIG['stop_loss_pct']}%")

    print(f"\n  Fetching {SYMBOL} 5m data ({DAYS}d)...", end=" ", flush=True)
    klines = fetch_klines(SYMBOL, INTERVAL, DAYS)
    print(f"{len(klines)} bars")

    # Run variants
    r_none = run_backtest(klines, natr_min=0.0, label="No NATR filter")
    r_08 = run_backtest(klines, natr_min=0.8, label="NATR >= 0.8%")
    r_10 = run_backtest(klines, natr_min=1.0, label="NATR >= 1.0%")
    r_12 = run_backtest(klines, natr_min=1.2, label="NATR >= 1.2%")
    r_15 = run_backtest(klines, natr_min=1.5, label="NATR >= 1.5%")

    for r in [r_none, r_08, r_10, r_12, r_15]:
        print_result(r)

    print("\n" + "=" * 60)
    print("  COMPARISON")
    print("=" * 60)
    print(f"  {'Filter':<18} {'Deals':>5} {'PnL':>10} {'WR':>6} {'Skipped':>8}")
    print(f"  {'-'*50}")
    for r in [r_none, r_08, r_10, r_12, r_15]:
        print(f"  {r['label']:<18} {r['deals']:>5} ${r['pnl']:>+8.4f} {r['wr']:>5.1f}% {r['skipped']:>8}")


if __name__ == "__main__":
    main()

📜 Git History

dd32dfdchore: initial commit — version control setup5 weeks ago
Show last diff
Loading...