← Назад
""" Parameter Sweep: Z-VWAP Clean (no DCA) ======================================= Downloads data ONCE, then sweeps parameter combos. Saves results to sweep_results.json. Usage: python backtests/sweep_zvwap.py --days 7 --top 35 """ import sys, os, json, time, argparse import numpy as np from datetime import datetime from itertools import product sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from pybit.unified_trading import HTTP # ============================================================ # CONSTANTS # ============================================================ VWAP_PERIOD = 50 NATR_PERIOD = 14 CHOP_PERIOD = 14 MAKER_FEE = 0.0002 TAKER_FEE = 0.00055 BLACKLIST = {"BTCUSDT", "ETHUSDT", "USDCUSDT", "BTCPERP", "ETHPERP"} # ============================================================ # PARAMETER GRID # ============================================================ GRID = { "z_entry": [1.8, 2.0, 2.5, 3.0], "z_max": [2.5, 3.5, 5.0], "tp_pct": [1.0, 1.5, 2.0, 3.0], "sl_pct": [1.0, 2.0, 4.0, 8.0], "chop_min": [0, 45, 50, 55], # 0 = disabled "natr_min": [0.5, 0.75], "natr_max": [2.5, 3.5], } # ============================================================ # DATA FETCHING (once) # ============================================================ def fetch_klines(session, symbol, interval="5", days=7): all_klines = [] bars_needed = days * 24 * 60 // int(interval) end_time = int(datetime.now().timestamp() * 1000) while len(all_klines) < bars_needed: try: resp = session.get_kline( category="linear", symbol=symbol, interval=interval, limit=1000, end=end_time, ) if resp["retCode"] != 0: break items = resp["result"]["list"] if not items: break for item in items: all_klines.append({ "ts": int(item[0]), "o": float(item[1]), "h": float(item[2]), "l": float(item[3]), "c": float(item[4]), "v": float(item[5]), }) end_time = int(items[-1][0]) - 1 if len(items) < 1000: break except Exception as e: print(f" ERR {symbol}: {e}") break time.sleep(0.15) all_klines.reverse() seen = set() unique = [] for k in all_klines: if k["ts"] not in seen: seen.add(k["ts"]) unique.append(k) return unique[-bars_needed:] if len(unique) > bars_needed else unique def get_top_symbols(session, top_n=35, min_vol=20_000_000): tickers = session.get_tickers(category="linear") cands = [] for t in tickers["result"]["list"]: sym = t["symbol"] if not sym.endswith("USDT") or sym in BLACKLIST: continue vol = float(t.get("turnover24h", 0)) if vol >= min_vol: cands.append((sym, vol)) cands.sort(key=lambda x: x[1], reverse=True) return cands[:top_n] # ============================================================ # INDICATORS (vectorized) # ============================================================ def calc_zvwap(h, l, c, v, period=VWAP_PERIOD): n = len(c) z = np.full(n, 0.0) for i in range(period, n): hi = h[i-period:i]; lo = l[i-period:i] ci = c[i-period:i]; vi = v[i-period:i] tp = (hi + lo + ci) / 3 cum_tv = np.cumsum(tp * vi) cum_v = np.cumsum(vi) cum_v_s = np.where(cum_v == 0, 1, cum_v) vwap_arr = cum_tv / cum_v_s vwap = vwap_arr[-1] std = np.std(ci - vwap_arr) if std > 0: z[i] = (c[i] - vwap) / std return z def calc_natr(h, l, c, period=NATR_PERIOD): n = len(c) natr = np.full(n, 0.0) tr = np.zeros(n) for i in range(1, n): tr[i] = max(h[i]-l[i], abs(h[i]-c[i-1]), abs(l[i]-c[i-1])) for i in range(period, n): atr = np.mean(tr[i-period+1:i+1]) natr[i] = (atr / c[i]) * 100 if c[i] > 0 else 0 return natr def calc_chop(h, l, c, period=CHOP_PERIOD): """Choppiness Index: 100×log10(sum(ATR)/range) / log10(period)""" n = len(c) chop = np.full(n, 50.0) tr = np.zeros(n) for i in range(1, n): tr[i] = max(h[i]-l[i], abs(h[i]-c[i-1]), abs(l[i]-c[i-1])) for i in range(period, n): atr_sum = np.sum(tr[i-period+1:i+1]) highest = np.max(h[i-period+1:i+1]) lowest = np.min(l[i-period+1:i+1]) rng = highest - lowest if rng > 0: chop[i] = 100 * np.log10(atr_sum / rng) / np.log10(period) return chop # ============================================================ # BACKTEST (single config, all symbols) # ============================================================ def run_one(data_cache, cfg): """Run backtest with given config across all symbols. Returns summary dict.""" all_trades = [] for sym, (h, l, c, v, z_arr, natr_arr, chop_arr) in data_cache.items(): n = len(c) active = None cooldown = 0 for i in range(VWAP_PERIOD, n): z = z_arr[i] natr = natr_arr[i] chop = chop_arr[i] # Process active trade if active: # SL check sl_hit = False if active["side"] == "LONG" and l[i] <= active["sl"]: sl_hit = True; cp = active["sl"] elif active["side"] == "SHORT" and h[i] >= active["sl"]: sl_hit = True; cp = active["sl"] # TP check tp_hit = False if active["side"] == "LONG" and h[i] >= active["tp"]: tp_hit = True; cp2 = active["tp"] elif active["side"] == "SHORT" and l[i] <= active["tp"]: tp_hit = True; cp2 = active["tp"] # Z-TP z_tp = False if active["side"] == "LONG" and z >= -0.3 and c[i] > active["ep"]: z_tp = True; cp3 = c[i] elif active["side"] == "SHORT" and z <= 0.3 and c[i] < active["ep"]: z_tp = True; cp3 = c[i] if sl_hit: pnl = _calc_pnl(active, cp, "SL") all_trades.append({"sym": sym, "pnl": pnl, "reason": "SL"}) active = None; cooldown = i + 12 elif tp_hit: pnl = _calc_pnl(active, cp2, "TP") all_trades.append({"sym": sym, "pnl": pnl, "reason": "TP"}) active = None; cooldown = i + 12 elif z_tp: pnl = _calc_pnl(active, cp3, "Z-TP") all_trades.append({"sym": sym, "pnl": pnl, "reason": "Z-TP"}) active = None; cooldown = i + 12 continue if i < cooldown: continue # Filters if natr < cfg["natr_min"] or natr > cfg["natr_max"]: continue if cfg["chop_min"] > 0 and chop < cfg["chop_min"]: continue if abs(z) > cfg["z_max"]: continue # Entry if z < -cfg["z_entry"]: ep = c[i] active = { "side": "LONG", "ep": ep, "qty": (5.0 * 3) / ep, "tp": ep * (1 + cfg["tp_pct"]/100), "sl": ep * (1 - cfg["sl_pct"]/100), } elif z > cfg["z_entry"]: ep = c[i] active = { "side": "SHORT", "ep": ep, "qty": (5.0 * 3) / ep, "tp": ep * (1 - cfg["tp_pct"]/100), "sl": ep * (1 + cfg["sl_pct"]/100), } # Force close open if active: pnl = _calc_pnl(active, c[-1], "END") all_trades.append({"sym": sym, "pnl": pnl, "reason": "END"}) if not all_trades: return None wins = [t for t in all_trades if t["pnl"] > 0] losses = [t for t in all_trades if t["pnl"] <= 0] total_pnl = sum(t["pnl"] for t in all_trades) gp = sum(t["pnl"] for t in wins) gl = abs(sum(t["pnl"] for t in losses)) pf = gp / gl if gl > 0 else 999 return { "trades": len(all_trades), "wr": round(len(wins) / len(all_trades) * 100, 1) if all_trades else 0, "pnl": round(total_pnl, 2), "pf": round(pf, 2), "tp_count": sum(1 for t in all_trades if t["reason"] == "TP"), "sl_count": sum(1 for t in all_trades if t["reason"] == "SL"), "ztp_count": sum(1 for t in all_trades if t["reason"] == "Z-TP"), } def _calc_pnl(trade, close_price, reason): if trade["side"] == "LONG": pnl = trade["qty"] * (close_price - trade["ep"]) else: pnl = trade["qty"] * (trade["ep"] - close_price) entry_fee = trade["qty"] * trade["ep"] * TAKER_FEE exit_fee = trade["qty"] * close_price * (MAKER_FEE if reason == "TP" else TAKER_FEE) return pnl - entry_fee - exit_fee # ============================================================ # MAIN # ============================================================ def main(): parser = argparse.ArgumentParser() parser.add_argument("--days", type=int, default=7) parser.add_argument("--top", type=int, default=35) args = parser.parse_args() session = HTTP(testnet=False) # Step 1: get symbols print(f"[1/3] Fetching top {args.top} symbols...") symbols = get_top_symbols(session, args.top) print(f" Found {len(symbols)} symbols") # Step 2: download data ONCE print(f"[2/3] Downloading {args.days}d klines...") data_cache = {} for idx, (sym, vol) in enumerate(symbols): klines = fetch_klines(session, sym, days=args.days) if len(klines) < VWAP_PERIOD + 50: continue h = np.array([k["h"] for k in klines]) l = np.array([k["l"] for k in klines]) c = np.array([k["c"] for k in klines]) v = np.array([k["v"] for k in klines]) z = calc_zvwap(h, l, c, v) natr = calc_natr(h, l, c) chop = calc_chop(h, l, c) data_cache[sym] = (h, l, c, v, z, natr, chop) if (idx + 1) % 10 == 0: print(f" {idx+1}/{len(symbols)} downloaded") time.sleep(0.2) print(f" Cached {len(data_cache)} symbols") # Step 3: sweep keys = list(GRID.keys()) combos = list(product(*[GRID[k] for k in keys])) print(f"[3/3] Sweeping {len(combos)} parameter combos...") results = [] for idx, vals in enumerate(combos): cfg = dict(zip(keys, vals)) # Skip invalid: z_entry must be < z_max if cfg["z_entry"] >= cfg["z_max"]: continue res = run_one(data_cache, cfg) if res and res["trades"] >= 5: # min 5 trades for significance res["config"] = cfg results.append(res) if (idx + 1) % 200 == 0: print(f" {idx+1}/{len(combos)} tested...") # Sort by PnL results.sort(key=lambda x: x["pnl"], reverse=True) # Save out_path = os.path.join(os.path.dirname(__file__), "sweep_results.json") with open(out_path, "w") as f: json.dump({ "date": datetime.now().isoformat(), "days": args.days, "symbols": len(data_cache), "combos_tested": len(results), "top_20": results[:20], "bottom_5": results[-5:] if len(results) >= 5 else results, "current_prod": { "z_entry": 1.8, "z_max": 2.5, "tp_pct": 1.5, "sl_pct": 8.0, "chop_min": 45, "natr_min": 0.75, "natr_max": 2.5 } }, f, indent=2) # Print top 10 print(f"\n{'='*80}") print(f"SWEEP DONE: {len(results)} valid combos ({args.days}d, {len(data_cache)} coins)") print(f"{'='*80}") print(f"{'#':>3} {'PnL':>8} {'WR%':>6} {'PF':>6} {'Trd':>5} | Z_ent Z_max TP% SL% CHOP NATR") print("-" * 80) for i, r in enumerate(results[:15]): c = r["config"] print(f"{i+1:>3} ${r['pnl']:>7.2f} {r['wr']:>5.1f}% {r['pf']:>5.2f} {r['trades']:>5} | " f"{c['z_entry']:>4.1f} {c['z_max']:>4.1f} {c['tp_pct']:>4.1f} {c['sl_pct']:>4.1f} " f"{c['chop_min']:>4} {c['natr_min']:.2f}-{c['natr_max']:.1f}") # Find current prod config result prod = {"z_entry": 1.8, "z_max": 2.5, "tp_pct": 1.5, "sl_pct": 8.0, "chop_min": 45, "natr_min": 0.75, "natr_max": 2.5} prod_res = run_one(data_cache, prod) if prod_res: print(f"\nCURRENT PROD: PnL ${prod_res['pnl']:.2f}, WR {prod_res['wr']}%, " f"PF {prod_res['pf']}, {prod_res['trades']} trades") print(f"\nResults saved to {out_path}") if __name__ == "__main__": main()