← Back
"""
NATR-Focused Sweep — DCA Z-VWAP (NO Safety Orders)
====================================================
Focus: deep NATR exploration with best params from prior sweeps.

Grid:
  - NATR min: 0.3, 0.5, 0.75, 1.0, 1.25, 1.5, 2.0
  - NATR max: 0(off), 1.5, 2.0, 2.5, 3.0, 3.5, 5.0
  - Z Entry: 1.8, 2.0, 2.5, 3.0
  - Z Max: 0(off), 2.5, 3.5
  - TP%: 1.0, 1.5, 2.0, 3.0
  - SL%: 3.0, 5.0, 8.0
  - CHOP min: 0(off), 45, 50, 55
  - Cooldown: 6, 12

Data: 30d, 5m candles, top50 by vol (>$20M)

Usage: python3 backtests/backtest_natr_sweep.py
"""

import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

import json, time
import numpy as np
from datetime import datetime
from itertools import product
from pybit.unified_trading import HTTP

ORDER_USD = 7.0
LEVERAGE = 3
VWAP_PERIOD = 50
TAKER_FEE = 0.00055
TIMEFRAME = "5"
DAYS = 30
MIN_VOLUME_24H = 20_000_000
BLACKLIST = {"BTCUSDT", "ETHUSDT", "TRXUSDT", "USDCUSDT", "BTCPERP", "ETHPERP"}

GRID = {
    "z_entry":  [1.8, 2.0, 2.5, 3.0],
    "z_max":    [0, 2.5, 3.5],
    "tp_pct":   [1.0, 1.5, 2.0, 3.0],
    "sl_pct":   [3.0, 5.0, 8.0],
    "natr_min": [0.3, 0.5, 0.75, 1.0, 1.25, 1.5, 2.0],
    "natr_max": [0, 1.5, 2.0, 2.5, 3.0, 3.5, 5.0],
    "chop_min": [0, 45, 50, 55],
    "cooldown": [6, 12],
}

session = HTTP(testnet=False)


def get_symbols():
    resp = session.get_tickers(category="linear")
    if resp["retCode"] != 0:
        return []
    out = []
    for t in resp["result"]["list"]:
        s = t["symbol"]
        if not s.endswith("USDT") or s in BLACKLIST:
            continue
        v = float(t.get("turnover24h", 0))
        if v >= MIN_VOLUME_24H:
            out.append({"symbol": s, "volume_24h": v})
    out.sort(key=lambda x: x["volume_24h"], reverse=True)
    return out[:50]


def fetch_klines(symbol, interval, days):
    kls = []
    need = days * 24 * 60 // int(interval)
    end = int(datetime.now().timestamp() * 1000)
    while len(kls) < need:
        try:
            r = session.get_kline(category="linear", symbol=symbol,
                                   interval=interval, limit=1000, end=end)
            if r["retCode"] != 0:
                break
            items = r["result"]["list"]
            if not items:
                break
            for i in items:
                kls.append({"ts": int(i[0]), "h": float(i[2]),
                            "l": float(i[3]), "c": float(i[4]), "v": float(i[5])})
            end = int(items[-1][0]) - 1
            if len(items) < 1000:
                break
        except:
            break
    kls.reverse()
    seen = set()
    u = []
    for k in kls:
        if k["ts"] not in seen:
            seen.add(k["ts"])
            u.append(k)
    return u[-need:] if len(u) > need else u


def calc_indicators(c, h, l, v):
    n = len(c)
    z = np.zeros(n)
    for i in range(VWAP_PERIOD, n):
        hh, ll, cc, vv = h[i-VWAP_PERIOD:i], l[i-VWAP_PERIOD:i], c[i-VWAP_PERIOD:i], v[i-VWAP_PERIOD:i]
        tp = (hh + ll + cc) / 3
        ctv = np.cumsum(tp * vv)
        cv = np.cumsum(vv)
        cv_s = np.where(cv == 0, 1, cv)
        va = ctv / cv_s
        dev = cc - va
        std = np.std(dev)
        if std > 0:
            z[i] = (c[i] - va[-1]) / std

    natr = np.zeros(n)
    for i in range(14, n):
        trs = [max(h[j]-l[j], abs(h[j]-c[j-1]), abs(l[j]-c[j-1])) for j in range(i-13, i+1)]
        natr[i] = (np.mean(trs) / c[i]) * 100 if c[i] > 0 else 0

    chop = np.full(n, 50.0)
    for i in range(14, n):
        atr_s = sum(max(h[j]-l[j], abs(h[j]-c[j-1]), abs(l[j]-c[j-1])) for j in range(i-13, i+1))
        hi, lo = np.max(h[i-13:i+1]), np.min(l[i-13:i+1])
        rng = hi - lo
        if rng > 0:
            chop[i] = 100 * np.log10(atr_s / rng) / np.log10(14)
    return z, natr, chop


def sim(z_arr, natr_arr, chop_arr, c, h, l, p):
    n = len(c)
    deals = []
    in_t = False
    side = None
    ep = 0
    eb = 0
    cu = 0
    ze, zm = p["z_entry"], p["z_max"]
    tp, sl = p["tp_pct"], p["sl_pct"]
    nm, nx, cm, cd = p["natr_min"], p["natr_max"], p["chop_min"], p["cooldown"]

    for i in range(VWAP_PERIOD, n):
        if in_t:
            z = z_arr[i]
            closed = False
            cp = 0
            reason = ""
            if side == "LONG":
                tp_p, sl_p = ep * (1 + tp / 100), ep * (1 - sl / 100)
                if l[i] <= sl_p:
                    cp, reason, closed = sl_p, "SL", True
                elif h[i] >= tp_p:
                    cp, reason, closed = tp_p, "TP", True
                elif z >= -0.3 and c[i] > ep:
                    cp, reason, closed = c[i], "Z-TP", True
            else:
                tp_p, sl_p = ep * (1 - tp / 100), ep * (1 + sl / 100)
                if h[i] >= sl_p:
                    cp, reason, closed = sl_p, "SL", True
                elif l[i] <= tp_p:
                    cp, reason, closed = tp_p, "TP", True
                elif z <= 0.3 and c[i] < ep:
                    cp, reason, closed = c[i], "Z-TP", True
            if not closed and (i - eb) >= 36:
                cp, reason, closed = c[i], "TIME", True
            if closed:
                qty = (ORDER_USD * LEVERAGE) / ep
                pnl = qty * (cp - ep) if side == "LONG" else qty * (ep - cp)
                pnl -= qty * ep * TAKER_FEE + qty * cp * TAKER_FEE
                deals.append({"pnl": pnl, "reason": reason, "natr": natr_arr[eb]})
                in_t = False
                cu = i + cd
            continue
        if i < cu:
            continue
        z = z_arr[i]
        if abs(z) <= ze:
            continue
        if zm > 0 and abs(z) > zm:
            continue
        if natr_arr[i] < nm:
            continue
        if nx > 0 and natr_arr[i] > nx:
            continue
        if cm > 0 and chop_arr[i] < cm:
            continue
        side = "LONG" if z < -ze else "SHORT"
        ep = c[i]
        eb = i
        in_t = True

    if in_t:
        qty = (ORDER_USD * LEVERAGE) / ep
        cp = c[-1]
        pnl = qty * (cp - ep) if side == "LONG" else qty * (ep - cp)
        pnl -= qty * ep * TAKER_FEE + qty * cp * TAKER_FEE
        deals.append({"pnl": pnl, "reason": "END", "natr": natr_arr[eb]})
    return deals


def main():
    print("=" * 80)
    print("  NATR-FOCUSED SWEEP — Z-VWAP, 30 days, NO SO")
    print("=" * 80)

    syms = get_symbols()
    print(f"\n  {len(syms)} symbols, downloading {DAYS}d klines...")

    data = {}
    for idx, sd in enumerate(syms):
        s = sd["symbol"]
        print(f"  [{idx+1}/{len(syms)}] {s}...", end=" ", flush=True)
        time.sleep(0.15)
        kl = fetch_klines(s, TIMEFRAME, days=DAYS)
        if len(kl) < VWAP_PERIOD + 100:
            print("skip")
            continue
        c = np.array([k["c"] for k in kl])
        h = np.array([k["h"] for k in kl])
        l = np.array([k["l"] for k in kl])
        v = np.array([k["v"] for k in kl])
        zz, natr, chop = calc_indicators(c, h, l, v)
        data[s] = {"c": c, "h": h, "l": l, "z": zz, "natr": natr, "chop": chop}
        print("OK")

    print(f"\n  {len(data)} symbols loaded.")

    # Skip invalid combos: natr_max <= natr_min, z_max <= z_entry
    keys = list(GRID.keys())
    vals = list(GRID.values())
    all_combos = list(product(*vals))
    combos = []
    for combo in all_combos:
        p = dict(zip(keys, combo))
        if p["z_max"] > 0 and p["z_max"] <= p["z_entry"]:
            continue
        if p["natr_max"] > 0 and p["natr_max"] <= p["natr_min"]:
            continue
        combos.append(p)

    print(f"  {len(combos)} valid combos (from {len(all_combos)} total)\n")
    print("  Running sweep...")

    results = []
    t0 = time.time()

    for ci, params in enumerate(combos):
        all_deals = []
        for s, d in data.items():
            deals = sim(d["z"], d["natr"], d["chop"], d["c"], d["h"], d["l"], params)
            all_deals.extend(deals)

        if (ci + 1) % 2000 == 0:
            elapsed = time.time() - t0
            eta = elapsed / (ci + 1) * (len(combos) - ci - 1)
            print(f"  ... {ci+1}/{len(combos)} ({elapsed:.0f}s elapsed, ~{eta:.0f}s remaining)")

        if len(all_deals) < 10:
            continue

        pnl = sum(d["pnl"] for d in all_deals)
        wins = [d for d in all_deals if d["pnl"] > 0]
        losses = [d for d in all_deals if d["pnl"] <= 0]
        wr = len(wins) / len(all_deals) * 100
        gp = sum(d["pnl"] for d in wins) if wins else 0
        gl = abs(sum(d["pnl"] for d in losses)) if losses else 0.001
        pf = gp / gl

        reasons = {}
        for d in all_deals:
            reasons[d["reason"]] = reasons.get(d["reason"], 0) + 1

        # NATR distribution of entries
        natrs = [d["natr"] for d in all_deals if d.get("natr", 0) > 0]
        avg_natr = np.mean(natrs) if natrs else 0

        results.append({
            **params,
            "deals": len(all_deals),
            "pnl": round(pnl, 2),
            "wr": round(wr, 1),
            "pf": round(pf, 2),
            "avg_w": round(gp / len(wins), 3) if wins else 0,
            "avg_l": round(gl / len(losses), 3) if losses else 0,
            "avg_natr": round(avg_natr, 2),
            "tp_cnt": reasons.get("TP", 0),
            "ztp_cnt": reasons.get("Z-TP", 0),
            "sl_cnt": reasons.get("SL", 0),
            "time_cnt": reasons.get("TIME", 0),
        })

    elapsed = time.time() - t0
    print(f"\n  Done! {len(results)} valid combos in {elapsed:.0f}s")

    results.sort(key=lambda x: (x["pf"], x["pnl"]), reverse=True)

    hdr = f"{'#':>3} {'Z':>4} {'Zm':>4} {'TP':>4} {'SL':>4} {'NRm':>5} {'NRx':>4} {'CHP':>3} {'CD':>3} | {'Dls':>4} {'PnL':>8} {'WR%':>5} {'PF':>5} {'AvW':>6} {'AvL':>6} {'aNR':>4} | {'TP':>3} {'ZTP':>3} {'SL':>3} {'TIM':>3}"
    sep = "-" * 110

    # ========== NATR ANALYSIS ==========
    # Group results by NATR range to show which bands work best
    print(f"\n{'='*110}")
    print(f"  NATR BAND ANALYSIS (min deals 15, PF > 1)")
    print(f"{'='*110}")

    natr_bands = {}
    for r in results:
        if r["deals"] < 15 or r["pf"] <= 1.0:
            continue
        band = f"{r['natr_min']:.2f}-{r['natr_max'] if r['natr_max'] > 0 else 'inf'}"
        if band not in natr_bands:
            natr_bands[band] = []
        natr_bands[band].append(r)

    print(f"\n  {'NATR Band':<15} {'Combos':>6} {'BestPF':>6} {'BestPnL':>8} {'AvgPF':>6} {'AvgPnL':>8} {'AvgDeals':>8}")
    print(f"  {'-'*65}")
    band_stats = []
    for band, rs in sorted(natr_bands.items()):
        avg_pf = np.mean([r["pf"] for r in rs])
        avg_pnl = np.mean([r["pnl"] for r in rs])
        avg_deals = np.mean([r["deals"] for r in rs])
        best_pf = max(r["pf"] for r in rs)
        best_pnl = max(r["pnl"] for r in rs)
        band_stats.append((band, len(rs), best_pf, best_pnl, avg_pf, avg_pnl, avg_deals))
    band_stats.sort(key=lambda x: x[4], reverse=True)
    for bs in band_stats:
        print(f"  {bs[0]:<15} {bs[1]:>6} {bs[2]:>6.2f} ${bs[3]:>+7.2f} {bs[4]:>6.2f} ${bs[5]:>+7.2f} {bs[6]:>8.0f}")

    # NATR MIN analysis
    print(f"\n  NATR MIN impact (averaged across all other params, min 15 deals, PF>1):")
    print(f"  {'NATR_min':>8} {'Combos':>6} {'AvgPF':>6} {'AvgPnL':>8} {'AvgDeals':>8} {'BestPF':>6}")
    print(f"  {'-'*50}")
    for nm in sorted(GRID["natr_min"]):
        rs = [r for r in results if r["natr_min"] == nm and r["deals"] >= 15 and r["pf"] > 1]
        if not rs:
            print(f"  {nm:>8.2f} {0:>6} {'n/a':>6} {'n/a':>8} {'n/a':>8} {'n/a':>6}")
            continue
        print(f"  {nm:>8.2f} {len(rs):>6} {np.mean([r['pf'] for r in rs]):>6.2f} ${np.mean([r['pnl'] for r in rs]):>+7.2f} {np.mean([r['deals'] for r in rs]):>8.0f} {max(r['pf'] for r in rs):>6.2f}")

    # NATR MAX analysis
    print(f"\n  NATR MAX impact (averaged across all other params, min 15 deals, PF>1):")
    print(f"  {'NATR_max':>8} {'Combos':>6} {'AvgPF':>6} {'AvgPnL':>8} {'AvgDeals':>8} {'BestPF':>6}")
    print(f"  {'-'*50}")
    for nx in sorted(GRID["natr_max"]):
        label = "off" if nx == 0 else f"{nx:.1f}"
        rs = [r for r in results if r["natr_max"] == nx and r["deals"] >= 15 and r["pf"] > 1]
        if not rs:
            print(f"  {label:>8} {0:>6} {'n/a':>6} {'n/a':>8} {'n/a':>8} {'n/a':>6}")
            continue
        print(f"  {label:>8} {len(rs):>6} {np.mean([r['pf'] for r in rs]):>6.2f} ${np.mean([r['pnl'] for r in rs]):>+7.2f} {np.mean([r['deals'] for r in rs]):>8.0f} {max(r['pf'] for r in rs):>6.2f}")

    # ========== TOP TABLES ==========
    # TOP 25 by PF (min 15 deals)
    print(f"\n{'='*110}")
    print(f"  TOP 25 by Profit Factor (min 15 deals)")
    print(f"{'='*110}")
    print(hdr)
    print(sep)
    shown = 0
    for r in results:
        if r["deals"] < 15:
            continue
        shown += 1
        if shown > 25:
            break
        print(f"{shown:>3} {r['z_entry']:>4.1f} {r['z_max']:>4.1f} {r['tp_pct']:>4.1f} {r['sl_pct']:>4.1f} {r['natr_min']:>5.2f} {r['natr_max']:>4.1f} {r['chop_min']:>3.0f} {r['cooldown']:>3} | {r['deals']:>4} ${r['pnl']:>+7.2f} {r['wr']:>5.1f} {r['pf']:>5.2f} ${r['avg_w']:>5.3f} ${r['avg_l']:>5.3f} {r['avg_natr']:>4.1f} | {r['tp_cnt']:>3} {r['ztp_cnt']:>3} {r['sl_cnt']:>3} {r['time_cnt']:>3}")

    # TOP 25 by PnL
    results_pnl = sorted(results, key=lambda x: x["pnl"], reverse=True)
    print(f"\n{'='*110}")
    print(f"  TOP 25 by PnL (min 15 deals)")
    print(f"{'='*110}")
    print(hdr)
    print(sep)
    shown = 0
    for r in results_pnl:
        if r["deals"] < 15:
            continue
        shown += 1
        if shown > 25:
            break
        print(f"{shown:>3} {r['z_entry']:>4.1f} {r['z_max']:>4.1f} {r['tp_pct']:>4.1f} {r['sl_pct']:>4.1f} {r['natr_min']:>5.2f} {r['natr_max']:>4.1f} {r['chop_min']:>3.0f} {r['cooldown']:>3} | {r['deals']:>4} ${r['pnl']:>+7.2f} {r['wr']:>5.1f} {r['pf']:>5.2f} ${r['avg_w']:>5.3f} ${r['avg_l']:>5.3f} {r['avg_natr']:>4.1f} | {r['tp_cnt']:>3} {r['ztp_cnt']:>3} {r['sl_cnt']:>3} {r['time_cnt']:>3}")

    # BALANCED RECOMMENDED (PF>=1.3, deals>=20, best PnL)
    bal = sorted([r for r in results if r["pf"] >= 1.3 and r["deals"] >= 20],
                 key=lambda x: x["pnl"], reverse=True)
    if bal:
        print(f"\n{'='*110}")
        print(f"  RECOMMENDED (PF>=1.3, >=20 deals, by PnL)")
        print(f"{'='*110}")
        print(hdr)
        print(sep)
        for i, r in enumerate(bal[:15]):
            print(f"{i+1:>3} {r['z_entry']:>4.1f} {r['z_max']:>4.1f} {r['tp_pct']:>4.1f} {r['sl_pct']:>4.1f} {r['natr_min']:>5.2f} {r['natr_max']:>4.1f} {r['chop_min']:>3.0f} {r['cooldown']:>3} | {r['deals']:>4} ${r['pnl']:>+7.2f} {r['wr']:>5.1f} {r['pf']:>5.2f} ${r['avg_w']:>5.3f} ${r['avg_l']:>5.3f} {r['avg_natr']:>4.1f} | {r['tp_cnt']:>3} {r['ztp_cnt']:>3} {r['sl_cnt']:>3} {r['time_cnt']:>3}")

    # BEST PER NATR BAND (top 3 per unique natr_min)
    print(f"\n{'='*110}")
    print(f"  BEST COMBO PER NATR_MIN (top 1 by PF, min 15 deals)")
    print(f"{'='*110}")
    print(hdr)
    print(sep)
    for nm in sorted(GRID["natr_min"]):
        rs = sorted([r for r in results if r["natr_min"] == nm and r["deals"] >= 15],
                     key=lambda x: (x["pf"], x["pnl"]), reverse=True)
        if rs:
            r = rs[0]
            print(f"  * {r['z_entry']:>4.1f} {r['z_max']:>4.1f} {r['tp_pct']:>4.1f} {r['sl_pct']:>4.1f} {r['natr_min']:>5.2f} {r['natr_max']:>4.1f} {r['chop_min']:>3.0f} {r['cooldown']:>3} | {r['deals']:>4} ${r['pnl']:>+7.2f} {r['wr']:>5.1f} {r['pf']:>5.2f} ${r['avg_w']:>5.3f} ${r['avg_l']:>5.3f} {r['avg_natr']:>4.1f} | {r['tp_cnt']:>3} {r['ztp_cnt']:>3} {r['sl_cnt']:>3} {r['time_cnt']:>3}")

    # Save
    out = os.path.join(os.path.dirname(__file__), "results_natr_sweep.json")
    save_data = {
        "grid": GRID,
        "data_days": DAYS,
        "symbols_count": len(data),
        "combos_tested": len(combos),
        "valid_results": len(results),
        "top100_pf": results[:100],
        "top100_pnl": sorted(results, key=lambda x: x["pnl"], reverse=True)[:100],
        "natr_analysis": {
            "by_natr_min": {},
            "by_natr_max": {},
        }
    }
    # Save NATR analysis
    for nm in GRID["natr_min"]:
        rs = [r for r in results if r["natr_min"] == nm and r["deals"] >= 15]
        if rs:
            save_data["natr_analysis"]["by_natr_min"][str(nm)] = {
                "count": len(rs),
                "profitable": len([r for r in rs if r["pf"] > 1]),
                "avg_pf": round(np.mean([r["pf"] for r in rs]), 2),
                "avg_pnl": round(np.mean([r["pnl"] for r in rs]), 2),
            }
    for nx in GRID["natr_max"]:
        rs = [r for r in results if r["natr_max"] == nx and r["deals"] >= 15]
        if rs:
            save_data["natr_analysis"]["by_natr_max"][str(nx)] = {
                "count": len(rs),
                "profitable": len([r for r in rs if r["pf"] > 1]),
                "avg_pf": round(np.mean([r["pf"] for r in rs]), 2),
                "avg_pnl": round(np.mean([r["pnl"] for r in rs]), 2),
            }

    with open(out, "w") as f:
        json.dump(save_data, f, indent=2)
    print(f"\n  Saved to {out}")


if __name__ == "__main__":
    main()

📜 Git History

dd32dfdchore: initial commit — version control setup5 weeks ago
Show last diff
Loading...