← Назад
""" R:R Sweep — positive risk/reward combos ========================================= Fixed: TP/SL ratio >= 2:1, CHOP on, no SO Sweep: Z, TP, SL, NATR, CHOP Usage: python3 backtests/backtest_rr_sweep.py """ import sys, os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import json, time import numpy as np from datetime import datetime from itertools import product from pybit.unified_trading import HTTP ORDER_USD = 7.0 LEVERAGE = 3 VWAP_PERIOD = 50 TAKER_FEE = 0.00055 TIMEFRAME = "5" DAYS = 30 MIN_VOLUME_24H = 20_000_000 BLACKLIST = {"BTCUSDT", "ETHUSDT", "TRXUSDT", "USDCUSDT", "BTCPERP", "ETHPERP"} # R:R combos (all >= 2:1) RR_COMBOS = [ (2.0, 1.0), # 2:1 (3.0, 1.0), # 3:1 (3.0, 1.5), # 2:1 (4.0, 1.5), # 2.7:1 (5.0, 2.0), # 2.5:1 (2.0, 0.5), # 4:1 (3.0, 0.5), # 6:1 (1.5, 0.5), # 3:1 (1.5, 0.75), # 2:1 (2.0, 0.75), # 2.7:1 ] GRID = { "z_entry": [1.8, 2.0, 2.5, 3.0], "z_max": [0, 2.5, 3.5], # 0 = off "natr_min": [0.5, 0.75, 1.0, 1.5], "natr_max": [0, 2.0, 3.0], # 0 = off "chop_min": [45, 50, 55, 60], "cooldown": [6, 14], } session = HTTP(testnet=False) def get_symbols(): resp = session.get_tickers(category="linear") if resp["retCode"] != 0: return [] out = [] for t in resp["result"]["list"]: s = t["symbol"] if not s.endswith("USDT") or s in BLACKLIST: continue v = float(t.get("turnover24h", 0)) if v >= MIN_VOLUME_24H: out.append({"symbol": s, "volume_24h": v}) out.sort(key=lambda x: x["volume_24h"], reverse=True) return out[:50] def fetch_klines(symbol, interval, days): kls = [] need = days * 24 * 60 // int(interval) end = int(datetime.now().timestamp() * 1000) while len(kls) < need: try: r = session.get_kline(category="linear", symbol=symbol, interval=interval, limit=1000, end=end) if r["retCode"] != 0: break items = r["result"]["list"] if not items: break for i in items: kls.append({"ts": int(i[0]), "h": float(i[2]), "l": float(i[3]), "c": float(i[4]), "v": float(i[5])}) end = int(items[-1][0]) - 1 if len(items) < 1000: break except: break kls.reverse() seen = set(); u = [] for k in kls: if k["ts"] not in seen: seen.add(k["ts"]); u.append(k) return u[-need:] if len(u) > need else u def calc_indicators(c, h, l, v): n = len(c) z = np.zeros(n) for i in range(VWAP_PERIOD, n): hh, ll, cc, vv = h[i-VWAP_PERIOD:i], l[i-VWAP_PERIOD:i], c[i-VWAP_PERIOD:i], v[i-VWAP_PERIOD:i] tp = (hh+ll+cc)/3; ctv = np.cumsum(tp*vv); cv = np.cumsum(vv) cv_s = np.where(cv==0,1,cv); va = ctv/cv_s dev = cc - va; std = np.std(dev) if std > 0: z[i] = (c[i] - va[-1]) / std natr = np.zeros(n) for i in range(14, n): trs = [max(h[j]-l[j], abs(h[j]-c[j-1]), abs(l[j]-c[j-1])) for j in range(i-13,i+1)] natr[i] = (np.mean(trs)/c[i])*100 if c[i]>0 else 0 chop = np.full(n, 50.0) for i in range(14, n): atr_s = sum(max(h[j]-l[j], abs(h[j]-c[j-1]), abs(l[j]-c[j-1])) for j in range(i-13,i+1)) hi, lo = np.max(h[i-13:i+1]), np.min(l[i-13:i+1]) rng = hi - lo if rng > 0: chop[i] = 100*np.log10(atr_s/rng)/np.log10(14) return z, natr, chop def sim(z_arr, natr_arr, chop_arr, c, h, l, p): n = len(c) deals = []; in_t = False; side = None; ep = 0; eb = 0; cu = 0 ze, zm, ztp = p["z_entry"], p["z_max"], 0.3 tp, sl = p["tp_pct"], p["sl_pct"] nm, nx, cm, cd = p["natr_min"], p["natr_max"], p["chop_min"], p["cooldown"] for i in range(VWAP_PERIOD, n): if in_t: z = z_arr[i]; closed = False; cp = 0; reason = "" if side == "LONG": tp_p, sl_p = ep*(1+tp/100), ep*(1-sl/100) if l[i] <= sl_p: cp,reason,closed = sl_p,"SL",True elif h[i] >= tp_p: cp,reason,closed = tp_p,"TP",True elif z >= -ztp and c[i] > ep: cp,reason,closed = c[i],"Z-TP",True else: tp_p, sl_p = ep*(1-tp/100), ep*(1+sl/100) if h[i] >= sl_p: cp,reason,closed = sl_p,"SL",True elif l[i] <= tp_p: cp,reason,closed = tp_p,"TP",True elif z <= ztp and c[i] < ep: cp,reason,closed = c[i],"Z-TP",True if not closed and (i-eb) >= 36: cp,reason,closed = c[i],"TIME",True if closed: qty = (ORDER_USD*LEVERAGE)/ep pnl = qty*(cp-ep) if side=="LONG" else qty*(ep-cp) pnl -= qty*ep*TAKER_FEE + qty*cp*TAKER_FEE deals.append({"pnl": pnl, "reason": reason}) in_t = False; cu = i + cd continue if i < cu: continue z = z_arr[i] if abs(z) <= ze: continue if zm > 0 and abs(z) > zm: continue if natr_arr[i] < nm: continue if nx > 0 and natr_arr[i] > nx: continue if chop_arr[i] < cm: continue side = "LONG" if z < -ze else "SHORT" ep = c[i]; eb = i; in_t = True if in_t: qty = (ORDER_USD*LEVERAGE)/ep; cp = c[-1] pnl = qty*(cp-ep) if side=="LONG" else qty*(ep-cp) pnl -= qty*ep*TAKER_FEE + qty*cp*TAKER_FEE deals.append({"pnl": pnl, "reason": "END"}) return deals def main(): print("="*80) print(" R:R SWEEP — Positive Risk/Reward + All Filters") print(f" {len(RR_COMBOS)} R:R combos x {len(list(product(*GRID.values())))} filter combos") print("="*80) syms = get_symbols() print(f"\n {len(syms)} symbols, downloading {DAYS}d klines...") data = {} for idx, sd in enumerate(syms): s = sd["symbol"] print(f" [{idx+1}/{len(syms)}] {s}...", end=" ", flush=True) time.sleep(0.15) kl = fetch_klines(s, TIMEFRAME, days=DAYS) if len(kl) < VWAP_PERIOD+100: print("skip"); continue c = np.array([k["c"] for k in kl]); h = np.array([k["h"] for k in kl]) l = np.array([k["l"] for k in kl]); v = np.array([k["v"] for k in kl]) zz, natr, chop = calc_indicators(c,h,l,v) data[s] = {"c":c,"h":h,"l":l,"z":zz,"natr":natr,"chop":chop} print("OK") print(f"\n {len(data)} symbols loaded. Running sweep...\n") gkeys = list(GRID.keys()) gvals = list(GRID.values()) gcombos = list(product(*gvals)) total = len(RR_COMBOS) * len(gcombos) print(f" Total combos: {total}") results = [] cnt = 0 for tp_pct, sl_pct in RR_COMBOS: for gcombo in gcombos: gp = dict(zip(gkeys, gcombo)) # skip z_max <= z_entry (makes no sense) if gp["z_max"] > 0 and gp["z_max"] <= gp["z_entry"]: continue params = {**gp, "tp_pct": tp_pct, "sl_pct": sl_pct} all_deals = [] for s, d in data.items(): deals = sim(d["z"], d["natr"], d["chop"], d["c"], d["h"], d["l"], params) all_deals.extend(deals) cnt += 1 if cnt % 500 == 0: print(f" ... {cnt}/{total}") if len(all_deals) < 10: continue pnl = sum(d["pnl"] for d in all_deals) wins = [d for d in all_deals if d["pnl"] > 0] losses = [d for d in all_deals if d["pnl"] <= 0] wr = len(wins)/len(all_deals)*100 gp_val = sum(d["pnl"] for d in wins) if wins else 0 gl_val = abs(sum(d["pnl"] for d in losses)) if losses else 0.001 pf = gp_val/gl_val reasons = {} for d in all_deals: reasons[d["reason"]] = reasons.get(d["reason"],0)+1 results.append({ "tp": tp_pct, "sl": sl_pct, "rr": round(tp_pct/sl_pct, 1), **gp, "deals": len(all_deals), "pnl": round(pnl,2), "wr": round(wr,1), "pf": round(pf,2), "avg_w": round(gp_val/len(wins),3) if wins else 0, "avg_l": round(gl_val/len(losses),3) if losses else 0, "tp_cnt": reasons.get("TP",0), "ztp_cnt": reasons.get("Z-TP",0), "sl_cnt": reasons.get("SL",0), "time_cnt": reasons.get("TIME",0), }) print(f"\n {len(results)} valid combos (>=10 deals)") # Sort by PF results.sort(key=lambda x: (x["pf"], x["pnl"]), reverse=True) hdr = f"{'#':>3} {'TP':>4} {'SL':>4} {'R:R':>4} {'Z':>4} {'Zm':>4} {'NRm':>4} {'NRx':>4} {'CHP':>3} {'CD':>3} | {'Dls':>4} {'PnL':>8} {'WR%':>5} {'PF':>5} {'AvW':>6} {'AvL':>6} | {'TP':>3} {'ZTP':>3} {'SL':>3} {'TIM':>3}" sep = "-"*105 # TOP 25 by PF (min 15 deals) print(f"\n{'='*105}") print(f" TOP 25 by Profit Factor (min 15 deals, R:R >= 2:1)") print(f"{'='*105}") print(hdr); print(sep) shown = 0 for r in results: if r["deals"] < 15: continue shown += 1 if shown > 25: break print(f"{shown:>3} {r['tp']:>4.1f} {r['sl']:>4.1f} {r['rr']:>4.1f} {r['z_entry']:>4.1f} {r['z_max']:>4.1f} {r['natr_min']:>4.2f} {r['natr_max']:>4.1f} {r['chop_min']:>3.0f} {r['cooldown']:>3} | {r['deals']:>4} ${r['pnl']:>+7.2f} {r['wr']:>5.1f} {r['pf']:>5.2f} ${r['avg_w']:>5.3f} ${r['avg_l']:>5.3f} | {r['tp_cnt']:>3} {r['ztp_cnt']:>3} {r['sl_cnt']:>3} {r['time_cnt']:>3}") # TOP 25 by PnL results_pnl = sorted(results, key=lambda x: x["pnl"], reverse=True) print(f"\n{'='*105}") print(f" TOP 25 by PnL (min 15 deals)") print(f"{'='*105}") print(hdr); print(sep) shown = 0 for r in results_pnl: if r["deals"] < 15: continue shown += 1 if shown > 25: break print(f"{shown:>3} {r['tp']:>4.1f} {r['sl']:>4.1f} {r['rr']:>4.1f} {r['z_entry']:>4.1f} {r['z_max']:>4.1f} {r['natr_min']:>4.2f} {r['natr_max']:>4.1f} {r['chop_min']:>3.0f} {r['cooldown']:>3} | {r['deals']:>4} ${r['pnl']:>+7.2f} {r['wr']:>5.1f} {r['pf']:>5.2f} ${r['avg_w']:>5.3f} ${r['avg_l']:>5.3f} | {r['tp_cnt']:>3} {r['ztp_cnt']:>3} {r['sl_cnt']:>3} {r['time_cnt']:>3}") # BEST BALANCED (PF>1.2, deals>=20, best PnL) bal = sorted([r for r in results if r["pf"] >= 1.2 and r["deals"] >= 20], key=lambda x: x["pnl"], reverse=True) if bal: print(f"\n{'='*105}") print(f" RECOMMENDED (PF>=1.2, >=20 deals, R:R>=2:1, by PnL)") print(f"{'='*105}") print(hdr); print(sep) for i, r in enumerate(bal[:15]): print(f"{i+1:>3} {r['tp']:>4.1f} {r['sl']:>4.1f} {r['rr']:>4.1f} {r['z_entry']:>4.1f} {r['z_max']:>4.1f} {r['natr_min']:>4.2f} {r['natr_max']:>4.1f} {r['chop_min']:>3.0f} {r['cooldown']:>3} | {r['deals']:>4} ${r['pnl']:>+7.2f} {r['wr']:>5.1f} {r['pf']:>5.2f} ${r['avg_w']:>5.3f} ${r['avg_l']:>5.3f} | {r['tp_cnt']:>3} {r['ztp_cnt']:>3} {r['sl_cnt']:>3} {r['time_cnt']:>3}") out = os.path.join(os.path.dirname(__file__), "results_rr_sweep.json") with open(out, "w") as f: json.dump({"rr_combos": RR_COMBOS, "grid": GRID, "top100_pf": results[:100], "top100_pnl": results_pnl[:100]}, f, indent=2) print(f"\n Saved to {out}") if __name__ == "__main__": main()