← Back
"""
MEGA Parameter Sweep: DCA Z-VWAP (NO Safety Orders)
=====================================================
30-day backtest, all key parameters:
  - Z Entry: 1.8, 2.0, 2.5, 3.0, 3.5
  - Z Max: 0 (off), 3.0, 4.0, 5.0
  - TP%: 0.5, 1.0, 1.5, 2.0, 3.0, 5.0
  - SL%: 0.5, 1.0, 2.0, 3.0, 5.0, 8.0
  - NATR min: 0.3, 0.5, 0.75, 1.0, 1.5
  - NATR max: 0 (off), 2.0, 2.5, 3.0, 5.0
  - CHOP min: 0 (off), 40, 45, 50, 55, 60
  - Volume min: 20M, 50M, 100M
  - Cooldown: 6, 12, 24 bars

Goal: find ALL profitable combos across 30 days.

Usage:
  cd /home/app/trading-bot-bybit
  python3 backtests/backtest_mega_sweep.py
"""

import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

import json
import time
import numpy as np
from datetime import datetime
from itertools import product
from pybit.unified_trading import HTTP

# ============================================================
# FIXED CONFIG
# ============================================================
ORDER_USD = 7.0
LEVERAGE = 3
VWAP_PERIOD = 50
MAKER_FEE = 0.0002
TAKER_FEE = 0.00055
TIMEFRAME = "5"
DAYS = 30
MAX_SYMBOLS = 60

BLACKLIST = {"BTCUSDT", "ETHUSDT", "TRXUSDT", "USDCUSDT", "BTCPERP", "ETHPERP"}

# ============================================================
# PARAMETER GRID
# ============================================================
PARAM_GRID = {
    "z_entry":    [1.8, 2.0, 2.5, 3.0, 3.5],
    "z_max":      [0, 3.0, 4.0, 5.0],
    "tp_pct":     [0.5, 1.0, 1.5, 2.0, 3.0, 5.0],
    "sl_pct":     [0.5, 1.0, 2.0, 3.0, 5.0, 8.0],
    "natr_min":   [0.3, 0.5, 0.75, 1.0, 1.5],
    "natr_max":   [0, 2.0, 2.5, 3.0, 5.0],
    "chop_min":   [0, 40, 45, 50, 55, 60],
    "vol_min":    [20_000_000, 50_000_000, 100_000_000],
    "cooldown":   [6, 12, 24],
}

session = HTTP(testnet=False)


# ============================================================
# DATA
# ============================================================
def get_symbols():
    resp = session.get_tickers(category="linear")
    if resp["retCode"] != 0:
        return []
    symbols = []
    for t in resp["result"]["list"]:
        sym = t["symbol"]
        if not sym.endswith("USDT") or sym in BLACKLIST:
            continue
        vol = float(t.get("turnover24h", 0))
        if vol >= 20_000_000:  # fetch all >= 20M, filter per combo
            symbols.append({"symbol": sym, "volume_24h": vol})
    symbols.sort(key=lambda x: x["volume_24h"], reverse=True)
    return symbols[:MAX_SYMBOLS]


def fetch_klines(symbol, interval, days=30):
    all_klines = []
    bars_needed = days * 24 * 60 // int(interval)
    end_time = int(datetime.now().timestamp() * 1000)
    while len(all_klines) < bars_needed:
        try:
            resp = session.get_kline(category="linear", symbol=symbol,
                                     interval=interval, limit=1000, end=end_time)
            if resp["retCode"] != 0:
                break
            items = resp["result"]["list"]
            if not items:
                break
            for item in items:
                all_klines.append({
                    "ts": int(item[0]),
                    "o": float(item[1]), "h": float(item[2]),
                    "l": float(item[3]), "c": float(item[4]),
                    "v": float(item[5]),
                })
            end_time = int(items[-1][0]) - 1
            if len(items) < 1000:
                break
            time.sleep(0.05)
        except Exception as e:
            print(f"    ERR: {e}")
            break
    all_klines.reverse()
    seen = set()
    unique = []
    for k in all_klines:
        if k["ts"] not in seen:
            seen.add(k["ts"])
            unique.append(k)
    return unique[-bars_needed:] if len(unique) > bars_needed else unique


# ============================================================
# INDICATORS (vectorized)
# ============================================================
def calc_all_indicators(closes, highs, lows, volumes):
    n = len(closes)

    # Z-VWAP
    z_scores = np.zeros(n)
    for i in range(VWAP_PERIOD, n):
        h = highs[i-VWAP_PERIOD:i]
        l = lows[i-VWAP_PERIOD:i]
        c = closes[i-VWAP_PERIOD:i]
        v = volumes[i-VWAP_PERIOD:i]
        tp = (h + l + c) / 3
        ctv = np.cumsum(tp * v)
        cv = np.cumsum(v)
        cv_safe = np.where(cv == 0, 1, cv)
        vwap_arr = ctv / cv_safe
        vwap = vwap_arr[-1]
        dev = c - vwap_arr
        std = np.std(dev)
        if std > 0:
            z_scores[i] = (closes[i] - vwap) / std

    # NATR (rolling 14)
    natr = np.zeros(n)
    for i in range(14, n):
        trs = []
        for j in range(i-13, i+1):
            tr = max(highs[j] - lows[j],
                     abs(highs[j] - closes[j-1]),
                     abs(lows[j] - closes[j-1]))
            trs.append(tr)
        atr = np.mean(trs)
        natr[i] = (atr / closes[i]) * 100 if closes[i] > 0 else 0

    # CHOP Index (14)
    chop = np.full(n, 50.0)
    for i in range(14, n):
        atr_sum = 0
        for j in range(i-13, i+1):
            tr = max(highs[j] - lows[j],
                     abs(highs[j] - closes[j-1]),
                     abs(lows[j] - closes[j-1]))
            atr_sum += tr
        hi = np.max(highs[i-13:i+1])
        lo = np.min(lows[i-13:i+1])
        rng = hi - lo
        if rng > 0:
            chop[i] = 100 * np.log10(atr_sum / rng) / np.log10(14)

    return z_scores, natr, chop


# ============================================================
# SIMPLE DEAL (no SO)
# ============================================================
def simulate_deals(z_scores, natr, chop, closes, highs, lows, timestamps, params):
    n = len(closes)
    z_entry = params["z_entry"]
    z_max = params["z_max"]
    tp_pct = params["tp_pct"]
    sl_pct = params["sl_pct"]
    natr_min = params["natr_min"]
    natr_max = params["natr_max"]
    chop_min = params["chop_min"]
    cooldown = params["cooldown"]

    deals = []
    in_trade = False
    side = None
    entry_price = 0
    entry_bar = 0
    cooldown_until = 0

    for i in range(VWAP_PERIOD, n):
        if in_trade:
            z = z_scores[i]
            closed = False
            close_price = 0
            reason = ""

            if side == "LONG":
                tp_price = entry_price * (1 + tp_pct / 100)
                sl_price = entry_price * (1 - sl_pct / 100)
                if lows[i] <= sl_price:
                    close_price = sl_price
                    reason = "SL"
                    closed = True
                elif highs[i] >= tp_price:
                    close_price = tp_price
                    reason = "TP"
                    closed = True
                elif z >= -0.3 and closes[i] > entry_price:
                    close_price = closes[i]
                    reason = "Z-TP"
                    closed = True
            else:  # SHORT
                tp_price = entry_price * (1 - tp_pct / 100)
                sl_price = entry_price * (1 + sl_pct / 100)
                if highs[i] >= sl_price:
                    close_price = sl_price
                    reason = "SL"
                    closed = True
                elif lows[i] <= tp_price:
                    close_price = tp_price
                    reason = "TP"
                    closed = True
                elif z <= 0.3 and closes[i] < entry_price:
                    close_price = closes[i]
                    reason = "Z-TP"
                    closed = True

            # Time stop: 36 bars (3h)
            if not closed and (i - entry_bar) >= 36:
                close_price = closes[i]
                reason = "TIME"
                closed = True

            if closed:
                qty = (ORDER_USD * LEVERAGE) / entry_price
                if side == "LONG":
                    pnl = qty * (close_price - entry_price)
                else:
                    pnl = qty * (entry_price - close_price)
                fees = qty * entry_price * TAKER_FEE + qty * close_price * TAKER_FEE
                pnl -= fees
                deals.append({
                    "side": side, "pnl": pnl, "reason": reason,
                    "bars": i - entry_bar,
                })
                in_trade = False
                cooldown_until = i + cooldown
            continue

        # Check entry
        if i < cooldown_until:
            continue

        z = z_scores[i]
        if abs(z) <= z_entry:
            continue

        # Z max filter (skip breakouts)
        if z_max > 0 and abs(z) > z_max:
            continue

        # NATR filters
        if natr[i] < natr_min:
            continue
        if natr_max > 0 and natr[i] > natr_max:
            continue

        # CHOP filter
        if chop_min > 0 and chop[i] < chop_min:
            continue

        side = "LONG" if z < -z_entry else "SHORT"
        entry_price = closes[i]
        entry_bar = i
        in_trade = True

    # Force close open
    if in_trade:
        qty = (ORDER_USD * LEVERAGE) / entry_price
        cp = closes[-1]
        if side == "LONG":
            pnl = qty * (cp - entry_price)
        else:
            pnl = qty * (entry_price - cp)
        fees = qty * entry_price * TAKER_FEE + qty * cp * TAKER_FEE
        pnl -= fees
        deals.append({"side": side, "pnl": pnl, "reason": "END", "bars": n - 1 - entry_bar})

    return deals


# ============================================================
# MAIN
# ============================================================
def main():
    print("=" * 80)
    print("  MEGA PARAMETER SWEEP — Z-VWAP NO SO — 30 DAYS")
    print("=" * 80)

    # Calculate total combos
    keys = list(PARAM_GRID.keys())
    values = list(PARAM_GRID.values())
    combos = list(product(*values))
    total = len(combos)
    print(f"\n  Grid: {' × '.join(str(len(v)) for v in values)} = {total} combinations")

    # Fetch symbols
    print("\n  Fetching symbols...")
    symbols_data = get_symbols()
    print(f"  {len(symbols_data)} symbols with vol >= $20M")

    # Fetch all kline data (30 days = ~8640 5m bars)
    print(f"\n  Downloading 30-day klines ({TIMEFRAME}m)...")
    all_data = {}
    for idx, sd in enumerate(symbols_data):
        sym = sd["symbol"]
        print(f"  [{idx+1}/{len(symbols_data)}] {sym}...", end=" ", flush=True)
        time.sleep(0.12)
        klines = fetch_klines(sym, TIMEFRAME, days=DAYS)
        if len(klines) < VWAP_PERIOD + 100:
            print(f"skip ({len(klines)} bars)")
            continue
        closes = np.array([k["c"] for k in klines])
        highs = np.array([k["h"] for k in klines])
        lows = np.array([k["l"] for k in klines])
        volumes = np.array([k["v"] for k in klines])
        timestamps = [k["ts"] for k in klines]

        z_scores, natr, chop = calc_all_indicators(closes, highs, lows, volumes)
        all_data[sym] = {
            "closes": closes, "highs": highs, "lows": lows,
            "z": z_scores, "natr": natr, "chop": chop, "ts": timestamps,
            "vol": sd["volume_24h"],
        }
        print(f"OK ({len(klines)} bars)")

    print(f"\n  {len(all_data)} symbols loaded")

    # Build symbol->volume lookup
    sym_vol = {sd["symbol"]: sd["volume_24h"] for sd in symbols_data}

    # Run sweep
    print(f"\n  Running {total} combinations...")
    results = []
    t0 = time.time()

    for ci, combo in enumerate(combos):
        params = dict(zip(keys, combo))

        # Quick sanity: tp must be > fees
        if params["tp_pct"] < 0.1:
            continue

        # Skip impossible: z_entry >= z_max (if z_max enabled)
        if params["z_max"] > 0 and params["z_entry"] >= params["z_max"]:
            continue

        # Skip impossible: natr_min >= natr_max (if natr_max enabled)
        if params["natr_max"] > 0 and params["natr_min"] >= params["natr_max"]:
            continue

        all_deals = []
        for sym, data in all_data.items():
            # Volume filter
            if sym_vol.get(sym, 0) < params["vol_min"]:
                continue

            deals = simulate_deals(
                data["z"], data["natr"], data["chop"],
                data["closes"], data["highs"], data["lows"],
                data["ts"], params
            )
            for d in deals:
                d["symbol"] = sym
            all_deals.extend(deals)

        if len(all_deals) < 5:
            continue

        total_pnl = sum(d["pnl"] for d in all_deals)
        wins = [d for d in all_deals if d["pnl"] > 0]
        losses = [d for d in all_deals if d["pnl"] <= 0]
        wr = len(wins) / len(all_deals) * 100
        gp = sum(d["pnl"] for d in wins) if wins else 0
        gl = abs(sum(d["pnl"] for d in losses)) if losses else 0.001
        pf = gp / gl if gl > 0 else 999

        avg_bars = sum(d["bars"] for d in all_deals) / len(all_deals)

        tp_cnt = sum(1 for d in all_deals if d["reason"] == "TP")
        ztp_cnt = sum(1 for d in all_deals if d["reason"] == "Z-TP")
        sl_cnt = sum(1 for d in all_deals if d["reason"] == "SL")
        time_cnt = sum(1 for d in all_deals if d["reason"] == "TIME")

        results.append({
            **params,
            "deals": len(all_deals),
            "pnl": round(total_pnl, 2),
            "pnl_per_day": round(total_pnl / DAYS, 2),
            "wr": round(wr, 1),
            "pf": round(pf, 2),
            "avg_win": round(gp / len(wins), 3) if wins else 0,
            "avg_loss": round(gl / len(losses), 3) if losses else 0,
            "avg_bars": round(avg_bars, 1),
            "tp": tp_cnt, "ztp": ztp_cnt, "sl": sl_cnt, "time": time_cnt,
        })

        if (ci + 1) % 500 == 0:
            elapsed = time.time() - t0
            rate = (ci + 1) / elapsed
            eta = (total - ci - 1) / rate / 60
            profitable = sum(1 for r in results if r["pnl"] > 0)
            print(f"  ... {ci+1}/{total} ({rate:.0f}/s, ETA {eta:.1f}min) — {len(results)} valid, {profitable} profitable")

    elapsed = time.time() - t0
    profitable = sum(1 for r in results if r["pnl"] > 0)
    print(f"\n  Done in {elapsed:.0f}s! {len(results)} valid combos, {profitable} profitable")

    # ============================================================
    # DISPLAY RESULTS
    # ============================================================
    header = f"{'#':>3} {'Z':>4} {'Zmax':>4} {'TP%':>5} {'SL%':>5} {'NATRm':>5} {'NATRx':>5} {'CHOP':>4} {'VolM':>4} {'CD':>3} | {'Deals':>5} {'PnL':>8} {'$/d':>6} {'WR%':>5} {'PF':>6} {'AvgW':>7} {'AvgL':>7} {'Bars':>5} | {'TP':>4} {'ZTP':>4} {'SL':>4} {'TIM':>4}"

    def print_row(i, r):
        vol_m = r["vol_min"] // 1_000_000
        natr_x = r["natr_max"] if r["natr_max"] > 0 else 0
        z_mx = r["z_max"] if r["z_max"] > 0 else 0
        print(f"{i:>3} {r['z_entry']:>4.1f} {z_mx:>4.1f} {r['tp_pct']:>5.1f} {r['sl_pct']:>5.1f} {r['natr_min']:>5.2f} {natr_x:>5.1f} {r['chop_min']:>4.0f} {vol_m:>4.0f} {r['cooldown']:>3} | {r['deals']:>5} ${r['pnl']:>+7.2f} ${r['pnl_per_day']:>+5.2f} {r['wr']:>5.1f} {r['pf']:>6.2f} ${r['avg_win']:>6.3f} ${r['avg_loss']:>6.3f} {r['avg_bars']:>5.1f} | {r['tp']:>4} {r['ztp']:>4} {r['sl']:>4} {r['time']:>4}")

    # TOP 30 by PnL (min 20 deals for 30d)
    min_deals = 20
    results_pnl = sorted([r for r in results if r["deals"] >= min_deals], key=lambda x: x["pnl"], reverse=True)
    print(f"\n{'='*140}")
    print(f"  TOP 30 by Total PnL (min {min_deals} deals)")
    print(f"{'='*140}")
    print(header)
    print("-" * 140)
    for i, r in enumerate(results_pnl[:30]):
        print_row(i+1, r)

    # TOP 30 by PF (min 20 deals)
    results_pf = sorted([r for r in results if r["deals"] >= min_deals and r["pf"] < 100], key=lambda x: x["pf"], reverse=True)
    print(f"\n{'='*140}")
    print(f"  TOP 30 by Profit Factor (min {min_deals} deals)")
    print(f"{'='*140}")
    print(header)
    print("-" * 140)
    for i, r in enumerate(results_pf[:30]):
        print_row(i+1, r)

    # TOP 30 by PnL/day (min 20 deals, PF > 1)
    results_daily = sorted([r for r in results if r["deals"] >= min_deals and r["pf"] > 1.0], key=lambda x: x["pnl_per_day"], reverse=True)
    print(f"\n{'='*140}")
    print(f"  TOP 30 by $/day (min {min_deals} deals, PF > 1)")
    print(f"{'='*140}")
    print(header)
    print("-" * 140)
    for i, r in enumerate(results_daily[:30]):
        print_row(i+1, r)

    # BALANCED: PF > 1.3, >= 30 deals, WR > 50%
    balanced = [r for r in results if r["pf"] > 1.3 and r["deals"] >= 30 and r["wr"] > 50]
    balanced.sort(key=lambda x: x["pnl"], reverse=True)
    if balanced:
        print(f"\n{'='*140}")
        print(f"  BALANCED PICK (PF > 1.3, >= 30 deals, WR > 50%)")
        print(f"{'='*140}")
        print(header)
        print("-" * 140)
        for i, r in enumerate(balanced[:30]):
            print_row(i+1, r)

    # CONSERVATIVE: PF > 1.5, >= 50 deals, WR > 60%
    conservative = [r for r in results if r["pf"] > 1.5 and r["deals"] >= 50 and r["wr"] > 60]
    conservative.sort(key=lambda x: x["pnl"], reverse=True)
    if conservative:
        print(f"\n{'='*140}")
        print(f"  CONSERVATIVE (PF > 1.5, >= 50 deals, WR > 60%)")
        print(f"{'='*140}")
        print(header)
        print("-" * 140)
        for i, r in enumerate(conservative[:20]):
            print_row(i+1, r)

    # SUMMARY STATS
    print(f"\n{'='*80}")
    print(f"  SUMMARY")
    print(f"{'='*80}")
    print(f"  Total combinations tested: {total}")
    print(f"  Valid (>= 5 deals): {len(results)}")
    print(f"  Profitable: {profitable} ({profitable/len(results)*100:.1f}%)" if results else "")
    if results_pnl:
        print(f"  Best PnL/30d: ${results_pnl[0]['pnl']:+.2f} ({results_pnl[0]['pnl_per_day']:+.2f}/day)")
        print(f"  Best PF: {results_pf[0]['pf']:.2f}" if results_pf else "")
    print(f"  Elapsed: {elapsed:.0f}s")

    # Save all results
    output_path = os.path.join(os.path.dirname(__file__), "results_mega_sweep.json")
    with open(output_path, "w") as f:
        json.dump({
            "grid": {k: [str(v) for v in vs] for k, vs in PARAM_GRID.items()},
            "days": DAYS,
            "symbols": len(all_data),
            "total_combos": total,
            "valid": len(results),
            "profitable": profitable,
            "elapsed_sec": round(elapsed),
            "results": sorted(results, key=lambda x: x["pnl"], reverse=True)[:500],
        }, f, indent=2)
    print(f"  Saved top 500 to {output_path}")


if __name__ == "__main__":
    main()

📜 Git History

dd32dfdchore: initial commit — version control setup5 weeks ago
Show last diff
Loading...