← Back
"""
Sweep v2: Only R:R >= 1 combos (TP >= SL)
==========================================
Reuses data from sweep_zvwap.py logic.
Saves to sweep_rr_results.json.

Usage:
  python3 backtests/sweep_rr.py
"""

import sys, os, json, time, argparse
import numpy as np
from datetime import datetime
from itertools import product

sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from pybit.unified_trading import HTTP

VWAP_PERIOD = 50
NATR_PERIOD = 14
CHOP_PERIOD = 14
MAKER_FEE = 0.0002
TAKER_FEE = 0.00055
BLACKLIST = {"BTCUSDT", "ETHUSDT", "USDCUSDT", "BTCPERP", "ETHPERP"}

# R:R focused grid
GRID = {
    "z_entry":   [1.8, 2.0, 2.5, 3.0],
    "z_max":     [2.5, 3.5, 5.0],
    "tp_pct":    [1.0, 1.5, 2.0, 3.0, 4.0, 5.0],
    "sl_pct":    [0.5, 1.0, 1.5, 2.0, 3.0],
    "chop_min":  [0, 45, 50, 55],
    "natr_min":  [0.5, 0.75],
    "natr_max":  [2.5, 3.5],
}


def fetch_klines(session, symbol, interval="5", days=7):
    all_klines = []
    bars_needed = days * 24 * 60 // int(interval)
    end_time = int(datetime.now().timestamp() * 1000)
    while len(all_klines) < bars_needed:
        try:
            resp = session.get_kline(
                category="linear", symbol=symbol,
                interval=interval, limit=1000, end=end_time,
            )
            if resp["retCode"] != 0: break
            items = resp["result"]["list"]
            if not items: break
            for item in items:
                all_klines.append({
                    "ts": int(item[0]), "o": float(item[1]),
                    "h": float(item[2]), "l": float(item[3]),
                    "c": float(item[4]), "v": float(item[5]),
                })
            end_time = int(items[-1][0]) - 1
            if len(items) < 1000: break
        except Exception as e:
            print(f"  ERR {symbol}: {e}"); break
        time.sleep(0.15)
    all_klines.reverse()
    seen = set(); unique = []
    for k in all_klines:
        if k["ts"] not in seen: seen.add(k["ts"]); unique.append(k)
    return unique[-bars_needed:] if len(unique) > bars_needed else unique


def get_top_symbols(session, top_n=35, min_vol=20_000_000):
    tickers = session.get_tickers(category="linear")
    cands = []
    for t in tickers["result"]["list"]:
        sym = t["symbol"]
        if not sym.endswith("USDT") or sym in BLACKLIST: continue
        vol = float(t.get("turnover24h", 0))
        if vol >= min_vol: cands.append((sym, vol))
    cands.sort(key=lambda x: x[1], reverse=True)
    return cands[:top_n]


def calc_zvwap(h, l, c, v, period=VWAP_PERIOD):
    n = len(c); z = np.full(n, 0.0)
    for i in range(period, n):
        hi=h[i-period:i]; lo=l[i-period:i]; ci=c[i-period:i]; vi=v[i-period:i]
        tp = (hi+lo+ci)/3; ctv = np.cumsum(tp*vi); cv = np.cumsum(vi)
        cvs = np.where(cv==0,1,cv); va = ctv/cvs; vwap=va[-1]
        std = np.std(ci-va)
        if std > 0: z[i] = (c[i]-vwap)/std
    return z


def calc_natr(h, l, c, period=NATR_PERIOD):
    n = len(c); natr = np.full(n, 0.0); tr = np.zeros(n)
    for i in range(1, n):
        tr[i] = max(h[i]-l[i], abs(h[i]-c[i-1]), abs(l[i]-c[i-1]))
    for i in range(period, n):
        atr = np.mean(tr[i-period+1:i+1])
        natr[i] = (atr/c[i])*100 if c[i]>0 else 0
    return natr


def calc_chop(h, l, c, period=CHOP_PERIOD):
    n = len(c); chop = np.full(n, 50.0); tr = np.zeros(n)
    for i in range(1, n):
        tr[i] = max(h[i]-l[i], abs(h[i]-c[i-1]), abs(l[i]-c[i-1]))
    for i in range(period, n):
        atr_sum = np.sum(tr[i-period+1:i+1])
        highest = np.max(h[i-period+1:i+1]); lowest = np.min(l[i-period+1:i+1])
        rng = highest - lowest
        if rng > 0: chop[i] = 100*np.log10(atr_sum/rng)/np.log10(period)
    return chop


def run_one(data_cache, cfg):
    all_trades = []
    for sym, (h, l, c, v, z_arr, natr_arr, chop_arr) in data_cache.items():
        n = len(c); active = None; cooldown = 0
        for i in range(VWAP_PERIOD, n):
            z=z_arr[i]; natr=natr_arr[i]; chop=chop_arr[i]
            if active:
                sl_hit=tp_hit=z_tp=False
                if active["side"]=="LONG":
                    if l[i]<=active["sl"]: sl_hit=True; cp=active["sl"]
                    if h[i]>=active["tp"]: tp_hit=True; cp2=active["tp"]
                    if z>=-0.3 and c[i]>active["ep"]: z_tp=True; cp3=c[i]
                else:
                    if h[i]>=active["sl"]: sl_hit=True; cp=active["sl"]
                    if l[i]<=active["tp"]: tp_hit=True; cp2=active["tp"]
                    if z<=0.3 and c[i]<active["ep"]: z_tp=True; cp3=c[i]
                if sl_hit:
                    all_trades.append({"sym":sym,"pnl":_pnl(active,cp,"SL"),"reason":"SL"})
                    active=None; cooldown=i+12
                elif tp_hit:
                    all_trades.append({"sym":sym,"pnl":_pnl(active,cp2,"TP"),"reason":"TP"})
                    active=None; cooldown=i+12
                elif z_tp:
                    all_trades.append({"sym":sym,"pnl":_pnl(active,cp3,"Z-TP"),"reason":"Z-TP"})
                    active=None; cooldown=i+12
                continue
            if i<cooldown: continue
            if natr<cfg["natr_min"] or natr>cfg["natr_max"]: continue
            if cfg["chop_min"]>0 and chop<cfg["chop_min"]: continue
            if abs(z)>cfg["z_max"]: continue
            if z<-cfg["z_entry"]:
                ep=c[i]; active={"side":"LONG","ep":ep,"qty":(5.0*3)/ep,
                    "tp":ep*(1+cfg["tp_pct"]/100),"sl":ep*(1-cfg["sl_pct"]/100)}
            elif z>cfg["z_entry"]:
                ep=c[i]; active={"side":"SHORT","ep":ep,"qty":(5.0*3)/ep,
                    "tp":ep*(1-cfg["tp_pct"]/100),"sl":ep*(1+cfg["sl_pct"]/100)}
        if active:
            all_trades.append({"sym":sym,"pnl":_pnl(active,c[-1],"END"),"reason":"END"})

    if not all_trades: return None
    wins=[t for t in all_trades if t["pnl"]>0]
    total_pnl=sum(t["pnl"] for t in all_trades)
    gp=sum(t["pnl"] for t in wins); gl=abs(sum(t["pnl"] for t in all_trades if t["pnl"]<=0))
    pf=gp/gl if gl>0 else 999
    return {
        "trades":len(all_trades),
        "wr":round(len(wins)/len(all_trades)*100,1) if all_trades else 0,
        "pnl":round(total_pnl,2), "pf":round(pf,2),
        "tp_count":sum(1 for t in all_trades if t["reason"]=="TP"),
        "sl_count":sum(1 for t in all_trades if t["reason"]=="SL"),
        "ztp_count":sum(1 for t in all_trades if t["reason"]=="Z-TP"),
    }


def _pnl(trade, close_price, reason):
    if trade["side"]=="LONG": pnl=trade["qty"]*(close_price-trade["ep"])
    else: pnl=trade["qty"]*(trade["ep"]-close_price)
    ef=trade["qty"]*trade["ep"]*TAKER_FEE
    xf=trade["qty"]*close_price*(MAKER_FEE if reason=="TP" else TAKER_FEE)
    return pnl-ef-xf


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--days", type=int, default=7)
    parser.add_argument("--top", type=int, default=35)
    args = parser.parse_args()

    session = HTTP(testnet=False)
    print(f"[1/3] Fetching top {args.top} symbols...")
    symbols = get_top_symbols(session, args.top)
    print(f"  Found {len(symbols)} symbols")

    print(f"[2/3] Downloading {args.days}d klines...")
    data_cache = {}
    for idx, (sym, vol) in enumerate(symbols):
        klines = fetch_klines(session, sym, days=args.days)
        if len(klines) < VWAP_PERIOD + 50: continue
        h=np.array([k["h"] for k in klines]); l=np.array([k["l"] for k in klines])
        c=np.array([k["c"] for k in klines]); v=np.array([k["v"] for k in klines])
        data_cache[sym] = (h, l, c, v, calc_zvwap(h,l,c,v), calc_natr(h,l,c), calc_chop(h,l,c))
        if (idx+1)%10==0: print(f"  {idx+1}/{len(symbols)}")
        time.sleep(0.2)
    print(f"  Cached {len(data_cache)} symbols")

    # Build combos, ONLY R:R >= 1
    keys = list(GRID.keys())
    all_combos = list(product(*[GRID[k] for k in keys]))
    combos = []
    for vals in all_combos:
        cfg = dict(zip(keys, vals))
        if cfg["z_entry"] >= cfg["z_max"]: continue
        if cfg["tp_pct"] < cfg["sl_pct"]: continue  # R:R < 1 — skip
        combos.append(cfg)

    print(f"[3/3] Sweeping {len(combos)} R:R≥1 combos...")
    results = []
    for idx, cfg in enumerate(combos):
        res = run_one(data_cache, cfg)
        if res and res["trades"] >= 5:
            res["config"] = cfg
            res["rr"] = round(cfg["tp_pct"] / cfg["sl_pct"], 1)
            results.append(res)
        if (idx+1) % 200 == 0: print(f"  {idx+1}/{len(combos)}")

    results.sort(key=lambda x: x["pnl"], reverse=True)

    out_path = os.path.join(os.path.dirname(__file__), "sweep_rr_results.json")
    with open(out_path, "w") as f:
        json.dump({
            "date": datetime.now().isoformat(),
            "days": args.days, "symbols": len(data_cache),
            "combos_tested": len(results),
            "top_30": results[:30],
            "bottom_5": results[-5:] if len(results)>=5 else results,
        }, f, indent=2)

    print(f"\n{'='*85}")
    print(f"SWEEP R:R≥1: {len(results)} valid combos ({args.days}d, {len(data_cache)} coins)")
    print(f"{'='*85}")
    print(f"{'#':>3} {'PnL':>8} {'WR%':>6} {'PF':>6} {'Trd':>5} {'R:R':>4} | Z    TP%  SL%  CHOP NATR")
    print("-"*85)
    for i, r in enumerate(results[:20]):
        c = r["config"]
        print(f"{i+1:>3} ${r['pnl']:>7.2f} {r['wr']:>5.1f}% {r['pf']:>5.2f} {r['trades']:>5} {r['rr']:>4.1f} | "
              f"{c['z_entry']:>4.1f} {c['tp_pct']:>4.1f} {c['sl_pct']:>4.1f} {c['chop_min']:>4} {c['natr_min']:.2f}-{c['natr_max']:.1f}")

    print(f"\nResults saved to {out_path}")


if __name__ == "__main__":
    main()

📜 Git History

dd32dfdchore: initial commit — version control setup5 weeks ago
Show last diff
Loading...