← Назад"""
Sweep v2: Only R:R >= 1 combos (TP >= SL)
==========================================
Reuses data from sweep_zvwap.py logic.
Saves to sweep_rr_results.json.
Usage:
python3 backtests/sweep_rr.py
"""
import sys, os, json, time, argparse
import numpy as np
from datetime import datetime
from itertools import product
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from pybit.unified_trading import HTTP
VWAP_PERIOD = 50
NATR_PERIOD = 14
CHOP_PERIOD = 14
MAKER_FEE = 0.0002
TAKER_FEE = 0.00055
BLACKLIST = {"BTCUSDT", "ETHUSDT", "USDCUSDT", "BTCPERP", "ETHPERP"}
# R:R focused grid
GRID = {
"z_entry": [1.8, 2.0, 2.5, 3.0],
"z_max": [2.5, 3.5, 5.0],
"tp_pct": [1.0, 1.5, 2.0, 3.0, 4.0, 5.0],
"sl_pct": [0.5, 1.0, 1.5, 2.0, 3.0],
"chop_min": [0, 45, 50, 55],
"natr_min": [0.5, 0.75],
"natr_max": [2.5, 3.5],
}
def fetch_klines(session, symbol, interval="5", days=7):
all_klines = []
bars_needed = days * 24 * 60 // int(interval)
end_time = int(datetime.now().timestamp() * 1000)
while len(all_klines) < bars_needed:
try:
resp = session.get_kline(
category="linear", symbol=symbol,
interval=interval, limit=1000, end=end_time,
)
if resp["retCode"] != 0: break
items = resp["result"]["list"]
if not items: break
for item in items:
all_klines.append({
"ts": int(item[0]), "o": float(item[1]),
"h": float(item[2]), "l": float(item[3]),
"c": float(item[4]), "v": float(item[5]),
})
end_time = int(items[-1][0]) - 1
if len(items) < 1000: break
except Exception as e:
print(f" ERR {symbol}: {e}"); break
time.sleep(0.15)
all_klines.reverse()
seen = set(); unique = []
for k in all_klines:
if k["ts"] not in seen: seen.add(k["ts"]); unique.append(k)
return unique[-bars_needed:] if len(unique) > bars_needed else unique
def get_top_symbols(session, top_n=35, min_vol=20_000_000):
tickers = session.get_tickers(category="linear")
cands = []
for t in tickers["result"]["list"]:
sym = t["symbol"]
if not sym.endswith("USDT") or sym in BLACKLIST: continue
vol = float(t.get("turnover24h", 0))
if vol >= min_vol: cands.append((sym, vol))
cands.sort(key=lambda x: x[1], reverse=True)
return cands[:top_n]
def calc_zvwap(h, l, c, v, period=VWAP_PERIOD):
n = len(c); z = np.full(n, 0.0)
for i in range(period, n):
hi=h[i-period:i]; lo=l[i-period:i]; ci=c[i-period:i]; vi=v[i-period:i]
tp = (hi+lo+ci)/3; ctv = np.cumsum(tp*vi); cv = np.cumsum(vi)
cvs = np.where(cv==0,1,cv); va = ctv/cvs; vwap=va[-1]
std = np.std(ci-va)
if std > 0: z[i] = (c[i]-vwap)/std
return z
def calc_natr(h, l, c, period=NATR_PERIOD):
n = len(c); natr = np.full(n, 0.0); tr = np.zeros(n)
for i in range(1, n):
tr[i] = max(h[i]-l[i], abs(h[i]-c[i-1]), abs(l[i]-c[i-1]))
for i in range(period, n):
atr = np.mean(tr[i-period+1:i+1])
natr[i] = (atr/c[i])*100 if c[i]>0 else 0
return natr
def calc_chop(h, l, c, period=CHOP_PERIOD):
n = len(c); chop = np.full(n, 50.0); tr = np.zeros(n)
for i in range(1, n):
tr[i] = max(h[i]-l[i], abs(h[i]-c[i-1]), abs(l[i]-c[i-1]))
for i in range(period, n):
atr_sum = np.sum(tr[i-period+1:i+1])
highest = np.max(h[i-period+1:i+1]); lowest = np.min(l[i-period+1:i+1])
rng = highest - lowest
if rng > 0: chop[i] = 100*np.log10(atr_sum/rng)/np.log10(period)
return chop
def run_one(data_cache, cfg):
all_trades = []
for sym, (h, l, c, v, z_arr, natr_arr, chop_arr) in data_cache.items():
n = len(c); active = None; cooldown = 0
for i in range(VWAP_PERIOD, n):
z=z_arr[i]; natr=natr_arr[i]; chop=chop_arr[i]
if active:
sl_hit=tp_hit=z_tp=False
if active["side"]=="LONG":
if l[i]<=active["sl"]: sl_hit=True; cp=active["sl"]
if h[i]>=active["tp"]: tp_hit=True; cp2=active["tp"]
if z>=-0.3 and c[i]>active["ep"]: z_tp=True; cp3=c[i]
else:
if h[i]>=active["sl"]: sl_hit=True; cp=active["sl"]
if l[i]<=active["tp"]: tp_hit=True; cp2=active["tp"]
if z<=0.3 and c[i]<active["ep"]: z_tp=True; cp3=c[i]
if sl_hit:
all_trades.append({"sym":sym,"pnl":_pnl(active,cp,"SL"),"reason":"SL"})
active=None; cooldown=i+12
elif tp_hit:
all_trades.append({"sym":sym,"pnl":_pnl(active,cp2,"TP"),"reason":"TP"})
active=None; cooldown=i+12
elif z_tp:
all_trades.append({"sym":sym,"pnl":_pnl(active,cp3,"Z-TP"),"reason":"Z-TP"})
active=None; cooldown=i+12
continue
if i<cooldown: continue
if natr<cfg["natr_min"] or natr>cfg["natr_max"]: continue
if cfg["chop_min"]>0 and chop<cfg["chop_min"]: continue
if abs(z)>cfg["z_max"]: continue
if z<-cfg["z_entry"]:
ep=c[i]; active={"side":"LONG","ep":ep,"qty":(5.0*3)/ep,
"tp":ep*(1+cfg["tp_pct"]/100),"sl":ep*(1-cfg["sl_pct"]/100)}
elif z>cfg["z_entry"]:
ep=c[i]; active={"side":"SHORT","ep":ep,"qty":(5.0*3)/ep,
"tp":ep*(1-cfg["tp_pct"]/100),"sl":ep*(1+cfg["sl_pct"]/100)}
if active:
all_trades.append({"sym":sym,"pnl":_pnl(active,c[-1],"END"),"reason":"END"})
if not all_trades: return None
wins=[t for t in all_trades if t["pnl"]>0]
total_pnl=sum(t["pnl"] for t in all_trades)
gp=sum(t["pnl"] for t in wins); gl=abs(sum(t["pnl"] for t in all_trades if t["pnl"]<=0))
pf=gp/gl if gl>0 else 999
return {
"trades":len(all_trades),
"wr":round(len(wins)/len(all_trades)*100,1) if all_trades else 0,
"pnl":round(total_pnl,2), "pf":round(pf,2),
"tp_count":sum(1 for t in all_trades if t["reason"]=="TP"),
"sl_count":sum(1 for t in all_trades if t["reason"]=="SL"),
"ztp_count":sum(1 for t in all_trades if t["reason"]=="Z-TP"),
}
def _pnl(trade, close_price, reason):
if trade["side"]=="LONG": pnl=trade["qty"]*(close_price-trade["ep"])
else: pnl=trade["qty"]*(trade["ep"]-close_price)
ef=trade["qty"]*trade["ep"]*TAKER_FEE
xf=trade["qty"]*close_price*(MAKER_FEE if reason=="TP" else TAKER_FEE)
return pnl-ef-xf
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--days", type=int, default=7)
parser.add_argument("--top", type=int, default=35)
args = parser.parse_args()
session = HTTP(testnet=False)
print(f"[1/3] Fetching top {args.top} symbols...")
symbols = get_top_symbols(session, args.top)
print(f" Found {len(symbols)} symbols")
print(f"[2/3] Downloading {args.days}d klines...")
data_cache = {}
for idx, (sym, vol) in enumerate(symbols):
klines = fetch_klines(session, sym, days=args.days)
if len(klines) < VWAP_PERIOD + 50: continue
h=np.array([k["h"] for k in klines]); l=np.array([k["l"] for k in klines])
c=np.array([k["c"] for k in klines]); v=np.array([k["v"] for k in klines])
data_cache[sym] = (h, l, c, v, calc_zvwap(h,l,c,v), calc_natr(h,l,c), calc_chop(h,l,c))
if (idx+1)%10==0: print(f" {idx+1}/{len(symbols)}")
time.sleep(0.2)
print(f" Cached {len(data_cache)} symbols")
# Build combos, ONLY R:R >= 1
keys = list(GRID.keys())
all_combos = list(product(*[GRID[k] for k in keys]))
combos = []
for vals in all_combos:
cfg = dict(zip(keys, vals))
if cfg["z_entry"] >= cfg["z_max"]: continue
if cfg["tp_pct"] < cfg["sl_pct"]: continue # R:R < 1 — skip
combos.append(cfg)
print(f"[3/3] Sweeping {len(combos)} R:R≥1 combos...")
results = []
for idx, cfg in enumerate(combos):
res = run_one(data_cache, cfg)
if res and res["trades"] >= 5:
res["config"] = cfg
res["rr"] = round(cfg["tp_pct"] / cfg["sl_pct"], 1)
results.append(res)
if (idx+1) % 200 == 0: print(f" {idx+1}/{len(combos)}")
results.sort(key=lambda x: x["pnl"], reverse=True)
out_path = os.path.join(os.path.dirname(__file__), "sweep_rr_results.json")
with open(out_path, "w") as f:
json.dump({
"date": datetime.now().isoformat(),
"days": args.days, "symbols": len(data_cache),
"combos_tested": len(results),
"top_30": results[:30],
"bottom_5": results[-5:] if len(results)>=5 else results,
}, f, indent=2)
print(f"\n{'='*85}")
print(f"SWEEP R:R≥1: {len(results)} valid combos ({args.days}d, {len(data_cache)} coins)")
print(f"{'='*85}")
print(f"{'#':>3} {'PnL':>8} {'WR%':>6} {'PF':>6} {'Trd':>5} {'R:R':>4} | Z TP% SL% CHOP NATR")
print("-"*85)
for i, r in enumerate(results[:20]):
c = r["config"]
print(f"{i+1:>3} ${r['pnl']:>7.2f} {r['wr']:>5.1f}% {r['pf']:>5.2f} {r['trades']:>5} {r['rr']:>4.1f} | "
f"{c['z_entry']:>4.1f} {c['tp_pct']:>4.1f} {c['sl_pct']:>4.1f} {c['chop_min']:>4} {c['natr_min']:.2f}-{c['natr_max']:.1f}")
print(f"\nResults saved to {out_path}")
if __name__ == "__main__":
main()