← Назад
""" NATR-Focused Sweep — DCA Z-VWAP (NO Safety Orders) ==================================================== Focus: deep NATR exploration with best params from prior sweeps. Grid: - NATR min: 0.3, 0.5, 0.75, 1.0, 1.25, 1.5, 2.0 - NATR max: 0(off), 1.5, 2.0, 2.5, 3.0, 3.5, 5.0 - Z Entry: 1.8, 2.0, 2.5, 3.0 - Z Max: 0(off), 2.5, 3.5 - TP%: 1.0, 1.5, 2.0, 3.0 - SL%: 3.0, 5.0, 8.0 - CHOP min: 0(off), 45, 50, 55 - Cooldown: 6, 12 Data: 30d, 5m candles, top50 by vol (>$20M) Usage: python3 backtests/backtest_natr_sweep.py """ import sys, os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import json, time import numpy as np from datetime import datetime from itertools import product from pybit.unified_trading import HTTP ORDER_USD = 7.0 LEVERAGE = 3 VWAP_PERIOD = 50 TAKER_FEE = 0.00055 TIMEFRAME = "5" DAYS = 30 MIN_VOLUME_24H = 20_000_000 BLACKLIST = {"BTCUSDT", "ETHUSDT", "TRXUSDT", "USDCUSDT", "BTCPERP", "ETHPERP"} GRID = { "z_entry": [1.8, 2.0, 2.5, 3.0], "z_max": [0, 2.5, 3.5], "tp_pct": [1.0, 1.5, 2.0, 3.0], "sl_pct": [3.0, 5.0, 8.0], "natr_min": [0.3, 0.5, 0.75, 1.0, 1.25, 1.5, 2.0], "natr_max": [0, 1.5, 2.0, 2.5, 3.0, 3.5, 5.0], "chop_min": [0, 45, 50, 55], "cooldown": [6, 12], } session = HTTP(testnet=False) def get_symbols(): resp = session.get_tickers(category="linear") if resp["retCode"] != 0: return [] out = [] for t in resp["result"]["list"]: s = t["symbol"] if not s.endswith("USDT") or s in BLACKLIST: continue v = float(t.get("turnover24h", 0)) if v >= MIN_VOLUME_24H: out.append({"symbol": s, "volume_24h": v}) out.sort(key=lambda x: x["volume_24h"], reverse=True) return out[:50] def fetch_klines(symbol, interval, days): kls = [] need = days * 24 * 60 // int(interval) end = int(datetime.now().timestamp() * 1000) while len(kls) < need: try: r = session.get_kline(category="linear", symbol=symbol, interval=interval, limit=1000, end=end) if r["retCode"] != 0: break items = r["result"]["list"] if not items: break for i in items: kls.append({"ts": int(i[0]), "h": float(i[2]), "l": float(i[3]), "c": float(i[4]), "v": float(i[5])}) end = int(items[-1][0]) - 1 if len(items) < 1000: break except: break kls.reverse() seen = set() u = [] for k in kls: if k["ts"] not in seen: seen.add(k["ts"]) u.append(k) return u[-need:] if len(u) > need else u def calc_indicators(c, h, l, v): n = len(c) z = np.zeros(n) for i in range(VWAP_PERIOD, n): hh, ll, cc, vv = h[i-VWAP_PERIOD:i], l[i-VWAP_PERIOD:i], c[i-VWAP_PERIOD:i], v[i-VWAP_PERIOD:i] tp = (hh + ll + cc) / 3 ctv = np.cumsum(tp * vv) cv = np.cumsum(vv) cv_s = np.where(cv == 0, 1, cv) va = ctv / cv_s dev = cc - va std = np.std(dev) if std > 0: z[i] = (c[i] - va[-1]) / std natr = np.zeros(n) for i in range(14, n): trs = [max(h[j]-l[j], abs(h[j]-c[j-1]), abs(l[j]-c[j-1])) for j in range(i-13, i+1)] natr[i] = (np.mean(trs) / c[i]) * 100 if c[i] > 0 else 0 chop = np.full(n, 50.0) for i in range(14, n): atr_s = sum(max(h[j]-l[j], abs(h[j]-c[j-1]), abs(l[j]-c[j-1])) for j in range(i-13, i+1)) hi, lo = np.max(h[i-13:i+1]), np.min(l[i-13:i+1]) rng = hi - lo if rng > 0: chop[i] = 100 * np.log10(atr_s / rng) / np.log10(14) return z, natr, chop def sim(z_arr, natr_arr, chop_arr, c, h, l, p): n = len(c) deals = [] in_t = False side = None ep = 0 eb = 0 cu = 0 ze, zm = p["z_entry"], p["z_max"] tp, sl = p["tp_pct"], p["sl_pct"] nm, nx, cm, cd = p["natr_min"], p["natr_max"], p["chop_min"], p["cooldown"] for i in range(VWAP_PERIOD, n): if in_t: z = z_arr[i] closed = False cp = 0 reason = "" if side == "LONG": tp_p, sl_p = ep * (1 + tp / 100), ep * (1 - sl / 100) if l[i] <= sl_p: cp, reason, closed = sl_p, "SL", True elif h[i] >= tp_p: cp, reason, closed = tp_p, "TP", True elif z >= -0.3 and c[i] > ep: cp, reason, closed = c[i], "Z-TP", True else: tp_p, sl_p = ep * (1 - tp / 100), ep * (1 + sl / 100) if h[i] >= sl_p: cp, reason, closed = sl_p, "SL", True elif l[i] <= tp_p: cp, reason, closed = tp_p, "TP", True elif z <= 0.3 and c[i] < ep: cp, reason, closed = c[i], "Z-TP", True if not closed and (i - eb) >= 36: cp, reason, closed = c[i], "TIME", True if closed: qty = (ORDER_USD * LEVERAGE) / ep pnl = qty * (cp - ep) if side == "LONG" else qty * (ep - cp) pnl -= qty * ep * TAKER_FEE + qty * cp * TAKER_FEE deals.append({"pnl": pnl, "reason": reason, "natr": natr_arr[eb]}) in_t = False cu = i + cd continue if i < cu: continue z = z_arr[i] if abs(z) <= ze: continue if zm > 0 and abs(z) > zm: continue if natr_arr[i] < nm: continue if nx > 0 and natr_arr[i] > nx: continue if cm > 0 and chop_arr[i] < cm: continue side = "LONG" if z < -ze else "SHORT" ep = c[i] eb = i in_t = True if in_t: qty = (ORDER_USD * LEVERAGE) / ep cp = c[-1] pnl = qty * (cp - ep) if side == "LONG" else qty * (ep - cp) pnl -= qty * ep * TAKER_FEE + qty * cp * TAKER_FEE deals.append({"pnl": pnl, "reason": "END", "natr": natr_arr[eb]}) return deals def main(): print("=" * 80) print(" NATR-FOCUSED SWEEP — Z-VWAP, 30 days, NO SO") print("=" * 80) syms = get_symbols() print(f"\n {len(syms)} symbols, downloading {DAYS}d klines...") data = {} for idx, sd in enumerate(syms): s = sd["symbol"] print(f" [{idx+1}/{len(syms)}] {s}...", end=" ", flush=True) time.sleep(0.15) kl = fetch_klines(s, TIMEFRAME, days=DAYS) if len(kl) < VWAP_PERIOD + 100: print("skip") continue c = np.array([k["c"] for k in kl]) h = np.array([k["h"] for k in kl]) l = np.array([k["l"] for k in kl]) v = np.array([k["v"] for k in kl]) zz, natr, chop = calc_indicators(c, h, l, v) data[s] = {"c": c, "h": h, "l": l, "z": zz, "natr": natr, "chop": chop} print("OK") print(f"\n {len(data)} symbols loaded.") # Skip invalid combos: natr_max <= natr_min, z_max <= z_entry keys = list(GRID.keys()) vals = list(GRID.values()) all_combos = list(product(*vals)) combos = [] for combo in all_combos: p = dict(zip(keys, combo)) if p["z_max"] > 0 and p["z_max"] <= p["z_entry"]: continue if p["natr_max"] > 0 and p["natr_max"] <= p["natr_min"]: continue combos.append(p) print(f" {len(combos)} valid combos (from {len(all_combos)} total)\n") print(" Running sweep...") results = [] t0 = time.time() for ci, params in enumerate(combos): all_deals = [] for s, d in data.items(): deals = sim(d["z"], d["natr"], d["chop"], d["c"], d["h"], d["l"], params) all_deals.extend(deals) if (ci + 1) % 2000 == 0: elapsed = time.time() - t0 eta = elapsed / (ci + 1) * (len(combos) - ci - 1) print(f" ... {ci+1}/{len(combos)} ({elapsed:.0f}s elapsed, ~{eta:.0f}s remaining)") if len(all_deals) < 10: continue pnl = sum(d["pnl"] for d in all_deals) wins = [d for d in all_deals if d["pnl"] > 0] losses = [d for d in all_deals if d["pnl"] <= 0] wr = len(wins) / len(all_deals) * 100 gp = sum(d["pnl"] for d in wins) if wins else 0 gl = abs(sum(d["pnl"] for d in losses)) if losses else 0.001 pf = gp / gl reasons = {} for d in all_deals: reasons[d["reason"]] = reasons.get(d["reason"], 0) + 1 # NATR distribution of entries natrs = [d["natr"] for d in all_deals if d.get("natr", 0) > 0] avg_natr = np.mean(natrs) if natrs else 0 results.append({ **params, "deals": len(all_deals), "pnl": round(pnl, 2), "wr": round(wr, 1), "pf": round(pf, 2), "avg_w": round(gp / len(wins), 3) if wins else 0, "avg_l": round(gl / len(losses), 3) if losses else 0, "avg_natr": round(avg_natr, 2), "tp_cnt": reasons.get("TP", 0), "ztp_cnt": reasons.get("Z-TP", 0), "sl_cnt": reasons.get("SL", 0), "time_cnt": reasons.get("TIME", 0), }) elapsed = time.time() - t0 print(f"\n Done! {len(results)} valid combos in {elapsed:.0f}s") results.sort(key=lambda x: (x["pf"], x["pnl"]), reverse=True) hdr = f"{'#':>3} {'Z':>4} {'Zm':>4} {'TP':>4} {'SL':>4} {'NRm':>5} {'NRx':>4} {'CHP':>3} {'CD':>3} | {'Dls':>4} {'PnL':>8} {'WR%':>5} {'PF':>5} {'AvW':>6} {'AvL':>6} {'aNR':>4} | {'TP':>3} {'ZTP':>3} {'SL':>3} {'TIM':>3}" sep = "-" * 110 # ========== NATR ANALYSIS ========== # Group results by NATR range to show which bands work best print(f"\n{'='*110}") print(f" NATR BAND ANALYSIS (min deals 15, PF > 1)") print(f"{'='*110}") natr_bands = {} for r in results: if r["deals"] < 15 or r["pf"] <= 1.0: continue band = f"{r['natr_min']:.2f}-{r['natr_max'] if r['natr_max'] > 0 else 'inf'}" if band not in natr_bands: natr_bands[band] = [] natr_bands[band].append(r) print(f"\n {'NATR Band':<15} {'Combos':>6} {'BestPF':>6} {'BestPnL':>8} {'AvgPF':>6} {'AvgPnL':>8} {'AvgDeals':>8}") print(f" {'-'*65}") band_stats = [] for band, rs in sorted(natr_bands.items()): avg_pf = np.mean([r["pf"] for r in rs]) avg_pnl = np.mean([r["pnl"] for r in rs]) avg_deals = np.mean([r["deals"] for r in rs]) best_pf = max(r["pf"] for r in rs) best_pnl = max(r["pnl"] for r in rs) band_stats.append((band, len(rs), best_pf, best_pnl, avg_pf, avg_pnl, avg_deals)) band_stats.sort(key=lambda x: x[4], reverse=True) for bs in band_stats: print(f" {bs[0]:<15} {bs[1]:>6} {bs[2]:>6.2f} ${bs[3]:>+7.2f} {bs[4]:>6.2f} ${bs[5]:>+7.2f} {bs[6]:>8.0f}") # NATR MIN analysis print(f"\n NATR MIN impact (averaged across all other params, min 15 deals, PF>1):") print(f" {'NATR_min':>8} {'Combos':>6} {'AvgPF':>6} {'AvgPnL':>8} {'AvgDeals':>8} {'BestPF':>6}") print(f" {'-'*50}") for nm in sorted(GRID["natr_min"]): rs = [r for r in results if r["natr_min"] == nm and r["deals"] >= 15 and r["pf"] > 1] if not rs: print(f" {nm:>8.2f} {0:>6} {'n/a':>6} {'n/a':>8} {'n/a':>8} {'n/a':>6}") continue print(f" {nm:>8.2f} {len(rs):>6} {np.mean([r['pf'] for r in rs]):>6.2f} ${np.mean([r['pnl'] for r in rs]):>+7.2f} {np.mean([r['deals'] for r in rs]):>8.0f} {max(r['pf'] for r in rs):>6.2f}") # NATR MAX analysis print(f"\n NATR MAX impact (averaged across all other params, min 15 deals, PF>1):") print(f" {'NATR_max':>8} {'Combos':>6} {'AvgPF':>6} {'AvgPnL':>8} {'AvgDeals':>8} {'BestPF':>6}") print(f" {'-'*50}") for nx in sorted(GRID["natr_max"]): label = "off" if nx == 0 else f"{nx:.1f}" rs = [r for r in results if r["natr_max"] == nx and r["deals"] >= 15 and r["pf"] > 1] if not rs: print(f" {label:>8} {0:>6} {'n/a':>6} {'n/a':>8} {'n/a':>8} {'n/a':>6}") continue print(f" {label:>8} {len(rs):>6} {np.mean([r['pf'] for r in rs]):>6.2f} ${np.mean([r['pnl'] for r in rs]):>+7.2f} {np.mean([r['deals'] for r in rs]):>8.0f} {max(r['pf'] for r in rs):>6.2f}") # ========== TOP TABLES ========== # TOP 25 by PF (min 15 deals) print(f"\n{'='*110}") print(f" TOP 25 by Profit Factor (min 15 deals)") print(f"{'='*110}") print(hdr) print(sep) shown = 0 for r in results: if r["deals"] < 15: continue shown += 1 if shown > 25: break print(f"{shown:>3} {r['z_entry']:>4.1f} {r['z_max']:>4.1f} {r['tp_pct']:>4.1f} {r['sl_pct']:>4.1f} {r['natr_min']:>5.2f} {r['natr_max']:>4.1f} {r['chop_min']:>3.0f} {r['cooldown']:>3} | {r['deals']:>4} ${r['pnl']:>+7.2f} {r['wr']:>5.1f} {r['pf']:>5.2f} ${r['avg_w']:>5.3f} ${r['avg_l']:>5.3f} {r['avg_natr']:>4.1f} | {r['tp_cnt']:>3} {r['ztp_cnt']:>3} {r['sl_cnt']:>3} {r['time_cnt']:>3}") # TOP 25 by PnL results_pnl = sorted(results, key=lambda x: x["pnl"], reverse=True) print(f"\n{'='*110}") print(f" TOP 25 by PnL (min 15 deals)") print(f"{'='*110}") print(hdr) print(sep) shown = 0 for r in results_pnl: if r["deals"] < 15: continue shown += 1 if shown > 25: break print(f"{shown:>3} {r['z_entry']:>4.1f} {r['z_max']:>4.1f} {r['tp_pct']:>4.1f} {r['sl_pct']:>4.1f} {r['natr_min']:>5.2f} {r['natr_max']:>4.1f} {r['chop_min']:>3.0f} {r['cooldown']:>3} | {r['deals']:>4} ${r['pnl']:>+7.2f} {r['wr']:>5.1f} {r['pf']:>5.2f} ${r['avg_w']:>5.3f} ${r['avg_l']:>5.3f} {r['avg_natr']:>4.1f} | {r['tp_cnt']:>3} {r['ztp_cnt']:>3} {r['sl_cnt']:>3} {r['time_cnt']:>3}") # BALANCED RECOMMENDED (PF>=1.3, deals>=20, best PnL) bal = sorted([r for r in results if r["pf"] >= 1.3 and r["deals"] >= 20], key=lambda x: x["pnl"], reverse=True) if bal: print(f"\n{'='*110}") print(f" RECOMMENDED (PF>=1.3, >=20 deals, by PnL)") print(f"{'='*110}") print(hdr) print(sep) for i, r in enumerate(bal[:15]): print(f"{i+1:>3} {r['z_entry']:>4.1f} {r['z_max']:>4.1f} {r['tp_pct']:>4.1f} {r['sl_pct']:>4.1f} {r['natr_min']:>5.2f} {r['natr_max']:>4.1f} {r['chop_min']:>3.0f} {r['cooldown']:>3} | {r['deals']:>4} ${r['pnl']:>+7.2f} {r['wr']:>5.1f} {r['pf']:>5.2f} ${r['avg_w']:>5.3f} ${r['avg_l']:>5.3f} {r['avg_natr']:>4.1f} | {r['tp_cnt']:>3} {r['ztp_cnt']:>3} {r['sl_cnt']:>3} {r['time_cnt']:>3}") # BEST PER NATR BAND (top 3 per unique natr_min) print(f"\n{'='*110}") print(f" BEST COMBO PER NATR_MIN (top 1 by PF, min 15 deals)") print(f"{'='*110}") print(hdr) print(sep) for nm in sorted(GRID["natr_min"]): rs = sorted([r for r in results if r["natr_min"] == nm and r["deals"] >= 15], key=lambda x: (x["pf"], x["pnl"]), reverse=True) if rs: r = rs[0] print(f" * {r['z_entry']:>4.1f} {r['z_max']:>4.1f} {r['tp_pct']:>4.1f} {r['sl_pct']:>4.1f} {r['natr_min']:>5.2f} {r['natr_max']:>4.1f} {r['chop_min']:>3.0f} {r['cooldown']:>3} | {r['deals']:>4} ${r['pnl']:>+7.2f} {r['wr']:>5.1f} {r['pf']:>5.2f} ${r['avg_w']:>5.3f} ${r['avg_l']:>5.3f} {r['avg_natr']:>4.1f} | {r['tp_cnt']:>3} {r['ztp_cnt']:>3} {r['sl_cnt']:>3} {r['time_cnt']:>3}") # Save out = os.path.join(os.path.dirname(__file__), "results_natr_sweep.json") save_data = { "grid": GRID, "data_days": DAYS, "symbols_count": len(data), "combos_tested": len(combos), "valid_results": len(results), "top100_pf": results[:100], "top100_pnl": sorted(results, key=lambda x: x["pnl"], reverse=True)[:100], "natr_analysis": { "by_natr_min": {}, "by_natr_max": {}, } } # Save NATR analysis for nm in GRID["natr_min"]: rs = [r for r in results if r["natr_min"] == nm and r["deals"] >= 15] if rs: save_data["natr_analysis"]["by_natr_min"][str(nm)] = { "count": len(rs), "profitable": len([r for r in rs if r["pf"] > 1]), "avg_pf": round(np.mean([r["pf"] for r in rs]), 2), "avg_pnl": round(np.mean([r["pnl"] for r in rs]), 2), } for nx in GRID["natr_max"]: rs = [r for r in results if r["natr_max"] == nx and r["deals"] >= 15] if rs: save_data["natr_analysis"]["by_natr_max"][str(nx)] = { "count": len(rs), "profitable": len([r for r in rs if r["pf"] > 1]), "avg_pf": round(np.mean([r["pf"] for r in rs]), 2), "avg_pnl": round(np.mean([r["pnl"] for r in rs]), 2), } with open(out, "w") as f: json.dump(save_data, f, indent=2) print(f"\n Saved to {out}") if __name__ == "__main__": main()