"""
Backtest: EMA/SMA filter on real DCA Z-VWAP trades.
Tests if adding MA filter (price vs MA) would improve PnL.
Logic:
LONG only if price < MA (buying dip below trend)
SHORT only if price > MA (shorting above trend)
OR inverted:
LONG only if price > MA (with trend)
SHORT only if price < MA (with trend)
Fetches 5m klines from Bybit for each trade's entry time.
"""
import json
import time
import numpy as np
import requests
from datetime import datetime, timezone
from collections import defaultdict
BYBIT_KLINE_URL = "https://api.bybit.com/v5/market/kline"
def fetch_klines(symbol: str, interval: str, end_time_ms: int, limit: int = 210):
"""Fetch klines from Bybit ending at end_time_ms."""
params = {
"category": "linear",
"symbol": symbol,
"interval": interval,
"endTime": end_time_ms,
"limit": limit,
}
try:
r = requests.get(BYBIT_KLINE_URL, params=params, timeout=10)
data = r.json()
if data.get("retCode") != 0:
return None
# Bybit returns newest first, reverse
rows = data["result"]["list"]
rows.reverse()
return rows # [timestamp, open, high, low, close, volume, turnover]
except Exception as e:
print(f" Error fetching {symbol}: {e}")
return None
def calc_ema(closes: np.ndarray, period: int) -> float:
"""Calculate EMA of closes, return last value."""
if len(closes) < period:
return None
multiplier = 2 / (period + 1)
ema = closes[0]
for c in closes[1:]:
ema = (c - ema) * multiplier + ema
return ema
def calc_sma(closes: np.ndarray, period: int) -> float:
"""Calculate SMA of closes, return last value."""
if len(closes) < period:
return None
return float(np.mean(closes[-period:]))
def run_backtest():
with open("/tmp/matched_trades.json") as f:
trades = json.load(f)
print(f"Total matched trades: {len(trades)}")
print(f"Unique symbols: {len(set(t['symbol'] for t in trades))}")
print()
# Test configs: (type, period, mode)
# mode: "counter" = LONG below MA, SHORT above MA (counter-trend / MR)
# mode: "trend" = LONG above MA, SHORT below MA (with trend)
configs = []
for ma_type in ["SMA", "EMA"]:
for period in [20, 50, 100, 200]:
for mode in ["counter", "trend"]:
configs.append((ma_type, period, mode))
# Fetch klines for each unique (symbol, entry_time) to avoid dupes
# Group trades by symbol
by_symbol = defaultdict(list)
for t in trades:
by_symbol[t["symbol"]].append(t)
print(f"Fetching klines for {len(by_symbol)} symbols...")
# For each trade, calculate MA values at entry time
enriched = []
failed = 0
for idx, trade in enumerate(trades):
symbol = trade["symbol"]
# Parse entry timestamp
entry_ts = trade["entry_ts"]
dt = datetime.fromisoformat(entry_ts)
end_time_ms = int(dt.timestamp() * 1000)
# Fetch 5m klines (need 200+ for SMA200)
klines = fetch_klines(symbol, "5", end_time_ms, limit=210)
if not klines or len(klines) < 50:
failed += 1
continue
closes = np.array([float(k[4]) for k in klines])
entry_price = trade["entry_price"]
# Calculate all MAs
ma_values = {}
for period in [20, 50, 100, 200]:
sma = calc_sma(closes, period)
ema = calc_ema(closes, period)
ma_values[f"SMA_{period}"] = sma
ma_values[f"EMA_{period}"] = ema
enriched.append({**trade, "ma": ma_values})
# Rate limit
if (idx + 1) % 10 == 0:
print(f" {idx + 1}/{len(trades)} fetched...")
time.sleep(0.5)
else:
time.sleep(0.15)
print(f"\nEnriched: {len(enriched)} trades, failed: {failed}")
# Save enriched data
with open("/tmp/enriched_trades.json", "w") as f:
json.dump(enriched, f, indent=2, default=str)
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# BASELINE (no filter)
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
total_pnl = sum(t["pnl"] for t in enriched)
wins = sum(1 for t in enriched if t["pnl"] > 0)
losses = sum(1 for t in enriched if t["pnl"] <= 0)
wr = wins / len(enriched) * 100 if enriched else 0
print("\n" + "=" * 70)
print(f"BASELINE: {len(enriched)} trades | WR {wr:.1f}% ({wins}W/{losses}L) | PnL ${total_pnl:+.2f}")
print("=" * 70)
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# TEST EACH CONFIG
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
results = []
for ma_type, period, mode in configs:
key = f"{ma_type}_{period}"
filtered = []
rejected = 0
for t in enriched:
ma_val = t["ma"].get(key)
if ma_val is None:
rejected += 1
continue
price = t["entry_price"]
side = t["side"]
if mode == "counter":
# MR logic: LONG below MA (cheap), SHORT above MA (expensive)
if side == "BUY" and price < ma_val:
filtered.append(t)
elif side == "SELL" and price > ma_val:
filtered.append(t)
else:
rejected += 1
else:
# Trend logic: LONG above MA (uptrend), SHORT below MA (downtrend)
if side == "BUY" and price > ma_val:
filtered.append(t)
elif side == "SELL" and price < ma_val:
filtered.append(t)
else:
rejected += 1
if not filtered:
continue
pnl = sum(t["pnl"] for t in filtered)
w = sum(1 for t in filtered if t["pnl"] > 0)
l = sum(1 for t in filtered if t["pnl"] <= 0)
wr = w / len(filtered) * 100
avg_pnl = pnl / len(filtered)
# Also check what we REJECTED
rejected_trades = [t for t in enriched if t not in filtered and t["ma"].get(key) is not None]
rej_pnl = sum(t["pnl"] for t in rejected_trades)
results.append({
"config": f"{ma_type} {period} ({mode})",
"trades": len(filtered),
"rejected": len(rejected_trades),
"wr": wr,
"pnl": pnl,
"avg_pnl": avg_pnl,
"rej_pnl": rej_pnl,
})
# Sort by PnL
results.sort(key=lambda x: x["pnl"], reverse=True)
print(f"\n{'Config':<25} {'Trades':>7} {'Reject':>7} {'WR%':>6} {'PnL':>10} {'Avg':>8} {'Rej PnL':>10}")
print("-" * 80)
for r in results:
print(
f"{r['config']:<25} {r['trades']:>7} {r['rejected']:>7} "
f"{r['wr']:>5.1f}% ${r['pnl']:>+8.2f} ${r['avg_pnl']:>+7.4f} ${r['rej_pnl']:>+8.2f}"
)
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# DEEP DIVE: best configs
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
print("\n" + "=" * 70)
print("TOP 5 CONFIGS (by PnL)")
print("=" * 70)
for r in results[:5]:
print(f"\n๐ {r['config']}")
print(f" Kept: {r['trades']} trades โ PnL ${r['pnl']:+.2f} (WR {r['wr']:.1f}%)")
print(f" Rejected: {r['rejected']} trades โ PnL ${r['rej_pnl']:+.2f}")
improvement = r['pnl'] - total_pnl
print(f" vs Baseline: ${improvement:+.2f} improvement")
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# DISTANCE FROM MA analysis
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
print("\n" + "=" * 70)
print("DISTANCE FROM EMA50 ANALYSIS")
print("=" * 70)
buckets = defaultdict(list)
for t in enriched:
ema50 = t["ma"].get("EMA_50")
if ema50 is None:
continue
dist_pct = (t["entry_price"] - ema50) / ema50 * 100
# Normalize for side: positive = "right direction" for MR
if t["side"] == "BUY":
norm_dist = -dist_pct # below EMA = positive for long MR
else:
norm_dist = dist_pct # above EMA = positive for short MR
if norm_dist < 0:
bucket = "wrong_side"
elif norm_dist < 1:
bucket = "0-1%"
elif norm_dist < 2:
bucket = "1-2%"
elif norm_dist < 3:
bucket = "2-3%"
else:
bucket = "3%+"
buckets[bucket].append(t)
print(f"\n{'Bucket':<15} {'Trades':>7} {'WR%':>6} {'PnL':>10} {'Avg':>8}")
print("-" * 50)
for bucket in ["wrong_side", "0-1%", "1-2%", "2-3%", "3%+"]:
trades_b = buckets.get(bucket, [])
if not trades_b:
continue
pnl = sum(t["pnl"] for t in trades_b)
w = sum(1 for t in trades_b if t["pnl"] > 0)
wr = w / len(trades_b) * 100
avg = pnl / len(trades_b)
print(f"{bucket:<15} {len(trades_b):>7} {wr:>5.1f}% ${pnl:>+8.2f} ${avg:>+7.4f}")
if __name__ == "__main__":
run_backtest()