← Back
ā˜†
"""
Backtest: EMA/SMA filter on real DCA Z-VWAP trades.
Tests if adding MA filter (price vs MA) would improve PnL.

Logic:
  LONG only if price < MA (buying dip below trend)
  SHORT only if price > MA (shorting above trend)

  OR inverted:
  LONG only if price > MA (with trend)
  SHORT only if price < MA (with trend)

Fetches 5m klines from Bybit for each trade's entry time.
"""

import json
import time
import numpy as np
import requests
from datetime import datetime, timezone
from collections import defaultdict

BYBIT_KLINE_URL = "https://api.bybit.com/v5/market/kline"

def fetch_klines(symbol: str, interval: str, end_time_ms: int, limit: int = 210):
    """Fetch klines from Bybit ending at end_time_ms."""
    params = {
        "category": "linear",
        "symbol": symbol,
        "interval": interval,
        "endTime": end_time_ms,
        "limit": limit,
    }
    try:
        r = requests.get(BYBIT_KLINE_URL, params=params, timeout=10)
        data = r.json()
        if data.get("retCode") != 0:
            return None
        # Bybit returns newest first, reverse
        rows = data["result"]["list"]
        rows.reverse()
        return rows  # [timestamp, open, high, low, close, volume, turnover]
    except Exception as e:
        print(f"  Error fetching {symbol}: {e}")
        return None


def calc_ema(closes: np.ndarray, period: int) -> float:
    """Calculate EMA of closes, return last value."""
    if len(closes) < period:
        return None
    multiplier = 2 / (period + 1)
    ema = closes[0]
    for c in closes[1:]:
        ema = (c - ema) * multiplier + ema
    return ema


def calc_sma(closes: np.ndarray, period: int) -> float:
    """Calculate SMA of closes, return last value."""
    if len(closes) < period:
        return None
    return float(np.mean(closes[-period:]))


def run_backtest():
    with open("/tmp/matched_trades.json") as f:
        trades = json.load(f)

    print(f"Total matched trades: {len(trades)}")
    print(f"Unique symbols: {len(set(t['symbol'] for t in trades))}")
    print()

    # Test configs: (type, period, mode)
    # mode: "counter" = LONG below MA, SHORT above MA (counter-trend / MR)
    # mode: "trend" = LONG above MA, SHORT below MA (with trend)
    configs = []
    for ma_type in ["SMA", "EMA"]:
        for period in [20, 50, 100, 200]:
            for mode in ["counter", "trend"]:
                configs.append((ma_type, period, mode))

    # Fetch klines for each unique (symbol, entry_time) to avoid dupes
    # Group trades by symbol
    by_symbol = defaultdict(list)
    for t in trades:
        by_symbol[t["symbol"]].append(t)

    print(f"Fetching klines for {len(by_symbol)} symbols...")

    # For each trade, calculate MA values at entry time
    enriched = []
    failed = 0

    for idx, trade in enumerate(trades):
        symbol = trade["symbol"]
        # Parse entry timestamp
        entry_ts = trade["entry_ts"]
        dt = datetime.fromisoformat(entry_ts)
        end_time_ms = int(dt.timestamp() * 1000)

        # Fetch 5m klines (need 200+ for SMA200)
        klines = fetch_klines(symbol, "5", end_time_ms, limit=210)
        if not klines or len(klines) < 50:
            failed += 1
            continue

        closes = np.array([float(k[4]) for k in klines])
        entry_price = trade["entry_price"]

        # Calculate all MAs
        ma_values = {}
        for period in [20, 50, 100, 200]:
            sma = calc_sma(closes, period)
            ema = calc_ema(closes, period)
            ma_values[f"SMA_{period}"] = sma
            ma_values[f"EMA_{period}"] = ema

        enriched.append({**trade, "ma": ma_values})

        # Rate limit
        if (idx + 1) % 10 == 0:
            print(f"  {idx + 1}/{len(trades)} fetched...")
            time.sleep(0.5)
        else:
            time.sleep(0.15)

    print(f"\nEnriched: {len(enriched)} trades, failed: {failed}")

    # Save enriched data
    with open("/tmp/enriched_trades.json", "w") as f:
        json.dump(enriched, f, indent=2, default=str)

    # ═══════════════════════════════════════════════
    # BASELINE (no filter)
    # ═══════════════════════════════════════════════
    total_pnl = sum(t["pnl"] for t in enriched)
    wins = sum(1 for t in enriched if t["pnl"] > 0)
    losses = sum(1 for t in enriched if t["pnl"] <= 0)
    wr = wins / len(enriched) * 100 if enriched else 0

    print("\n" + "=" * 70)
    print(f"BASELINE: {len(enriched)} trades | WR {wr:.1f}% ({wins}W/{losses}L) | PnL ${total_pnl:+.2f}")
    print("=" * 70)

    # ═══════════════════════════════════════════════
    # TEST EACH CONFIG
    # ═══════════════════════════════════════════════
    results = []

    for ma_type, period, mode in configs:
        key = f"{ma_type}_{period}"

        filtered = []
        rejected = 0

        for t in enriched:
            ma_val = t["ma"].get(key)
            if ma_val is None:
                rejected += 1
                continue

            price = t["entry_price"]
            side = t["side"]

            if mode == "counter":
                # MR logic: LONG below MA (cheap), SHORT above MA (expensive)
                if side == "BUY" and price < ma_val:
                    filtered.append(t)
                elif side == "SELL" and price > ma_val:
                    filtered.append(t)
                else:
                    rejected += 1
            else:
                # Trend logic: LONG above MA (uptrend), SHORT below MA (downtrend)
                if side == "BUY" and price > ma_val:
                    filtered.append(t)
                elif side == "SELL" and price < ma_val:
                    filtered.append(t)
                else:
                    rejected += 1

        if not filtered:
            continue

        pnl = sum(t["pnl"] for t in filtered)
        w = sum(1 for t in filtered if t["pnl"] > 0)
        l = sum(1 for t in filtered if t["pnl"] <= 0)
        wr = w / len(filtered) * 100
        avg_pnl = pnl / len(filtered)

        # Also check what we REJECTED
        rejected_trades = [t for t in enriched if t not in filtered and t["ma"].get(key) is not None]
        rej_pnl = sum(t["pnl"] for t in rejected_trades)

        results.append({
            "config": f"{ma_type} {period} ({mode})",
            "trades": len(filtered),
            "rejected": len(rejected_trades),
            "wr": wr,
            "pnl": pnl,
            "avg_pnl": avg_pnl,
            "rej_pnl": rej_pnl,
        })

    # Sort by PnL
    results.sort(key=lambda x: x["pnl"], reverse=True)

    print(f"\n{'Config':<25} {'Trades':>7} {'Reject':>7} {'WR%':>6} {'PnL':>10} {'Avg':>8} {'Rej PnL':>10}")
    print("-" * 80)
    for r in results:
        print(
            f"{r['config']:<25} {r['trades']:>7} {r['rejected']:>7} "
            f"{r['wr']:>5.1f}% ${r['pnl']:>+8.2f} ${r['avg_pnl']:>+7.4f} ${r['rej_pnl']:>+8.2f}"
        )

    # ═══════════════════════════════════════════════
    # DEEP DIVE: best configs
    # ═══════════════════════════════════════════════
    print("\n" + "=" * 70)
    print("TOP 5 CONFIGS (by PnL)")
    print("=" * 70)

    for r in results[:5]:
        print(f"\nšŸ“Š {r['config']}")
        print(f"   Kept: {r['trades']} trades → PnL ${r['pnl']:+.2f} (WR {r['wr']:.1f}%)")
        print(f"   Rejected: {r['rejected']} trades → PnL ${r['rej_pnl']:+.2f}")
        improvement = r['pnl'] - total_pnl
        print(f"   vs Baseline: ${improvement:+.2f} improvement")

    # ═══════════════════════════════════════════════
    # DISTANCE FROM MA analysis
    # ═══════════════════════════════════════════════
    print("\n" + "=" * 70)
    print("DISTANCE FROM EMA50 ANALYSIS")
    print("=" * 70)

    buckets = defaultdict(list)
    for t in enriched:
        ema50 = t["ma"].get("EMA_50")
        if ema50 is None:
            continue
        dist_pct = (t["entry_price"] - ema50) / ema50 * 100
        # Normalize for side: positive = "right direction" for MR
        if t["side"] == "BUY":
            norm_dist = -dist_pct  # below EMA = positive for long MR
        else:
            norm_dist = dist_pct   # above EMA = positive for short MR

        if norm_dist < 0:
            bucket = "wrong_side"
        elif norm_dist < 1:
            bucket = "0-1%"
        elif norm_dist < 2:
            bucket = "1-2%"
        elif norm_dist < 3:
            bucket = "2-3%"
        else:
            bucket = "3%+"
        buckets[bucket].append(t)

    print(f"\n{'Bucket':<15} {'Trades':>7} {'WR%':>6} {'PnL':>10} {'Avg':>8}")
    print("-" * 50)
    for bucket in ["wrong_side", "0-1%", "1-2%", "2-3%", "3%+"]:
        trades_b = buckets.get(bucket, [])
        if not trades_b:
            continue
        pnl = sum(t["pnl"] for t in trades_b)
        w = sum(1 for t in trades_b if t["pnl"] > 0)
        wr = w / len(trades_b) * 100
        avg = pnl / len(trades_b)
        print(f"{bucket:<15} {len(trades_b):>7} {wr:>5.1f}% ${pnl:>+8.2f} ${avg:>+7.4f}")


if __name__ == "__main__":
    run_backtest()

šŸ“œ Git History

dd32dfdchore: initial commit — version control setup5 weeks ago
Show last diff
Loading...