β ΠΠ°Π·Π°Π΄"""
Deep analysis: EMA50 threshold 3% vs 5% β granular 0.5% steps.
Uses the enriched trade data from backtest_chop_vs_ema (full simulation).
Re-runs simulation with fine-grained EMA thresholds + bucket analysis.
"""
import json
import time
import numpy as np
import requests
from datetime import datetime, timezone, timedelta
from collections import defaultdict
BYBIT_KLINE_URL = "https://api.bybit.com/v5/market/kline"
BYBIT_TICKER_URL = "https://api.bybit.com/v5/market/tickers"
TP_PCT = 1.5
SL_PCT = 10.0
Z_THRESHOLD = 1.8
NATR_MIN = 0.75
NATR_MAX = 2.5
COOLDOWN_BARS = 60
BLACKLIST = {"BTCUSDT", "ETHUSDT", "USDCUSDT"}
def fetch_klines_paginated(symbol, interval, start_ms, end_ms, limit=1000):
all_klines = []
cursor_end = end_ms
while cursor_end > start_ms:
params = {"category": "linear", "symbol": symbol, "interval": interval,
"endTime": cursor_end, "limit": limit}
try:
r = requests.get(BYBIT_KLINE_URL, params=params, timeout=15)
data = r.json()
if data.get("retCode") != 0: break
rows = data["result"]["list"]
if not rows: break
rows.reverse()
for row in rows:
ts = int(row[0])
if start_ms <= ts <= end_ms:
all_klines.append(row)
oldest = int(rows[0][0])
if oldest <= start_ms: break
cursor_end = oldest - 1
time.sleep(0.2)
except Exception as e:
print(f" Error {symbol}: {e}")
break
seen = set()
unique = []
for k in all_klines:
ts = int(k[0])
if ts not in seen:
seen.add(ts)
unique.append(k)
unique.sort(key=lambda x: int(x[0]))
return unique
def get_top_symbols(n=50):
r = requests.get(BYBIT_TICKER_URL, params={"category": "linear"}, timeout=10)
data = r.json()
candidates = []
for t in data["result"]["list"]:
sym = t["symbol"]
if not sym.endswith("USDT") or sym in BLACKLIST: continue
vol = float(t.get("turnover24h", 0))
if vol >= 20_000_000:
candidates.append((sym, vol))
candidates.sort(key=lambda x: x[1], reverse=True)
return [c[0] for c in candidates[:n]]
def calc_indicators(highs, lows, closes, volumes, idx, z_period=50):
if idx < max(z_period + 10, 200):
return None
h = highs[idx - z_period:idx + 1]
l = lows[idx - z_period:idx + 1]
c = closes[idx - z_period:idx + 1]
v = volumes[idx - z_period:idx + 1]
tp = (h + l + c) / 3
cum_tp_vol = np.cumsum(tp * v)
cum_vol = np.cumsum(v)
cum_vol_safe = np.where(cum_vol == 0, 1, cum_vol)
vwap_arr = cum_tp_vol / cum_vol_safe
vwap = vwap_arr[-1]
deviations = c - vwap_arr
std = np.std(deviations)
if std == 0: return None
z_score = float((c[-1] - vwap) / std)
tr_slice = 15
h_tr = highs[idx - tr_slice + 1:idx + 1]
l_tr = lows[idx - tr_slice + 1:idx + 1]
c_tr = closes[idx - tr_slice:idx + 1]
tr = np.maximum(h_tr - l_tr, np.maximum(np.abs(h_tr - c_tr[:-1]), np.abs(l_tr - c_tr[:-1])))
atr = np.mean(tr[-14:])
natr = (atr / closes[idx]) * 100 if closes[idx] > 0 else 0
ema50 = None
if idx >= 50:
ema = closes[0]
mult = 2 / 51
for i in range(1, idx + 1):
ema = (closes[i] - ema) * mult + ema
ema50 = ema
ema50_dist = None
if ema50 and ema50 > 0:
ema50_dist = abs((closes[idx] - ema50) / ema50 * 100)
return {"z": z_score, "natr": natr, "ema50_dist": ema50_dist, "price": float(closes[idx])}
def simulate_deal(closes, start_idx, side, tp_pct, sl_pct):
entry = closes[start_idx]
for i in range(start_idx + 1, min(start_idx + 500, len(closes))):
price = closes[i]
pnl_pct = ((price - entry) / entry * 100) if side == "BUY" else ((entry - price) / entry * 100)
if pnl_pct >= tp_pct: return pnl_pct, "TP", i - start_idx
if pnl_pct <= -sl_pct: return pnl_pct, "SL", i - start_idx
price = closes[min(start_idx + 499, len(closes) - 1)]
pnl_pct = ((price - entry) / entry * 100) if side == "BUY" else ((entry - price) / entry * 100)
return pnl_pct, "TIMEOUT", 500
def run():
start_dt = datetime(2026, 4, 7, 0, 0, tzinfo=timezone.utc)
end_dt = datetime(2026, 4, 11, 0, 0, tzinfo=timezone.utc)
start_ms = int(start_dt.timestamp() * 1000)
end_ms = int(end_dt.timestamp() * 1000)
warmup_start = start_dt - timedelta(days=4)
warmup_ms = int(warmup_start.timestamp() * 1000)
symbols = get_top_symbols(50)
print(f"Symbols: {len(symbols)}")
symbol_data = {}
for idx, sym in enumerate(symbols):
klines = fetch_klines_paginated(sym, "5", warmup_ms, end_ms)
if klines and len(klines) > 300:
symbol_data[sym] = klines
if (idx + 1) % 5 == 0:
print(f" {idx+1}/{len(symbols)}...")
time.sleep(1)
else:
time.sleep(0.3)
print(f"Loaded: {len(symbol_data)} symbols\n")
# Generate ALL signals (no EMA filter, only NATR)
all_signals = []
for symbol, klines in symbol_data.items():
closes = np.array([float(k[4]) for k in klines])
highs = np.array([float(k[2]) for k in klines])
lows = np.array([float(k[3]) for k in klines])
volumes = np.array([float(k[5]) for k in klines])
timestamps = [int(k[0]) for k in klines]
scan_start = None
for i, ts in enumerate(timestamps):
if ts >= start_ms:
scan_start = i
break
if scan_start is None or scan_start < 210:
continue
scan_end = len(timestamps) - 1
for i, ts in enumerate(timestamps):
if ts > end_ms:
scan_end = i
break
cooldown_until = 0
for idx in range(scan_start, scan_end, 12):
if idx <= cooldown_until:
continue
ind = calc_indicators(highs, lows, closes, volumes, idx)
if ind is None: continue
if ind["natr"] < NATR_MIN or ind["natr"] > NATR_MAX: continue
side = None
if ind["z"] < -Z_THRESHOLD: side = "BUY"
elif ind["z"] > Z_THRESHOLD: side = "SELL"
if not side: continue
pnl_pct, reason, duration = simulate_deal(closes, idx, side, TP_PCT, SL_PCT)
pnl_usd = pnl_pct / 100 * 3.0 * 3
all_signals.append({
"symbol": symbol,
"side": side,
"z": ind["z"],
"natr": ind["natr"],
"ema50_dist": ind["ema50_dist"],
"pnl_pct": pnl_pct,
"pnl_usd": pnl_usd,
"reason": reason,
"duration": duration,
})
cooldown_until = idx + COOLDOWN_BARS
print(f"Total unfiltered signals: {len(all_signals)}")
base_pnl = sum(t["pnl_usd"] for t in all_signals)
base_w = sum(1 for t in all_signals if t["pnl_usd"] > 0)
base_wr = base_w / len(all_signals) * 100
print(f"Baseline (NATR only): {len(all_signals)} | WR {base_wr:.1f}% | PnL ${base_pnl:+.2f}\n")
# βββββββββββββββββββββββββββββββββββββββββββββββ
# PART 1: Granular EMA thresholds (0.5% steps)
# βββββββββββββββββββββββββββββββββββββββββββββββ
print("=" * 85)
print("PART 1: EMA50 distance threshold β 0.5% steps")
print("=" * 85)
print(f"\n{'Threshold':<12} {'Trades':>7} {'vs Base':>8} {'WR%':>6} {'PnL':>10} {'Avg':>8} {'SLs':>5} {'SL$':>10} {'$/tradeβ':>10}")
print("-" * 90)
for threshold in [1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0, 5.5, 6.0, 7.0, 8.0, 99]:
filtered = [t for t in all_signals if t["ema50_dist"] is not None and (t["ema50_dist"] <= threshold or threshold >= 99)]
if not filtered: continue
pnl = sum(t["pnl_usd"] for t in filtered)
w = sum(1 for t in filtered if t["pnl_usd"] > 0)
wr = w / len(filtered) * 100
avg = pnl / len(filtered)
sls = sum(1 for t in filtered if t["reason"] == "SL")
sl_pnl = sum(t["pnl_usd"] for t in filtered if t["reason"] == "SL")
cut = len(all_signals) - len(filtered)
label = "ALL" if threshold >= 99 else f"β€{threshold}%"
# marginal: what does the NEXT 0.5% add?
print(f"{label:<12} {len(filtered):>7} {f'-{cut}':>8} {wr:>5.1f}% ${pnl:>+8.2f} ${avg:>+7.4f} {sls:>5} ${sl_pnl:>+8.2f}")
# βββββββββββββββββββββββββββββββββββββββββββββββ
# PART 2: What's IN each bucket (marginal analysis)
# βββββββββββββββββββββββββββββββββββββββββββββββ
print(f"\n{'='*85}")
print("PART 2: MARGINAL ANALYSIS β what each 1% band adds")
print(f"{'='*85}")
bands = [(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7), (7, 99)]
print(f"\n{'Band':<12} {'Trades':>7} {'WR%':>6} {'PnL':>10} {'Avg':>8} {'SLs':>5} {'Top Losers'}")
print("-" * 90)
for lo, hi in bands:
band_label = f"{lo}-{hi}%" if hi < 99 else f"{lo}%+"
band_trades = [t for t in all_signals
if t["ema50_dist"] is not None and lo <= t["ema50_dist"] < hi]
if not band_trades: continue
pnl = sum(t["pnl_usd"] for t in band_trades)
w = sum(1 for t in band_trades if t["pnl_usd"] > 0)
wr = w / len(band_trades) * 100
avg = pnl / len(band_trades)
sls = sum(1 for t in band_trades if t["reason"] == "SL")
# Top losers in this band
losers = sorted(band_trades, key=lambda x: x["pnl_usd"])[:3]
loser_str = ", ".join(f"{t['symbol'][:8]} ${t['pnl_usd']:+.2f}" for t in losers if t["pnl_usd"] < 0)
print(f"{band_label:<12} {len(band_trades):>7} {wr:>5.1f}% ${pnl:>+8.2f} ${avg:>+7.4f} {sls:>5} {loser_str or 'none'}")
# βββββββββββββββββββββββββββββββββββββββββββββββ
# PART 3: Cumulative PnL curve (add trades by dist)
# βββββββββββββββββββββββββββββββββββββββββββββββ
print(f"\n{'='*85}")
print("PART 3: CUMULATIVE PnL β adding trades from closest β farthest from EMA50")
print("(Shows when adding more distant trades starts HURTING)")
print(f"{'='*85}")
valid = [t for t in all_signals if t["ema50_dist"] is not None]
valid.sort(key=lambda x: x["ema50_dist"])
cumul_pnl = 0
peak_pnl = -999
peak_idx = 0
peak_dist = 0
print(f"\n{'#':>5} {'Dist%':>7} {'Symbol':<12} {'Side':<5} {'PnL':>8} {'Cumul':>10} {'WR%':>6}")
print("-" * 60)
wins = 0
for i, t in enumerate(valid):
cumul_pnl += t["pnl_usd"]
if t["pnl_usd"] > 0: wins += 1
wr = wins / (i + 1) * 100
if cumul_pnl > peak_pnl:
peak_pnl = cumul_pnl
peak_idx = i
peak_dist = t["ema50_dist"]
# Print every 10th + last + around interesting zones
if (i + 1) % 10 == 0 or i == len(valid) - 1 or t["pnl_usd"] < -0.5:
print(f"{i+1:>5} {t['ema50_dist']:>6.1f}% {t['symbol']:<12} {t['side']:<5} ${t['pnl_usd']:>+6.2f} ${cumul_pnl:>+8.2f} {wr:>5.1f}%")
print(f"\nπ Peak cumulative PnL: ${peak_pnl:+.2f} at trade #{peak_idx+1} (EMA50 dist β€{peak_dist:.1f}%)")
# βββββββββββββββββββββββββββββββββββββββββββββββ
# PART 4: Risk-adjusted: PnL per trade (efficiency)
# βββββββββββββββββββββββββββββββββββββββββββββββ
print(f"\n{'='*85}")
print("PART 4: EFFICIENCY β PnL/trade Γ volume (total PnL) for each threshold")
print("Finding the OPTIMAL balance between quality and quantity")
print(f"{'='*85}")
print(f"\n{'Threshold':<12} {'Trades':>7} {'PnL':>10} {'$/trade':>8} {'WR%':>6} {'Profit Factor':>14} {'Max Drawdown':>13}")
print("-" * 80)
for threshold in [2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0, 6.0, 99]:
filtered = [t for t in all_signals if t["ema50_dist"] is not None and (t["ema50_dist"] <= threshold or threshold >= 99)]
if not filtered: continue
gross_profit = sum(t["pnl_usd"] for t in filtered if t["pnl_usd"] > 0)
gross_loss = abs(sum(t["pnl_usd"] for t in filtered if t["pnl_usd"] <= 0))
pf = gross_profit / gross_loss if gross_loss > 0 else 999
pnl = sum(t["pnl_usd"] for t in filtered)
avg = pnl / len(filtered)
w = sum(1 for t in filtered if t["pnl_usd"] > 0)
wr = w / len(filtered) * 100
# Max drawdown
cumul = 0
peak = 0
max_dd = 0
for t in filtered:
cumul += t["pnl_usd"]
if cumul > peak: peak = cumul
dd = peak - cumul
if dd > max_dd: max_dd = dd
label = "ALL" if threshold >= 99 else f"β€{threshold}%"
print(f"{label:<12} {len(filtered):>7} ${pnl:>+8.2f} ${avg:>+7.4f} {wr:>5.1f}% {pf:>13.2f} ${max_dd:>11.2f}")
# βββββββββββββββββββββββββββββββββββββββββββββββ
# PART 5: Per-symbol breakdown (who benefits from filter)
# βββββββββββββββββββββββββββββββββββββββββββββββ
print(f"\n{'='*85}")
print("PART 5: PER-SYMBOL β which coins hurt most at dist>3% vs >5%")
print(f"{'='*85}")
by_sym_3plus = defaultdict(list)
by_sym_5plus = defaultdict(list)
for t in all_signals:
d = t.get("ema50_dist")
if d is None: continue
if d > 3.0: by_sym_3plus[t["symbol"]].append(t)
if d > 5.0: by_sym_5plus[t["symbol"]].append(t)
print(f"\n--- Trades with EMA50 dist 3-5% (the contested zone) ---")
zone_3_5 = [t for t in all_signals if t["ema50_dist"] is not None and 3.0 < t["ema50_dist"] <= 5.0]
if zone_3_5:
by_sym = defaultdict(list)
for t in zone_3_5: by_sym[t["symbol"]].append(t)
print(f"\n{'Symbol':<15} {'Trades':>7} {'WR%':>6} {'PnL':>10} {'Avg Dist':>9}")
print("-" * 55)
sym_stats = []
for sym, trades in sorted(by_sym.items()):
pnl = sum(t["pnl_usd"] for t in trades)
w = sum(1 for t in trades if t["pnl_usd"] > 0)
wr = w / len(trades) * 100
avg_dist = np.mean([t["ema50_dist"] for t in trades])
sym_stats.append((sym, len(trades), wr, pnl, avg_dist))
sym_stats.sort(key=lambda x: x[3])
for sym, n, wr, pnl, avg_d in sym_stats:
print(f"{sym:<15} {n:>7} {wr:>5.1f}% ${pnl:>+8.2f} {avg_d:>8.1f}%")
total_pnl = sum(t["pnl_usd"] for t in zone_3_5)
total_w = sum(1 for t in zone_3_5 if t["pnl_usd"] > 0)
total_wr = total_w / len(zone_3_5) * 100
print(f"\n{'TOTAL 3-5%':<15} {len(zone_3_5):>7} {total_wr:>5.1f}% ${total_pnl:>+8.2f}")
print(f"\nβ Cutting β€3% LOSES these {len(zone_3_5)} trades (PnL ${total_pnl:+.2f})")
print(f"β Cutting β€5% KEEPS them")
if __name__ == "__main__":
run()