← Back
"""
R:R Sweep — positive risk/reward combos
=========================================
Fixed: TP/SL ratio >= 2:1, CHOP on, no SO
Sweep: Z, TP, SL, NATR, CHOP

Usage: python3 backtests/backtest_rr_sweep.py
"""

import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

import json, time
import numpy as np
from datetime import datetime
from itertools import product
from pybit.unified_trading import HTTP

ORDER_USD = 7.0
LEVERAGE = 3
VWAP_PERIOD = 50
TAKER_FEE = 0.00055
TIMEFRAME = "5"
DAYS = 30
MIN_VOLUME_24H = 20_000_000
BLACKLIST = {"BTCUSDT", "ETHUSDT", "TRXUSDT", "USDCUSDT", "BTCPERP", "ETHPERP"}

# R:R combos (all >= 2:1)
RR_COMBOS = [
    (2.0, 1.0),   # 2:1
    (3.0, 1.0),   # 3:1
    (3.0, 1.5),   # 2:1
    (4.0, 1.5),   # 2.7:1
    (5.0, 2.0),   # 2.5:1
    (2.0, 0.5),   # 4:1
    (3.0, 0.5),   # 6:1
    (1.5, 0.5),   # 3:1
    (1.5, 0.75),  # 2:1
    (2.0, 0.75),  # 2.7:1
]

GRID = {
    "z_entry":  [1.8, 2.0, 2.5, 3.0],
    "z_max":    [0, 2.5, 3.5],  # 0 = off
    "natr_min": [0.5, 0.75, 1.0, 1.5],
    "natr_max": [0, 2.0, 3.0],  # 0 = off
    "chop_min": [45, 50, 55, 60],
    "cooldown": [6, 14],
}

session = HTTP(testnet=False)

def get_symbols():
    resp = session.get_tickers(category="linear")
    if resp["retCode"] != 0: return []
    out = []
    for t in resp["result"]["list"]:
        s = t["symbol"]
        if not s.endswith("USDT") or s in BLACKLIST: continue
        v = float(t.get("turnover24h", 0))
        if v >= MIN_VOLUME_24H:
            out.append({"symbol": s, "volume_24h": v})
    out.sort(key=lambda x: x["volume_24h"], reverse=True)
    return out[:50]

def fetch_klines(symbol, interval, days):
    kls = []
    need = days * 24 * 60 // int(interval)
    end = int(datetime.now().timestamp() * 1000)
    while len(kls) < need:
        try:
            r = session.get_kline(category="linear", symbol=symbol, interval=interval, limit=1000, end=end)
            if r["retCode"] != 0: break
            items = r["result"]["list"]
            if not items: break
            for i in items:
                kls.append({"ts": int(i[0]), "h": float(i[2]), "l": float(i[3]), "c": float(i[4]), "v": float(i[5])})
            end = int(items[-1][0]) - 1
            if len(items) < 1000: break
        except: break
    kls.reverse()
    seen = set(); u = []
    for k in kls:
        if k["ts"] not in seen: seen.add(k["ts"]); u.append(k)
    return u[-need:] if len(u) > need else u

def calc_indicators(c, h, l, v):
    n = len(c)
    z = np.zeros(n)
    for i in range(VWAP_PERIOD, n):
        hh, ll, cc, vv = h[i-VWAP_PERIOD:i], l[i-VWAP_PERIOD:i], c[i-VWAP_PERIOD:i], v[i-VWAP_PERIOD:i]
        tp = (hh+ll+cc)/3; ctv = np.cumsum(tp*vv); cv = np.cumsum(vv)
        cv_s = np.where(cv==0,1,cv); va = ctv/cv_s
        dev = cc - va; std = np.std(dev)
        if std > 0: z[i] = (c[i] - va[-1]) / std

    natr = np.zeros(n)
    for i in range(14, n):
        trs = [max(h[j]-l[j], abs(h[j]-c[j-1]), abs(l[j]-c[j-1])) for j in range(i-13,i+1)]
        natr[i] = (np.mean(trs)/c[i])*100 if c[i]>0 else 0

    chop = np.full(n, 50.0)
    for i in range(14, n):
        atr_s = sum(max(h[j]-l[j], abs(h[j]-c[j-1]), abs(l[j]-c[j-1])) for j in range(i-13,i+1))
        hi, lo = np.max(h[i-13:i+1]), np.min(l[i-13:i+1])
        rng = hi - lo
        if rng > 0: chop[i] = 100*np.log10(atr_s/rng)/np.log10(14)
    return z, natr, chop

def sim(z_arr, natr_arr, chop_arr, c, h, l, p):
    n = len(c)
    deals = []; in_t = False; side = None; ep = 0; eb = 0; cu = 0
    ze, zm, ztp = p["z_entry"], p["z_max"], 0.3
    tp, sl = p["tp_pct"], p["sl_pct"]
    nm, nx, cm, cd = p["natr_min"], p["natr_max"], p["chop_min"], p["cooldown"]

    for i in range(VWAP_PERIOD, n):
        if in_t:
            z = z_arr[i]; closed = False; cp = 0; reason = ""
            if side == "LONG":
                tp_p, sl_p = ep*(1+tp/100), ep*(1-sl/100)
                if l[i] <= sl_p: cp,reason,closed = sl_p,"SL",True
                elif h[i] >= tp_p: cp,reason,closed = tp_p,"TP",True
                elif z >= -ztp and c[i] > ep: cp,reason,closed = c[i],"Z-TP",True
            else:
                tp_p, sl_p = ep*(1-tp/100), ep*(1+sl/100)
                if h[i] >= sl_p: cp,reason,closed = sl_p,"SL",True
                elif l[i] <= tp_p: cp,reason,closed = tp_p,"TP",True
                elif z <= ztp and c[i] < ep: cp,reason,closed = c[i],"Z-TP",True
            if not closed and (i-eb) >= 36: cp,reason,closed = c[i],"TIME",True
            if closed:
                qty = (ORDER_USD*LEVERAGE)/ep
                pnl = qty*(cp-ep) if side=="LONG" else qty*(ep-cp)
                pnl -= qty*ep*TAKER_FEE + qty*cp*TAKER_FEE
                deals.append({"pnl": pnl, "reason": reason})
                in_t = False; cu = i + cd
            continue
        if i < cu: continue
        z = z_arr[i]
        if abs(z) <= ze: continue
        if zm > 0 and abs(z) > zm: continue
        if natr_arr[i] < nm: continue
        if nx > 0 and natr_arr[i] > nx: continue
        if chop_arr[i] < cm: continue
        side = "LONG" if z < -ze else "SHORT"
        ep = c[i]; eb = i; in_t = True

    if in_t:
        qty = (ORDER_USD*LEVERAGE)/ep; cp = c[-1]
        pnl = qty*(cp-ep) if side=="LONG" else qty*(ep-cp)
        pnl -= qty*ep*TAKER_FEE + qty*cp*TAKER_FEE
        deals.append({"pnl": pnl, "reason": "END"})
    return deals

def main():
    print("="*80)
    print("  R:R SWEEP — Positive Risk/Reward + All Filters")
    print(f"  {len(RR_COMBOS)} R:R combos x {len(list(product(*GRID.values())))} filter combos")
    print("="*80)

    syms = get_symbols()
    print(f"\n  {len(syms)} symbols, downloading {DAYS}d klines...")

    data = {}
    for idx, sd in enumerate(syms):
        s = sd["symbol"]
        print(f"  [{idx+1}/{len(syms)}] {s}...", end=" ", flush=True)
        time.sleep(0.15)
        kl = fetch_klines(s, TIMEFRAME, days=DAYS)
        if len(kl) < VWAP_PERIOD+100: print("skip"); continue
        c = np.array([k["c"] for k in kl]); h = np.array([k["h"] for k in kl])
        l = np.array([k["l"] for k in kl]); v = np.array([k["v"] for k in kl])
        zz, natr, chop = calc_indicators(c,h,l,v)
        data[s] = {"c":c,"h":h,"l":l,"z":zz,"natr":natr,"chop":chop}
        print("OK")

    print(f"\n  {len(data)} symbols loaded. Running sweep...\n")

    gkeys = list(GRID.keys())
    gvals = list(GRID.values())
    gcombos = list(product(*gvals))
    total = len(RR_COMBOS) * len(gcombos)
    print(f"  Total combos: {total}")

    results = []
    cnt = 0

    for tp_pct, sl_pct in RR_COMBOS:
        for gcombo in gcombos:
            gp = dict(zip(gkeys, gcombo))
            # skip z_max <= z_entry (makes no sense)
            if gp["z_max"] > 0 and gp["z_max"] <= gp["z_entry"]:
                continue
            params = {**gp, "tp_pct": tp_pct, "sl_pct": sl_pct}

            all_deals = []
            for s, d in data.items():
                deals = sim(d["z"], d["natr"], d["chop"], d["c"], d["h"], d["l"], params)
                all_deals.extend(deals)

            cnt += 1
            if cnt % 500 == 0:
                print(f"  ... {cnt}/{total}")

            if len(all_deals) < 10: continue

            pnl = sum(d["pnl"] for d in all_deals)
            wins = [d for d in all_deals if d["pnl"] > 0]
            losses = [d for d in all_deals if d["pnl"] <= 0]
            wr = len(wins)/len(all_deals)*100
            gp_val = sum(d["pnl"] for d in wins) if wins else 0
            gl_val = abs(sum(d["pnl"] for d in losses)) if losses else 0.001
            pf = gp_val/gl_val

            reasons = {}
            for d in all_deals: reasons[d["reason"]] = reasons.get(d["reason"],0)+1

            results.append({
                "tp": tp_pct, "sl": sl_pct, "rr": round(tp_pct/sl_pct, 1),
                **gp,
                "deals": len(all_deals), "pnl": round(pnl,2), "wr": round(wr,1),
                "pf": round(pf,2),
                "avg_w": round(gp_val/len(wins),3) if wins else 0,
                "avg_l": round(gl_val/len(losses),3) if losses else 0,
                "tp_cnt": reasons.get("TP",0), "ztp_cnt": reasons.get("Z-TP",0),
                "sl_cnt": reasons.get("SL",0), "time_cnt": reasons.get("TIME",0),
            })

    print(f"\n  {len(results)} valid combos (>=10 deals)")

    # Sort by PF
    results.sort(key=lambda x: (x["pf"], x["pnl"]), reverse=True)

    hdr = f"{'#':>3} {'TP':>4} {'SL':>4} {'R:R':>4} {'Z':>4} {'Zm':>4} {'NRm':>4} {'NRx':>4} {'CHP':>3} {'CD':>3} | {'Dls':>4} {'PnL':>8} {'WR%':>5} {'PF':>5} {'AvW':>6} {'AvL':>6} | {'TP':>3} {'ZTP':>3} {'SL':>3} {'TIM':>3}"
    sep = "-"*105

    # TOP 25 by PF (min 15 deals)
    print(f"\n{'='*105}")
    print(f"  TOP 25 by Profit Factor (min 15 deals, R:R >= 2:1)")
    print(f"{'='*105}")
    print(hdr); print(sep)
    shown = 0
    for r in results:
        if r["deals"] < 15: continue
        shown += 1
        if shown > 25: break
        print(f"{shown:>3} {r['tp']:>4.1f} {r['sl']:>4.1f} {r['rr']:>4.1f} {r['z_entry']:>4.1f} {r['z_max']:>4.1f} {r['natr_min']:>4.2f} {r['natr_max']:>4.1f} {r['chop_min']:>3.0f} {r['cooldown']:>3} | {r['deals']:>4} ${r['pnl']:>+7.2f} {r['wr']:>5.1f} {r['pf']:>5.2f} ${r['avg_w']:>5.3f} ${r['avg_l']:>5.3f} | {r['tp_cnt']:>3} {r['ztp_cnt']:>3} {r['sl_cnt']:>3} {r['time_cnt']:>3}")

    # TOP 25 by PnL
    results_pnl = sorted(results, key=lambda x: x["pnl"], reverse=True)
    print(f"\n{'='*105}")
    print(f"  TOP 25 by PnL (min 15 deals)")
    print(f"{'='*105}")
    print(hdr); print(sep)
    shown = 0
    for r in results_pnl:
        if r["deals"] < 15: continue
        shown += 1
        if shown > 25: break
        print(f"{shown:>3} {r['tp']:>4.1f} {r['sl']:>4.1f} {r['rr']:>4.1f} {r['z_entry']:>4.1f} {r['z_max']:>4.1f} {r['natr_min']:>4.2f} {r['natr_max']:>4.1f} {r['chop_min']:>3.0f} {r['cooldown']:>3} | {r['deals']:>4} ${r['pnl']:>+7.2f} {r['wr']:>5.1f} {r['pf']:>5.2f} ${r['avg_w']:>5.3f} ${r['avg_l']:>5.3f} | {r['tp_cnt']:>3} {r['ztp_cnt']:>3} {r['sl_cnt']:>3} {r['time_cnt']:>3}")

    # BEST BALANCED (PF>1.2, deals>=20, best PnL)
    bal = sorted([r for r in results if r["pf"] >= 1.2 and r["deals"] >= 20], key=lambda x: x["pnl"], reverse=True)
    if bal:
        print(f"\n{'='*105}")
        print(f"  RECOMMENDED (PF>=1.2, >=20 deals, R:R>=2:1, by PnL)")
        print(f"{'='*105}")
        print(hdr); print(sep)
        for i, r in enumerate(bal[:15]):
            print(f"{i+1:>3} {r['tp']:>4.1f} {r['sl']:>4.1f} {r['rr']:>4.1f} {r['z_entry']:>4.1f} {r['z_max']:>4.1f} {r['natr_min']:>4.2f} {r['natr_max']:>4.1f} {r['chop_min']:>3.0f} {r['cooldown']:>3} | {r['deals']:>4} ${r['pnl']:>+7.2f} {r['wr']:>5.1f} {r['pf']:>5.2f} ${r['avg_w']:>5.3f} ${r['avg_l']:>5.3f} | {r['tp_cnt']:>3} {r['ztp_cnt']:>3} {r['sl_cnt']:>3} {r['time_cnt']:>3}")

    out = os.path.join(os.path.dirname(__file__), "results_rr_sweep.json")
    with open(out, "w") as f:
        json.dump({"rr_combos": RR_COMBOS, "grid": GRID, "top100_pf": results[:100], "top100_pnl": results_pnl[:100]}, f, indent=2)
    print(f"\n  Saved to {out}")

if __name__ == "__main__":
    main()

📜 Git History

dd32dfdchore: initial commit — version control setup5 weeks ago
Show last diff
Loading...