← Back
"""
Parameter Sweep: Z-VWAP Clean (no DCA)
=======================================
Downloads data ONCE, then sweeps parameter combos.
Saves results to sweep_results.json.

Usage:
  python backtests/sweep_zvwap.py --days 7 --top 35
"""

import sys, os, json, time, argparse
import numpy as np
from datetime import datetime
from itertools import product

sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from pybit.unified_trading import HTTP

# ============================================================
# CONSTANTS
# ============================================================
VWAP_PERIOD = 50
NATR_PERIOD = 14
CHOP_PERIOD = 14
MAKER_FEE = 0.0002
TAKER_FEE = 0.00055
BLACKLIST = {"BTCUSDT", "ETHUSDT", "USDCUSDT", "BTCPERP", "ETHPERP"}

# ============================================================
# PARAMETER GRID
# ============================================================
GRID = {
    "z_entry":   [1.8, 2.0, 2.5, 3.0],
    "z_max":     [2.5, 3.5, 5.0],
    "tp_pct":    [1.0, 1.5, 2.0, 3.0],
    "sl_pct":    [1.0, 2.0, 4.0, 8.0],
    "chop_min":  [0, 45, 50, 55],     # 0 = disabled
    "natr_min":  [0.5, 0.75],
    "natr_max":  [2.5, 3.5],
}

# ============================================================
# DATA FETCHING (once)
# ============================================================
def fetch_klines(session, symbol, interval="5", days=7):
    all_klines = []
    bars_needed = days * 24 * 60 // int(interval)
    end_time = int(datetime.now().timestamp() * 1000)
    while len(all_klines) < bars_needed:
        try:
            resp = session.get_kline(
                category="linear", symbol=symbol,
                interval=interval, limit=1000, end=end_time,
            )
            if resp["retCode"] != 0:
                break
            items = resp["result"]["list"]
            if not items:
                break
            for item in items:
                all_klines.append({
                    "ts": int(item[0]),
                    "o": float(item[1]), "h": float(item[2]),
                    "l": float(item[3]), "c": float(item[4]),
                    "v": float(item[5]),
                })
            end_time = int(items[-1][0]) - 1
            if len(items) < 1000:
                break
        except Exception as e:
            print(f"  ERR {symbol}: {e}")
            break
        time.sleep(0.15)
    all_klines.reverse()
    seen = set()
    unique = []
    for k in all_klines:
        if k["ts"] not in seen:
            seen.add(k["ts"])
            unique.append(k)
    return unique[-bars_needed:] if len(unique) > bars_needed else unique


def get_top_symbols(session, top_n=35, min_vol=20_000_000):
    tickers = session.get_tickers(category="linear")
    cands = []
    for t in tickers["result"]["list"]:
        sym = t["symbol"]
        if not sym.endswith("USDT") or sym in BLACKLIST:
            continue
        vol = float(t.get("turnover24h", 0))
        if vol >= min_vol:
            cands.append((sym, vol))
    cands.sort(key=lambda x: x[1], reverse=True)
    return cands[:top_n]


# ============================================================
# INDICATORS (vectorized)
# ============================================================
def calc_zvwap(h, l, c, v, period=VWAP_PERIOD):
    n = len(c)
    z = np.full(n, 0.0)
    for i in range(period, n):
        hi = h[i-period:i]; lo = l[i-period:i]
        ci = c[i-period:i]; vi = v[i-period:i]
        tp = (hi + lo + ci) / 3
        cum_tv = np.cumsum(tp * vi)
        cum_v = np.cumsum(vi)
        cum_v_s = np.where(cum_v == 0, 1, cum_v)
        vwap_arr = cum_tv / cum_v_s
        vwap = vwap_arr[-1]
        std = np.std(ci - vwap_arr)
        if std > 0:
            z[i] = (c[i] - vwap) / std
    return z


def calc_natr(h, l, c, period=NATR_PERIOD):
    n = len(c)
    natr = np.full(n, 0.0)
    tr = np.zeros(n)
    for i in range(1, n):
        tr[i] = max(h[i]-l[i], abs(h[i]-c[i-1]), abs(l[i]-c[i-1]))
    for i in range(period, n):
        atr = np.mean(tr[i-period+1:i+1])
        natr[i] = (atr / c[i]) * 100 if c[i] > 0 else 0
    return natr


def calc_chop(h, l, c, period=CHOP_PERIOD):
    """Choppiness Index: 100×log10(sum(ATR)/range) / log10(period)"""
    n = len(c)
    chop = np.full(n, 50.0)
    tr = np.zeros(n)
    for i in range(1, n):
        tr[i] = max(h[i]-l[i], abs(h[i]-c[i-1]), abs(l[i]-c[i-1]))
    for i in range(period, n):
        atr_sum = np.sum(tr[i-period+1:i+1])
        highest = np.max(h[i-period+1:i+1])
        lowest = np.min(l[i-period+1:i+1])
        rng = highest - lowest
        if rng > 0:
            chop[i] = 100 * np.log10(atr_sum / rng) / np.log10(period)
    return chop


# ============================================================
# BACKTEST (single config, all symbols)
# ============================================================
def run_one(data_cache, cfg):
    """Run backtest with given config across all symbols. Returns summary dict."""
    all_trades = []

    for sym, (h, l, c, v, z_arr, natr_arr, chop_arr) in data_cache.items():
        n = len(c)
        active = None
        cooldown = 0

        for i in range(VWAP_PERIOD, n):
            z = z_arr[i]
            natr = natr_arr[i]
            chop = chop_arr[i]

            # Process active trade
            if active:
                # SL check
                sl_hit = False
                if active["side"] == "LONG" and l[i] <= active["sl"]:
                    sl_hit = True; cp = active["sl"]
                elif active["side"] == "SHORT" and h[i] >= active["sl"]:
                    sl_hit = True; cp = active["sl"]

                # TP check
                tp_hit = False
                if active["side"] == "LONG" and h[i] >= active["tp"]:
                    tp_hit = True; cp2 = active["tp"]
                elif active["side"] == "SHORT" and l[i] <= active["tp"]:
                    tp_hit = True; cp2 = active["tp"]

                # Z-TP
                z_tp = False
                if active["side"] == "LONG" and z >= -0.3 and c[i] > active["ep"]:
                    z_tp = True; cp3 = c[i]
                elif active["side"] == "SHORT" and z <= 0.3 and c[i] < active["ep"]:
                    z_tp = True; cp3 = c[i]

                if sl_hit:
                    pnl = _calc_pnl(active, cp, "SL")
                    all_trades.append({"sym": sym, "pnl": pnl, "reason": "SL"})
                    active = None; cooldown = i + 12
                elif tp_hit:
                    pnl = _calc_pnl(active, cp2, "TP")
                    all_trades.append({"sym": sym, "pnl": pnl, "reason": "TP"})
                    active = None; cooldown = i + 12
                elif z_tp:
                    pnl = _calc_pnl(active, cp3, "Z-TP")
                    all_trades.append({"sym": sym, "pnl": pnl, "reason": "Z-TP"})
                    active = None; cooldown = i + 12
                continue

            if i < cooldown:
                continue

            # Filters
            if natr < cfg["natr_min"] or natr > cfg["natr_max"]:
                continue
            if cfg["chop_min"] > 0 and chop < cfg["chop_min"]:
                continue
            if abs(z) > cfg["z_max"]:
                continue

            # Entry
            if z < -cfg["z_entry"]:
                ep = c[i]
                active = {
                    "side": "LONG", "ep": ep, "qty": (5.0 * 3) / ep,
                    "tp": ep * (1 + cfg["tp_pct"]/100),
                    "sl": ep * (1 - cfg["sl_pct"]/100),
                }
            elif z > cfg["z_entry"]:
                ep = c[i]
                active = {
                    "side": "SHORT", "ep": ep, "qty": (5.0 * 3) / ep,
                    "tp": ep * (1 - cfg["tp_pct"]/100),
                    "sl": ep * (1 + cfg["sl_pct"]/100),
                }

        # Force close open
        if active:
            pnl = _calc_pnl(active, c[-1], "END")
            all_trades.append({"sym": sym, "pnl": pnl, "reason": "END"})

    if not all_trades:
        return None

    wins = [t for t in all_trades if t["pnl"] > 0]
    losses = [t for t in all_trades if t["pnl"] <= 0]
    total_pnl = sum(t["pnl"] for t in all_trades)
    gp = sum(t["pnl"] for t in wins)
    gl = abs(sum(t["pnl"] for t in losses))
    pf = gp / gl if gl > 0 else 999

    return {
        "trades": len(all_trades),
        "wr": round(len(wins) / len(all_trades) * 100, 1) if all_trades else 0,
        "pnl": round(total_pnl, 2),
        "pf": round(pf, 2),
        "tp_count": sum(1 for t in all_trades if t["reason"] == "TP"),
        "sl_count": sum(1 for t in all_trades if t["reason"] == "SL"),
        "ztp_count": sum(1 for t in all_trades if t["reason"] == "Z-TP"),
    }


def _calc_pnl(trade, close_price, reason):
    if trade["side"] == "LONG":
        pnl = trade["qty"] * (close_price - trade["ep"])
    else:
        pnl = trade["qty"] * (trade["ep"] - close_price)
    entry_fee = trade["qty"] * trade["ep"] * TAKER_FEE
    exit_fee = trade["qty"] * close_price * (MAKER_FEE if reason == "TP" else TAKER_FEE)
    return pnl - entry_fee - exit_fee


# ============================================================
# MAIN
# ============================================================
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--days", type=int, default=7)
    parser.add_argument("--top", type=int, default=35)
    args = parser.parse_args()

    session = HTTP(testnet=False)

    # Step 1: get symbols
    print(f"[1/3] Fetching top {args.top} symbols...")
    symbols = get_top_symbols(session, args.top)
    print(f"  Found {len(symbols)} symbols")

    # Step 2: download data ONCE
    print(f"[2/3] Downloading {args.days}d klines...")
    data_cache = {}
    for idx, (sym, vol) in enumerate(symbols):
        klines = fetch_klines(session, sym, days=args.days)
        if len(klines) < VWAP_PERIOD + 50:
            continue
        h = np.array([k["h"] for k in klines])
        l = np.array([k["l"] for k in klines])
        c = np.array([k["c"] for k in klines])
        v = np.array([k["v"] for k in klines])
        z = calc_zvwap(h, l, c, v)
        natr = calc_natr(h, l, c)
        chop = calc_chop(h, l, c)
        data_cache[sym] = (h, l, c, v, z, natr, chop)
        if (idx + 1) % 10 == 0:
            print(f"  {idx+1}/{len(symbols)} downloaded")
        time.sleep(0.2)

    print(f"  Cached {len(data_cache)} symbols")

    # Step 3: sweep
    keys = list(GRID.keys())
    combos = list(product(*[GRID[k] for k in keys]))
    print(f"[3/3] Sweeping {len(combos)} parameter combos...")

    results = []
    for idx, vals in enumerate(combos):
        cfg = dict(zip(keys, vals))
        # Skip invalid: z_entry must be < z_max
        if cfg["z_entry"] >= cfg["z_max"]:
            continue
        res = run_one(data_cache, cfg)
        if res and res["trades"] >= 5:  # min 5 trades for significance
            res["config"] = cfg
            results.append(res)
        if (idx + 1) % 200 == 0:
            print(f"  {idx+1}/{len(combos)} tested...")

    # Sort by PnL
    results.sort(key=lambda x: x["pnl"], reverse=True)

    # Save
    out_path = os.path.join(os.path.dirname(__file__), "sweep_results.json")
    with open(out_path, "w") as f:
        json.dump({
            "date": datetime.now().isoformat(),
            "days": args.days,
            "symbols": len(data_cache),
            "combos_tested": len(results),
            "top_20": results[:20],
            "bottom_5": results[-5:] if len(results) >= 5 else results,
            "current_prod": {
                "z_entry": 1.8, "z_max": 2.5, "tp_pct": 1.5,
                "sl_pct": 8.0, "chop_min": 45, "natr_min": 0.75, "natr_max": 2.5
            }
        }, f, indent=2)

    # Print top 10
    print(f"\n{'='*80}")
    print(f"SWEEP DONE: {len(results)} valid combos ({args.days}d, {len(data_cache)} coins)")
    print(f"{'='*80}")
    print(f"{'#':>3} {'PnL':>8} {'WR%':>6} {'PF':>6} {'Trd':>5} | Z_ent Z_max TP%  SL%  CHOP NATR")
    print("-" * 80)
    for i, r in enumerate(results[:15]):
        c = r["config"]
        print(f"{i+1:>3} ${r['pnl']:>7.2f} {r['wr']:>5.1f}% {r['pf']:>5.2f} {r['trades']:>5} | "
              f"{c['z_entry']:>4.1f}  {c['z_max']:>4.1f} {c['tp_pct']:>4.1f} {c['sl_pct']:>4.1f} "
              f"{c['chop_min']:>4}  {c['natr_min']:.2f}-{c['natr_max']:.1f}")

    # Find current prod config result
    prod = {"z_entry": 1.8, "z_max": 2.5, "tp_pct": 1.5, "sl_pct": 8.0,
            "chop_min": 45, "natr_min": 0.75, "natr_max": 2.5}
    prod_res = run_one(data_cache, prod)
    if prod_res:
        print(f"\nCURRENT PROD: PnL ${prod_res['pnl']:.2f}, WR {prod_res['wr']}%, "
              f"PF {prod_res['pf']}, {prod_res['trades']} trades")

    print(f"\nResults saved to {out_path}")


if __name__ == "__main__":
    main()

📜 Git History

dd32dfdchore: initial commit — version control setup5 weeks ago
Show last diff
Loading...