← Back
"""
Backtest v2: EMA distance filter + Z-VWAP period optimization.

1) Distance from EMA50/100/200 — granular thresholds 1-7%
2) Z-VWAP recalculation with periods 50/100/200 at entry time
"""

import json
import time
import numpy as np
import requests
from collections import defaultdict

BYBIT_KLINE_URL = "https://api.bybit.com/v5/market/kline"


def fetch_klines(symbol: str, interval: str, end_time_ms: int, limit: int = 250):
    """Fetch klines from Bybit ending at end_time_ms."""
    params = {
        "category": "linear",
        "symbol": symbol,
        "interval": interval,
        "endTime": end_time_ms,
        "limit": limit,
    }
    try:
        r = requests.get(BYBIT_KLINE_URL, params=params, timeout=10)
        data = r.json()
        if data.get("retCode") != 0:
            return None
        rows = data["result"]["list"]
        rows.reverse()
        return rows
    except Exception as e:
        print(f"  Error fetching {symbol}: {e}")
        return None


def calc_ema(closes: np.ndarray, period: int) -> float:
    if len(closes) < period:
        return None
    multiplier = 2 / (period + 1)
    ema = closes[0]
    for c in closes[1:]:
        ema = (c - ema) * multiplier + ema
    return ema


def calc_zvwap(highs, lows, closes, volumes, period):
    """Calculate Z-score from VWAP for given period."""
    if len(closes) < period:
        return None
    h = highs[-period:]
    l = lows[-period:]
    c = closes[-period:]
    v = volumes[-period:]

    tp = (h + l + c) / 3
    cum_tp_vol = np.cumsum(tp * v)
    cum_vol = np.cumsum(v)
    cum_vol_safe = np.where(cum_vol == 0, 1, cum_vol)
    vwap_arr = cum_tp_vol / cum_vol_safe

    vwap = vwap_arr[-1]
    deviations = c - vwap_arr
    std = np.std(deviations)

    if std == 0:
        return None
    return float((c[-1] - vwap) / std)


def run():
    with open("/tmp/matched_trades.json") as f:
        trades = json.load(f)

    print(f"Total trades: {len(trades)}")
    print(f"Fetching klines (250 candles each for EMA200 + Z200)...\n")

    enriched = []
    failed = 0
    from datetime import datetime

    for idx, trade in enumerate(trades):
        symbol = trade["symbol"]
        dt = datetime.fromisoformat(trade["entry_ts"])
        end_time_ms = int(dt.timestamp() * 1000)

        klines = fetch_klines(symbol, "5", end_time_ms, limit=250)
        if not klines or len(klines) < 60:
            failed += 1
            continue

        closes = np.array([float(k[4]) for k in klines])
        highs = np.array([float(k[2]) for k in klines])
        lows = np.array([float(k[3]) for k in klines])
        volumes = np.array([float(k[5]) for k in klines])

        entry_price = trade["entry_price"]

        # EMA distances
        ema_dist = {}
        for period in [20, 50, 100, 200]:
            ema = calc_ema(closes, period)
            if ema and ema > 0:
                # "MR distance" — positive means price is on the right side for our trade
                raw_dist = (entry_price - ema) / ema * 100
                if trade["side"] == "BUY":
                    ema_dist[period] = -raw_dist  # long: below EMA = positive
                else:
                    ema_dist[period] = raw_dist   # short: above EMA = positive
            else:
                ema_dist[period] = None

        # Z-VWAP for different periods
        z_scores = {}
        for period in [50, 100, 150, 200]:
            z = calc_zvwap(highs, lows, closes, volumes, period)
            z_scores[period] = z

        enriched.append({
            **trade,
            "ema_dist": ema_dist,
            "z_scores": z_scores,
        })

        if (idx + 1) % 10 == 0:
            print(f"  {idx + 1}/{len(trades)}...")
            time.sleep(0.5)
        else:
            time.sleep(0.15)

    print(f"\nEnriched: {len(enriched)}, failed: {failed}")

    baseline_pnl = sum(t["pnl"] for t in enriched)
    baseline_w = sum(1 for t in enriched if t["pnl"] > 0)
    baseline_l = len(enriched) - baseline_w
    baseline_wr = baseline_w / len(enriched) * 100

    print(f"\n{'='*75}")
    print(f"BASELINE: {len(enriched)} trades | WR {baseline_wr:.1f}% ({baseline_w}W/{baseline_l}L) | PnL ${baseline_pnl:+.2f}")
    print(f"{'='*75}")

    # ═══════════════════════════════════════════════════════
    # PART 1: EMA Distance filter — granular
    # ═══════════════════════════════════════════════════════
    print(f"\n{'='*75}")
    print("PART 1: EMA DISTANCE FILTER (max distance from EMA)")
    print(f"{'='*75}")

    for ema_period in [50, 100, 200]:
        print(f"\n--- EMA {ema_period} ---")
        print(f"{'Max Dist':<10} {'Trades':>7} {'Cut%':>6} {'WR%':>6} {'PnL':>10} {'Avg':>8} {'vs Base':>10}")
        print("-" * 65)

        for max_dist in [1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 5.0, 6.0, 7.0, 99.0]:
            filtered = []
            for t in enriched:
                d = t["ema_dist"].get(ema_period)
                if d is None:
                    continue
                # abs distance — we want trades that are NOT too far from EMA
                # d > 0 means "right side" for MR, but we want to cap how far
                abs_d = abs((t["entry_price"] - calc_ema(
                    # Shortcut: use raw distance
                    np.array([1.0]), 1) if False else 0))

                # Recalculate raw distance
                # We stored MR-normalized dist. To get absolute distance:
                # For BUY: raw = -ema_dist, for SELL: raw = ema_dist
                # abs raw distance = |ema_dist| (already normalized)
                # Actually ema_dist represents how far on MR side
                # But we want TOTAL distance from EMA regardless of direction
                # Since ema_dist = -raw for BUY and +raw for SELL,
                # The wrong-side trades have negative ema_dist
                # So abs total distance = abs(ema_dist) when ema_dist > 0 = on right side
                # But wrong side trades also need filtering
                # Let's just use absolute raw distance
                pass

            # Redo with raw absolute distance
            filtered = []
            for t in enriched:
                d = t["ema_dist"].get(ema_period)
                if d is None:
                    continue
                # ema_dist is MR-normalized. Absolute distance from EMA:
                # We need the raw (unsigned) distance
                # For BUY: ema_dist = -raw_dist, so raw_dist = -ema_dist
                # For SELL: ema_dist = raw_dist
                # abs_raw = |raw| = |ema_dist| in both cases? No.
                # BUY: raw = (price - ema)/ema*100, ema_dist = -raw → abs(raw) = abs(ema_dist)
                # SELL: raw = (price - ema)/ema*100, ema_dist = raw → abs(raw) = abs(ema_dist)
                # YES: abs_raw_distance = abs(ema_dist)
                abs_dist = abs(d)
                if abs_dist <= max_dist or max_dist >= 99:
                    filtered.append(t)

            if not filtered:
                continue

            pnl = sum(t["pnl"] for t in filtered)
            w = sum(1 for t in filtered if t["pnl"] > 0)
            wr = w / len(filtered) * 100
            avg = pnl / len(filtered)
            cut = (1 - len(filtered) / len(enriched)) * 100
            diff = pnl - baseline_pnl
            label = "ALL" if max_dist >= 99 else f"≤{max_dist}%"

            print(
                f"{label:<10} {len(filtered):>7} {cut:>5.0f}% {wr:>5.1f}% "
                f"${pnl:>+8.2f} ${avg:>+7.4f} ${diff:>+8.2f}"
            )

    # ═══════════════════════════════════════════════════════
    # PART 2: Z-VWAP Period comparison
    # ═══════════════════════════════════════════════════════
    print(f"\n{'='*75}")
    print("PART 2: Z-VWAP PERIOD OPTIMIZATION")
    print("Would we get better signals with longer VWAP lookback?")
    print(f"{'='*75}")

    # For each Z period, check: would trades with |Z| > 1.8 on THAT period
    # have been better or worse?
    for z_period in [50, 100, 150, 200]:
        # Split trades: those where Z on this period still triggers vs not
        triggered = []
        skipped = []
        for t in enriched:
            z = t["z_scores"].get(z_period)
            if z is None:
                continue
            # Original trade was BUY if z50 < -1.8, SELL if z50 > 1.8
            if t["side"] == "BUY" and z < -1.8:
                triggered.append(t)
            elif t["side"] == "SELL" and z > 1.8:
                triggered.append(t)
            else:
                skipped.append(t)

        if not triggered:
            print(f"\nZ-VWAP period={z_period}: 0 triggered")
            continue

        t_pnl = sum(t["pnl"] for t in triggered)
        t_w = sum(1 for t in triggered if t["pnl"] > 0)
        t_wr = t_w / len(triggered) * 100
        s_pnl = sum(t["pnl"] for t in skipped) if skipped else 0
        s_w = sum(1 for t in skipped if t["pnl"] > 0) if skipped else 0
        s_wr = (s_w / len(skipped) * 100) if skipped else 0

        print(f"\nZ-VWAP period={z_period}:")
        print(f"  Triggered (|Z|>1.8): {len(triggered)} trades | WR {t_wr:.1f}% | PnL ${t_pnl:+.2f}")
        print(f"  Skipped  (|Z|≤1.8): {len(skipped)} trades | WR {s_wr:.1f}% | PnL ${s_pnl:+.2f}")

    # ═══════════════════════════════════════════════════════
    # PART 3: COMBO — best EMA distance + Z period
    # ═══════════════════════════════════════════════════════
    print(f"\n{'='*75}")
    print("PART 3: COMBO MATRIX (EMA dist × Z period)")
    print(f"{'='*75}")

    print(f"\n{'Combo':<30} {'Trades':>7} {'Cut%':>6} {'WR%':>6} {'PnL':>10} {'Avg':>8}")
    print("-" * 72)

    combos = []
    for ema_p in [50, 100]:
        for max_d in [2.0, 3.0, 4.0, 5.0]:
            for z_p in [50, 100, 200]:
                filtered = []
                for t in enriched:
                    d = t["ema_dist"].get(ema_p)
                    z = t["z_scores"].get(z_p)
                    if d is None or z is None:
                        continue
                    if abs(d) > max_d:
                        continue
                    if t["side"] == "BUY" and z >= -1.8:
                        continue
                    if t["side"] == "SELL" and z <= 1.8:
                        continue
                    filtered.append(t)

                if len(filtered) < 5:
                    continue

                pnl = sum(t["pnl"] for t in filtered)
                w = sum(1 for t in filtered if t["pnl"] > 0)
                wr = w / len(filtered) * 100
                avg = pnl / len(filtered)
                cut = (1 - len(filtered) / len(enriched)) * 100
                label = f"EMA{ema_p}≤{max_d}% Z{z_p}"
                combos.append((label, len(filtered), cut, wr, pnl, avg))

    combos.sort(key=lambda x: x[4], reverse=True)
    for label, n, cut, wr, pnl, avg in combos:
        print(f"{label:<30} {n:>7} {cut:>5.0f}% {wr:>5.1f}% ${pnl:>+8.2f} ${avg:>+7.4f}")

    # ═══════════════════════════════════════════════════════
    # PART 4: Bucketed deep-dive — what's in 3%+ bucket
    # ═══════════════════════════════════════════════════════
    print(f"\n{'='*75}")
    print("PART 4: TRADES WITH EMA50 DIST > 3% (the losers)")
    print(f"{'='*75}")

    losers = [t for t in enriched if t["ema_dist"].get(50) is not None and abs(t["ema_dist"][50]) > 3.0]
    losers.sort(key=lambda x: x["pnl"])

    print(f"\n{'Symbol':<15} {'Side':<6} {'PnL':>8} {'Dist%':>7} {'Z50':>6} {'Z100':>6} {'Z200':>6} {'Reason':<8} {'SOs':>4}")
    print("-" * 75)
    for t in losers[:25]:
        z50 = t["z_scores"].get(50, 0) or 0
        z100 = t["z_scores"].get(100, 0) or 0
        z200 = t["z_scores"].get(200, 0) or 0
        dist = t["ema_dist"].get(50, 0) or 0
        print(
            f"{t['symbol']:<15} {t['side']:<6} ${t['pnl']:>+7.2f} {dist:>+6.1f}% "
            f"{z50:>+5.1f} {z100:>+5.1f} {z200:>+5.1f} {t['reason']:<8} {t['sos']:>4}"
        )


if __name__ == "__main__":
    run()

📜 Git History

dd32dfdchore: initial commit — version control setup5 weeks ago
Show last diff
Loading...