← Назад
""" Deep analysis: EMA50 threshold 3% vs 5% β€” granular 0.5% steps. Uses the enriched trade data from backtest_chop_vs_ema (full simulation). Re-runs simulation with fine-grained EMA thresholds + bucket analysis. """ import json import time import numpy as np import requests from datetime import datetime, timezone, timedelta from collections import defaultdict BYBIT_KLINE_URL = "https://api.bybit.com/v5/market/kline" BYBIT_TICKER_URL = "https://api.bybit.com/v5/market/tickers" TP_PCT = 1.5 SL_PCT = 10.0 Z_THRESHOLD = 1.8 NATR_MIN = 0.75 NATR_MAX = 2.5 COOLDOWN_BARS = 60 BLACKLIST = {"BTCUSDT", "ETHUSDT", "USDCUSDT"} def fetch_klines_paginated(symbol, interval, start_ms, end_ms, limit=1000): all_klines = [] cursor_end = end_ms while cursor_end > start_ms: params = {"category": "linear", "symbol": symbol, "interval": interval, "endTime": cursor_end, "limit": limit} try: r = requests.get(BYBIT_KLINE_URL, params=params, timeout=15) data = r.json() if data.get("retCode") != 0: break rows = data["result"]["list"] if not rows: break rows.reverse() for row in rows: ts = int(row[0]) if start_ms <= ts <= end_ms: all_klines.append(row) oldest = int(rows[0][0]) if oldest <= start_ms: break cursor_end = oldest - 1 time.sleep(0.2) except Exception as e: print(f" Error {symbol}: {e}") break seen = set() unique = [] for k in all_klines: ts = int(k[0]) if ts not in seen: seen.add(ts) unique.append(k) unique.sort(key=lambda x: int(x[0])) return unique def get_top_symbols(n=50): r = requests.get(BYBIT_TICKER_URL, params={"category": "linear"}, timeout=10) data = r.json() candidates = [] for t in data["result"]["list"]: sym = t["symbol"] if not sym.endswith("USDT") or sym in BLACKLIST: continue vol = float(t.get("turnover24h", 0)) if vol >= 20_000_000: candidates.append((sym, vol)) candidates.sort(key=lambda x: x[1], reverse=True) return [c[0] for c in candidates[:n]] def calc_indicators(highs, lows, closes, volumes, idx, z_period=50): if idx < max(z_period + 10, 200): return None h = highs[idx - z_period:idx + 1] l = lows[idx - z_period:idx + 1] c = closes[idx - z_period:idx + 1] v = volumes[idx - z_period:idx + 1] tp = (h + l + c) / 3 cum_tp_vol = np.cumsum(tp * v) cum_vol = np.cumsum(v) cum_vol_safe = np.where(cum_vol == 0, 1, cum_vol) vwap_arr = cum_tp_vol / cum_vol_safe vwap = vwap_arr[-1] deviations = c - vwap_arr std = np.std(deviations) if std == 0: return None z_score = float((c[-1] - vwap) / std) tr_slice = 15 h_tr = highs[idx - tr_slice + 1:idx + 1] l_tr = lows[idx - tr_slice + 1:idx + 1] c_tr = closes[idx - tr_slice:idx + 1] tr = np.maximum(h_tr - l_tr, np.maximum(np.abs(h_tr - c_tr[:-1]), np.abs(l_tr - c_tr[:-1]))) atr = np.mean(tr[-14:]) natr = (atr / closes[idx]) * 100 if closes[idx] > 0 else 0 ema50 = None if idx >= 50: ema = closes[0] mult = 2 / 51 for i in range(1, idx + 1): ema = (closes[i] - ema) * mult + ema ema50 = ema ema50_dist = None if ema50 and ema50 > 0: ema50_dist = abs((closes[idx] - ema50) / ema50 * 100) return {"z": z_score, "natr": natr, "ema50_dist": ema50_dist, "price": float(closes[idx])} def simulate_deal(closes, start_idx, side, tp_pct, sl_pct): entry = closes[start_idx] for i in range(start_idx + 1, min(start_idx + 500, len(closes))): price = closes[i] pnl_pct = ((price - entry) / entry * 100) if side == "BUY" else ((entry - price) / entry * 100) if pnl_pct >= tp_pct: return pnl_pct, "TP", i - start_idx if pnl_pct <= -sl_pct: return pnl_pct, "SL", i - start_idx price = closes[min(start_idx + 499, len(closes) - 1)] pnl_pct = ((price - entry) / entry * 100) if side == "BUY" else ((entry - price) / entry * 100) return pnl_pct, "TIMEOUT", 500 def run(): start_dt = datetime(2026, 4, 7, 0, 0, tzinfo=timezone.utc) end_dt = datetime(2026, 4, 11, 0, 0, tzinfo=timezone.utc) start_ms = int(start_dt.timestamp() * 1000) end_ms = int(end_dt.timestamp() * 1000) warmup_start = start_dt - timedelta(days=4) warmup_ms = int(warmup_start.timestamp() * 1000) symbols = get_top_symbols(50) print(f"Symbols: {len(symbols)}") symbol_data = {} for idx, sym in enumerate(symbols): klines = fetch_klines_paginated(sym, "5", warmup_ms, end_ms) if klines and len(klines) > 300: symbol_data[sym] = klines if (idx + 1) % 5 == 0: print(f" {idx+1}/{len(symbols)}...") time.sleep(1) else: time.sleep(0.3) print(f"Loaded: {len(symbol_data)} symbols\n") # Generate ALL signals (no EMA filter, only NATR) all_signals = [] for symbol, klines in symbol_data.items(): closes = np.array([float(k[4]) for k in klines]) highs = np.array([float(k[2]) for k in klines]) lows = np.array([float(k[3]) for k in klines]) volumes = np.array([float(k[5]) for k in klines]) timestamps = [int(k[0]) for k in klines] scan_start = None for i, ts in enumerate(timestamps): if ts >= start_ms: scan_start = i break if scan_start is None or scan_start < 210: continue scan_end = len(timestamps) - 1 for i, ts in enumerate(timestamps): if ts > end_ms: scan_end = i break cooldown_until = 0 for idx in range(scan_start, scan_end, 12): if idx <= cooldown_until: continue ind = calc_indicators(highs, lows, closes, volumes, idx) if ind is None: continue if ind["natr"] < NATR_MIN or ind["natr"] > NATR_MAX: continue side = None if ind["z"] < -Z_THRESHOLD: side = "BUY" elif ind["z"] > Z_THRESHOLD: side = "SELL" if not side: continue pnl_pct, reason, duration = simulate_deal(closes, idx, side, TP_PCT, SL_PCT) pnl_usd = pnl_pct / 100 * 3.0 * 3 all_signals.append({ "symbol": symbol, "side": side, "z": ind["z"], "natr": ind["natr"], "ema50_dist": ind["ema50_dist"], "pnl_pct": pnl_pct, "pnl_usd": pnl_usd, "reason": reason, "duration": duration, }) cooldown_until = idx + COOLDOWN_BARS print(f"Total unfiltered signals: {len(all_signals)}") base_pnl = sum(t["pnl_usd"] for t in all_signals) base_w = sum(1 for t in all_signals if t["pnl_usd"] > 0) base_wr = base_w / len(all_signals) * 100 print(f"Baseline (NATR only): {len(all_signals)} | WR {base_wr:.1f}% | PnL ${base_pnl:+.2f}\n") # ═══════════════════════════════════════════════ # PART 1: Granular EMA thresholds (0.5% steps) # ═══════════════════════════════════════════════ print("=" * 85) print("PART 1: EMA50 distance threshold β€” 0.5% steps") print("=" * 85) print(f"\n{'Threshold':<12} {'Trades':>7} {'vs Base':>8} {'WR%':>6} {'PnL':>10} {'Avg':>8} {'SLs':>5} {'SL$':>10} {'$/trade↑':>10}") print("-" * 90) for threshold in [1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0, 5.5, 6.0, 7.0, 8.0, 99]: filtered = [t for t in all_signals if t["ema50_dist"] is not None and (t["ema50_dist"] <= threshold or threshold >= 99)] if not filtered: continue pnl = sum(t["pnl_usd"] for t in filtered) w = sum(1 for t in filtered if t["pnl_usd"] > 0) wr = w / len(filtered) * 100 avg = pnl / len(filtered) sls = sum(1 for t in filtered if t["reason"] == "SL") sl_pnl = sum(t["pnl_usd"] for t in filtered if t["reason"] == "SL") cut = len(all_signals) - len(filtered) label = "ALL" if threshold >= 99 else f"≀{threshold}%" # marginal: what does the NEXT 0.5% add? print(f"{label:<12} {len(filtered):>7} {f'-{cut}':>8} {wr:>5.1f}% ${pnl:>+8.2f} ${avg:>+7.4f} {sls:>5} ${sl_pnl:>+8.2f}") # ═══════════════════════════════════════════════ # PART 2: What's IN each bucket (marginal analysis) # ═══════════════════════════════════════════════ print(f"\n{'='*85}") print("PART 2: MARGINAL ANALYSIS β€” what each 1% band adds") print(f"{'='*85}") bands = [(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7), (7, 99)] print(f"\n{'Band':<12} {'Trades':>7} {'WR%':>6} {'PnL':>10} {'Avg':>8} {'SLs':>5} {'Top Losers'}") print("-" * 90) for lo, hi in bands: band_label = f"{lo}-{hi}%" if hi < 99 else f"{lo}%+" band_trades = [t for t in all_signals if t["ema50_dist"] is not None and lo <= t["ema50_dist"] < hi] if not band_trades: continue pnl = sum(t["pnl_usd"] for t in band_trades) w = sum(1 for t in band_trades if t["pnl_usd"] > 0) wr = w / len(band_trades) * 100 avg = pnl / len(band_trades) sls = sum(1 for t in band_trades if t["reason"] == "SL") # Top losers in this band losers = sorted(band_trades, key=lambda x: x["pnl_usd"])[:3] loser_str = ", ".join(f"{t['symbol'][:8]} ${t['pnl_usd']:+.2f}" for t in losers if t["pnl_usd"] < 0) print(f"{band_label:<12} {len(band_trades):>7} {wr:>5.1f}% ${pnl:>+8.2f} ${avg:>+7.4f} {sls:>5} {loser_str or 'none'}") # ═══════════════════════════════════════════════ # PART 3: Cumulative PnL curve (add trades by dist) # ═══════════════════════════════════════════════ print(f"\n{'='*85}") print("PART 3: CUMULATIVE PnL β€” adding trades from closest β†’ farthest from EMA50") print("(Shows when adding more distant trades starts HURTING)") print(f"{'='*85}") valid = [t for t in all_signals if t["ema50_dist"] is not None] valid.sort(key=lambda x: x["ema50_dist"]) cumul_pnl = 0 peak_pnl = -999 peak_idx = 0 peak_dist = 0 print(f"\n{'#':>5} {'Dist%':>7} {'Symbol':<12} {'Side':<5} {'PnL':>8} {'Cumul':>10} {'WR%':>6}") print("-" * 60) wins = 0 for i, t in enumerate(valid): cumul_pnl += t["pnl_usd"] if t["pnl_usd"] > 0: wins += 1 wr = wins / (i + 1) * 100 if cumul_pnl > peak_pnl: peak_pnl = cumul_pnl peak_idx = i peak_dist = t["ema50_dist"] # Print every 10th + last + around interesting zones if (i + 1) % 10 == 0 or i == len(valid) - 1 or t["pnl_usd"] < -0.5: print(f"{i+1:>5} {t['ema50_dist']:>6.1f}% {t['symbol']:<12} {t['side']:<5} ${t['pnl_usd']:>+6.2f} ${cumul_pnl:>+8.2f} {wr:>5.1f}%") print(f"\nπŸ† Peak cumulative PnL: ${peak_pnl:+.2f} at trade #{peak_idx+1} (EMA50 dist ≀{peak_dist:.1f}%)") # ═══════════════════════════════════════════════ # PART 4: Risk-adjusted: PnL per trade (efficiency) # ═══════════════════════════════════════════════ print(f"\n{'='*85}") print("PART 4: EFFICIENCY β€” PnL/trade Γ— volume (total PnL) for each threshold") print("Finding the OPTIMAL balance between quality and quantity") print(f"{'='*85}") print(f"\n{'Threshold':<12} {'Trades':>7} {'PnL':>10} {'$/trade':>8} {'WR%':>6} {'Profit Factor':>14} {'Max Drawdown':>13}") print("-" * 80) for threshold in [2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0, 6.0, 99]: filtered = [t for t in all_signals if t["ema50_dist"] is not None and (t["ema50_dist"] <= threshold or threshold >= 99)] if not filtered: continue gross_profit = sum(t["pnl_usd"] for t in filtered if t["pnl_usd"] > 0) gross_loss = abs(sum(t["pnl_usd"] for t in filtered if t["pnl_usd"] <= 0)) pf = gross_profit / gross_loss if gross_loss > 0 else 999 pnl = sum(t["pnl_usd"] for t in filtered) avg = pnl / len(filtered) w = sum(1 for t in filtered if t["pnl_usd"] > 0) wr = w / len(filtered) * 100 # Max drawdown cumul = 0 peak = 0 max_dd = 0 for t in filtered: cumul += t["pnl_usd"] if cumul > peak: peak = cumul dd = peak - cumul if dd > max_dd: max_dd = dd label = "ALL" if threshold >= 99 else f"≀{threshold}%" print(f"{label:<12} {len(filtered):>7} ${pnl:>+8.2f} ${avg:>+7.4f} {wr:>5.1f}% {pf:>13.2f} ${max_dd:>11.2f}") # ═══════════════════════════════════════════════ # PART 5: Per-symbol breakdown (who benefits from filter) # ═══════════════════════════════════════════════ print(f"\n{'='*85}") print("PART 5: PER-SYMBOL β€” which coins hurt most at dist>3% vs >5%") print(f"{'='*85}") by_sym_3plus = defaultdict(list) by_sym_5plus = defaultdict(list) for t in all_signals: d = t.get("ema50_dist") if d is None: continue if d > 3.0: by_sym_3plus[t["symbol"]].append(t) if d > 5.0: by_sym_5plus[t["symbol"]].append(t) print(f"\n--- Trades with EMA50 dist 3-5% (the contested zone) ---") zone_3_5 = [t for t in all_signals if t["ema50_dist"] is not None and 3.0 < t["ema50_dist"] <= 5.0] if zone_3_5: by_sym = defaultdict(list) for t in zone_3_5: by_sym[t["symbol"]].append(t) print(f"\n{'Symbol':<15} {'Trades':>7} {'WR%':>6} {'PnL':>10} {'Avg Dist':>9}") print("-" * 55) sym_stats = [] for sym, trades in sorted(by_sym.items()): pnl = sum(t["pnl_usd"] for t in trades) w = sum(1 for t in trades if t["pnl_usd"] > 0) wr = w / len(trades) * 100 avg_dist = np.mean([t["ema50_dist"] for t in trades]) sym_stats.append((sym, len(trades), wr, pnl, avg_dist)) sym_stats.sort(key=lambda x: x[3]) for sym, n, wr, pnl, avg_d in sym_stats: print(f"{sym:<15} {n:>7} {wr:>5.1f}% ${pnl:>+8.2f} {avg_d:>8.1f}%") total_pnl = sum(t["pnl_usd"] for t in zone_3_5) total_w = sum(1 for t in zone_3_5 if t["pnl_usd"] > 0) total_wr = total_w / len(zone_3_5) * 100 print(f"\n{'TOTAL 3-5%':<15} {len(zone_3_5):>7} {total_wr:>5.1f}% ${total_pnl:>+8.2f}") print(f"\nβ†’ Cutting ≀3% LOSES these {len(zone_3_5)} trades (PnL ${total_pnl:+.2f})") print(f"β†’ Cutting ≀5% KEEPS them") if __name__ == "__main__": run()