โ† ะะฐะทะฐะด
""" Backtest: DCA Z-VWAP with Simplified Filters v2 ================================================= Dynamic screener (ALL Bybit USDT perps), filters: - Volume >= $20M - NATR >= 0.75% (no max cap, no CHOP) - EMA trend filter: LONG only above EMA, SHORT only below - No Z-max cap (removed) - Blacklist: only heavyweights (BTC, ETH, TRX, stables) Usage: python backtests/backtest_simplified_filters.py """ import sys, os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import json import time import numpy as np from datetime import datetime from pybit.unified_trading import HTTP # ============================================================ # CONFIG # ============================================================ DCA_CONFIG = { "base_order_usd": 7.0, "safety_order_usd": 5.0, "max_safety_orders": 3, "natr_factor": 1.0, # SO spacing = NATR ร— factor ร— step_scale^n "step_scale": 1.3, "volume_scale": 1.3, "take_profit_pct": 1.5, "stop_loss_pct": 8.0, "z_entry_threshold": 1.8, "z_tp_threshold": 0.3, "cooldown_bars": 12, # 12ร—5m = 1h "leverage": 3, "ema_period": 50, # EMA trend filter (50 bars ร— 5m = ~4h) } VWAP_PERIOD = 50 MAKER_FEE = 0.0002 TAKER_FEE = 0.00055 TIMEFRAME = "5" DAYS = 7 # === SIMPLIFIED FILTERS === MIN_NATR_PCT = 0.75 # only minimum, no max MIN_VOLUME_24H = 20_000_000 MAX_DEALS_CONCURRENT = 6 # === SLIM BLACKLIST โ€” only heavyweights === BLACKLIST = { "BTCUSDT", "ETHUSDT", "TRXUSDT", "USDCUSDT", "BTCPERP", "ETHPERP", } session = HTTP(testnet=False) # ============================================================ # DYNAMIC SCREENER โ€” fetch all USDT perps, filter by volume # ============================================================ def get_screener_symbols(): """Get all tradeable USDT perps with volume >= threshold.""" print("๐Ÿ“ก Fetching all Bybit USDT perps...") resp = session.get_tickers(category="linear") if resp["retCode"] != 0: print(f" โŒ Ticker error: {resp['retMsg']}") return [] symbols = [] for t in resp["result"]["list"]: sym = t["symbol"] if not sym.endswith("USDT"): continue if sym in BLACKLIST: continue vol24h = float(t.get("turnover24h", 0)) if vol24h < MIN_VOLUME_24H: continue symbols.append({ "symbol": sym, "volume_24h": vol24h, "price": float(t.get("lastPrice", 0)), }) symbols.sort(key=lambda x: x["volume_24h"], reverse=True) print(f" โœ… {len(symbols)} symbols pass volume filter (>= ${MIN_VOLUME_24H/1e6:.0f}M)") return symbols # ============================================================ # DATA FETCHING # ============================================================ def fetch_klines(symbol: str, interval: str, days: int = 7) -> list[dict]: """Fetch klines from Bybit.""" all_klines = [] bars_needed = days * 24 * 60 // int(interval) end_time = int(datetime.now().timestamp() * 1000) while len(all_klines) < bars_needed: try: resp = session.get_kline( category="linear", symbol=symbol, interval=interval, limit=1000, end=end_time, ) if resp["retCode"] != 0: break items = resp["result"]["list"] if not items: break for item in items: all_klines.append({ "ts": int(item[0]), "open": float(item[1]), "high": float(item[2]), "low": float(item[3]), "close": float(item[4]), "volume": float(item[5]), }) end_time = int(items[-1][0]) - 1 if len(items) < 1000: break except Exception as e: print(f" Fetch error {symbol}: {e}") break all_klines.reverse() seen = set() unique = [] for k in all_klines: if k["ts"] not in seen: seen.add(k["ts"]) unique.append(k) return unique[-bars_needed:] if len(unique) > bars_needed else unique # ============================================================ # INDICATORS # ============================================================ def calc_zvwap(highs, lows, closes, volumes, period=VWAP_PERIOD): """Z-Score from rolling VWAP.""" n = len(closes) z_scores = np.full(n, 0.0) for i in range(period, n): h = highs[i-period:i] l = lows[i-period:i] c = closes[i-period:i] v = volumes[i-period:i] tp = (h + l + c) / 3 cum_tp_vol = np.cumsum(tp * v) cum_vol = np.cumsum(v) cum_vol_safe = np.where(cum_vol == 0, 1, cum_vol) vwap_arr = cum_tp_vol / cum_vol_safe vwap = vwap_arr[-1] deviations = c - vwap_arr std = np.std(deviations) if std > 0: z_scores[i] = (closes[i] - vwap) / std return z_scores def calc_natr(highs, lows, closes, period=14): """Normalized ATR (%) over last `period` bars.""" n = len(closes) if n < period + 1: return 0.0 trs = [] for i in range(n - period, n): tr = max(highs[i] - lows[i], abs(highs[i] - closes[i-1]), abs(lows[i] - closes[i-1])) trs.append(tr) atr = np.mean(trs) return (atr / closes[-1]) * 100 if closes[-1] > 0 else 0.0 def calc_ema(closes, period): """Calculate EMA array. Returns np array same length as closes.""" ema = np.full(len(closes), np.nan) if len(closes) < period: return ema # Seed with SMA ema[period - 1] = np.mean(closes[:period]) k = 2.0 / (period + 1) for i in range(period, len(closes)): ema[i] = closes[i] * k + ema[i - 1] * (1 - k) return ema # ============================================================ # DCA DEAL SIMULATOR # ============================================================ class DCADeal: def __init__(self, symbol, side, entry_price, entry_bar, config, natr_pct): self.symbol = symbol self.side = side self.config = config self.entry_bar = entry_bar self.natr_pct = natr_pct leverage = config["leverage"] self.orders = [{ "price": entry_price, "usd": config["base_order_usd"], "qty": (config["base_order_usd"] * leverage) / entry_price, "type": "BO", }] # NATR-based SO spacing (like live bot) self.so_triggers = [] cumul_deviation = 0 for i in range(config["max_safety_orders"]): spacing = natr_pct * config["natr_factor"] * (config["step_scale"] ** i) cumul_deviation += spacing so_size = config["safety_order_usd"] * (config["volume_scale"] ** i) if side == "LONG": trigger_price = entry_price * (1 - cumul_deviation / 100) else: trigger_price = entry_price * (1 + cumul_deviation / 100) self.so_triggers.append({ "price": trigger_price, "usd": so_size, "deviation_pct": cumul_deviation, "filled": False, }) self.so_filled = 0 self.so_fill_times = [] self.closed = False self.close_price = 0 self.close_bar = 0 self.close_reason = "" self.pnl = 0 self.fees = 0 self.max_drawdown_pct = 0 @property def avg_entry(self): total_qty = sum(o["qty"] for o in self.orders) total_cost = sum(o["qty"] * o["price"] for o in self.orders) return total_cost / total_qty if total_qty > 0 else 0 @property def total_qty(self): return sum(o["qty"] for o in self.orders) @property def total_invested(self): return sum(o["usd"] for o in self.orders) def tick(self, bar_idx, high, low, close, z_score): if self.closed: return True leverage = self.config["leverage"] # Check SO fills for i, so in enumerate(self.so_triggers): if so["filled"]: continue filled = False if self.side == "LONG" and low <= so["price"]: filled = True elif self.side == "SHORT" and high >= so["price"]: filled = True if filled: qty = (so["usd"] * leverage) / so["price"] self.orders.append({ "price": so["price"], "usd": so["usd"], "qty": qty, "type": f"SO{i+1}", }) so["filled"] = True self.so_filled += 1 self.so_fill_times.append(bar_idx) self.fees += so["usd"] * leverage * MAKER_FEE avg = self.avg_entry # Max drawdown if self.side == "LONG": dd_pct = (avg - low) / avg * 100 else: dd_pct = (high - avg) / avg * 100 self.max_drawdown_pct = max(self.max_drawdown_pct, dd_pct) # SO Guard: 3 SOs in 15 bars (75 min on 5m) โ†’ force close if len(self.so_fill_times) >= 3: last3 = self.so_fill_times[-3:] if last3[-1] - last3[0] <= 15: self.close_price = close self._close(bar_idx, "SO_GUARD") return True # Time Stop (progressive) bars_open = bar_idx - self.entry_bar hours_open = bars_open * 5 / 60 # 5m bars time_limits = {1: 3.0, 2: 2.0, 3: 1.5} for min_sos, max_hours in time_limits.items(): if self.so_filled >= min_sos and hours_open >= max_hours: self.close_price = close self._close(bar_idx, f"TIME_STOP({self.so_filled}SO/{hours_open:.1f}h)") return True # TP tp_price = avg * (1 + self.config["take_profit_pct"] / 100) if self.side == "LONG" else \ avg * (1 - self.config["take_profit_pct"] / 100) tp_hit = False if self.side == "LONG" and high >= tp_price: tp_hit = True self.close_price = tp_price elif self.side == "SHORT" and low <= tp_price: tp_hit = True self.close_price = tp_price # Z-reversion TP z_tp = False if self.side == "LONG" and z_score >= -self.config["z_tp_threshold"]: if close > avg: z_tp = True self.close_price = close elif self.side == "SHORT" and z_score <= self.config["z_tp_threshold"]: if close < avg: z_tp = True self.close_price = close # SL sl_price = avg * (1 - self.config["stop_loss_pct"] / 100) if self.side == "LONG" else \ avg * (1 + self.config["stop_loss_pct"] / 100) sl_hit = False if self.side == "LONG" and low <= sl_price: sl_hit = True self.close_price = sl_price elif self.side == "SHORT" and high >= sl_price: sl_hit = True self.close_price = sl_price if sl_hit: self._close(bar_idx, "SL") return True elif tp_hit: self._close(bar_idx, "TP%") return True elif z_tp: self._close(bar_idx, "Z-TP") return True return False def _close(self, bar_idx, reason): self.closed = True self.close_bar = bar_idx self.close_reason = reason total_qty = self.total_qty avg = self.avg_entry if self.side == "LONG": self.pnl = total_qty * (self.close_price - avg) else: self.pnl = total_qty * (avg - self.close_price) exit_fee = total_qty * self.close_price * TAKER_FEE entry_fee = self.orders[0]["qty"] * self.orders[0]["price"] * TAKER_FEE self.fees += exit_fee + entry_fee self.pnl -= self.fees # ============================================================ # BACKTEST ENGINE (multi-symbol concurrent deals) # ============================================================ def run_backtest(symbols_data, config): """Run backtest across all symbols with concurrent deal limit.""" print(f"\n{'='*70}") print(f" Running {DAYS}-day backtest on {len(symbols_data)} symbols") print(f" Filters: NATR >= {MIN_NATR_PCT}%, Vol >= ${MIN_VOLUME_24H/1e6:.0f}M") print(f" Blacklist: {sorted(BLACKLIST)}") print(f" Max concurrent deals: {MAX_DEALS_CONCURRENT}") print(f"{'='*70}") print(f" BO=${config['base_order_usd']}, SO=${config['safety_order_usd']}, " f"MaxSO={config['max_safety_orders']}, SL={config['stop_loss_pct']}%, " f"TP={config['take_profit_pct']}%, Lev={config['leverage']}x") print(f" Z-entry={config['z_entry_threshold']}, Z-TP={config['z_tp_threshold']}, " f"EMA trend={config['ema_period']} (trade WITH trend only)") print() # Fetch data for all symbols all_kline_data = {} skipped_natr = [] for idx, sd in enumerate(symbols_data): sym = sd["symbol"] print(f" [{idx+1}/{len(symbols_data)}] {sym} (Vol ${sd['volume_24h']/1e6:.1f}M)...", end=" ", flush=True) time.sleep(0.15) # rate limit klines = fetch_klines(sym, TIMEFRAME, days=DAYS) if len(klines) < VWAP_PERIOD + 50: print(f"โš ๏ธ {len(klines)} bars, skip") continue # Calculate NATR on fetched data closes = np.array([k["close"] for k in klines]) highs = np.array([k["high"] for k in klines]) lows = np.array([k["low"] for k in klines]) natr = calc_natr(highs, lows, closes, 14) if natr < MIN_NATR_PCT: print(f"โš ๏ธ NATR {natr:.2f}% < {MIN_NATR_PCT}%, skip") skipped_natr.append((sym, natr)) continue print(f"โœ… {len(klines)} bars, NATR={natr:.2f}%") all_kline_data[sym] = { "klines": klines, "natr": natr, "volume": sd["volume_24h"], } print(f"\n ๐Ÿ“Š {len(all_kline_data)} symbols ready for backtest") if skipped_natr: print(f" โญ๏ธ Skipped (low NATR): {', '.join(f'{s}({n:.2f}%)' for s,n in skipped_natr[:10])}") # Align all data to common timeline using bar index # For simplicity: run each symbol independently but track concurrent deals globally # Build unified timeline: merge all bars from all symbols all_timestamps = set() for sym, data in all_kline_data.items(): for k in data["klines"]: all_timestamps.add(k["ts"]) timeline = sorted(all_timestamps) ts_to_idx = {ts: i for i, ts in enumerate(timeline)} # Index klines by timestamp for each symbol sym_bars = {} for sym, data in all_kline_data.items(): bars_by_ts = {} for k in data["klines"]: bars_by_ts[k["ts"]] = k sym_bars[sym] = bars_by_ts # Pre-compute Z-scores and EMA for each symbol sym_z = {} sym_ema = {} sym_np = {} ema_period = config.get("ema_period", 50) for sym, data in all_kline_data.items(): klines = data["klines"] closes = np.array([k["close"] for k in klines]) highs = np.array([k["high"] for k in klines]) lows = np.array([k["low"] for k in klines]) volumes = np.array([k["volume"] for k in klines]) z_scores = calc_zvwap(highs, lows, closes, volumes, VWAP_PERIOD) ema_arr = calc_ema(closes, ema_period) sym_z[sym] = {klines[i]["ts"]: z_scores[i] for i in range(len(klines))} sym_ema[sym] = {klines[i]["ts"]: ema_arr[i] for i in range(len(klines))} sym_np[sym] = {"highs": highs, "lows": lows, "closes": closes, "volumes": volumes, "timestamps": [k["ts"] for k in klines]} # Simulate with global concurrent deal limit active_deals = {} # symbol -> DCADeal all_deals = [] cooldowns = {} # symbol -> cooldown_until_ts signals_generated = 0 signals_skipped_max_deals = 0 signals_skipped_ema = 0 for ts in timeline: # Check active deals for sym in list(active_deals.keys()): deal = active_deals[sym] if ts not in sym_bars.get(sym, {}): continue bar = sym_bars[sym][ts] z = sym_z[sym].get(ts, 0) closed = deal.tick( ts_to_idx[ts], bar["high"], bar["low"], bar["close"], z ) if closed: all_deals.append(deal) cooldowns[sym] = ts + config["cooldown_bars"] * 5 * 60 * 1000 # cooldown in ms del active_deals[sym] # Check for new entries for sym, data in all_kline_data.items(): if sym in active_deals: continue if ts not in sym_bars.get(sym, {}): continue if sym in cooldowns and ts < cooldowns[sym]: continue z = sym_z[sym].get(ts, 0) bar = sym_bars[sym][ts] ema_val = sym_ema[sym].get(ts, np.nan) # Check Z thresholds if abs(z) <= config["z_entry_threshold"]: continue signals_generated += 1 # EMA trend filter: only trade WITH trend # Price above EMA โ†’ uptrend โ†’ only LONG allowed # Price below EMA โ†’ downtrend โ†’ only SHORT allowed side = "LONG" if z < -config["z_entry_threshold"] else "SHORT" if not np.isnan(ema_val): if side == "LONG" and bar["close"] < ema_val: signals_skipped_ema += 1 continue if side == "SHORT" and bar["close"] > ema_val: signals_skipped_ema += 1 continue if len(active_deals) >= MAX_DEALS_CONCURRENT: signals_skipped_max_deals += 1 continue natr = data["natr"] deal = DCADeal(sym, side, bar["close"], ts_to_idx[ts], config, natr) active_deals[sym] = deal # Force-close remaining deals for sym, deal in active_deals.items(): klines = all_kline_data[sym]["klines"] deal.close_price = klines[-1]["close"] deal._close(ts_to_idx[klines[-1]["ts"]], "END") all_deals.append(deal) return all_deals, signals_generated, signals_skipped_max_deals, signals_skipped_ema, all_kline_data # ============================================================ # DISPLAY RESULTS # ============================================================ def print_results(all_deals, signals_gen, skip_max, skip_ema, kline_data): if not all_deals: print("\nโŒ No deals generated!") return total_pnl = sum(d.pnl for d in all_deals) wins = [d for d in all_deals if d.pnl > 0] losses = [d for d in all_deals if d.pnl <= 0] wr = len(wins) / len(all_deals) * 100 gross_profit = sum(d.pnl for d in wins) if wins else 0 gross_loss = abs(sum(d.pnl for d in losses)) if losses else 0.001 pf = gross_profit / gross_loss if gross_loss > 0 else float('inf') avg_win = gross_profit / len(wins) if wins else 0 avg_loss = gross_loss / len(losses) if losses else 0 max_dd = max(d.max_drawdown_pct for d in all_deals) total_fees = sum(d.fees for d in all_deals) avg_sos = sum(d.so_filled for d in all_deals) / len(all_deals) avg_bars = sum(d.close_bar - d.entry_bar for d in all_deals) / len(all_deals) longs = [d for d in all_deals if d.side == "LONG"] shorts = [d for d in all_deals if d.side == "SHORT"] # Close reasons reasons = {} for d in all_deals: r = d.close_reason.split("(")[0] # group TIME_STOP variants reasons[r] = reasons.get(r, 0) + 1 print(f"\n{'='*70}") print(f" RESULTS โ€” {DAYS}-day backtest, {len(kline_data)} symbols") print(f"{'='*70}") pnl_emoji = "๐ŸŸข" if total_pnl > 0 else "๐Ÿ”ด" print(f"\n {pnl_emoji} Total PnL: ${total_pnl:+.2f}") print(f" ๐Ÿ“Š Profit Factor: {pf:.2f}") print(f" ๐ŸŽฏ Win Rate: {wr:.1f}% ({len(wins)}W / {len(losses)}L)") print(f" ๐Ÿ“ˆ Avg Win: ${avg_win:.3f}") print(f" ๐Ÿ“‰ Avg Loss: ${avg_loss:.3f}") print(f" ๐Ÿ’ฐ Deals: {len(all_deals)} (L:{len(longs)} / S:{len(shorts)})") print(f" ๐Ÿ”„ Avg SOs: {avg_sos:.1f}") print(f" โฑ๏ธ Avg Duration: {avg_bars:.0f} bars ({avg_bars*5/60:.1f}h)") print(f" ๐Ÿ“‰ Max DD: {max_dd:.1f}%") print(f" ๐Ÿ’ธ Total Fees: ${total_fees:.2f}") print(f" ๐Ÿ“ก Signals: {signals_gen} (skipped: {skip_ema} EMA-trend, {skip_max} max-deals)") print(f"\n Close Reasons:") for r, cnt in sorted(reasons.items(), key=lambda x: -x[1]): pct = cnt / len(all_deals) * 100 print(f" {r:20s} {cnt:4d} ({pct:.0f}%)") # Long vs Short long_pnl = sum(d.pnl for d in longs) short_pnl = sum(d.pnl for d in shorts) long_wr = sum(1 for d in longs if d.pnl > 0) / len(longs) * 100 if longs else 0 short_wr = sum(1 for d in shorts if d.pnl > 0) / len(shorts) * 100 if shorts else 0 print(f"\n LONG: ${long_pnl:+.2f} ({len(longs)} deals, WR={long_wr:.0f}%)") print(f" SHORT: ${short_pnl:+.2f} ({len(shorts)} deals, WR={short_wr:.0f}%)") # Per-symbol breakdown (top 10 by deal count) sym_stats = {} for d in all_deals: if d.symbol not in sym_stats: sym_stats[d.symbol] = {"deals": 0, "pnl": 0, "wins": 0} sym_stats[d.symbol]["deals"] += 1 sym_stats[d.symbol]["pnl"] += d.pnl if d.pnl > 0: sym_stats[d.symbol]["wins"] += 1 print(f"\n {'โ”€'*60}") print(f" Per-Symbol Breakdown (sorted by PnL):") print(f" {'Symbol':15s} {'Deals':>6s} {'PnL':>10s} {'WR':>6s} {'AvgPnL':>10s}") print(f" {'โ”€'*60}") sorted_syms = sorted(sym_stats.items(), key=lambda x: x[1]["pnl"], reverse=True) for sym, st in sorted_syms: wr_sym = st["wins"] / st["deals"] * 100 if st["deals"] > 0 else 0 avg_pnl = st["pnl"] / st["deals"] emoji = "๐ŸŸข" if st["pnl"] > 0 else "๐Ÿ”ด" print(f" {emoji} {sym:13s} {st['deals']:>5d} ${st['pnl']:>+8.2f} {wr_sym:>5.0f}% ${avg_pnl:>+8.3f}") # Would-be blacklist comparison old_blacklist_coins = {"RAVEUSDT", "FARTCOINUSDT", "TAOUSDT", "ARIAUSDT", "SIRENUSDT", "MAGMAUSDT"} old_bl_deals = [d for d in all_deals if d.symbol in old_blacklist_coins] if old_bl_deals: old_bl_pnl = sum(d.pnl for d in old_bl_deals) old_bl_wins = sum(1 for d in old_bl_deals if d.pnl > 0) old_bl_wr = old_bl_wins / len(old_bl_deals) * 100 print(f"\n {'โ”€'*60}") print(f" โš ๏ธ Ex-blacklist coins (RAVE/FART/TAO/ARIA/SIREN/MAGMA):") print(f" {len(old_bl_deals)} deals, PnL=${old_bl_pnl:+.2f}, WR={old_bl_wr:.0f}%") pnl_without = total_pnl - old_bl_pnl print(f" Without them: PnL=${pnl_without:+.2f} (delta ${-old_bl_pnl:+.2f})") return sym_stats # ============================================================ # MAIN # ============================================================ def main(): print("=" * 70) print(" DCA Z-VWAP Backtest โ€” SIMPLIFIED FILTERS v2") print(" NATR >= 0.75% | No max NATR | No CHOP | No Z-max") print(" + EMA50 trend filter (trade WITH trend)") print(" Blacklist: BTC, ETH, TRX only") print("=" * 70) symbols_data = get_screener_symbols() if not symbols_data: print("โŒ No symbols found") return # Limit to top 50 by volume to keep backtest manageable symbols_data = symbols_data[:50] print(f" Testing top {len(symbols_data)} by volume") all_deals, sig_gen, skip_max, skip_zmax, kline_data = run_backtest(symbols_data, DCA_CONFIG) sym_stats = print_results(all_deals, sig_gen, skip_max, skip_zmax, kline_data) # Save results output_path = os.path.join(os.path.dirname(__file__), "results_simplified_filters.json") save_data = { "config": DCA_CONFIG, "filters": { "min_natr_pct": MIN_NATR_PCT, "max_natr_pct": "NONE (removed)", "min_chop": "NONE (removed)", "min_volume_24h": MIN_VOLUME_24H, "blacklist": sorted(BLACKLIST), }, "summary": { "total_deals": len(all_deals), "total_pnl": round(sum(d.pnl for d in all_deals), 4), "win_rate": round(sum(1 for d in all_deals if d.pnl > 0) / len(all_deals) * 100, 1) if all_deals else 0, "symbols_tested": len(kline_data), "signals_generated": sig_gen, }, "per_symbol": {sym: {"deals": st["deals"], "pnl": round(st["pnl"], 4), "wr": round(st["wins"]/st["deals"]*100, 1) if st["deals"] > 0 else 0} for sym, st in (sym_stats or {}).items()}, "deals": [ { "symbol": d.symbol, "side": d.side, "entry": round(d.orders[0]["price"], 6), "avg": round(d.avg_entry, 6), "exit": round(d.close_price, 6), "pnl": round(d.pnl, 4), "sos": d.so_filled, "natr": round(d.natr_pct, 2), "reason": d.close_reason, "invested": round(d.total_invested, 2), "dd": round(d.max_drawdown_pct, 2), } for d in all_deals ], } with open(output_path, "w") as f: json.dump(save_data, f, indent=2) print(f"\n๐Ÿ’พ Results saved to {output_path}") if __name__ == "__main__": main()