โ ะะฐะทะฐะด"""
Backtest: DCA Z-VWAP with NATR filter โ ENA 5m 7 days
=======================================================
Compares: no NATR filter vs NATR >= 1.0%
Usage:
cd /home/app/trading-bot-bybit && source venv/bin/activate
python backtests/backtest_dca_natr.py
"""
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import numpy as np
from datetime import datetime
from pybit.unified_trading import HTTP
# ============================================================
# CONFIG (matches live bot)
# ============================================================
CONFIG = {
"base_order_usd": 5.0,
"safety_order_usd": 7.0,
"max_safety_orders": 4,
"price_deviation_pct": 1.5,
"step_scale": 1.5,
"volume_scale": 1.3,
"take_profit_pct": 1.5,
"stop_loss_pct": 10.0,
"z_entry_threshold": 1.8,
"z_tp_threshold": 0.3,
"cooldown_bars": 12,
"leverage": 3,
}
VWAP_PERIOD = 50
ATR_PERIOD = 14
MAKER_FEE = 0.0002
TAKER_FEE = 0.00055
SYMBOL = "WIFUSDT"
INTERVAL = "5"
DAYS = 7
# ============================================================
# DATA
# ============================================================
def fetch_klines(symbol, interval, days=7):
session = HTTP(testnet=False)
all_klines = []
bars_needed = days * 24 * 60 // int(interval)
end_time = int(datetime.now().timestamp() * 1000)
while len(all_klines) < bars_needed:
resp = session.get_kline(
category="linear", symbol=symbol,
interval=interval, limit=1000, end=end_time,
)
if resp["retCode"] != 0:
break
items = resp["result"]["list"]
if not items:
break
for item in items:
all_klines.append({
"ts": int(item[0]),
"open": float(item[1]),
"high": float(item[2]),
"low": float(item[3]),
"close": float(item[4]),
"volume": float(item[5]),
})
end_time = int(items[-1][0]) - 1
if len(items) < 1000:
break
all_klines.reverse()
seen = set()
unique = [k for k in all_klines if k["ts"] not in seen and not seen.add(k["ts"])]
return unique[-bars_needed:]
# ============================================================
# INDICATORS
# ============================================================
def calc_zvwap(highs, lows, closes, volumes, period=VWAP_PERIOD):
n = len(closes)
z_scores = np.full(n, 0.0)
for i in range(period, n):
h, l, c, v = highs[i-period:i], lows[i-period:i], closes[i-period:i], volumes[i-period:i]
tp = (h + l + c) / 3
cum_tp_vol = np.cumsum(tp * v)
cum_vol = np.where(np.cumsum(v) == 0, 1, np.cumsum(v))
vwap_arr = cum_tp_vol / cum_vol
vwap = vwap_arr[-1]
std = np.std(c - vwap_arr)
if std > 0:
z_scores[i] = (closes[i] - vwap) / std
return z_scores
def calc_natr(highs, lows, closes, period=ATR_PERIOD):
"""NATR = ATR / close * 100. Returns array."""
n = len(closes)
natr = np.full(n, 0.0)
for i in range(1, n):
start = max(0, i - period)
tr = np.maximum(
highs[start+1:i+1] - lows[start+1:i+1],
np.maximum(
np.abs(highs[start+1:i+1] - closes[start:i]),
np.abs(lows[start+1:i+1] - closes[start:i])
)
)
if len(tr) > 0 and closes[i] > 0:
natr[i] = (np.mean(tr) / closes[i]) * 100
return natr
# ============================================================
# DCA DEAL
# ============================================================
class DCADeal:
def __init__(self, symbol, side, entry_price, entry_bar, config):
self.symbol = symbol
self.side = side
self.config = config
self.entry_bar = entry_bar
lev = config["leverage"]
self.orders = [{
"price": entry_price,
"usd": config["base_order_usd"],
"qty": (config["base_order_usd"] * lev) / entry_price,
"type": "BO",
}]
self.so_triggers = []
cumul = 0
for i in range(config["max_safety_orders"]):
dev = config["price_deviation_pct"] * (config["step_scale"] ** i)
cumul += dev
so_size = config["safety_order_usd"] * (config["volume_scale"] ** i)
if side == "LONG":
tp = entry_price * (1 - cumul / 100)
else:
tp = entry_price * (1 + cumul / 100)
self.so_triggers.append({"price": tp, "usd": so_size, "filled": False})
self.so_filled = 0
self.closed = False
self.close_price = 0
self.close_bar = 0
self.close_reason = ""
self.pnl = 0
self.fees = 0
@property
def avg_entry(self):
tq = sum(o["qty"] for o in self.orders)
tc = sum(o["qty"] * o["price"] for o in self.orders)
return tc / tq if tq > 0 else 0
@property
def total_qty(self):
return sum(o["qty"] for o in self.orders)
@property
def total_invested(self):
return sum(o["usd"] for o in self.orders)
def tick(self, bar_idx, high, low, close, z_score):
if self.closed:
return True
lev = self.config["leverage"]
# SO fills
for i, so in enumerate(self.so_triggers):
if so["filled"]:
continue
filled = (self.side == "LONG" and low <= so["price"]) or \
(self.side == "SHORT" and high >= so["price"])
if filled:
qty = (so["usd"] * lev) / so["price"]
self.orders.append({"price": so["price"], "usd": so["usd"], "qty": qty})
so["filled"] = True
self.so_filled += 1
self.fees += so["usd"] * lev * MAKER_FEE
avg = self.avg_entry
tp_pct = self.config["take_profit_pct"]
sl_pct = self.config["stop_loss_pct"]
# TP%
if self.side == "LONG":
tp_price = avg * (1 + tp_pct / 100)
sl_price = avg * (1 - sl_pct / 100)
else:
tp_price = avg * (1 - tp_pct / 100)
sl_price = avg * (1 + sl_pct / 100)
tp_hit = (self.side == "LONG" and high >= tp_price) or \
(self.side == "SHORT" and low <= tp_price)
sl_hit = (self.side == "LONG" and low <= sl_price) or \
(self.side == "SHORT" and high >= sl_price)
# Z-reversion
z_tp = False
if self.side == "LONG" and z_score >= -self.config["z_tp_threshold"] and close > avg:
z_tp = True
elif self.side == "SHORT" and z_score <= self.config["z_tp_threshold"] and close < avg:
z_tp = True
if sl_hit:
self.close_price = sl_price
self._close(bar_idx, "SL")
return True
elif tp_hit:
self.close_price = tp_price
self._close(bar_idx, "TP%")
return True
elif z_tp:
self.close_price = close
self._close(bar_idx, "Z-TP")
return True
return False
def _close(self, bar_idx, reason):
self.closed = True
self.close_bar = bar_idx
self.close_reason = reason
tq = self.total_qty
avg = self.avg_entry
if self.side == "LONG":
self.pnl = tq * (self.close_price - avg)
else:
self.pnl = tq * (avg - self.close_price)
self.fees += tq * self.close_price * TAKER_FEE
self.fees += self.orders[0]["qty"] * self.orders[0]["price"] * TAKER_FEE
self.pnl -= self.fees
# ============================================================
# BACKTEST
# ============================================================
def run_backtest(klines, natr_min=0.0, label=""):
closes = np.array([k["close"] for k in klines])
highs = np.array([k["high"] for k in klines])
lows = np.array([k["low"] for k in klines])
volumes = np.array([k["volume"] for k in klines])
z_scores = calc_zvwap(highs, lows, closes, volumes)
natr_arr = calc_natr(highs, lows, closes)
n = len(closes)
deals = []
active = None
cooldown = 0
skipped_natr = 0
for i in range(VWAP_PERIOD, n):
z = z_scores[i]
if active:
if active.tick(i, highs[i], lows[i], closes[i], z):
deals.append(active)
cooldown = i + CONFIG["cooldown_bars"]
active = None
continue
if i < cooldown:
continue
threshold = CONFIG["z_entry_threshold"]
if abs(z) > threshold:
# NATR filter
if natr_arr[i] < natr_min:
skipped_natr += 1
continue
side = "LONG" if z < -threshold else "SHORT"
active = DCADeal(SYMBOL, side, closes[i], i, CONFIG)
if active and not active.closed:
active.close_price = closes[-1]
active._close(n - 1, "END")
deals.append(active)
# Results
if not deals:
return {"label": label, "deals": 0, "pnl": 0, "wr": 0, "skipped": skipped_natr}
total_pnl = sum(d.pnl for d in deals)
wins = sum(1 for d in deals if d.pnl > 0)
wr = wins / len(deals) * 100
longs = [d for d in deals if d.side == "LONG"]
shorts = [d for d in deals if d.side == "SHORT"]
tp_cnt = sum(1 for d in deals if d.close_reason == "TP%")
ztp_cnt = sum(1 for d in deals if d.close_reason == "Z-TP")
sl_cnt = sum(1 for d in deals if d.close_reason == "SL")
end_cnt = sum(1 for d in deals if d.close_reason == "END")
avg_sos = sum(d.so_filled for d in deals) / len(deals)
avg_dur = sum(d.close_bar - d.entry_bar for d in deals) / len(deals)
avg_inv = sum(d.total_invested for d in deals) / len(deals)
total_fees = sum(d.fees for d in deals)
return {
"label": label,
"deals": len(deals),
"pnl": round(total_pnl, 4),
"wr": round(wr, 1),
"avg_pnl": round(total_pnl / len(deals), 4),
"avg_sos": round(avg_sos, 1),
"avg_dur_bars": round(avg_dur, 0),
"avg_invested": round(avg_inv, 2),
"fees": round(total_fees, 4),
"tp": tp_cnt, "ztp": ztp_cnt, "sl": sl_cnt, "end": end_cnt,
"longs": len(longs), "shorts": len(shorts),
"long_pnl": round(sum(d.pnl for d in longs), 4),
"short_pnl": round(sum(d.pnl for d in shorts), 4),
"long_wr": round(sum(1 for d in longs if d.pnl > 0) / len(longs) * 100, 1) if longs else 0,
"short_wr": round(sum(1 for d in shorts if d.pnl > 0) / len(shorts) * 100, 1) if shorts else 0,
"skipped": skipped_natr,
"detail": [
{"side": d.side[0], "pnl": f"${d.pnl:+.4f}", "sos": d.so_filled,
"reason": d.close_reason, "bars": d.close_bar - d.entry_bar,
"natr": f"{natr_arr[d.entry_bar]:.2f}%"}
for d in deals
],
}
def print_result(r):
emoji = "๐ข" if r["pnl"] > 0 else "๐ด"
print(f"\n {emoji} {r['label']}")
print(f" PnL: ${r['pnl']:+.4f} | Deals: {r['deals']} | WR: {r['wr']}%")
if r["deals"] > 0:
print(f" Avg PnL: ${r['avg_pnl']:+.4f} | Avg SOs: {r['avg_sos']} | Avg Dur: {r['avg_dur_bars']:.0f} bars")
print(f" TP%: {r['tp']} | Z-TP: {r['ztp']} | SL: {r['sl']} | END: {r['end']}")
print(f" Long: {r['longs']} (${r['long_pnl']:+.4f} WR={r['long_wr']}%) | "
f"Short: {r['shorts']} (${r['short_pnl']:+.4f} WR={r['short_wr']}%)")
print(f" Fees: ${r['fees']:.4f} | Avg Invested: ${r['avg_invested']:.2f}")
print(f" Skipped (NATR): {r['skipped']}")
print(f" --- Per deal ---")
for d in r["detail"]:
e = "โ
" if d["pnl"].startswith("$+") or d["pnl"] == "$+0.0000" else "โ"
print(f" {e} {d['side']} {d['pnl']} | SO:{d['sos']} | {d['reason']} | "
f"{d['bars']}bars | NATR:{d['natr']}")
def main():
print("=" * 60)
print(f" DCA Z-VWAP + NATR Filter โ {SYMBOL} 5m 7d")
print("=" * 60)
print(f" Config: BO=${CONFIG['base_order_usd']}, SO=${CONFIG['safety_order_usd']}, "
f"MaxSO={CONFIG['max_safety_orders']}, Lev={CONFIG['leverage']}x")
print(f" Z-entry={CONFIG['z_entry_threshold']}, Z-TP={CONFIG['z_tp_threshold']}, "
f"TP={CONFIG['take_profit_pct']}%, SL={CONFIG['stop_loss_pct']}%")
print(f"\n Fetching {SYMBOL} 5m data ({DAYS}d)...", end=" ", flush=True)
klines = fetch_klines(SYMBOL, INTERVAL, DAYS)
print(f"{len(klines)} bars")
# Run variants
r_none = run_backtest(klines, natr_min=0.0, label="No NATR filter")
r_08 = run_backtest(klines, natr_min=0.8, label="NATR >= 0.8%")
r_10 = run_backtest(klines, natr_min=1.0, label="NATR >= 1.0%")
r_12 = run_backtest(klines, natr_min=1.2, label="NATR >= 1.2%")
r_15 = run_backtest(klines, natr_min=1.5, label="NATR >= 1.5%")
for r in [r_none, r_08, r_10, r_12, r_15]:
print_result(r)
print("\n" + "=" * 60)
print(" COMPARISON")
print("=" * 60)
print(f" {'Filter':<18} {'Deals':>5} {'PnL':>10} {'WR':>6} {'Skipped':>8}")
print(f" {'-'*50}")
for r in [r_none, r_08, r_10, r_12, r_15]:
print(f" {r['label']:<18} {r['deals']:>5} ${r['pnl']:>+8.4f} {r['wr']:>5.1f}% {r['skipped']:>8}")
if __name__ == "__main__":
main()