โ† ะะฐะทะฐะด
""" FULL Parameter Sweep: DCA Z-VWAP โ€” NO-SO + WITH-SO variants ============================================================= 30-day backtest, all key parameters + DCA safety orders. Usage: cd /home/app/trading-bot-bybit python3 backtests/backtest_full_sweep.py """ import sys, os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import json import time import numpy as np from datetime import datetime from itertools import product from pybit.unified_trading import HTTP # ============================================================ # FIXED CONFIG # ============================================================ ORDER_USD = 7.0 SO_BASE_USD = 5.0 LEVERAGE = 3 VWAP_PERIOD = 50 MAKER_FEE = 0.0002 TAKER_FEE = 0.00055 TIMEFRAME = "5" DAYS = 30 MAX_SYMBOLS = 60 BLACKLIST = { "BTCUSDT", "ETHUSDT", "USDCUSDT", "BTCPERP", "ETHPERP", "RAVEUSDT", "FARTCOINUSDT", "TAOUSDT", "ARIAUSDT", "SIRENUSDT", "MAGMAUSDT", } # ============================================================ # PARAMETER GRID # ============================================================ PARAM_GRID = { "z_entry": [1.8, 2.0, 2.5, 3.0], "z_max": [0, 2.5, 3.5, 5.0], "tp_pct": [1.0, 1.5, 2.0, 3.0, 5.0], "sl_pct": [1.0, 2.0, 3.0, 5.0, 8.0], "natr_min": [0.5, 0.75, 1.0, 1.5], "natr_max": [0, 2.0, 2.5, 3.0], "chop_min": [0, 45, 50, 55, 60], "vol_min": [20_000_000, 50_000_000, 100_000_000], "max_so": [0, 2, 3], # 0 = no SO, 2 or 3 SOs } # SO config (fixed for simplicity โ€” vary only max_so count) SO_VOLUME_SCALE = 1.3 SO_STEP_SCALE = 1.3 SO_NATR_FACTOR = 1.0 session = HTTP(testnet=False) # ============================================================ # DATA # ============================================================ def get_symbols(): resp = session.get_tickers(category="linear") if resp["retCode"] != 0: return [] symbols = [] for t in resp["result"]["list"]: sym = t["symbol"] if not sym.endswith("USDT") or sym in BLACKLIST: continue vol = float(t.get("turnover24h", 0)) if vol >= 20_000_000: symbols.append({"symbol": sym, "volume_24h": vol}) symbols.sort(key=lambda x: x["volume_24h"], reverse=True) return symbols[:MAX_SYMBOLS] def fetch_klines(symbol, interval, days=30): all_klines = [] bars_needed = days * 24 * 60 // int(interval) end_time = int(datetime.now().timestamp() * 1000) while len(all_klines) < bars_needed: try: resp = session.get_kline(category="linear", symbol=symbol, interval=interval, limit=1000, end=end_time) if resp["retCode"] != 0: break items = resp["result"]["list"] if not items: break for item in items: all_klines.append({ "ts": int(item[0]), "o": float(item[1]), "h": float(item[2]), "l": float(item[3]), "c": float(item[4]), "v": float(item[5]), }) end_time = int(items[-1][0]) - 1 if len(items) < 1000: break time.sleep(0.05) except Exception as e: print(f" ERR: {e}") break all_klines.reverse() seen = set() unique = [] for k in all_klines: if k["ts"] not in seen: seen.add(k["ts"]) unique.append(k) return unique[-bars_needed:] if len(unique) > bars_needed else unique # ============================================================ # INDICATORS (vectorized) # ============================================================ def calc_all_indicators(closes, highs, lows, volumes): n = len(closes) # Z-VWAP z_scores = np.zeros(n) for i in range(VWAP_PERIOD, n): h = highs[i-VWAP_PERIOD:i] l = lows[i-VWAP_PERIOD:i] c = closes[i-VWAP_PERIOD:i] v = volumes[i-VWAP_PERIOD:i] tp = (h + l + c) / 3 ctv = np.cumsum(tp * v) cv = np.cumsum(v) cv_safe = np.where(cv == 0, 1, cv) vwap_arr = ctv / cv_safe vwap = vwap_arr[-1] dev = c - vwap_arr std = np.std(dev) if std > 0: z_scores[i] = (closes[i] - vwap) / std # NATR (rolling 14) natr = np.zeros(n) for i in range(14, n): trs = [] for j in range(i-13, i+1): tr = max(highs[j] - lows[j], abs(highs[j] - closes[j-1]), abs(lows[j] - closes[j-1])) trs.append(tr) atr = np.mean(trs) natr[i] = (atr / closes[i]) * 100 if closes[i] > 0 else 0 # CHOP Index (14) chop = np.full(n, 50.0) for i in range(14, n): atr_sum = 0 for j in range(i-13, i+1): tr = max(highs[j] - lows[j], abs(highs[j] - closes[j-1]), abs(lows[j] - closes[j-1])) atr_sum += tr hi = np.max(highs[i-13:i+1]) lo = np.min(lows[i-13:i+1]) rng = hi - lo if rng > 0: chop[i] = 100 * np.log10(atr_sum / rng) / np.log10(14) return z_scores, natr, chop # ============================================================ # DEAL SIMULATOR โ€” with optional DCA Safety Orders # ============================================================ def simulate_deals(z_scores, natr, chop, closes, highs, lows, timestamps, params): n = len(closes) z_entry = params["z_entry"] z_max = params["z_max"] tp_pct = params["tp_pct"] sl_pct = params["sl_pct"] natr_min = params["natr_min"] natr_max = params["natr_max"] chop_min = params["chop_min"] max_so = params["max_so"] deals = [] in_trade = False side = None entry_price = 0 entry_bar = 0 cooldown_until = 0 # DCA state avg_price = 0 total_qty = 0 total_invested = 0 so_filled = 0 so_triggers = [] # list of (trigger_price, so_usd) natr_at_entry = 0 def calc_so_triggers(entry_p, trade_side, natr_val, max_sos): """Calculate SO trigger prices based on NATR spacing""" triggers = [] natr_spacing = max(natr_val, 0.5) # floor 0.5% cumulative = 0 for i in range(max_sos): dev_pct = natr_spacing * SO_NATR_FACTOR * (SO_STEP_SCALE ** i) cumulative += dev_pct # Don't place SO beyond 90% of SL if cumulative >= sl_pct * 0.9: break so_usd = SO_BASE_USD * (SO_VOLUME_SCALE ** i) if trade_side == "LONG": trigger_p = entry_p * (1 - cumulative / 100) else: trigger_p = entry_p * (1 + cumulative / 100) triggers.append((trigger_p, so_usd)) return triggers def calc_avg_price_after_so(old_avg, old_qty, fill_price, fill_usd): new_qty = fill_usd * LEVERAGE / fill_price new_avg = (old_avg * old_qty + fill_price * new_qty) / (old_qty + new_qty) return new_avg, old_qty + new_qty, fill_usd for i in range(VWAP_PERIOD, n): if in_trade: z = z_scores[i] closed = False close_price = 0 reason = "" # Check SO fills first (before TP/SL check, because avg changes) if max_so > 0 and so_filled < len(so_triggers): trigger_p, so_usd = so_triggers[so_filled] so_hit = False if side == "LONG" and lows[i] <= trigger_p: so_hit = True elif side == "SHORT" and highs[i] >= trigger_p: so_hit = True if so_hit: avg_price, total_qty, added_usd = calc_avg_price_after_so( avg_price, total_qty, trigger_p, so_usd ) total_invested += added_usd so_filled += 1 # TP/SL based on avg_price (DCA adjusted) if side == "LONG": tp_price = avg_price * (1 + tp_pct / 100) sl_price = entry_price * (1 - sl_pct / 100) # SL from original entry if lows[i] <= sl_price: close_price = sl_price reason = "SL" closed = True elif highs[i] >= tp_price: close_price = tp_price reason = "TP" closed = True elif z >= -0.3 and closes[i] > avg_price: close_price = closes[i] reason = "Z-TP" closed = True else: # SHORT tp_price = avg_price * (1 - tp_pct / 100) sl_price = entry_price * (1 + sl_pct / 100) if highs[i] >= sl_price: close_price = sl_price reason = "SL" closed = True elif lows[i] <= tp_price: close_price = tp_price reason = "TP" closed = True elif z <= 0.3 and closes[i] < avg_price: close_price = closes[i] reason = "Z-TP" closed = True # Time stop: progressive by SO count if not closed: time_limits = {0: 36, 1: 36, 2: 24, 3: 18} max_bars = time_limits.get(so_filled, 18) if (i - entry_bar) >= max_bars: close_price = closes[i] reason = "TIME" closed = True if closed: if side == "LONG": pnl = total_qty * (close_price - avg_price) else: pnl = total_qty * (avg_price - close_price) # Fees: entry taker + SO taker + exit (TP=maker, else=taker) exit_fee_rate = MAKER_FEE if reason == "TP" else TAKER_FEE entry_fees = (ORDER_USD * LEVERAGE) * TAKER_FEE so_fees = sum(so_triggers[j][1] * LEVERAGE * TAKER_FEE for j in range(so_filled)) exit_fees = total_qty * close_price * exit_fee_rate pnl -= (entry_fees + so_fees + exit_fees) deals.append({ "side": side, "pnl": pnl, "reason": reason, "bars": i - entry_bar, "sos": so_filled, "invested": total_invested, }) in_trade = False cooldown_until = i + 12 # fixed cooldown 12 bars = 1h continue # Check entry if i < cooldown_until: continue z = z_scores[i] if abs(z) <= z_entry: continue if z_max > 0 and abs(z) > z_max: continue if natr[i] < natr_min: continue if natr_max > 0 and natr[i] > natr_max: continue if chop_min > 0 and chop[i] < chop_min: continue side = "LONG" if z < -z_entry else "SHORT" entry_price = closes[i] entry_bar = i in_trade = True # Init DCA state total_qty = (ORDER_USD * LEVERAGE) / entry_price avg_price = entry_price total_invested = ORDER_USD so_filled = 0 natr_at_entry = natr[i] so_triggers = calc_so_triggers(entry_price, side, natr_at_entry, max_so) if max_so > 0 else [] # Force close open if in_trade: cp = closes[-1] if side == "LONG": pnl = total_qty * (cp - avg_price) else: pnl = total_qty * (avg_price - cp) entry_fees = (ORDER_USD * LEVERAGE) * TAKER_FEE so_fees = sum(so_triggers[j][1] * LEVERAGE * TAKER_FEE for j in range(so_filled)) exit_fees = total_qty * cp * TAKER_FEE pnl -= (entry_fees + so_fees + exit_fees) deals.append({"side": side, "pnl": pnl, "reason": "END", "bars": len(closes) - 1 - entry_bar, "sos": so_filled, "invested": total_invested}) return deals # ============================================================ # MAIN # ============================================================ def main(): print("=" * 80) print(" FULL PARAMETER SWEEP โ€” Z-VWAP + DCA โ€” 30 DAYS") print("=" * 80) keys = list(PARAM_GRID.keys()) values = list(PARAM_GRID.values()) raw_combos = list(product(*values)) total_raw = len(raw_combos) # Pre-filter impossible combos combos = [] for combo in raw_combos: p = dict(zip(keys, combo)) if p["z_max"] > 0 and p["z_entry"] >= p["z_max"]: continue if p["natr_max"] > 0 and p["natr_min"] >= p["natr_max"]: continue if p["tp_pct"] < 0.1: continue combos.append(combo) print(f"\n Grid: {' ร— '.join(str(len(v)) for v in values)} = {total_raw} raw") print(f" After filtering impossible: {len(combos)} combinations") # Fetch symbols print("\n Fetching symbols...") symbols_data = get_symbols() print(f" {len(symbols_data)} symbols (vol >= $20M, excl blacklist)") # Fetch klines print(f"\n Downloading {DAYS}-day klines ({TIMEFRAME}m)...") all_data = {} for idx, sd in enumerate(symbols_data): sym = sd["symbol"] print(f" [{idx+1}/{len(symbols_data)}] {sym}...", end=" ", flush=True) time.sleep(0.12) klines = fetch_klines(sym, TIMEFRAME, days=DAYS) if len(klines) < VWAP_PERIOD + 100: print(f"skip ({len(klines)} bars)") continue closes = np.array([k["c"] for k in klines]) highs = np.array([k["h"] for k in klines]) lows = np.array([k["l"] for k in klines]) volumes = np.array([k["v"] for k in klines]) timestamps = [k["ts"] for k in klines] z_scores, natr, chop = calc_all_indicators(closes, highs, lows, volumes) all_data[sym] = { "closes": closes, "highs": highs, "lows": lows, "z": z_scores, "natr": natr, "chop": chop, "ts": timestamps, "vol": sd["volume_24h"], } print(f"OK ({len(klines)} bars)") print(f"\n {len(all_data)} symbols loaded") sym_vol = {sd["symbol"]: sd["volume_24h"] for sd in symbols_data} # Run sweep print(f"\n Running {len(combos)} combinations across {len(all_data)} symbols...") results = [] t0 = time.time() for ci, combo in enumerate(combos): params = dict(zip(keys, combo)) all_deals = [] for sym, data in all_data.items(): if sym_vol.get(sym, 0) < params["vol_min"]: continue deals = simulate_deals( data["z"], data["natr"], data["chop"], data["closes"], data["highs"], data["lows"], data["ts"], params ) for d in deals: d["symbol"] = sym all_deals.extend(deals) if len(all_deals) < 5: continue total_pnl = sum(d["pnl"] for d in all_deals) wins = [d for d in all_deals if d["pnl"] > 0] losses = [d for d in all_deals if d["pnl"] <= 0] wr = len(wins) / len(all_deals) * 100 gp = sum(d["pnl"] for d in wins) if wins else 0 gl = abs(sum(d["pnl"] for d in losses)) if losses else 0.001 pf = gp / gl if gl > 0 else 999 avg_bars = sum(d["bars"] for d in all_deals) / len(all_deals) avg_sos = sum(d.get("sos", 0) for d in all_deals) / len(all_deals) avg_invested = sum(d.get("invested", ORDER_USD) for d in all_deals) / len(all_deals) tp_cnt = sum(1 for d in all_deals if d["reason"] == "TP") ztp_cnt = sum(1 for d in all_deals if d["reason"] == "Z-TP") sl_cnt = sum(1 for d in all_deals if d["reason"] == "SL") time_cnt = sum(1 for d in all_deals if d["reason"] == "TIME") # Per-symbol breakdown for top results sym_pnl = {} for d in all_deals: s = d["symbol"] if s not in sym_pnl: sym_pnl[s] = {"pnl": 0, "cnt": 0, "wins": 0} sym_pnl[s]["pnl"] += d["pnl"] sym_pnl[s]["cnt"] += 1 if d["pnl"] > 0: sym_pnl[s]["wins"] += 1 # Top 3 winners and losers by symbol sorted_syms = sorted(sym_pnl.items(), key=lambda x: x[1]["pnl"], reverse=True) top_winners = [(s, round(v["pnl"], 2)) for s, v in sorted_syms[:3]] top_losers = [(s, round(v["pnl"], 2)) for s, v in sorted_syms[-3:]] results.append({ **params, "deals": len(all_deals), "pnl": round(total_pnl, 2), "pnl_per_day": round(total_pnl / DAYS, 2), "wr": round(wr, 1), "pf": round(pf, 2), "avg_win": round(gp / len(wins), 3) if wins else 0, "avg_loss": round(gl / len(losses), 3) if losses else 0, "avg_bars": round(avg_bars, 1), "avg_sos": round(avg_sos, 2), "avg_invested": round(avg_invested, 1), "tp": tp_cnt, "ztp": ztp_cnt, "sl": sl_cnt, "time": time_cnt, "top_win_syms": top_winners, "top_lose_syms": top_losers, }) if (ci + 1) % 1000 == 0: elapsed = time.time() - t0 rate = (ci + 1) / elapsed eta = (len(combos) - ci - 1) / rate / 60 profitable = sum(1 for r in results if r["pnl"] > 0) print(f" ... {ci+1}/{len(combos)} ({rate:.0f}/s, ETA {eta:.1f}min) โ€” {len(results)} valid, {profitable} profitable") elapsed = time.time() - t0 profitable = sum(1 for r in results if r["pnl"] > 0) print(f"\n Done in {elapsed:.0f}s! {len(results)} valid, {profitable} profitable") # ============================================================ # DISPLAY RESULTS # ============================================================ header = f"{'#':>3} {'Z':>4} {'Zmax':>4} {'TP%':>5} {'SL%':>5} {'NATRm':>5} {'NATRx':>5} {'CHOP':>4} {'VolM':>4} {'SO':>3} | {'Deals':>5} {'PnL':>8} {'$/d':>6} {'WR%':>5} {'PF':>6} {'AvgW':>7} {'AvgL':>7} {'Bars':>5} {'SOs':>4} | {'TP':>4} {'ZTP':>4} {'SL':>4} {'TIM':>4}" def print_row(i, r): vol_m = r["vol_min"] // 1_000_000 natr_x = r["natr_max"] if r["natr_max"] > 0 else 0 z_mx = r["z_max"] if r["z_max"] > 0 else 0 print(f"{i:>3} {r['z_entry']:>4.1f} {z_mx:>4.1f} {r['tp_pct']:>5.1f} {r['sl_pct']:>5.1f} {r['natr_min']:>5.2f} {natr_x:>5.1f} {r['chop_min']:>4.0f} {vol_m:>4.0f} {r['max_so']:>3} | {r['deals']:>5} ${r['pnl']:>+7.2f} ${r['pnl_per_day']:>+5.2f} {r['wr']:>5.1f} {r['pf']:>6.2f} ${r['avg_win']:>6.3f} ${r['avg_loss']:>6.3f} {r['avg_bars']:>5.1f} {r['avg_sos']:>4.1f} | {r['tp']:>4} {r['ztp']:>4} {r['sl']:>4} {r['time']:>4}") min_deals = 15 # ===== TOP 30 by PnL ===== results_pnl = sorted([r for r in results if r["deals"] >= min_deals], key=lambda x: x["pnl"], reverse=True) print(f"\n{'='*160}") print(f" TOP 30 by Total PnL (min {min_deals} deals)") print(f"{'='*160}") print(header) print("-" * 160) for i, r in enumerate(results_pnl[:30]): print_row(i+1, r) if i < 5: print(f" Winners: {r['top_win_syms']}") print(f" Losers: {r['top_lose_syms']}") # ===== TOP 30 by PF (min 20 deals) ===== results_pf = sorted([r for r in results if r["deals"] >= 20 and r["pf"] < 100], key=lambda x: x["pf"], reverse=True) print(f"\n{'='*160}") print(f" TOP 30 by Profit Factor (min 20 deals)") print(f"{'='*160}") print(header) print("-" * 160) for i, r in enumerate(results_pf[:30]): print_row(i+1, r) # ===== TOP by $/day (PF > 1) ===== results_daily = sorted([r for r in results if r["deals"] >= min_deals and r["pf"] > 1.0], key=lambda x: x["pnl_per_day"], reverse=True) print(f"\n{'='*160}") print(f" TOP 30 by $/day (min {min_deals} deals, PF > 1)") print(f"{'='*160}") print(header) print("-" * 160) for i, r in enumerate(results_daily[:30]): print_row(i+1, r) # ===== BALANCED: PF > 1.3, >= 30 deals, WR > 45% ===== balanced = sorted([r for r in results if r["pf"] > 1.3 and r["deals"] >= 30 and r["wr"] > 45], key=lambda x: x["pnl"], reverse=True) if balanced: print(f"\n{'='*160}") print(f" BALANCED (PF > 1.3, >= 30 deals, WR > 45%)") print(f"{'='*160}") print(header) print("-" * 160) for i, r in enumerate(balanced[:30]): print_row(i+1, r) # ===== WITH-SO only (best DCA combos) ===== so_results = sorted([r for r in results if r["max_so"] > 0 and r["deals"] >= min_deals], key=lambda x: x["pnl"], reverse=True) if so_results: print(f"\n{'='*160}") print(f" BEST DCA/SO COMBOS (max_so > 0, min {min_deals} deals)") print(f"{'='*160}") print(header) print("-" * 160) for i, r in enumerate(so_results[:30]): print_row(i+1, r) if i < 5: print(f" Winners: {r['top_win_syms']}") print(f" Losers: {r['top_lose_syms']}") # ===== NO-SO only (best simple combos) ===== noso_results = sorted([r for r in results if r["max_so"] == 0 and r["deals"] >= min_deals], key=lambda x: x["pnl"], reverse=True) if noso_results: print(f"\n{'='*160}") print(f" BEST NO-SO COMBOS (simple entry/exit, min {min_deals} deals)") print(f"{'='*160}") print(header) print("-" * 160) for i, r in enumerate(noso_results[:30]): print_row(i+1, r) # ===== PARAMETER HEATMAP: what values appear most in profitable combos ===== profitable_results = [r for r in results if r["pnl"] > 0 and r["deals"] >= 15] if profitable_results: print(f"\n{'='*80}") print(f" PARAMETER FREQUENCY IN PROFITABLE COMBOS ({len(profitable_results)} combos)") print(f"{'='*80}") for param in keys: counts = {} pnl_sums = {} for r in profitable_results: v = r[param] counts[v] = counts.get(v, 0) + 1 pnl_sums[v] = pnl_sums.get(v, 0) + r["pnl"] print(f"\n {param}:") for v in sorted(counts.keys()): avg_pnl = pnl_sums[v] / counts[v] bar = "โ–ˆ" * (counts[v] * 40 // max(counts.values())) print(f" {str(v):>8s} : {counts[v]:>4} combos, avg PnL ${avg_pnl:>+6.2f} {bar}") # ===== SUMMARY ===== print(f"\n{'='*80}") print(f" SUMMARY") print(f"{'='*80}") print(f" Total tested: {len(combos)}") print(f" Valid (>= 5 deals): {len(results)}") print(f" Profitable: {profitable} ({profitable/len(results)*100:.1f}%)" if results else "") if results_pnl: best = results_pnl[0] print(f" Best PnL/30d: ${best['pnl']:+.2f} ({best['pnl_per_day']:+.2f}/day), {best['deals']} deals, WR {best['wr']}%, PF {best['pf']}") print(f" Params: Z={best['z_entry']}, Zmax={best['z_max']}, TP={best['tp_pct']}%, SL={best['sl_pct']}%, NATR={best['natr_min']}-{best['natr_max']}, CHOP>={best['chop_min']}, Vol>={best['vol_min']//1e6}M, SO={best['max_so']}") if results_pf: best_pf = results_pf[0] print(f" Best PF: {best_pf['pf']} ({best_pf['deals']} deals, ${best_pf['pnl']:+.2f})") print(f" Elapsed: {elapsed:.0f}s ({elapsed/60:.1f}min)") # Save output_path = os.path.join(os.path.dirname(__file__), "results_full_sweep.json") with open(output_path, "w") as f: json.dump({ "grid": {k: [str(v) for v in vs] for k, vs in PARAM_GRID.items()}, "config": { "days": DAYS, "order_usd": ORDER_USD, "so_base_usd": SO_BASE_USD, "leverage": LEVERAGE, "timeframe": TIMEFRAME, "blacklist": list(BLACKLIST), }, "symbols_count": len(all_data), "total_combos": len(combos), "valid": len(results), "profitable": profitable, "elapsed_sec": round(elapsed), "top_by_pnl": sorted(results, key=lambda x: x["pnl"], reverse=True)[:200], "top_by_pf": sorted([r for r in results if r["deals"] >= 20], key=lambda x: x["pf"], reverse=True)[:100], "profitable_combos": sorted(profitable_results, key=lambda x: x["pnl"], reverse=True)[:300] if profitable_results else [], }, f, indent=2, default=str) print(f" Saved to {output_path}") if __name__ == "__main__": main()