← Назад"""
Parameter Sweep: Z-VWAP Clean (no DCA)
=======================================
Downloads data ONCE, then sweeps parameter combos.
Saves results to sweep_results.json.
Usage:
python backtests/sweep_zvwap.py --days 7 --top 35
"""
import sys, os, json, time, argparse
import numpy as np
from datetime import datetime
from itertools import product
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from pybit.unified_trading import HTTP
# ============================================================
# CONSTANTS
# ============================================================
VWAP_PERIOD = 50
NATR_PERIOD = 14
CHOP_PERIOD = 14
MAKER_FEE = 0.0002
TAKER_FEE = 0.00055
BLACKLIST = {"BTCUSDT", "ETHUSDT", "USDCUSDT", "BTCPERP", "ETHPERP"}
# ============================================================
# PARAMETER GRID
# ============================================================
GRID = {
"z_entry": [1.8, 2.0, 2.5, 3.0],
"z_max": [2.5, 3.5, 5.0],
"tp_pct": [1.0, 1.5, 2.0, 3.0],
"sl_pct": [1.0, 2.0, 4.0, 8.0],
"chop_min": [0, 45, 50, 55], # 0 = disabled
"natr_min": [0.5, 0.75],
"natr_max": [2.5, 3.5],
}
# ============================================================
# DATA FETCHING (once)
# ============================================================
def fetch_klines(session, symbol, interval="5", days=7):
all_klines = []
bars_needed = days * 24 * 60 // int(interval)
end_time = int(datetime.now().timestamp() * 1000)
while len(all_klines) < bars_needed:
try:
resp = session.get_kline(
category="linear", symbol=symbol,
interval=interval, limit=1000, end=end_time,
)
if resp["retCode"] != 0:
break
items = resp["result"]["list"]
if not items:
break
for item in items:
all_klines.append({
"ts": int(item[0]),
"o": float(item[1]), "h": float(item[2]),
"l": float(item[3]), "c": float(item[4]),
"v": float(item[5]),
})
end_time = int(items[-1][0]) - 1
if len(items) < 1000:
break
except Exception as e:
print(f" ERR {symbol}: {e}")
break
time.sleep(0.15)
all_klines.reverse()
seen = set()
unique = []
for k in all_klines:
if k["ts"] not in seen:
seen.add(k["ts"])
unique.append(k)
return unique[-bars_needed:] if len(unique) > bars_needed else unique
def get_top_symbols(session, top_n=35, min_vol=20_000_000):
tickers = session.get_tickers(category="linear")
cands = []
for t in tickers["result"]["list"]:
sym = t["symbol"]
if not sym.endswith("USDT") or sym in BLACKLIST:
continue
vol = float(t.get("turnover24h", 0))
if vol >= min_vol:
cands.append((sym, vol))
cands.sort(key=lambda x: x[1], reverse=True)
return cands[:top_n]
# ============================================================
# INDICATORS (vectorized)
# ============================================================
def calc_zvwap(h, l, c, v, period=VWAP_PERIOD):
n = len(c)
z = np.full(n, 0.0)
for i in range(period, n):
hi = h[i-period:i]; lo = l[i-period:i]
ci = c[i-period:i]; vi = v[i-period:i]
tp = (hi + lo + ci) / 3
cum_tv = np.cumsum(tp * vi)
cum_v = np.cumsum(vi)
cum_v_s = np.where(cum_v == 0, 1, cum_v)
vwap_arr = cum_tv / cum_v_s
vwap = vwap_arr[-1]
std = np.std(ci - vwap_arr)
if std > 0:
z[i] = (c[i] - vwap) / std
return z
def calc_natr(h, l, c, period=NATR_PERIOD):
n = len(c)
natr = np.full(n, 0.0)
tr = np.zeros(n)
for i in range(1, n):
tr[i] = max(h[i]-l[i], abs(h[i]-c[i-1]), abs(l[i]-c[i-1]))
for i in range(period, n):
atr = np.mean(tr[i-period+1:i+1])
natr[i] = (atr / c[i]) * 100 if c[i] > 0 else 0
return natr
def calc_chop(h, l, c, period=CHOP_PERIOD):
"""Choppiness Index: 100×log10(sum(ATR)/range) / log10(period)"""
n = len(c)
chop = np.full(n, 50.0)
tr = np.zeros(n)
for i in range(1, n):
tr[i] = max(h[i]-l[i], abs(h[i]-c[i-1]), abs(l[i]-c[i-1]))
for i in range(period, n):
atr_sum = np.sum(tr[i-period+1:i+1])
highest = np.max(h[i-period+1:i+1])
lowest = np.min(l[i-period+1:i+1])
rng = highest - lowest
if rng > 0:
chop[i] = 100 * np.log10(atr_sum / rng) / np.log10(period)
return chop
# ============================================================
# BACKTEST (single config, all symbols)
# ============================================================
def run_one(data_cache, cfg):
"""Run backtest with given config across all symbols. Returns summary dict."""
all_trades = []
for sym, (h, l, c, v, z_arr, natr_arr, chop_arr) in data_cache.items():
n = len(c)
active = None
cooldown = 0
for i in range(VWAP_PERIOD, n):
z = z_arr[i]
natr = natr_arr[i]
chop = chop_arr[i]
# Process active trade
if active:
# SL check
sl_hit = False
if active["side"] == "LONG" and l[i] <= active["sl"]:
sl_hit = True; cp = active["sl"]
elif active["side"] == "SHORT" and h[i] >= active["sl"]:
sl_hit = True; cp = active["sl"]
# TP check
tp_hit = False
if active["side"] == "LONG" and h[i] >= active["tp"]:
tp_hit = True; cp2 = active["tp"]
elif active["side"] == "SHORT" and l[i] <= active["tp"]:
tp_hit = True; cp2 = active["tp"]
# Z-TP
z_tp = False
if active["side"] == "LONG" and z >= -0.3 and c[i] > active["ep"]:
z_tp = True; cp3 = c[i]
elif active["side"] == "SHORT" and z <= 0.3 and c[i] < active["ep"]:
z_tp = True; cp3 = c[i]
if sl_hit:
pnl = _calc_pnl(active, cp, "SL")
all_trades.append({"sym": sym, "pnl": pnl, "reason": "SL"})
active = None; cooldown = i + 12
elif tp_hit:
pnl = _calc_pnl(active, cp2, "TP")
all_trades.append({"sym": sym, "pnl": pnl, "reason": "TP"})
active = None; cooldown = i + 12
elif z_tp:
pnl = _calc_pnl(active, cp3, "Z-TP")
all_trades.append({"sym": sym, "pnl": pnl, "reason": "Z-TP"})
active = None; cooldown = i + 12
continue
if i < cooldown:
continue
# Filters
if natr < cfg["natr_min"] or natr > cfg["natr_max"]:
continue
if cfg["chop_min"] > 0 and chop < cfg["chop_min"]:
continue
if abs(z) > cfg["z_max"]:
continue
# Entry
if z < -cfg["z_entry"]:
ep = c[i]
active = {
"side": "LONG", "ep": ep, "qty": (5.0 * 3) / ep,
"tp": ep * (1 + cfg["tp_pct"]/100),
"sl": ep * (1 - cfg["sl_pct"]/100),
}
elif z > cfg["z_entry"]:
ep = c[i]
active = {
"side": "SHORT", "ep": ep, "qty": (5.0 * 3) / ep,
"tp": ep * (1 - cfg["tp_pct"]/100),
"sl": ep * (1 + cfg["sl_pct"]/100),
}
# Force close open
if active:
pnl = _calc_pnl(active, c[-1], "END")
all_trades.append({"sym": sym, "pnl": pnl, "reason": "END"})
if not all_trades:
return None
wins = [t for t in all_trades if t["pnl"] > 0]
losses = [t for t in all_trades if t["pnl"] <= 0]
total_pnl = sum(t["pnl"] for t in all_trades)
gp = sum(t["pnl"] for t in wins)
gl = abs(sum(t["pnl"] for t in losses))
pf = gp / gl if gl > 0 else 999
return {
"trades": len(all_trades),
"wr": round(len(wins) / len(all_trades) * 100, 1) if all_trades else 0,
"pnl": round(total_pnl, 2),
"pf": round(pf, 2),
"tp_count": sum(1 for t in all_trades if t["reason"] == "TP"),
"sl_count": sum(1 for t in all_trades if t["reason"] == "SL"),
"ztp_count": sum(1 for t in all_trades if t["reason"] == "Z-TP"),
}
def _calc_pnl(trade, close_price, reason):
if trade["side"] == "LONG":
pnl = trade["qty"] * (close_price - trade["ep"])
else:
pnl = trade["qty"] * (trade["ep"] - close_price)
entry_fee = trade["qty"] * trade["ep"] * TAKER_FEE
exit_fee = trade["qty"] * close_price * (MAKER_FEE if reason == "TP" else TAKER_FEE)
return pnl - entry_fee - exit_fee
# ============================================================
# MAIN
# ============================================================
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--days", type=int, default=7)
parser.add_argument("--top", type=int, default=35)
args = parser.parse_args()
session = HTTP(testnet=False)
# Step 1: get symbols
print(f"[1/3] Fetching top {args.top} symbols...")
symbols = get_top_symbols(session, args.top)
print(f" Found {len(symbols)} symbols")
# Step 2: download data ONCE
print(f"[2/3] Downloading {args.days}d klines...")
data_cache = {}
for idx, (sym, vol) in enumerate(symbols):
klines = fetch_klines(session, sym, days=args.days)
if len(klines) < VWAP_PERIOD + 50:
continue
h = np.array([k["h"] for k in klines])
l = np.array([k["l"] for k in klines])
c = np.array([k["c"] for k in klines])
v = np.array([k["v"] for k in klines])
z = calc_zvwap(h, l, c, v)
natr = calc_natr(h, l, c)
chop = calc_chop(h, l, c)
data_cache[sym] = (h, l, c, v, z, natr, chop)
if (idx + 1) % 10 == 0:
print(f" {idx+1}/{len(symbols)} downloaded")
time.sleep(0.2)
print(f" Cached {len(data_cache)} symbols")
# Step 3: sweep
keys = list(GRID.keys())
combos = list(product(*[GRID[k] for k in keys]))
print(f"[3/3] Sweeping {len(combos)} parameter combos...")
results = []
for idx, vals in enumerate(combos):
cfg = dict(zip(keys, vals))
# Skip invalid: z_entry must be < z_max
if cfg["z_entry"] >= cfg["z_max"]:
continue
res = run_one(data_cache, cfg)
if res and res["trades"] >= 5: # min 5 trades for significance
res["config"] = cfg
results.append(res)
if (idx + 1) % 200 == 0:
print(f" {idx+1}/{len(combos)} tested...")
# Sort by PnL
results.sort(key=lambda x: x["pnl"], reverse=True)
# Save
out_path = os.path.join(os.path.dirname(__file__), "sweep_results.json")
with open(out_path, "w") as f:
json.dump({
"date": datetime.now().isoformat(),
"days": args.days,
"symbols": len(data_cache),
"combos_tested": len(results),
"top_20": results[:20],
"bottom_5": results[-5:] if len(results) >= 5 else results,
"current_prod": {
"z_entry": 1.8, "z_max": 2.5, "tp_pct": 1.5,
"sl_pct": 8.0, "chop_min": 45, "natr_min": 0.75, "natr_max": 2.5
}
}, f, indent=2)
# Print top 10
print(f"\n{'='*80}")
print(f"SWEEP DONE: {len(results)} valid combos ({args.days}d, {len(data_cache)} coins)")
print(f"{'='*80}")
print(f"{'#':>3} {'PnL':>8} {'WR%':>6} {'PF':>6} {'Trd':>5} | Z_ent Z_max TP% SL% CHOP NATR")
print("-" * 80)
for i, r in enumerate(results[:15]):
c = r["config"]
print(f"{i+1:>3} ${r['pnl']:>7.2f} {r['wr']:>5.1f}% {r['pf']:>5.2f} {r['trades']:>5} | "
f"{c['z_entry']:>4.1f} {c['z_max']:>4.1f} {c['tp_pct']:>4.1f} {c['sl_pct']:>4.1f} "
f"{c['chop_min']:>4} {c['natr_min']:.2f}-{c['natr_max']:.1f}")
# Find current prod config result
prod = {"z_entry": 1.8, "z_max": 2.5, "tp_pct": 1.5, "sl_pct": 8.0,
"chop_min": 45, "natr_min": 0.75, "natr_max": 2.5}
prod_res = run_one(data_cache, prod)
if prod_res:
print(f"\nCURRENT PROD: PnL ${prod_res['pnl']:.2f}, WR {prod_res['wr']}%, "
f"PF {prod_res['pf']}, {prod_res['trades']} trades")
print(f"\nResults saved to {out_path}")
if __name__ == "__main__":
main()