← Назад"""
NATR-Focused Sweep — DCA Z-VWAP (NO Safety Orders)
====================================================
Focus: deep NATR exploration with best params from prior sweeps.
Grid:
- NATR min: 0.3, 0.5, 0.75, 1.0, 1.25, 1.5, 2.0
- NATR max: 0(off), 1.5, 2.0, 2.5, 3.0, 3.5, 5.0
- Z Entry: 1.8, 2.0, 2.5, 3.0
- Z Max: 0(off), 2.5, 3.5
- TP%: 1.0, 1.5, 2.0, 3.0
- SL%: 3.0, 5.0, 8.0
- CHOP min: 0(off), 45, 50, 55
- Cooldown: 6, 12
Data: 30d, 5m candles, top50 by vol (>$20M)
Usage: python3 backtests/backtest_natr_sweep.py
"""
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import json, time
import numpy as np
from datetime import datetime
from itertools import product
from pybit.unified_trading import HTTP
ORDER_USD = 7.0
LEVERAGE = 3
VWAP_PERIOD = 50
TAKER_FEE = 0.00055
TIMEFRAME = "5"
DAYS = 30
MIN_VOLUME_24H = 20_000_000
BLACKLIST = {"BTCUSDT", "ETHUSDT", "TRXUSDT", "USDCUSDT", "BTCPERP", "ETHPERP"}
GRID = {
"z_entry": [1.8, 2.0, 2.5, 3.0],
"z_max": [0, 2.5, 3.5],
"tp_pct": [1.0, 1.5, 2.0, 3.0],
"sl_pct": [3.0, 5.0, 8.0],
"natr_min": [0.3, 0.5, 0.75, 1.0, 1.25, 1.5, 2.0],
"natr_max": [0, 1.5, 2.0, 2.5, 3.0, 3.5, 5.0],
"chop_min": [0, 45, 50, 55],
"cooldown": [6, 12],
}
session = HTTP(testnet=False)
def get_symbols():
resp = session.get_tickers(category="linear")
if resp["retCode"] != 0:
return []
out = []
for t in resp["result"]["list"]:
s = t["symbol"]
if not s.endswith("USDT") or s in BLACKLIST:
continue
v = float(t.get("turnover24h", 0))
if v >= MIN_VOLUME_24H:
out.append({"symbol": s, "volume_24h": v})
out.sort(key=lambda x: x["volume_24h"], reverse=True)
return out[:50]
def fetch_klines(symbol, interval, days):
kls = []
need = days * 24 * 60 // int(interval)
end = int(datetime.now().timestamp() * 1000)
while len(kls) < need:
try:
r = session.get_kline(category="linear", symbol=symbol,
interval=interval, limit=1000, end=end)
if r["retCode"] != 0:
break
items = r["result"]["list"]
if not items:
break
for i in items:
kls.append({"ts": int(i[0]), "h": float(i[2]),
"l": float(i[3]), "c": float(i[4]), "v": float(i[5])})
end = int(items[-1][0]) - 1
if len(items) < 1000:
break
except:
break
kls.reverse()
seen = set()
u = []
for k in kls:
if k["ts"] not in seen:
seen.add(k["ts"])
u.append(k)
return u[-need:] if len(u) > need else u
def calc_indicators(c, h, l, v):
n = len(c)
z = np.zeros(n)
for i in range(VWAP_PERIOD, n):
hh, ll, cc, vv = h[i-VWAP_PERIOD:i], l[i-VWAP_PERIOD:i], c[i-VWAP_PERIOD:i], v[i-VWAP_PERIOD:i]
tp = (hh + ll + cc) / 3
ctv = np.cumsum(tp * vv)
cv = np.cumsum(vv)
cv_s = np.where(cv == 0, 1, cv)
va = ctv / cv_s
dev = cc - va
std = np.std(dev)
if std > 0:
z[i] = (c[i] - va[-1]) / std
natr = np.zeros(n)
for i in range(14, n):
trs = [max(h[j]-l[j], abs(h[j]-c[j-1]), abs(l[j]-c[j-1])) for j in range(i-13, i+1)]
natr[i] = (np.mean(trs) / c[i]) * 100 if c[i] > 0 else 0
chop = np.full(n, 50.0)
for i in range(14, n):
atr_s = sum(max(h[j]-l[j], abs(h[j]-c[j-1]), abs(l[j]-c[j-1])) for j in range(i-13, i+1))
hi, lo = np.max(h[i-13:i+1]), np.min(l[i-13:i+1])
rng = hi - lo
if rng > 0:
chop[i] = 100 * np.log10(atr_s / rng) / np.log10(14)
return z, natr, chop
def sim(z_arr, natr_arr, chop_arr, c, h, l, p):
n = len(c)
deals = []
in_t = False
side = None
ep = 0
eb = 0
cu = 0
ze, zm = p["z_entry"], p["z_max"]
tp, sl = p["tp_pct"], p["sl_pct"]
nm, nx, cm, cd = p["natr_min"], p["natr_max"], p["chop_min"], p["cooldown"]
for i in range(VWAP_PERIOD, n):
if in_t:
z = z_arr[i]
closed = False
cp = 0
reason = ""
if side == "LONG":
tp_p, sl_p = ep * (1 + tp / 100), ep * (1 - sl / 100)
if l[i] <= sl_p:
cp, reason, closed = sl_p, "SL", True
elif h[i] >= tp_p:
cp, reason, closed = tp_p, "TP", True
elif z >= -0.3 and c[i] > ep:
cp, reason, closed = c[i], "Z-TP", True
else:
tp_p, sl_p = ep * (1 - tp / 100), ep * (1 + sl / 100)
if h[i] >= sl_p:
cp, reason, closed = sl_p, "SL", True
elif l[i] <= tp_p:
cp, reason, closed = tp_p, "TP", True
elif z <= 0.3 and c[i] < ep:
cp, reason, closed = c[i], "Z-TP", True
if not closed and (i - eb) >= 36:
cp, reason, closed = c[i], "TIME", True
if closed:
qty = (ORDER_USD * LEVERAGE) / ep
pnl = qty * (cp - ep) if side == "LONG" else qty * (ep - cp)
pnl -= qty * ep * TAKER_FEE + qty * cp * TAKER_FEE
deals.append({"pnl": pnl, "reason": reason, "natr": natr_arr[eb]})
in_t = False
cu = i + cd
continue
if i < cu:
continue
z = z_arr[i]
if abs(z) <= ze:
continue
if zm > 0 and abs(z) > zm:
continue
if natr_arr[i] < nm:
continue
if nx > 0 and natr_arr[i] > nx:
continue
if cm > 0 and chop_arr[i] < cm:
continue
side = "LONG" if z < -ze else "SHORT"
ep = c[i]
eb = i
in_t = True
if in_t:
qty = (ORDER_USD * LEVERAGE) / ep
cp = c[-1]
pnl = qty * (cp - ep) if side == "LONG" else qty * (ep - cp)
pnl -= qty * ep * TAKER_FEE + qty * cp * TAKER_FEE
deals.append({"pnl": pnl, "reason": "END", "natr": natr_arr[eb]})
return deals
def main():
print("=" * 80)
print(" NATR-FOCUSED SWEEP — Z-VWAP, 30 days, NO SO")
print("=" * 80)
syms = get_symbols()
print(f"\n {len(syms)} symbols, downloading {DAYS}d klines...")
data = {}
for idx, sd in enumerate(syms):
s = sd["symbol"]
print(f" [{idx+1}/{len(syms)}] {s}...", end=" ", flush=True)
time.sleep(0.15)
kl = fetch_klines(s, TIMEFRAME, days=DAYS)
if len(kl) < VWAP_PERIOD + 100:
print("skip")
continue
c = np.array([k["c"] for k in kl])
h = np.array([k["h"] for k in kl])
l = np.array([k["l"] for k in kl])
v = np.array([k["v"] for k in kl])
zz, natr, chop = calc_indicators(c, h, l, v)
data[s] = {"c": c, "h": h, "l": l, "z": zz, "natr": natr, "chop": chop}
print("OK")
print(f"\n {len(data)} symbols loaded.")
# Skip invalid combos: natr_max <= natr_min, z_max <= z_entry
keys = list(GRID.keys())
vals = list(GRID.values())
all_combos = list(product(*vals))
combos = []
for combo in all_combos:
p = dict(zip(keys, combo))
if p["z_max"] > 0 and p["z_max"] <= p["z_entry"]:
continue
if p["natr_max"] > 0 and p["natr_max"] <= p["natr_min"]:
continue
combos.append(p)
print(f" {len(combos)} valid combos (from {len(all_combos)} total)\n")
print(" Running sweep...")
results = []
t0 = time.time()
for ci, params in enumerate(combos):
all_deals = []
for s, d in data.items():
deals = sim(d["z"], d["natr"], d["chop"], d["c"], d["h"], d["l"], params)
all_deals.extend(deals)
if (ci + 1) % 2000 == 0:
elapsed = time.time() - t0
eta = elapsed / (ci + 1) * (len(combos) - ci - 1)
print(f" ... {ci+1}/{len(combos)} ({elapsed:.0f}s elapsed, ~{eta:.0f}s remaining)")
if len(all_deals) < 10:
continue
pnl = sum(d["pnl"] for d in all_deals)
wins = [d for d in all_deals if d["pnl"] > 0]
losses = [d for d in all_deals if d["pnl"] <= 0]
wr = len(wins) / len(all_deals) * 100
gp = sum(d["pnl"] for d in wins) if wins else 0
gl = abs(sum(d["pnl"] for d in losses)) if losses else 0.001
pf = gp / gl
reasons = {}
for d in all_deals:
reasons[d["reason"]] = reasons.get(d["reason"], 0) + 1
# NATR distribution of entries
natrs = [d["natr"] for d in all_deals if d.get("natr", 0) > 0]
avg_natr = np.mean(natrs) if natrs else 0
results.append({
**params,
"deals": len(all_deals),
"pnl": round(pnl, 2),
"wr": round(wr, 1),
"pf": round(pf, 2),
"avg_w": round(gp / len(wins), 3) if wins else 0,
"avg_l": round(gl / len(losses), 3) if losses else 0,
"avg_natr": round(avg_natr, 2),
"tp_cnt": reasons.get("TP", 0),
"ztp_cnt": reasons.get("Z-TP", 0),
"sl_cnt": reasons.get("SL", 0),
"time_cnt": reasons.get("TIME", 0),
})
elapsed = time.time() - t0
print(f"\n Done! {len(results)} valid combos in {elapsed:.0f}s")
results.sort(key=lambda x: (x["pf"], x["pnl"]), reverse=True)
hdr = f"{'#':>3} {'Z':>4} {'Zm':>4} {'TP':>4} {'SL':>4} {'NRm':>5} {'NRx':>4} {'CHP':>3} {'CD':>3} | {'Dls':>4} {'PnL':>8} {'WR%':>5} {'PF':>5} {'AvW':>6} {'AvL':>6} {'aNR':>4} | {'TP':>3} {'ZTP':>3} {'SL':>3} {'TIM':>3}"
sep = "-" * 110
# ========== NATR ANALYSIS ==========
# Group results by NATR range to show which bands work best
print(f"\n{'='*110}")
print(f" NATR BAND ANALYSIS (min deals 15, PF > 1)")
print(f"{'='*110}")
natr_bands = {}
for r in results:
if r["deals"] < 15 or r["pf"] <= 1.0:
continue
band = f"{r['natr_min']:.2f}-{r['natr_max'] if r['natr_max'] > 0 else 'inf'}"
if band not in natr_bands:
natr_bands[band] = []
natr_bands[band].append(r)
print(f"\n {'NATR Band':<15} {'Combos':>6} {'BestPF':>6} {'BestPnL':>8} {'AvgPF':>6} {'AvgPnL':>8} {'AvgDeals':>8}")
print(f" {'-'*65}")
band_stats = []
for band, rs in sorted(natr_bands.items()):
avg_pf = np.mean([r["pf"] for r in rs])
avg_pnl = np.mean([r["pnl"] for r in rs])
avg_deals = np.mean([r["deals"] for r in rs])
best_pf = max(r["pf"] for r in rs)
best_pnl = max(r["pnl"] for r in rs)
band_stats.append((band, len(rs), best_pf, best_pnl, avg_pf, avg_pnl, avg_deals))
band_stats.sort(key=lambda x: x[4], reverse=True)
for bs in band_stats:
print(f" {bs[0]:<15} {bs[1]:>6} {bs[2]:>6.2f} ${bs[3]:>+7.2f} {bs[4]:>6.2f} ${bs[5]:>+7.2f} {bs[6]:>8.0f}")
# NATR MIN analysis
print(f"\n NATR MIN impact (averaged across all other params, min 15 deals, PF>1):")
print(f" {'NATR_min':>8} {'Combos':>6} {'AvgPF':>6} {'AvgPnL':>8} {'AvgDeals':>8} {'BestPF':>6}")
print(f" {'-'*50}")
for nm in sorted(GRID["natr_min"]):
rs = [r for r in results if r["natr_min"] == nm and r["deals"] >= 15 and r["pf"] > 1]
if not rs:
print(f" {nm:>8.2f} {0:>6} {'n/a':>6} {'n/a':>8} {'n/a':>8} {'n/a':>6}")
continue
print(f" {nm:>8.2f} {len(rs):>6} {np.mean([r['pf'] for r in rs]):>6.2f} ${np.mean([r['pnl'] for r in rs]):>+7.2f} {np.mean([r['deals'] for r in rs]):>8.0f} {max(r['pf'] for r in rs):>6.2f}")
# NATR MAX analysis
print(f"\n NATR MAX impact (averaged across all other params, min 15 deals, PF>1):")
print(f" {'NATR_max':>8} {'Combos':>6} {'AvgPF':>6} {'AvgPnL':>8} {'AvgDeals':>8} {'BestPF':>6}")
print(f" {'-'*50}")
for nx in sorted(GRID["natr_max"]):
label = "off" if nx == 0 else f"{nx:.1f}"
rs = [r for r in results if r["natr_max"] == nx and r["deals"] >= 15 and r["pf"] > 1]
if not rs:
print(f" {label:>8} {0:>6} {'n/a':>6} {'n/a':>8} {'n/a':>8} {'n/a':>6}")
continue
print(f" {label:>8} {len(rs):>6} {np.mean([r['pf'] for r in rs]):>6.2f} ${np.mean([r['pnl'] for r in rs]):>+7.2f} {np.mean([r['deals'] for r in rs]):>8.0f} {max(r['pf'] for r in rs):>6.2f}")
# ========== TOP TABLES ==========
# TOP 25 by PF (min 15 deals)
print(f"\n{'='*110}")
print(f" TOP 25 by Profit Factor (min 15 deals)")
print(f"{'='*110}")
print(hdr)
print(sep)
shown = 0
for r in results:
if r["deals"] < 15:
continue
shown += 1
if shown > 25:
break
print(f"{shown:>3} {r['z_entry']:>4.1f} {r['z_max']:>4.1f} {r['tp_pct']:>4.1f} {r['sl_pct']:>4.1f} {r['natr_min']:>5.2f} {r['natr_max']:>4.1f} {r['chop_min']:>3.0f} {r['cooldown']:>3} | {r['deals']:>4} ${r['pnl']:>+7.2f} {r['wr']:>5.1f} {r['pf']:>5.2f} ${r['avg_w']:>5.3f} ${r['avg_l']:>5.3f} {r['avg_natr']:>4.1f} | {r['tp_cnt']:>3} {r['ztp_cnt']:>3} {r['sl_cnt']:>3} {r['time_cnt']:>3}")
# TOP 25 by PnL
results_pnl = sorted(results, key=lambda x: x["pnl"], reverse=True)
print(f"\n{'='*110}")
print(f" TOP 25 by PnL (min 15 deals)")
print(f"{'='*110}")
print(hdr)
print(sep)
shown = 0
for r in results_pnl:
if r["deals"] < 15:
continue
shown += 1
if shown > 25:
break
print(f"{shown:>3} {r['z_entry']:>4.1f} {r['z_max']:>4.1f} {r['tp_pct']:>4.1f} {r['sl_pct']:>4.1f} {r['natr_min']:>5.2f} {r['natr_max']:>4.1f} {r['chop_min']:>3.0f} {r['cooldown']:>3} | {r['deals']:>4} ${r['pnl']:>+7.2f} {r['wr']:>5.1f} {r['pf']:>5.2f} ${r['avg_w']:>5.3f} ${r['avg_l']:>5.3f} {r['avg_natr']:>4.1f} | {r['tp_cnt']:>3} {r['ztp_cnt']:>3} {r['sl_cnt']:>3} {r['time_cnt']:>3}")
# BALANCED RECOMMENDED (PF>=1.3, deals>=20, best PnL)
bal = sorted([r for r in results if r["pf"] >= 1.3 and r["deals"] >= 20],
key=lambda x: x["pnl"], reverse=True)
if bal:
print(f"\n{'='*110}")
print(f" RECOMMENDED (PF>=1.3, >=20 deals, by PnL)")
print(f"{'='*110}")
print(hdr)
print(sep)
for i, r in enumerate(bal[:15]):
print(f"{i+1:>3} {r['z_entry']:>4.1f} {r['z_max']:>4.1f} {r['tp_pct']:>4.1f} {r['sl_pct']:>4.1f} {r['natr_min']:>5.2f} {r['natr_max']:>4.1f} {r['chop_min']:>3.0f} {r['cooldown']:>3} | {r['deals']:>4} ${r['pnl']:>+7.2f} {r['wr']:>5.1f} {r['pf']:>5.2f} ${r['avg_w']:>5.3f} ${r['avg_l']:>5.3f} {r['avg_natr']:>4.1f} | {r['tp_cnt']:>3} {r['ztp_cnt']:>3} {r['sl_cnt']:>3} {r['time_cnt']:>3}")
# BEST PER NATR BAND (top 3 per unique natr_min)
print(f"\n{'='*110}")
print(f" BEST COMBO PER NATR_MIN (top 1 by PF, min 15 deals)")
print(f"{'='*110}")
print(hdr)
print(sep)
for nm in sorted(GRID["natr_min"]):
rs = sorted([r for r in results if r["natr_min"] == nm and r["deals"] >= 15],
key=lambda x: (x["pf"], x["pnl"]), reverse=True)
if rs:
r = rs[0]
print(f" * {r['z_entry']:>4.1f} {r['z_max']:>4.1f} {r['tp_pct']:>4.1f} {r['sl_pct']:>4.1f} {r['natr_min']:>5.2f} {r['natr_max']:>4.1f} {r['chop_min']:>3.0f} {r['cooldown']:>3} | {r['deals']:>4} ${r['pnl']:>+7.2f} {r['wr']:>5.1f} {r['pf']:>5.2f} ${r['avg_w']:>5.3f} ${r['avg_l']:>5.3f} {r['avg_natr']:>4.1f} | {r['tp_cnt']:>3} {r['ztp_cnt']:>3} {r['sl_cnt']:>3} {r['time_cnt']:>3}")
# Save
out = os.path.join(os.path.dirname(__file__), "results_natr_sweep.json")
save_data = {
"grid": GRID,
"data_days": DAYS,
"symbols_count": len(data),
"combos_tested": len(combos),
"valid_results": len(results),
"top100_pf": results[:100],
"top100_pnl": sorted(results, key=lambda x: x["pnl"], reverse=True)[:100],
"natr_analysis": {
"by_natr_min": {},
"by_natr_max": {},
}
}
# Save NATR analysis
for nm in GRID["natr_min"]:
rs = [r for r in results if r["natr_min"] == nm and r["deals"] >= 15]
if rs:
save_data["natr_analysis"]["by_natr_min"][str(nm)] = {
"count": len(rs),
"profitable": len([r for r in rs if r["pf"] > 1]),
"avg_pf": round(np.mean([r["pf"] for r in rs]), 2),
"avg_pnl": round(np.mean([r["pnl"] for r in rs]), 2),
}
for nx in GRID["natr_max"]:
rs = [r for r in results if r["natr_max"] == nx and r["deals"] >= 15]
if rs:
save_data["natr_analysis"]["by_natr_max"][str(nx)] = {
"count": len(rs),
"profitable": len([r for r in rs if r["pf"] > 1]),
"avg_pf": round(np.mean([r["pf"] for r in rs]), 2),
"avg_pnl": round(np.mean([r["pnl"] for r in rs]), 2),
}
with open(out, "w") as f:
json.dump(save_data, f, indent=2)
print(f"\n Saved to {out}")
if __name__ == "__main__":
main()