← Назад"""
R:R Sweep — positive risk/reward combos
=========================================
Fixed: TP/SL ratio >= 2:1, CHOP on, no SO
Sweep: Z, TP, SL, NATR, CHOP
Usage: python3 backtests/backtest_rr_sweep.py
"""
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import json, time
import numpy as np
from datetime import datetime
from itertools import product
from pybit.unified_trading import HTTP
ORDER_USD = 7.0
LEVERAGE = 3
VWAP_PERIOD = 50
TAKER_FEE = 0.00055
TIMEFRAME = "5"
DAYS = 30
MIN_VOLUME_24H = 20_000_000
BLACKLIST = {"BTCUSDT", "ETHUSDT", "TRXUSDT", "USDCUSDT", "BTCPERP", "ETHPERP"}
# R:R combos (all >= 2:1)
RR_COMBOS = [
(2.0, 1.0), # 2:1
(3.0, 1.0), # 3:1
(3.0, 1.5), # 2:1
(4.0, 1.5), # 2.7:1
(5.0, 2.0), # 2.5:1
(2.0, 0.5), # 4:1
(3.0, 0.5), # 6:1
(1.5, 0.5), # 3:1
(1.5, 0.75), # 2:1
(2.0, 0.75), # 2.7:1
]
GRID = {
"z_entry": [1.8, 2.0, 2.5, 3.0],
"z_max": [0, 2.5, 3.5], # 0 = off
"natr_min": [0.5, 0.75, 1.0, 1.5],
"natr_max": [0, 2.0, 3.0], # 0 = off
"chop_min": [45, 50, 55, 60],
"cooldown": [6, 14],
}
session = HTTP(testnet=False)
def get_symbols():
resp = session.get_tickers(category="linear")
if resp["retCode"] != 0: return []
out = []
for t in resp["result"]["list"]:
s = t["symbol"]
if not s.endswith("USDT") or s in BLACKLIST: continue
v = float(t.get("turnover24h", 0))
if v >= MIN_VOLUME_24H:
out.append({"symbol": s, "volume_24h": v})
out.sort(key=lambda x: x["volume_24h"], reverse=True)
return out[:50]
def fetch_klines(symbol, interval, days):
kls = []
need = days * 24 * 60 // int(interval)
end = int(datetime.now().timestamp() * 1000)
while len(kls) < need:
try:
r = session.get_kline(category="linear", symbol=symbol, interval=interval, limit=1000, end=end)
if r["retCode"] != 0: break
items = r["result"]["list"]
if not items: break
for i in items:
kls.append({"ts": int(i[0]), "h": float(i[2]), "l": float(i[3]), "c": float(i[4]), "v": float(i[5])})
end = int(items[-1][0]) - 1
if len(items) < 1000: break
except: break
kls.reverse()
seen = set(); u = []
for k in kls:
if k["ts"] not in seen: seen.add(k["ts"]); u.append(k)
return u[-need:] if len(u) > need else u
def calc_indicators(c, h, l, v):
n = len(c)
z = np.zeros(n)
for i in range(VWAP_PERIOD, n):
hh, ll, cc, vv = h[i-VWAP_PERIOD:i], l[i-VWAP_PERIOD:i], c[i-VWAP_PERIOD:i], v[i-VWAP_PERIOD:i]
tp = (hh+ll+cc)/3; ctv = np.cumsum(tp*vv); cv = np.cumsum(vv)
cv_s = np.where(cv==0,1,cv); va = ctv/cv_s
dev = cc - va; std = np.std(dev)
if std > 0: z[i] = (c[i] - va[-1]) / std
natr = np.zeros(n)
for i in range(14, n):
trs = [max(h[j]-l[j], abs(h[j]-c[j-1]), abs(l[j]-c[j-1])) for j in range(i-13,i+1)]
natr[i] = (np.mean(trs)/c[i])*100 if c[i]>0 else 0
chop = np.full(n, 50.0)
for i in range(14, n):
atr_s = sum(max(h[j]-l[j], abs(h[j]-c[j-1]), abs(l[j]-c[j-1])) for j in range(i-13,i+1))
hi, lo = np.max(h[i-13:i+1]), np.min(l[i-13:i+1])
rng = hi - lo
if rng > 0: chop[i] = 100*np.log10(atr_s/rng)/np.log10(14)
return z, natr, chop
def sim(z_arr, natr_arr, chop_arr, c, h, l, p):
n = len(c)
deals = []; in_t = False; side = None; ep = 0; eb = 0; cu = 0
ze, zm, ztp = p["z_entry"], p["z_max"], 0.3
tp, sl = p["tp_pct"], p["sl_pct"]
nm, nx, cm, cd = p["natr_min"], p["natr_max"], p["chop_min"], p["cooldown"]
for i in range(VWAP_PERIOD, n):
if in_t:
z = z_arr[i]; closed = False; cp = 0; reason = ""
if side == "LONG":
tp_p, sl_p = ep*(1+tp/100), ep*(1-sl/100)
if l[i] <= sl_p: cp,reason,closed = sl_p,"SL",True
elif h[i] >= tp_p: cp,reason,closed = tp_p,"TP",True
elif z >= -ztp and c[i] > ep: cp,reason,closed = c[i],"Z-TP",True
else:
tp_p, sl_p = ep*(1-tp/100), ep*(1+sl/100)
if h[i] >= sl_p: cp,reason,closed = sl_p,"SL",True
elif l[i] <= tp_p: cp,reason,closed = tp_p,"TP",True
elif z <= ztp and c[i] < ep: cp,reason,closed = c[i],"Z-TP",True
if not closed and (i-eb) >= 36: cp,reason,closed = c[i],"TIME",True
if closed:
qty = (ORDER_USD*LEVERAGE)/ep
pnl = qty*(cp-ep) if side=="LONG" else qty*(ep-cp)
pnl -= qty*ep*TAKER_FEE + qty*cp*TAKER_FEE
deals.append({"pnl": pnl, "reason": reason})
in_t = False; cu = i + cd
continue
if i < cu: continue
z = z_arr[i]
if abs(z) <= ze: continue
if zm > 0 and abs(z) > zm: continue
if natr_arr[i] < nm: continue
if nx > 0 and natr_arr[i] > nx: continue
if chop_arr[i] < cm: continue
side = "LONG" if z < -ze else "SHORT"
ep = c[i]; eb = i; in_t = True
if in_t:
qty = (ORDER_USD*LEVERAGE)/ep; cp = c[-1]
pnl = qty*(cp-ep) if side=="LONG" else qty*(ep-cp)
pnl -= qty*ep*TAKER_FEE + qty*cp*TAKER_FEE
deals.append({"pnl": pnl, "reason": "END"})
return deals
def main():
print("="*80)
print(" R:R SWEEP — Positive Risk/Reward + All Filters")
print(f" {len(RR_COMBOS)} R:R combos x {len(list(product(*GRID.values())))} filter combos")
print("="*80)
syms = get_symbols()
print(f"\n {len(syms)} symbols, downloading {DAYS}d klines...")
data = {}
for idx, sd in enumerate(syms):
s = sd["symbol"]
print(f" [{idx+1}/{len(syms)}] {s}...", end=" ", flush=True)
time.sleep(0.15)
kl = fetch_klines(s, TIMEFRAME, days=DAYS)
if len(kl) < VWAP_PERIOD+100: print("skip"); continue
c = np.array([k["c"] for k in kl]); h = np.array([k["h"] for k in kl])
l = np.array([k["l"] for k in kl]); v = np.array([k["v"] for k in kl])
zz, natr, chop = calc_indicators(c,h,l,v)
data[s] = {"c":c,"h":h,"l":l,"z":zz,"natr":natr,"chop":chop}
print("OK")
print(f"\n {len(data)} symbols loaded. Running sweep...\n")
gkeys = list(GRID.keys())
gvals = list(GRID.values())
gcombos = list(product(*gvals))
total = len(RR_COMBOS) * len(gcombos)
print(f" Total combos: {total}")
results = []
cnt = 0
for tp_pct, sl_pct in RR_COMBOS:
for gcombo in gcombos:
gp = dict(zip(gkeys, gcombo))
# skip z_max <= z_entry (makes no sense)
if gp["z_max"] > 0 and gp["z_max"] <= gp["z_entry"]:
continue
params = {**gp, "tp_pct": tp_pct, "sl_pct": sl_pct}
all_deals = []
for s, d in data.items():
deals = sim(d["z"], d["natr"], d["chop"], d["c"], d["h"], d["l"], params)
all_deals.extend(deals)
cnt += 1
if cnt % 500 == 0:
print(f" ... {cnt}/{total}")
if len(all_deals) < 10: continue
pnl = sum(d["pnl"] for d in all_deals)
wins = [d for d in all_deals if d["pnl"] > 0]
losses = [d for d in all_deals if d["pnl"] <= 0]
wr = len(wins)/len(all_deals)*100
gp_val = sum(d["pnl"] for d in wins) if wins else 0
gl_val = abs(sum(d["pnl"] for d in losses)) if losses else 0.001
pf = gp_val/gl_val
reasons = {}
for d in all_deals: reasons[d["reason"]] = reasons.get(d["reason"],0)+1
results.append({
"tp": tp_pct, "sl": sl_pct, "rr": round(tp_pct/sl_pct, 1),
**gp,
"deals": len(all_deals), "pnl": round(pnl,2), "wr": round(wr,1),
"pf": round(pf,2),
"avg_w": round(gp_val/len(wins),3) if wins else 0,
"avg_l": round(gl_val/len(losses),3) if losses else 0,
"tp_cnt": reasons.get("TP",0), "ztp_cnt": reasons.get("Z-TP",0),
"sl_cnt": reasons.get("SL",0), "time_cnt": reasons.get("TIME",0),
})
print(f"\n {len(results)} valid combos (>=10 deals)")
# Sort by PF
results.sort(key=lambda x: (x["pf"], x["pnl"]), reverse=True)
hdr = f"{'#':>3} {'TP':>4} {'SL':>4} {'R:R':>4} {'Z':>4} {'Zm':>4} {'NRm':>4} {'NRx':>4} {'CHP':>3} {'CD':>3} | {'Dls':>4} {'PnL':>8} {'WR%':>5} {'PF':>5} {'AvW':>6} {'AvL':>6} | {'TP':>3} {'ZTP':>3} {'SL':>3} {'TIM':>3}"
sep = "-"*105
# TOP 25 by PF (min 15 deals)
print(f"\n{'='*105}")
print(f" TOP 25 by Profit Factor (min 15 deals, R:R >= 2:1)")
print(f"{'='*105}")
print(hdr); print(sep)
shown = 0
for r in results:
if r["deals"] < 15: continue
shown += 1
if shown > 25: break
print(f"{shown:>3} {r['tp']:>4.1f} {r['sl']:>4.1f} {r['rr']:>4.1f} {r['z_entry']:>4.1f} {r['z_max']:>4.1f} {r['natr_min']:>4.2f} {r['natr_max']:>4.1f} {r['chop_min']:>3.0f} {r['cooldown']:>3} | {r['deals']:>4} ${r['pnl']:>+7.2f} {r['wr']:>5.1f} {r['pf']:>5.2f} ${r['avg_w']:>5.3f} ${r['avg_l']:>5.3f} | {r['tp_cnt']:>3} {r['ztp_cnt']:>3} {r['sl_cnt']:>3} {r['time_cnt']:>3}")
# TOP 25 by PnL
results_pnl = sorted(results, key=lambda x: x["pnl"], reverse=True)
print(f"\n{'='*105}")
print(f" TOP 25 by PnL (min 15 deals)")
print(f"{'='*105}")
print(hdr); print(sep)
shown = 0
for r in results_pnl:
if r["deals"] < 15: continue
shown += 1
if shown > 25: break
print(f"{shown:>3} {r['tp']:>4.1f} {r['sl']:>4.1f} {r['rr']:>4.1f} {r['z_entry']:>4.1f} {r['z_max']:>4.1f} {r['natr_min']:>4.2f} {r['natr_max']:>4.1f} {r['chop_min']:>3.0f} {r['cooldown']:>3} | {r['deals']:>4} ${r['pnl']:>+7.2f} {r['wr']:>5.1f} {r['pf']:>5.2f} ${r['avg_w']:>5.3f} ${r['avg_l']:>5.3f} | {r['tp_cnt']:>3} {r['ztp_cnt']:>3} {r['sl_cnt']:>3} {r['time_cnt']:>3}")
# BEST BALANCED (PF>1.2, deals>=20, best PnL)
bal = sorted([r for r in results if r["pf"] >= 1.2 and r["deals"] >= 20], key=lambda x: x["pnl"], reverse=True)
if bal:
print(f"\n{'='*105}")
print(f" RECOMMENDED (PF>=1.2, >=20 deals, R:R>=2:1, by PnL)")
print(f"{'='*105}")
print(hdr); print(sep)
for i, r in enumerate(bal[:15]):
print(f"{i+1:>3} {r['tp']:>4.1f} {r['sl']:>4.1f} {r['rr']:>4.1f} {r['z_entry']:>4.1f} {r['z_max']:>4.1f} {r['natr_min']:>4.2f} {r['natr_max']:>4.1f} {r['chop_min']:>3.0f} {r['cooldown']:>3} | {r['deals']:>4} ${r['pnl']:>+7.2f} {r['wr']:>5.1f} {r['pf']:>5.2f} ${r['avg_w']:>5.3f} ${r['avg_l']:>5.3f} | {r['tp_cnt']:>3} {r['ztp_cnt']:>3} {r['sl_cnt']:>3} {r['time_cnt']:>3}")
out = os.path.join(os.path.dirname(__file__), "results_rr_sweep.json")
with open(out, "w") as f:
json.dump({"rr_combos": RR_COMBOS, "grid": GRID, "top100_pf": results[:100], "top100_pnl": results_pnl[:100]}, f, indent=2)
print(f"\n Saved to {out}")
if __name__ == "__main__":
main()