← Back
"""
Parameter Sweep: DCA Z-VWAP (NO Safety Orders)
================================================
Tests combinations of:
  - Z Entry: 1.5, 1.8, 2.0, 2.5, 3.0
  - TP%: 0.5, 1.0, 1.5, 2.0, 3.0
  - SL%: 2, 3, 5, 8
  - NATR min: 0.5, 0.75, 1.0
  - CHOP min: 0 (off), 40, 45, 50, 55
  - Cooldown: 6, 12, 24 bars

Finds top combos by Profit Factor and PnL.

Usage:
  python3 backtests/backtest_param_sweep.py
"""

import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

import json
import time
import numpy as np
from datetime import datetime
from itertools import product
from pybit.unified_trading import HTTP

# ============================================================
# FIXED CONFIG
# ============================================================
ORDER_USD = 7.0
LEVERAGE = 3
VWAP_PERIOD = 50
MAKER_FEE = 0.0002
TAKER_FEE = 0.00055
TIMEFRAME = "5"
DAYS = 7
MIN_VOLUME_24H = 20_000_000
MAX_DEALS_CONCURRENT = 6

BLACKLIST = {"BTCUSDT", "ETHUSDT", "TRXUSDT", "USDCUSDT", "BTCPERP", "ETHPERP"}

# ============================================================
# PARAMETER GRID
# ============================================================
PARAM_GRID = {
    "z_entry":    [1.5, 1.8, 2.0, 2.5, 3.0],
    "tp_pct":     [0.5, 1.0, 1.5, 2.0, 3.0],
    "sl_pct":     [2.0, 3.0, 5.0, 8.0],
    "natr_min":   [0.5, 0.75, 1.0],
    "chop_min":   [0, 40, 50, 55],
    "cooldown":   [6, 12],
}

session = HTTP(testnet=False)


# ============================================================
# DATA
# ============================================================
def get_symbols():
    resp = session.get_tickers(category="linear")
    if resp["retCode"] != 0:
        return []
    symbols = []
    for t in resp["result"]["list"]:
        sym = t["symbol"]
        if not sym.endswith("USDT") or sym in BLACKLIST:
            continue
        vol = float(t.get("turnover24h", 0))
        if vol >= MIN_VOLUME_24H:
            symbols.append({"symbol": sym, "volume_24h": vol})
    symbols.sort(key=lambda x: x["volume_24h"], reverse=True)
    return symbols[:50]


def fetch_klines(symbol, interval, days=7):
    all_klines = []
    bars_needed = days * 24 * 60 // int(interval)
    end_time = int(datetime.now().timestamp() * 1000)
    while len(all_klines) < bars_needed:
        try:
            resp = session.get_kline(category="linear", symbol=symbol,
                                     interval=interval, limit=1000, end=end_time)
            if resp["retCode"] != 0:
                break
            items = resp["result"]["list"]
            if not items:
                break
            for item in items:
                all_klines.append({
                    "ts": int(item[0]),
                    "o": float(item[1]), "h": float(item[2]),
                    "l": float(item[3]), "c": float(item[4]),
                    "v": float(item[5]),
                })
            end_time = int(items[-1][0]) - 1
            if len(items) < 1000:
                break
        except:
            break
    all_klines.reverse()
    seen = set()
    unique = []
    for k in all_klines:
        if k["ts"] not in seen:
            seen.add(k["ts"])
            unique.append(k)
    return unique[-bars_needed:] if len(unique) > bars_needed else unique


# ============================================================
# INDICATORS (vectorized)
# ============================================================
def calc_all_indicators(closes, highs, lows, volumes):
    n = len(closes)

    # Z-VWAP
    z_scores = np.zeros(n)
    for i in range(VWAP_PERIOD, n):
        h = highs[i-VWAP_PERIOD:i]
        l = lows[i-VWAP_PERIOD:i]
        c = closes[i-VWAP_PERIOD:i]
        v = volumes[i-VWAP_PERIOD:i]
        tp = (h + l + c) / 3
        ctv = np.cumsum(tp * v)
        cv = np.cumsum(v)
        cv_safe = np.where(cv == 0, 1, cv)
        vwap_arr = ctv / cv_safe
        vwap = vwap_arr[-1]
        dev = c - vwap_arr
        std = np.std(dev)
        if std > 0:
            z_scores[i] = (closes[i] - vwap) / std

    # NATR (rolling 14)
    natr = np.zeros(n)
    for i in range(14, n):
        trs = []
        for j in range(i-13, i+1):
            tr = max(highs[j] - lows[j],
                     abs(highs[j] - closes[j-1]),
                     abs(lows[j] - closes[j-1]))
            trs.append(tr)
        atr = np.mean(trs)
        natr[i] = (atr / closes[i]) * 100 if closes[i] > 0 else 0

    # CHOP Index (14)
    chop = np.full(n, 50.0)
    for i in range(14, n):
        atr_sum = 0
        for j in range(i-13, i+1):
            tr = max(highs[j] - lows[j],
                     abs(highs[j] - closes[j-1]),
                     abs(lows[j] - closes[j-1]))
            atr_sum += tr
        hi = np.max(highs[i-13:i+1])
        lo = np.min(lows[i-13:i+1])
        rng = hi - lo
        if rng > 0:
            chop[i] = 100 * np.log10(atr_sum / rng) / np.log10(14)

    return z_scores, natr, chop


# ============================================================
# SIMPLE DEAL (no SO)
# ============================================================
def simulate_deals(z_scores, natr, chop, closes, highs, lows, timestamps, params):
    n = len(closes)
    z_entry = params["z_entry"]
    tp_pct = params["tp_pct"]
    sl_pct = params["sl_pct"]
    natr_min = params["natr_min"]
    chop_min = params["chop_min"]
    cooldown = params["cooldown"]

    deals = []
    in_trade = False
    side = None
    entry_price = 0
    entry_bar = 0
    cooldown_until = 0

    for i in range(VWAP_PERIOD, n):
        if in_trade:
            # Check exits
            z = z_scores[i]
            closed = False
            close_price = 0
            reason = ""

            if side == "LONG":
                tp_price = entry_price * (1 + tp_pct / 100)
                sl_price = entry_price * (1 - sl_pct / 100)
                # SL first
                if lows[i] <= sl_price:
                    close_price = sl_price
                    reason = "SL"
                    closed = True
                elif highs[i] >= tp_price:
                    close_price = tp_price
                    reason = "TP"
                    closed = True
                elif z >= -0.3 and closes[i] > entry_price:
                    close_price = closes[i]
                    reason = "Z-TP"
                    closed = True
            else:  # SHORT
                tp_price = entry_price * (1 - tp_pct / 100)
                sl_price = entry_price * (1 + sl_pct / 100)
                if highs[i] >= sl_price:
                    close_price = sl_price
                    reason = "SL"
                    closed = True
                elif lows[i] <= tp_price:
                    close_price = tp_price
                    reason = "TP"
                    closed = True
                elif z <= 0.3 and closes[i] < entry_price:
                    close_price = closes[i]
                    reason = "Z-TP"
                    closed = True

            # Time stop: 36 bars (3h)
            if not closed and (i - entry_bar) >= 36:
                close_price = closes[i]
                reason = "TIME"
                closed = True

            if closed:
                qty = (ORDER_USD * LEVERAGE) / entry_price
                if side == "LONG":
                    pnl = qty * (close_price - entry_price)
                else:
                    pnl = qty * (entry_price - close_price)
                fees = qty * entry_price * TAKER_FEE + qty * close_price * TAKER_FEE
                pnl -= fees

                deals.append({
                    "side": side,
                    "pnl": pnl,
                    "reason": reason,
                    "bars": i - entry_bar,
                })
                in_trade = False
                cooldown_until = i + cooldown
            continue

        # Check entry
        if i < cooldown_until:
            continue

        z = z_scores[i]
        if abs(z) <= z_entry:
            continue

        # NATR filter
        if natr[i] < natr_min:
            continue

        # CHOP filter
        if chop_min > 0 and chop[i] < chop_min:
            continue

        side = "LONG" if z < -z_entry else "SHORT"
        entry_price = closes[i]
        entry_bar = i
        in_trade = True

    # Force close open
    if in_trade:
        qty = (ORDER_USD * LEVERAGE) / entry_price
        cp = closes[-1]
        if side == "LONG":
            pnl = qty * (cp - entry_price)
        else:
            pnl = qty * (entry_price - cp)
        fees = qty * entry_price * TAKER_FEE + qty * cp * TAKER_FEE
        pnl -= fees
        deals.append({"side": side, "pnl": pnl, "reason": "END", "bars": n - 1 - entry_bar})

    return deals


# ============================================================
# MAIN
# ============================================================
def main():
    print("=" * 70)
    print("  PARAMETER SWEEP — Z-VWAP NO SO")
    print("  Finding optimal params for live")
    print("=" * 70)

    # Fetch symbols
    print("\n  Fetching symbols...")
    symbols_data = get_symbols()
    print(f"  {len(symbols_data)} symbols with vol >= ${MIN_VOLUME_24H/1e6:.0f}M")

    # Fetch all kline data
    print("\n  Downloading klines...")
    all_data = {}
    for idx, sd in enumerate(symbols_data):
        sym = sd["symbol"]
        print(f"  [{idx+1}/{len(symbols_data)}] {sym}...", end=" ", flush=True)
        time.sleep(0.12)
        klines = fetch_klines(sym, TIMEFRAME, days=DAYS)
        if len(klines) < VWAP_PERIOD + 50:
            print("skip (few bars)")
            continue
        closes = np.array([k["c"] for k in klines])
        highs = np.array([k["h"] for k in klines])
        lows = np.array([k["l"] for k in klines])
        volumes = np.array([k["v"] for k in klines])
        timestamps = [k["ts"] for k in klines]

        z_scores, natr, chop = calc_all_indicators(closes, highs, lows, volumes)
        all_data[sym] = {
            "closes": closes, "highs": highs, "lows": lows,
            "z": z_scores, "natr": natr, "chop": chop, "ts": timestamps,
            "vol": sd["volume_24h"],
        }
        print(f"OK ({len(klines)} bars)")

    print(f"\n  {len(all_data)} symbols loaded")

    # Generate param combos
    keys = list(PARAM_GRID.keys())
    values = list(PARAM_GRID.values())
    combos = list(product(*values))
    print(f"  {len(combos)} parameter combinations to test\n")

    results = []

    for ci, combo in enumerate(combos):
        params = dict(zip(keys, combo))

        all_deals = []
        for sym, data in all_data.items():
            deals = simulate_deals(
                data["z"], data["natr"], data["chop"],
                data["closes"], data["highs"], data["lows"],
                data["ts"], params
            )
            for d in deals:
                d["symbol"] = sym
            all_deals.extend(deals)

        if len(all_deals) < 5:
            continue

        total_pnl = sum(d["pnl"] for d in all_deals)
        wins = [d for d in all_deals if d["pnl"] > 0]
        losses = [d for d in all_deals if d["pnl"] <= 0]
        wr = len(wins) / len(all_deals) * 100
        gp = sum(d["pnl"] for d in wins) if wins else 0
        gl = abs(sum(d["pnl"] for d in losses)) if losses else 0.001
        pf = gp / gl if gl > 0 else 999

        avg_bars = sum(d["bars"] for d in all_deals) / len(all_deals)

        # Close reason breakdown
        tp_cnt = sum(1 for d in all_deals if d["reason"] == "TP")
        ztp_cnt = sum(1 for d in all_deals if d["reason"] == "Z-TP")
        sl_cnt = sum(1 for d in all_deals if d["reason"] == "SL")
        time_cnt = sum(1 for d in all_deals if d["reason"] == "TIME")

        results.append({
            **params,
            "deals": len(all_deals),
            "pnl": round(total_pnl, 2),
            "wr": round(wr, 1),
            "pf": round(pf, 2),
            "avg_win": round(gp / len(wins), 3) if wins else 0,
            "avg_loss": round(gl / len(losses), 3) if losses else 0,
            "avg_bars": round(avg_bars, 1),
            "tp": tp_cnt,
            "ztp": ztp_cnt,
            "sl": sl_cnt,
            "time": time_cnt,
        })

        if (ci + 1) % 100 == 0:
            print(f"  ... {ci+1}/{len(combos)} combos tested")

    print(f"\n  Done! {len(results)} valid combos (>= 5 deals)")

    # Sort by PF then PnL
    results.sort(key=lambda x: (x["pf"], x["pnl"]), reverse=True)

    # TOP 20 by PF (min 10 deals)
    print(f"\n{'='*100}")
    print(f"  TOP 20 by Profit Factor (min 10 deals)")
    print(f"{'='*100}")
    header = f"{'#':>3} {'Z':>4} {'TP%':>5} {'SL%':>5} {'NATR':>5} {'CHOP':>4} {'CD':>3} | {'Deals':>5} {'PnL':>8} {'WR%':>5} {'PF':>6} {'AvgW':>7} {'AvgL':>7} {'Bars':>5} | {'TP':>4} {'ZTP':>4} {'SL':>4} {'TIM':>4}"
    print(header)
    print("-" * 100)

    shown = 0
    for r in results:
        if r["deals"] < 10:
            continue
        shown += 1
        if shown > 20:
            break
        emoji = "+" if r["pnl"] > 0 else "-"
        print(f"{shown:>3} {r['z_entry']:>4.1f} {r['tp_pct']:>5.1f} {r['sl_pct']:>5.1f} {r['natr_min']:>5.2f} {r['chop_min']:>4.0f} {r['cooldown']:>3} | {r['deals']:>5} ${r['pnl']:>+7.2f} {r['wr']:>5.1f} {r['pf']:>6.2f} ${r['avg_win']:>6.3f} ${r['avg_loss']:>6.3f} {r['avg_bars']:>5.1f} | {r['tp']:>4} {r['ztp']:>4} {r['sl']:>4} {r['time']:>4}")

    # TOP 20 by PnL (min 10 deals)
    results_pnl = sorted(results, key=lambda x: x["pnl"], reverse=True)
    print(f"\n{'='*100}")
    print(f"  TOP 20 by Total PnL (min 10 deals)")
    print(f"{'='*100}")
    print(header)
    print("-" * 100)

    shown = 0
    for r in results_pnl:
        if r["deals"] < 10:
            continue
        shown += 1
        if shown > 20:
            break
        print(f"{shown:>3} {r['z_entry']:>4.1f} {r['tp_pct']:>5.1f} {r['sl_pct']:>5.1f} {r['natr_min']:>5.2f} {r['chop_min']:>4.0f} {r['cooldown']:>3} | {r['deals']:>5} ${r['pnl']:>+7.2f} {r['wr']:>5.1f} {r['pf']:>6.2f} ${r['avg_win']:>6.3f} ${r['avg_loss']:>6.3f} {r['avg_bars']:>5.1f} | {r['tp']:>4} {r['ztp']:>4} {r['sl']:>4} {r['time']:>4}")

    # TOP 20 by WR (min 10 deals, PF > 1)
    results_wr = sorted([r for r in results if r["pf"] > 1.0], key=lambda x: x["wr"], reverse=True)
    print(f"\n{'='*100}")
    print(f"  TOP 20 by Win Rate (min 10 deals, PF > 1)")
    print(f"{'='*100}")
    print(header)
    print("-" * 100)

    shown = 0
    for r in results_wr:
        if r["deals"] < 10:
            continue
        shown += 1
        if shown > 20:
            break
        print(f"{shown:>3} {r['z_entry']:>4.1f} {r['tp_pct']:>5.1f} {r['sl_pct']:>5.1f} {r['natr_min']:>5.2f} {r['chop_min']:>4.0f} {r['cooldown']:>3} | {r['deals']:>5} ${r['pnl']:>+7.2f} {r['wr']:>5.1f} {r['pf']:>6.2f} ${r['avg_win']:>6.3f} ${r['avg_loss']:>6.3f} {r['avg_bars']:>5.1f} | {r['tp']:>4} {r['ztp']:>4} {r['sl']:>4} {r['time']:>4}")

    # BALANCED PICK (PF > 1, >= 15 deals, best PnL)
    balanced = [r for r in results if r["pf"] > 1.0 and r["deals"] >= 15]
    balanced.sort(key=lambda x: x["pnl"], reverse=True)
    if balanced:
        print(f"\n{'='*100}")
        print(f"  RECOMMENDED (PF > 1, >= 15 deals, sorted by PnL)")
        print(f"{'='*100}")
        print(header)
        print("-" * 100)
        for i, r in enumerate(balanced[:10]):
            print(f"{i+1:>3} {r['z_entry']:>4.1f} {r['tp_pct']:>5.1f} {r['sl_pct']:>5.1f} {r['natr_min']:>5.2f} {r['chop_min']:>4.0f} {r['cooldown']:>3} | {r['deals']:>5} ${r['pnl']:>+7.2f} {r['wr']:>5.1f} {r['pf']:>6.2f} ${r['avg_win']:>6.3f} ${r['avg_loss']:>6.3f} {r['avg_bars']:>5.1f} | {r['tp']:>4} {r['ztp']:>4} {r['sl']:>4} {r['time']:>4}")

    # Save
    output_path = os.path.join(os.path.dirname(__file__), "results_param_sweep.json")
    with open(output_path, "w") as f:
        json.dump({"grid": PARAM_GRID, "results": results[:100]}, f, indent=2)
    print(f"\n  Saved top 100 to {output_path}")


if __name__ == "__main__":
    main()

📜 Git History

dd32dfdchore: initial commit — version control setup5 weeks ago
Show last diff
Loading...