"""
Backtest: CHOPβ₯45 (keep) + EMA50 distance cap (add) β loose filter for fat losers.
Uses full simulation Apr 7-10.
"""
import json, time, numpy as np, requests
from datetime import datetime, timezone, timedelta
from collections import defaultdict
BYBIT_KLINE_URL = "https://api.bybit.com/v5/market/kline"
BYBIT_TICKER_URL = "https://api.bybit.com/v5/market/tickers"
TP_PCT = 1.5; SL_PCT = 10.0; Z_THRESHOLD = 1.8
NATR_MIN = 0.75; NATR_MAX = 2.5; COOLDOWN_BARS = 60
BLACKLIST = {"BTCUSDT", "ETHUSDT", "USDCUSDT"}
def fetch_klines_paginated(symbol, interval, start_ms, end_ms, limit=1000):
all_klines = []; cursor_end = end_ms
while cursor_end > start_ms:
params = {"category": "linear", "symbol": symbol, "interval": interval,
"endTime": cursor_end, "limit": limit}
try:
r = requests.get(BYBIT_KLINE_URL, params=params, timeout=15)
data = r.json()
if data.get("retCode") != 0: break
rows = data["result"]["list"]
if not rows: break
rows.reverse()
for row in rows:
ts = int(row[0])
if start_ms <= ts <= end_ms: all_klines.append(row)
oldest = int(rows[0][0])
if oldest <= start_ms: break
cursor_end = oldest - 1; time.sleep(0.2)
except: break
seen = set(); unique = []
for k in all_klines:
ts = int(k[0])
if ts not in seen: seen.add(ts); unique.append(k)
unique.sort(key=lambda x: int(x[0])); return unique
def get_top_symbols(n=50):
r = requests.get(BYBIT_TICKER_URL, params={"category": "linear"}, timeout=10)
data = r.json()
cands = [(t["symbol"], float(t.get("turnover24h", 0))) for t in data["result"]["list"]
if t["symbol"].endswith("USDT") and t["symbol"] not in BLACKLIST and float(t.get("turnover24h", 0)) >= 20_000_000]
cands.sort(key=lambda x: x[1], reverse=True)
return [c[0] for c in cands[:n]]
def calc_indicators(highs, lows, closes, volumes, idx, z_period=50):
if idx < max(z_period + 10, 200): return None
h = highs[idx-z_period:idx+1]; l = lows[idx-z_period:idx+1]
c = closes[idx-z_period:idx+1]; v = volumes[idx-z_period:idx+1]
tp = (h+l+c)/3; ctv = np.cumsum(tp*v); cv = np.cumsum(v)
cvs = np.where(cv==0, 1, cv); vwap_arr = ctv/cvs
dev = c - vwap_arr; std = np.std(dev)
if std == 0: return None
z = float((c[-1] - vwap_arr[-1]) / std)
s = 15; h_tr = highs[idx-s+1:idx+1]; l_tr = lows[idx-s+1:idx+1]; c_tr = closes[idx-s:idx+1]
tr = np.maximum(h_tr-l_tr, np.maximum(np.abs(h_tr-c_tr[:-1]), np.abs(l_tr-c_tr[:-1])))
natr = (np.mean(tr[-14:]) / closes[idx]) * 100 if closes[idx] > 0 else 0
chop = 50.0
ct = tr[-14:]; atr_sum = np.sum(ct)
hi = np.max(h_tr[-14:]); lo = np.min(l_tr[-14:])
if hi > lo: chop = float(100 * np.log10(atr_sum / (hi-lo)) / np.log10(14))
ema50 = None
if idx >= 50:
ema = closes[0]; mult = 2/51
for i in range(1, idx+1): ema = (closes[i]-ema)*mult + ema
ema50 = ema
ema50_dist = abs((closes[idx]-ema50)/ema50*100) if ema50 and ema50 > 0 else None
return {"z": z, "natr": natr, "chop": chop, "ema50_dist": ema50_dist, "price": float(closes[idx])}
def simulate_deal(closes, start_idx, side):
entry = closes[start_idx]
for i in range(start_idx+1, min(start_idx+500, len(closes))):
p = closes[i]
pnl = ((p-entry)/entry*100) if side=="BUY" else ((entry-p)/entry*100)
if pnl >= TP_PCT: return pnl, "TP", i-start_idx
if pnl <= -SL_PCT: return pnl, "SL", i-start_idx
p = closes[min(start_idx+499, len(closes)-1)]
pnl = ((p-entry)/entry*100) if side=="BUY" else ((entry-p)/entry*100)
return pnl, "TIMEOUT", 500
def run():
start_dt = datetime(2026, 4, 7, 0, 0, tzinfo=timezone.utc)
end_dt = datetime(2026, 4, 11, 0, 0, tzinfo=timezone.utc)
start_ms = int(start_dt.timestamp()*1000); end_ms = int(end_dt.timestamp()*1000)
warmup_ms = int((start_dt - timedelta(days=4)).timestamp()*1000)
symbols = get_top_symbols(50)
print(f"Symbols: {len(symbols)}")
symbol_data = {}
for idx, sym in enumerate(symbols):
kl = fetch_klines_paginated(sym, "5", warmup_ms, end_ms)
if kl and len(kl) > 300: symbol_data[sym] = kl
if (idx+1) % 5 == 0: print(f" {idx+1}/{len(symbols)}..."); time.sleep(1)
else: time.sleep(0.3)
print(f"Loaded: {len(symbol_data)}\n")
# Generate ALL signals with NATR filter only (no CHOP, no EMA)
all_signals = []
for symbol, klines in symbol_data.items():
closes = np.array([float(k[4]) for k in klines])
highs = np.array([float(k[2]) for k in klines])
lows = np.array([float(k[3]) for k in klines])
volumes = np.array([float(k[5]) for k in klines])
timestamps = [int(k[0]) for k in klines]
scan_start = next((i for i, ts in enumerate(timestamps) if ts >= start_ms), None)
if scan_start is None or scan_start < 210: continue
scan_end = next((i for i, ts in enumerate(timestamps) if ts > end_ms), len(timestamps)-1)
cd = 0
for idx in range(scan_start, scan_end, 12):
if idx <= cd: continue
ind = calc_indicators(highs, lows, closes, volumes, idx)
if ind is None: continue
if ind["natr"] < NATR_MIN or ind["natr"] > NATR_MAX: continue
side = "BUY" if ind["z"] < -Z_THRESHOLD else ("SELL" if ind["z"] > Z_THRESHOLD else None)
if not side: continue
pnl_pct, reason, dur = simulate_deal(closes, idx, side)
pnl_usd = pnl_pct / 100 * 9 # $3 Γ 3x
all_signals.append({
"symbol": symbol, "side": side, "z": ind["z"], "natr": ind["natr"],
"chop": ind["chop"], "ema50_dist": ind["ema50_dist"],
"pnl_usd": pnl_usd, "reason": reason, "duration": dur,
})
cd = idx + COOLDOWN_BARS
print(f"Total signals (NATR only): {len(all_signals)}")
# βββββββββββββββββββββββββββββββββββββββββββββββ
# TEST CONFIGS
# βββββββββββββββββββββββββββββββββββββββββββββββ
configs = [
("No filters", None, None),
("CHOPβ₯45 only", 45, None),
("CHOPβ₯45 + EMAβ€3%", 45, 3.0),
("CHOPβ₯45 + EMAβ€4%", 45, 4.0),
("CHOPβ₯45 + EMAβ€5%", 45, 5.0),
("CHOPβ₯45 + EMAβ€6%", 45, 6.0),
("CHOPβ₯45 + EMAβ€7%", 45, 7.0),
("CHOPβ₯45 + EMAβ€8%", 45, 8.0),
("CHOPβ₯45 + EMAβ€10%", 45, 10.0),
]
print(f"\n{'='*90}")
print(f"{'Config':<25} {'Trades':>7} {'Cut':>5} {'WR%':>6} {'PnL':>10} {'Avg':>8} {'SLs':>5} {'SL PnL':>10} {'PF':>6}")
print("="*90)
for name, chop_min, ema_max in configs:
filtered = []
for t in all_signals:
if chop_min is not None and t["chop"] < chop_min: continue
if ema_max is not None and t["ema50_dist"] is not None and t["ema50_dist"] > ema_max: continue
filtered.append(t)
if not filtered: continue
pnl = sum(t["pnl_usd"] for t in filtered)
w = sum(1 for t in filtered if t["pnl_usd"] > 0)
wr = w / len(filtered) * 100
avg = pnl / len(filtered)
sls = sum(1 for t in filtered if t["reason"] == "SL")
sl_pnl = sum(t["pnl_usd"] for t in filtered if t["reason"] == "SL")
gp = sum(t["pnl_usd"] for t in filtered if t["pnl_usd"] > 0)
gl = abs(sum(t["pnl_usd"] for t in filtered if t["pnl_usd"] <= 0))
pf = gp / gl if gl > 0 else 999
cut = len(all_signals) - len(filtered)
print(f"{name:<25} {len(filtered):>7} {f'-{cut}':>5} {wr:>5.1f}% ${pnl:>+8.2f} ${avg:>+7.4f} {sls:>5} ${sl_pnl:>+8.2f} {pf:>5.2f}")
# βββββββββββββββββββββββββββββββββββββββββββββββ
# What EMA catches that CHOP misses (CHOP passes, EMA blocks)
# βββββββββββββββββββββββββββββββββββββββββββββββ
print(f"\n{'='*90}")
print("WHAT DOES EMA CATCH THAT CHOP MISSES?")
print(f"{'='*90}")
for ema_cap in [5.0, 6.0, 7.0, 8.0]:
# Trades that pass CHOPβ₯45 but have EMA > cap
caught = [t for t in all_signals if t["chop"] >= 45 and t["ema50_dist"] is not None and t["ema50_dist"] > ema_cap]
if not caught: continue
pnl = sum(t["pnl_usd"] for t in caught)
w = sum(1 for t in caught if t["pnl_usd"] > 0)
wr = w / len(caught) * 100
sls = [t for t in caught if t["reason"] == "SL"]
print(f"\nCHOP passes but EMA50 > {ema_cap}%: {len(caught)} trades | WR {wr:.0f}% | PnL ${pnl:+.2f}")
# Show them
caught.sort(key=lambda x: x["pnl_usd"])
for t in caught:
emoji = "β" if t["pnl_usd"] < -0.3 else ("β " if t["pnl_usd"] > 0 else "βͺ")
print(f" {emoji} {t['symbol']:<15} {t['side']:<5} dist={t['ema50_dist']:.1f}% PnL ${t['pnl_usd']:+.2f} CHOP={t['chop']:.0f} {t['reason']}")
# βββββββββββββββββββββββββββββββββββββββββββββββ
# Marginal: what each EMA band adds/removes from CHOP baseline
# βββββββββββββββββββββββββββββββββββββββββββββββ
print(f"\n{'='*90}")
print("MARGINAL: trades REMOVED from CHOP baseline by each EMA band")
print(f"{'='*90}")
chop_trades = [t for t in all_signals if t["chop"] >= 45]
bands = [(3, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 10), (10, 99)]
print(f"\n{'EMA Band':<12} {'Removed':>8} {'WR%':>6} {'PnL':>10} {'Verdict'}")
print("-" * 55)
for lo, hi in bands:
removed = [t for t in chop_trades if t["ema50_dist"] is not None and lo <= t["ema50_dist"] < hi]
if not removed: continue
pnl = sum(t["pnl_usd"] for t in removed)
w = sum(1 for t in removed if t["pnl_usd"] > 0)
wr = w / len(removed) * 100
verdict = "β CUT" if pnl < -0.5 else ("β οΈ MIXED" if pnl < 0 else "β KEEP")
label = f"{lo}-{hi}%" if hi < 99 else f"{lo}%+"
print(f"{label:<12} {len(removed):>8} {wr:>5.1f}% ${pnl:>+8.2f} {verdict}")
if __name__ == "__main__":
run()