← Назад
""" Sweep v2: Only R:R >= 1 combos (TP >= SL) ========================================== Reuses data from sweep_zvwap.py logic. Saves to sweep_rr_results.json. Usage: python3 backtests/sweep_rr.py """ import sys, os, json, time, argparse import numpy as np from datetime import datetime from itertools import product sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from pybit.unified_trading import HTTP VWAP_PERIOD = 50 NATR_PERIOD = 14 CHOP_PERIOD = 14 MAKER_FEE = 0.0002 TAKER_FEE = 0.00055 BLACKLIST = {"BTCUSDT", "ETHUSDT", "USDCUSDT", "BTCPERP", "ETHPERP"} # R:R focused grid GRID = { "z_entry": [1.8, 2.0, 2.5, 3.0], "z_max": [2.5, 3.5, 5.0], "tp_pct": [1.0, 1.5, 2.0, 3.0, 4.0, 5.0], "sl_pct": [0.5, 1.0, 1.5, 2.0, 3.0], "chop_min": [0, 45, 50, 55], "natr_min": [0.5, 0.75], "natr_max": [2.5, 3.5], } def fetch_klines(session, symbol, interval="5", days=7): all_klines = [] bars_needed = days * 24 * 60 // int(interval) end_time = int(datetime.now().timestamp() * 1000) while len(all_klines) < bars_needed: try: resp = session.get_kline( category="linear", symbol=symbol, interval=interval, limit=1000, end=end_time, ) if resp["retCode"] != 0: break items = resp["result"]["list"] if not items: break for item in items: all_klines.append({ "ts": int(item[0]), "o": float(item[1]), "h": float(item[2]), "l": float(item[3]), "c": float(item[4]), "v": float(item[5]), }) end_time = int(items[-1][0]) - 1 if len(items) < 1000: break except Exception as e: print(f" ERR {symbol}: {e}"); break time.sleep(0.15) all_klines.reverse() seen = set(); unique = [] for k in all_klines: if k["ts"] not in seen: seen.add(k["ts"]); unique.append(k) return unique[-bars_needed:] if len(unique) > bars_needed else unique def get_top_symbols(session, top_n=35, min_vol=20_000_000): tickers = session.get_tickers(category="linear") cands = [] for t in tickers["result"]["list"]: sym = t["symbol"] if not sym.endswith("USDT") or sym in BLACKLIST: continue vol = float(t.get("turnover24h", 0)) if vol >= min_vol: cands.append((sym, vol)) cands.sort(key=lambda x: x[1], reverse=True) return cands[:top_n] def calc_zvwap(h, l, c, v, period=VWAP_PERIOD): n = len(c); z = np.full(n, 0.0) for i in range(period, n): hi=h[i-period:i]; lo=l[i-period:i]; ci=c[i-period:i]; vi=v[i-period:i] tp = (hi+lo+ci)/3; ctv = np.cumsum(tp*vi); cv = np.cumsum(vi) cvs = np.where(cv==0,1,cv); va = ctv/cvs; vwap=va[-1] std = np.std(ci-va) if std > 0: z[i] = (c[i]-vwap)/std return z def calc_natr(h, l, c, period=NATR_PERIOD): n = len(c); natr = np.full(n, 0.0); tr = np.zeros(n) for i in range(1, n): tr[i] = max(h[i]-l[i], abs(h[i]-c[i-1]), abs(l[i]-c[i-1])) for i in range(period, n): atr = np.mean(tr[i-period+1:i+1]) natr[i] = (atr/c[i])*100 if c[i]>0 else 0 return natr def calc_chop(h, l, c, period=CHOP_PERIOD): n = len(c); chop = np.full(n, 50.0); tr = np.zeros(n) for i in range(1, n): tr[i] = max(h[i]-l[i], abs(h[i]-c[i-1]), abs(l[i]-c[i-1])) for i in range(period, n): atr_sum = np.sum(tr[i-period+1:i+1]) highest = np.max(h[i-period+1:i+1]); lowest = np.min(l[i-period+1:i+1]) rng = highest - lowest if rng > 0: chop[i] = 100*np.log10(atr_sum/rng)/np.log10(period) return chop def run_one(data_cache, cfg): all_trades = [] for sym, (h, l, c, v, z_arr, natr_arr, chop_arr) in data_cache.items(): n = len(c); active = None; cooldown = 0 for i in range(VWAP_PERIOD, n): z=z_arr[i]; natr=natr_arr[i]; chop=chop_arr[i] if active: sl_hit=tp_hit=z_tp=False if active["side"]=="LONG": if l[i]<=active["sl"]: sl_hit=True; cp=active["sl"] if h[i]>=active["tp"]: tp_hit=True; cp2=active["tp"] if z>=-0.3 and c[i]>active["ep"]: z_tp=True; cp3=c[i] else: if h[i]>=active["sl"]: sl_hit=True; cp=active["sl"] if l[i]<=active["tp"]: tp_hit=True; cp2=active["tp"] if z<=0.3 and c[i]<active["ep"]: z_tp=True; cp3=c[i] if sl_hit: all_trades.append({"sym":sym,"pnl":_pnl(active,cp,"SL"),"reason":"SL"}) active=None; cooldown=i+12 elif tp_hit: all_trades.append({"sym":sym,"pnl":_pnl(active,cp2,"TP"),"reason":"TP"}) active=None; cooldown=i+12 elif z_tp: all_trades.append({"sym":sym,"pnl":_pnl(active,cp3,"Z-TP"),"reason":"Z-TP"}) active=None; cooldown=i+12 continue if i<cooldown: continue if natr<cfg["natr_min"] or natr>cfg["natr_max"]: continue if cfg["chop_min"]>0 and chop<cfg["chop_min"]: continue if abs(z)>cfg["z_max"]: continue if z<-cfg["z_entry"]: ep=c[i]; active={"side":"LONG","ep":ep,"qty":(5.0*3)/ep, "tp":ep*(1+cfg["tp_pct"]/100),"sl":ep*(1-cfg["sl_pct"]/100)} elif z>cfg["z_entry"]: ep=c[i]; active={"side":"SHORT","ep":ep,"qty":(5.0*3)/ep, "tp":ep*(1-cfg["tp_pct"]/100),"sl":ep*(1+cfg["sl_pct"]/100)} if active: all_trades.append({"sym":sym,"pnl":_pnl(active,c[-1],"END"),"reason":"END"}) if not all_trades: return None wins=[t for t in all_trades if t["pnl"]>0] total_pnl=sum(t["pnl"] for t in all_trades) gp=sum(t["pnl"] for t in wins); gl=abs(sum(t["pnl"] for t in all_trades if t["pnl"]<=0)) pf=gp/gl if gl>0 else 999 return { "trades":len(all_trades), "wr":round(len(wins)/len(all_trades)*100,1) if all_trades else 0, "pnl":round(total_pnl,2), "pf":round(pf,2), "tp_count":sum(1 for t in all_trades if t["reason"]=="TP"), "sl_count":sum(1 for t in all_trades if t["reason"]=="SL"), "ztp_count":sum(1 for t in all_trades if t["reason"]=="Z-TP"), } def _pnl(trade, close_price, reason): if trade["side"]=="LONG": pnl=trade["qty"]*(close_price-trade["ep"]) else: pnl=trade["qty"]*(trade["ep"]-close_price) ef=trade["qty"]*trade["ep"]*TAKER_FEE xf=trade["qty"]*close_price*(MAKER_FEE if reason=="TP" else TAKER_FEE) return pnl-ef-xf def main(): parser = argparse.ArgumentParser() parser.add_argument("--days", type=int, default=7) parser.add_argument("--top", type=int, default=35) args = parser.parse_args() session = HTTP(testnet=False) print(f"[1/3] Fetching top {args.top} symbols...") symbols = get_top_symbols(session, args.top) print(f" Found {len(symbols)} symbols") print(f"[2/3] Downloading {args.days}d klines...") data_cache = {} for idx, (sym, vol) in enumerate(symbols): klines = fetch_klines(session, sym, days=args.days) if len(klines) < VWAP_PERIOD + 50: continue h=np.array([k["h"] for k in klines]); l=np.array([k["l"] for k in klines]) c=np.array([k["c"] for k in klines]); v=np.array([k["v"] for k in klines]) data_cache[sym] = (h, l, c, v, calc_zvwap(h,l,c,v), calc_natr(h,l,c), calc_chop(h,l,c)) if (idx+1)%10==0: print(f" {idx+1}/{len(symbols)}") time.sleep(0.2) print(f" Cached {len(data_cache)} symbols") # Build combos, ONLY R:R >= 1 keys = list(GRID.keys()) all_combos = list(product(*[GRID[k] for k in keys])) combos = [] for vals in all_combos: cfg = dict(zip(keys, vals)) if cfg["z_entry"] >= cfg["z_max"]: continue if cfg["tp_pct"] < cfg["sl_pct"]: continue # R:R < 1 — skip combos.append(cfg) print(f"[3/3] Sweeping {len(combos)} R:R≥1 combos...") results = [] for idx, cfg in enumerate(combos): res = run_one(data_cache, cfg) if res and res["trades"] >= 5: res["config"] = cfg res["rr"] = round(cfg["tp_pct"] / cfg["sl_pct"], 1) results.append(res) if (idx+1) % 200 == 0: print(f" {idx+1}/{len(combos)}") results.sort(key=lambda x: x["pnl"], reverse=True) out_path = os.path.join(os.path.dirname(__file__), "sweep_rr_results.json") with open(out_path, "w") as f: json.dump({ "date": datetime.now().isoformat(), "days": args.days, "symbols": len(data_cache), "combos_tested": len(results), "top_30": results[:30], "bottom_5": results[-5:] if len(results)>=5 else results, }, f, indent=2) print(f"\n{'='*85}") print(f"SWEEP R:R≥1: {len(results)} valid combos ({args.days}d, {len(data_cache)} coins)") print(f"{'='*85}") print(f"{'#':>3} {'PnL':>8} {'WR%':>6} {'PF':>6} {'Trd':>5} {'R:R':>4} | Z TP% SL% CHOP NATR") print("-"*85) for i, r in enumerate(results[:20]): c = r["config"] print(f"{i+1:>3} ${r['pnl']:>7.2f} {r['wr']:>5.1f}% {r['pf']:>5.2f} {r['trades']:>5} {r['rr']:>4.1f} | " f"{c['z_entry']:>4.1f} {c['tp_pct']:>4.1f} {c['sl_pct']:>4.1f} {c['chop_min']:>4} {c['natr_min']:.2f}-{c['natr_max']:.1f}") print(f"\nResults saved to {out_path}") if __name__ == "__main__": main()