"""
Backtest v2: EMA distance filter + Z-VWAP period optimization.
1) Distance from EMA50/100/200 โ granular thresholds 1-7%
2) Z-VWAP recalculation with periods 50/100/200 at entry time
"""
import json
import time
import numpy as np
import requests
from collections import defaultdict
BYBIT_KLINE_URL = "https://api.bybit.com/v5/market/kline"
def fetch_klines(symbol: str, interval: str, end_time_ms: int, limit: int = 250):
"""Fetch klines from Bybit ending at end_time_ms."""
params = {
"category": "linear",
"symbol": symbol,
"interval": interval,
"endTime": end_time_ms,
"limit": limit,
}
try:
r = requests.get(BYBIT_KLINE_URL, params=params, timeout=10)
data = r.json()
if data.get("retCode") != 0:
return None
rows = data["result"]["list"]
rows.reverse()
return rows
except Exception as e:
print(f" Error fetching {symbol}: {e}")
return None
def calc_ema(closes: np.ndarray, period: int) -> float:
if len(closes) < period:
return None
multiplier = 2 / (period + 1)
ema = closes[0]
for c in closes[1:]:
ema = (c - ema) * multiplier + ema
return ema
def calc_zvwap(highs, lows, closes, volumes, period):
"""Calculate Z-score from VWAP for given period."""
if len(closes) < period:
return None
h = highs[-period:]
l = lows[-period:]
c = closes[-period:]
v = volumes[-period:]
tp = (h + l + c) / 3
cum_tp_vol = np.cumsum(tp * v)
cum_vol = np.cumsum(v)
cum_vol_safe = np.where(cum_vol == 0, 1, cum_vol)
vwap_arr = cum_tp_vol / cum_vol_safe
vwap = vwap_arr[-1]
deviations = c - vwap_arr
std = np.std(deviations)
if std == 0:
return None
return float((c[-1] - vwap) / std)
def run():
with open("/tmp/matched_trades.json") as f:
trades = json.load(f)
print(f"Total trades: {len(trades)}")
print(f"Fetching klines (250 candles each for EMA200 + Z200)...\n")
enriched = []
failed = 0
from datetime import datetime
for idx, trade in enumerate(trades):
symbol = trade["symbol"]
dt = datetime.fromisoformat(trade["entry_ts"])
end_time_ms = int(dt.timestamp() * 1000)
klines = fetch_klines(symbol, "5", end_time_ms, limit=250)
if not klines or len(klines) < 60:
failed += 1
continue
closes = np.array([float(k[4]) for k in klines])
highs = np.array([float(k[2]) for k in klines])
lows = np.array([float(k[3]) for k in klines])
volumes = np.array([float(k[5]) for k in klines])
entry_price = trade["entry_price"]
# EMA distances
ema_dist = {}
for period in [20, 50, 100, 200]:
ema = calc_ema(closes, period)
if ema and ema > 0:
# "MR distance" โ positive means price is on the right side for our trade
raw_dist = (entry_price - ema) / ema * 100
if trade["side"] == "BUY":
ema_dist[period] = -raw_dist # long: below EMA = positive
else:
ema_dist[period] = raw_dist # short: above EMA = positive
else:
ema_dist[period] = None
# Z-VWAP for different periods
z_scores = {}
for period in [50, 100, 150, 200]:
z = calc_zvwap(highs, lows, closes, volumes, period)
z_scores[period] = z
enriched.append({
**trade,
"ema_dist": ema_dist,
"z_scores": z_scores,
})
if (idx + 1) % 10 == 0:
print(f" {idx + 1}/{len(trades)}...")
time.sleep(0.5)
else:
time.sleep(0.15)
print(f"\nEnriched: {len(enriched)}, failed: {failed}")
baseline_pnl = sum(t["pnl"] for t in enriched)
baseline_w = sum(1 for t in enriched if t["pnl"] > 0)
baseline_l = len(enriched) - baseline_w
baseline_wr = baseline_w / len(enriched) * 100
print(f"\n{'='*75}")
print(f"BASELINE: {len(enriched)} trades | WR {baseline_wr:.1f}% ({baseline_w}W/{baseline_l}L) | PnL ${baseline_pnl:+.2f}")
print(f"{'='*75}")
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# PART 1: EMA Distance filter โ granular
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
print(f"\n{'='*75}")
print("PART 1: EMA DISTANCE FILTER (max distance from EMA)")
print(f"{'='*75}")
for ema_period in [50, 100, 200]:
print(f"\n--- EMA {ema_period} ---")
print(f"{'Max Dist':<10} {'Trades':>7} {'Cut%':>6} {'WR%':>6} {'PnL':>10} {'Avg':>8} {'vs Base':>10}")
print("-" * 65)
for max_dist in [1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 5.0, 6.0, 7.0, 99.0]:
filtered = []
for t in enriched:
d = t["ema_dist"].get(ema_period)
if d is None:
continue
# abs distance โ we want trades that are NOT too far from EMA
# d > 0 means "right side" for MR, but we want to cap how far
abs_d = abs((t["entry_price"] - calc_ema(
# Shortcut: use raw distance
np.array([1.0]), 1) if False else 0))
# Recalculate raw distance
# We stored MR-normalized dist. To get absolute distance:
# For BUY: raw = -ema_dist, for SELL: raw = ema_dist
# abs raw distance = |ema_dist| (already normalized)
# Actually ema_dist represents how far on MR side
# But we want TOTAL distance from EMA regardless of direction
# Since ema_dist = -raw for BUY and +raw for SELL,
# The wrong-side trades have negative ema_dist
# So abs total distance = abs(ema_dist) when ema_dist > 0 = on right side
# But wrong side trades also need filtering
# Let's just use absolute raw distance
pass
# Redo with raw absolute distance
filtered = []
for t in enriched:
d = t["ema_dist"].get(ema_period)
if d is None:
continue
# ema_dist is MR-normalized. Absolute distance from EMA:
# We need the raw (unsigned) distance
# For BUY: ema_dist = -raw_dist, so raw_dist = -ema_dist
# For SELL: ema_dist = raw_dist
# abs_raw = |raw| = |ema_dist| in both cases? No.
# BUY: raw = (price - ema)/ema*100, ema_dist = -raw โ abs(raw) = abs(ema_dist)
# SELL: raw = (price - ema)/ema*100, ema_dist = raw โ abs(raw) = abs(ema_dist)
# YES: abs_raw_distance = abs(ema_dist)
abs_dist = abs(d)
if abs_dist <= max_dist or max_dist >= 99:
filtered.append(t)
if not filtered:
continue
pnl = sum(t["pnl"] for t in filtered)
w = sum(1 for t in filtered if t["pnl"] > 0)
wr = w / len(filtered) * 100
avg = pnl / len(filtered)
cut = (1 - len(filtered) / len(enriched)) * 100
diff = pnl - baseline_pnl
label = "ALL" if max_dist >= 99 else f"โค{max_dist}%"
print(
f"{label:<10} {len(filtered):>7} {cut:>5.0f}% {wr:>5.1f}% "
f"${pnl:>+8.2f} ${avg:>+7.4f} ${diff:>+8.2f}"
)
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# PART 2: Z-VWAP Period comparison
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
print(f"\n{'='*75}")
print("PART 2: Z-VWAP PERIOD OPTIMIZATION")
print("Would we get better signals with longer VWAP lookback?")
print(f"{'='*75}")
# For each Z period, check: would trades with |Z| > 1.8 on THAT period
# have been better or worse?
for z_period in [50, 100, 150, 200]:
# Split trades: those where Z on this period still triggers vs not
triggered = []
skipped = []
for t in enriched:
z = t["z_scores"].get(z_period)
if z is None:
continue
# Original trade was BUY if z50 < -1.8, SELL if z50 > 1.8
if t["side"] == "BUY" and z < -1.8:
triggered.append(t)
elif t["side"] == "SELL" and z > 1.8:
triggered.append(t)
else:
skipped.append(t)
if not triggered:
print(f"\nZ-VWAP period={z_period}: 0 triggered")
continue
t_pnl = sum(t["pnl"] for t in triggered)
t_w = sum(1 for t in triggered if t["pnl"] > 0)
t_wr = t_w / len(triggered) * 100
s_pnl = sum(t["pnl"] for t in skipped) if skipped else 0
s_w = sum(1 for t in skipped if t["pnl"] > 0) if skipped else 0
s_wr = (s_w / len(skipped) * 100) if skipped else 0
print(f"\nZ-VWAP period={z_period}:")
print(f" Triggered (|Z|>1.8): {len(triggered)} trades | WR {t_wr:.1f}% | PnL ${t_pnl:+.2f}")
print(f" Skipped (|Z|โค1.8): {len(skipped)} trades | WR {s_wr:.1f}% | PnL ${s_pnl:+.2f}")
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# PART 3: COMBO โ best EMA distance + Z period
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
print(f"\n{'='*75}")
print("PART 3: COMBO MATRIX (EMA dist ร Z period)")
print(f"{'='*75}")
print(f"\n{'Combo':<30} {'Trades':>7} {'Cut%':>6} {'WR%':>6} {'PnL':>10} {'Avg':>8}")
print("-" * 72)
combos = []
for ema_p in [50, 100]:
for max_d in [2.0, 3.0, 4.0, 5.0]:
for z_p in [50, 100, 200]:
filtered = []
for t in enriched:
d = t["ema_dist"].get(ema_p)
z = t["z_scores"].get(z_p)
if d is None or z is None:
continue
if abs(d) > max_d:
continue
if t["side"] == "BUY" and z >= -1.8:
continue
if t["side"] == "SELL" and z <= 1.8:
continue
filtered.append(t)
if len(filtered) < 5:
continue
pnl = sum(t["pnl"] for t in filtered)
w = sum(1 for t in filtered if t["pnl"] > 0)
wr = w / len(filtered) * 100
avg = pnl / len(filtered)
cut = (1 - len(filtered) / len(enriched)) * 100
label = f"EMA{ema_p}โค{max_d}% Z{z_p}"
combos.append((label, len(filtered), cut, wr, pnl, avg))
combos.sort(key=lambda x: x[4], reverse=True)
for label, n, cut, wr, pnl, avg in combos:
print(f"{label:<30} {n:>7} {cut:>5.0f}% {wr:>5.1f}% ${pnl:>+8.2f} ${avg:>+7.4f}")
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# PART 4: Bucketed deep-dive โ what's in 3%+ bucket
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
print(f"\n{'='*75}")
print("PART 4: TRADES WITH EMA50 DIST > 3% (the losers)")
print(f"{'='*75}")
losers = [t for t in enriched if t["ema_dist"].get(50) is not None and abs(t["ema_dist"][50]) > 3.0]
losers.sort(key=lambda x: x["pnl"])
print(f"\n{'Symbol':<15} {'Side':<6} {'PnL':>8} {'Dist%':>7} {'Z50':>6} {'Z100':>6} {'Z200':>6} {'Reason':<8} {'SOs':>4}")
print("-" * 75)
for t in losers[:25]:
z50 = t["z_scores"].get(50, 0) or 0
z100 = t["z_scores"].get(100, 0) or 0
z200 = t["z_scores"].get(200, 0) or 0
dist = t["ema_dist"].get(50, 0) or 0
print(
f"{t['symbol']:<15} {t['side']:<6} ${t['pnl']:>+7.2f} {dist:>+6.1f}% "
f"{z50:>+5.1f} {z100:>+5.1f} {z200:>+5.1f} {t['reason']:<8} {t['sos']:>4}"
)
if __name__ == "__main__":
run()