"""
Backtest v3: CHOP vs EMA50 distance filter.
Full simulation: scan historical 5m klines, generate Z-VWAP signals,
simulate DCA deals with TP/SL, compare filter combos:
A) CHOP β₯ 45 only (current)
B) EMA50 dist β€ 3% only
C) EMA50 dist β€ 5% only
D) CHOP β₯ 45 + EMA50 β€ 5%
E) EMA50 β€ 5% (no CHOP)
F) No filters at all
Uses 5m klines from Bybit, Apr 7β10, top 40 symbols by volume.
"""
import json
import time
import numpy as np
import requests
from datetime import datetime, timezone, timedelta
from collections import defaultdict
BYBIT_KLINE_URL = "https://api.bybit.com/v5/market/kline"
BYBIT_TICKER_URL = "https://api.bybit.com/v5/market/tickers"
# DCA config (matches bot)
TP_PCT = 1.5
SL_PCT = 10.0 # updated
Z_THRESHOLD = 1.8
Z_EXIT = 0.3
NATR_MIN = 0.75
NATR_MAX = 2.5
COOLDOWN_BARS = 60 # 60 bars = 5h cooldown (in 5m bars)
MAX_DEALS = 6
BLACKLIST = {"BTCUSDT", "ETHUSDT", "USDCUSDT"}
def fetch_klines_paginated(symbol, interval, start_ms, end_ms, limit=1000):
"""Fetch all klines between start and end, paginating."""
all_klines = []
cursor_end = end_ms
while cursor_end > start_ms:
params = {
"category": "linear",
"symbol": symbol,
"interval": interval,
"endTime": cursor_end,
"limit": limit,
}
try:
r = requests.get(BYBIT_KLINE_URL, params=params, timeout=15)
data = r.json()
if data.get("retCode") != 0:
break
rows = data["result"]["list"]
if not rows:
break
rows.reverse()
# Filter to only rows within our range
for row in rows:
ts = int(row[0])
if ts >= start_ms and ts <= end_ms:
all_klines.append(row)
# Move cursor back
oldest = int(rows[0][0])
if oldest <= start_ms:
break
cursor_end = oldest - 1
time.sleep(0.2)
except Exception as e:
print(f" Error fetching {symbol}: {e}")
break
# Deduplicate and sort
seen = set()
unique = []
for k in all_klines:
ts = int(k[0])
if ts not in seen:
seen.add(ts)
unique.append(k)
unique.sort(key=lambda x: int(x[0]))
return unique
def get_top_symbols(n=50):
"""Get top N USDT perp symbols by 24h volume."""
try:
r = requests.get(BYBIT_TICKER_URL, params={"category": "linear"}, timeout=10)
data = r.json()
tickers = data["result"]["list"]
candidates = []
for t in tickers:
sym = t["symbol"]
if not sym.endswith("USDT") or sym in BLACKLIST:
continue
vol = float(t.get("turnover24h", 0))
if vol >= 20_000_000:
candidates.append((sym, vol))
candidates.sort(key=lambda x: x[1], reverse=True)
return [c[0] for c in candidates[:n]]
except Exception as e:
print(f"Error fetching tickers: {e}")
return []
def calc_indicators(highs, lows, closes, volumes, idx, z_period=50):
"""Calculate Z-VWAP, NATR, CHOP, EMA50 at bar index idx."""
if idx < max(z_period + 10, 200):
return None
# Slice for Z-VWAP
h = highs[idx - z_period:idx + 1]
l = lows[idx - z_period:idx + 1]
c = closes[idx - z_period:idx + 1]
v = volumes[idx - z_period:idx + 1]
# Z-VWAP
tp = (h + l + c) / 3
cum_tp_vol = np.cumsum(tp * v)
cum_vol = np.cumsum(v)
cum_vol_safe = np.where(cum_vol == 0, 1, cum_vol)
vwap_arr = cum_tp_vol / cum_vol_safe
vwap = vwap_arr[-1]
deviations = c - vwap_arr
std = np.std(deviations)
if std == 0:
return None
z_score = float((c[-1] - vwap) / std)
# True Range for NATR + CHOP (shared)
tr_slice = 15 # need 15 bars for TR(14)
h_tr = highs[idx - tr_slice + 1:idx + 1]
l_tr = lows[idx - tr_slice + 1:idx + 1]
c_tr = closes[idx - tr_slice:idx + 1] # one extra for prev close
tr = np.maximum(h_tr - l_tr,
np.maximum(np.abs(h_tr - c_tr[:-1]),
np.abs(l_tr - c_tr[:-1])))
# NATR (14-period ATR / close * 100)
atr = np.mean(tr[-14:])
natr = (atr / closes[idx]) * 100 if closes[idx] > 0 else 0
# CHOP (14-period)
chop = 50.0
chop_tr = tr[-14:]
atr_sum = np.sum(chop_tr)
highest = np.max(h_tr[-14:])
lowest = np.min(l_tr[-14:])
if highest > lowest:
chop = float(100 * np.log10(atr_sum / (highest - lowest)) / np.log10(14))
# EMA 50
ema50 = None
if idx >= 50:
ema = closes[0]
mult = 2 / 51
for i in range(1, idx + 1):
ema = (closes[i] - ema) * mult + ema
ema50 = ema
# EMA distance (absolute %)
ema50_dist = None
if ema50 and ema50 > 0:
ema50_dist = abs((closes[idx] - ema50) / ema50 * 100)
return {
"z": z_score,
"natr": natr,
"chop": chop,
"ema50": ema50,
"ema50_dist": ema50_dist,
"price": float(closes[idx]),
}
def simulate_deal(closes, start_idx, side, tp_pct, sl_pct):
"""Simulate a simple DCA deal (BO only, no SOs for speed).
Returns (pnl_pct, reason, duration_bars)."""
entry = closes[start_idx]
for i in range(start_idx + 1, min(start_idx + 500, len(closes))):
price = closes[i]
if side == "BUY":
pnl_pct = (price - entry) / entry * 100
else:
pnl_pct = (entry - price) / entry * 100
if pnl_pct >= tp_pct:
return pnl_pct, "TP", i - start_idx
if pnl_pct <= -sl_pct:
return pnl_pct, "SL", i - start_idx
# Timeout β close at last bar
price = closes[min(start_idx + 499, len(closes) - 1)]
if side == "BUY":
pnl_pct = (price - entry) / entry * 100
else:
pnl_pct = (entry - price) / entry * 100
return pnl_pct, "TIMEOUT", 500
def run():
# Period: Apr 7β10 (4 days)
start_dt = datetime(2026, 4, 7, 0, 0, tzinfo=timezone.utc)
end_dt = datetime(2026, 4, 11, 0, 0, tzinfo=timezone.utc)
start_ms = int(start_dt.timestamp() * 1000)
end_ms = int(end_dt.timestamp() * 1000)
# Need extra history for EMA200 warmup
warmup_start = start_dt - timedelta(days=4) # 4 extra days
warmup_ms = int(warmup_start.timestamp() * 1000)
symbols = get_top_symbols(50)
print(f"Top symbols: {len(symbols)}")
# Fetch klines for all symbols
symbol_data = {}
for idx, symbol in enumerate(symbols):
klines = fetch_klines_paginated(symbol, "5", warmup_ms, end_ms)
if klines and len(klines) > 300:
symbol_data[symbol] = klines
if (idx + 1) % 5 == 0:
print(f" Fetched {idx + 1}/{len(symbols)} symbols ({len(klines) if klines else 0} bars)...")
time.sleep(1)
else:
time.sleep(0.3)
print(f"\nLoaded {len(symbol_data)} symbols with data")
# βββββββββββββββββββββββββββββββββββββββββββββββ
# SCAN + SIMULATE for each filter combo
# βββββββββββββββββββββββββββββββββββββββββββββββ
filter_configs = {
"A) CHOPβ₯45 only (current)": {"chop_min": 45, "ema_max": None},
"B) EMA50β€3% only": {"chop_min": None, "ema_max": 3.0},
"C) EMA50β€5% only": {"chop_min": None, "ema_max": 5.0},
"D) CHOPβ₯45 + EMA50β€3%": {"chop_min": 45, "ema_max": 3.0},
"E) CHOPβ₯45 + EMA50β€5%": {"chop_min": 45, "ema_max": 5.0},
"F) EMA50β€4% only": {"chop_min": None, "ema_max": 4.0},
"G) EMA50β€6% only": {"chop_min": None, "ema_max": 6.0},
"H) No filters": {"chop_min": None, "ema_max": None},
}
# Find start index (where Apr 7 begins in the data)
results = {}
for config_name, config in filter_configs.items():
all_trades = []
for symbol, klines in symbol_data.items():
closes = np.array([float(k[4]) for k in klines])
highs = np.array([float(k[2]) for k in klines])
lows = np.array([float(k[3]) for k in klines])
volumes = np.array([float(k[5]) for k in klines])
timestamps = [int(k[0]) for k in klines]
# Find start of Apr 7 in this data
scan_start = None
for i, ts in enumerate(timestamps):
if ts >= start_ms:
scan_start = i
break
if scan_start is None or scan_start < 210:
continue
# Find end of Apr 10
scan_end = len(timestamps) - 1
for i, ts in enumerate(timestamps):
if ts > end_ms:
scan_end = i
break
cooldown_until = 0
# Scan every 12 bars (= 60 min, like bot scan_interval)
for idx in range(scan_start, scan_end, 12):
if idx <= cooldown_until:
continue
ind = calc_indicators(highs, lows, closes, volumes, idx)
if ind is None:
continue
z = ind["z"]
natr = ind["natr"]
chop = ind["chop"]
ema_dist = ind["ema50_dist"]
# NATR filter always on
if natr < NATR_MIN or natr > NATR_MAX:
continue
# Entry signal
side = None
if z < -Z_THRESHOLD:
side = "BUY"
elif z > Z_THRESHOLD:
side = "SELL"
if not side:
continue
# Apply filter combo
if config["chop_min"] is not None and chop < config["chop_min"]:
continue
if config["ema_max"] is not None and ema_dist is not None and ema_dist > config["ema_max"]:
continue
# Simulate deal
pnl_pct, reason, duration = simulate_deal(closes, idx, side, TP_PCT, SL_PCT)
# Convert to USD (approx BO=$3 at 3x)
pnl_usd = pnl_pct / 100 * 3.0 * 3 # $3 BO Γ 3x leverage
all_trades.append({
"symbol": symbol,
"side": side,
"z": z,
"natr": natr,
"chop": chop,
"ema_dist": ema_dist,
"pnl_pct": pnl_pct,
"pnl_usd": pnl_usd,
"reason": reason,
"duration": duration,
})
# Cooldown
cooldown_until = idx + COOLDOWN_BARS
# Aggregate
if not all_trades:
results[config_name] = {"trades": 0}
continue
total_pnl = sum(t["pnl_usd"] for t in all_trades)
wins = sum(1 for t in all_trades if t["pnl_usd"] > 0)
losses = len(all_trades) - wins
wr = wins / len(all_trades) * 100
avg_pnl = total_pnl / len(all_trades)
sl_count = sum(1 for t in all_trades if t["reason"] == "SL")
tp_count = sum(1 for t in all_trades if t["reason"] == "TP")
sl_pnl = sum(t["pnl_usd"] for t in all_trades if t["reason"] == "SL")
results[config_name] = {
"trades": len(all_trades),
"wins": wins,
"losses": losses,
"wr": wr,
"pnl": total_pnl,
"avg_pnl": avg_pnl,
"sl_count": sl_count,
"tp_count": tp_count,
"sl_pnl": sl_pnl,
"all_trades": all_trades,
}
# βββββββββββββββββββββββββββββββββββββββββββββββ
# PRINT RESULTS
# βββββββββββββββββββββββββββββββββββββββββββββββ
print(f"\n{'='*85}")
print(f"RESULTS: Apr 7-10, {len(symbol_data)} symbols, BO-only simulation ($3 BO, 3x lev)")
print(f"{'='*85}")
print(f"\n{'Config':<30} {'Trades':>7} {'WR%':>6} {'PnL':>10} {'Avg':>8} {'SLs':>5} {'SL PnL':>10} {'TPs':>5}")
print("-" * 90)
for name in filter_configs:
r = results.get(name, {})
if r["trades"] == 0:
print(f"{name:<30} {'0':>7}")
continue
print(
f"{name:<30} {r['trades']:>7} {r['wr']:>5.1f}% ${r['pnl']:>+8.2f} "
f"${r['avg_pnl']:>+7.4f} {r['sl_count']:>5} ${r['sl_pnl']:>+8.2f} {r['tp_count']:>5}"
)
# βββββββββββββββββββββββββββββββββββββββββββββββ
# PER-FILTER: trades that differ
# βββββββββββββββββββββββββββββββββββββββββββββββ
print(f"\n{'='*85}")
print("COMPARISON: What CHOP blocks vs what EMA50 blocks")
print(f"{'='*85}")
# Get trade sets for A (CHOP only) and C (EMA50β€5% only)
a_trades = results.get("A) CHOPβ₯45 only (current)", {}).get("all_trades", [])
c_trades = results.get("C) EMA50β€5% only", {}).get("all_trades", [])
h_trades = results.get("H) No filters", {}).get("all_trades", [])
if h_trades:
# Trades that CHOP blocks but EMA5% would allow
chop_keys = set(f"{t['symbol']}_{t['z']:.2f}" for t in a_trades)
ema5_keys = set(f"{t['symbol']}_{t['z']:.2f}" for t in c_trades)
nofilter_keys = set(f"{t['symbol']}_{t['z']:.2f}" for t in h_trades)
# All unfiltered trades with CHOP/EMA info
print(f"\nUnfiltered trades: {len(h_trades)}")
chop_blocked = [t for t in h_trades if t["chop"] < 45]
ema5_blocked = [t for t in h_trades if t["ema_dist"] is not None and t["ema_dist"] > 5.0]
both_blocked = [t for t in h_trades if t["chop"] < 45 and t.get("ema_dist", 0) and t["ema_dist"] > 5.0]
chop_pnl = sum(t["pnl_usd"] for t in chop_blocked)
ema5_pnl = sum(t["pnl_usd"] for t in ema5_blocked)
both_pnl = sum(t["pnl_usd"] for t in both_blocked)
chop_only_blocked = [t for t in h_trades if t["chop"] < 45 and (t.get("ema_dist") is None or t["ema_dist"] <= 5.0)]
ema_only_blocked = [t for t in h_trades if t["chop"] >= 45 and t.get("ema_dist") is not None and t["ema_dist"] > 5.0]
print(f"\nCHOP<45 blocks: {len(chop_blocked)} trades β PnL ${chop_pnl:+.2f}")
print(f"EMA50>5% blocks: {len(ema5_blocked)} trades β PnL ${ema5_pnl:+.2f}")
print(f"Both block (overlap): {len(both_blocked)} trades β PnL ${both_pnl:+.2f}")
print(f"CHOP-only blocks (EMA would allow): {len(chop_only_blocked)} trades β PnL ${sum(t['pnl_usd'] for t in chop_only_blocked):+.2f}")
print(f"EMA-only blocks (CHOP would allow): {len(ema_only_blocked)} trades β PnL ${sum(t['pnl_usd'] for t in ema_only_blocked):+.2f}")
# Distribution
print(f"\n--- CHOP-only blocked trades (CHOP kills, EMA50 β€5% allows) ---")
if chop_only_blocked:
w = sum(1 for t in chop_only_blocked if t["pnl_usd"] > 0)
wr = w / len(chop_only_blocked) * 100
pnl = sum(t["pnl_usd"] for t in chop_only_blocked)
print(f" {len(chop_only_blocked)} trades | WR {wr:.1f}% | PnL ${pnl:+.2f}")
print(f" β These are trades we'd GAIN by removing CHOP (if using EMA50β€5%)")
print(f"\n--- EMA-only blocked trades (EMA50>5% kills, CHOPβ₯45 allows) ---")
if ema_only_blocked:
w = sum(1 for t in ema_only_blocked if t["pnl_usd"] > 0)
wr = w / len(ema_only_blocked) * 100
pnl = sum(t["pnl_usd"] for t in ema_only_blocked)
print(f" {len(ema_only_blocked)} trades | WR {wr:.1f}% | PnL ${pnl:+.2f}")
print(f" β These are bad trades CHOP misses but EMA50 catches")
if __name__ == "__main__":
run()