← Back
β˜†
"""
Backtest v3: CHOP vs EMA50 distance filter.

Full simulation: scan historical 5m klines, generate Z-VWAP signals,
simulate DCA deals with TP/SL, compare filter combos:
  A) CHOP β‰₯ 45 only (current)
  B) EMA50 dist ≀ 3% only
  C) EMA50 dist ≀ 5% only
  D) CHOP β‰₯ 45 + EMA50 ≀ 5%
  E) EMA50 ≀ 5% (no CHOP)
  F) No filters at all

Uses 5m klines from Bybit, Apr 7–10, top 40 symbols by volume.
"""

import json
import time
import numpy as np
import requests
from datetime import datetime, timezone, timedelta
from collections import defaultdict

BYBIT_KLINE_URL = "https://api.bybit.com/v5/market/kline"
BYBIT_TICKER_URL = "https://api.bybit.com/v5/market/tickers"

# DCA config (matches bot)
TP_PCT = 1.5
SL_PCT = 10.0  # updated
Z_THRESHOLD = 1.8
Z_EXIT = 0.3
NATR_MIN = 0.75
NATR_MAX = 2.5
COOLDOWN_BARS = 60  # 60 bars = 5h cooldown (in 5m bars)
MAX_DEALS = 6
BLACKLIST = {"BTCUSDT", "ETHUSDT", "USDCUSDT"}


def fetch_klines_paginated(symbol, interval, start_ms, end_ms, limit=1000):
    """Fetch all klines between start and end, paginating."""
    all_klines = []
    cursor_end = end_ms

    while cursor_end > start_ms:
        params = {
            "category": "linear",
            "symbol": symbol,
            "interval": interval,
            "endTime": cursor_end,
            "limit": limit,
        }
        try:
            r = requests.get(BYBIT_KLINE_URL, params=params, timeout=15)
            data = r.json()
            if data.get("retCode") != 0:
                break
            rows = data["result"]["list"]
            if not rows:
                break
            rows.reverse()
            # Filter to only rows within our range
            for row in rows:
                ts = int(row[0])
                if ts >= start_ms and ts <= end_ms:
                    all_klines.append(row)
            # Move cursor back
            oldest = int(rows[0][0])
            if oldest <= start_ms:
                break
            cursor_end = oldest - 1
            time.sleep(0.2)
        except Exception as e:
            print(f"  Error fetching {symbol}: {e}")
            break

    # Deduplicate and sort
    seen = set()
    unique = []
    for k in all_klines:
        ts = int(k[0])
        if ts not in seen:
            seen.add(ts)
            unique.append(k)
    unique.sort(key=lambda x: int(x[0]))
    return unique


def get_top_symbols(n=50):
    """Get top N USDT perp symbols by 24h volume."""
    try:
        r = requests.get(BYBIT_TICKER_URL, params={"category": "linear"}, timeout=10)
        data = r.json()
        tickers = data["result"]["list"]
        candidates = []
        for t in tickers:
            sym = t["symbol"]
            if not sym.endswith("USDT") or sym in BLACKLIST:
                continue
            vol = float(t.get("turnover24h", 0))
            if vol >= 20_000_000:
                candidates.append((sym, vol))
        candidates.sort(key=lambda x: x[1], reverse=True)
        return [c[0] for c in candidates[:n]]
    except Exception as e:
        print(f"Error fetching tickers: {e}")
        return []


def calc_indicators(highs, lows, closes, volumes, idx, z_period=50):
    """Calculate Z-VWAP, NATR, CHOP, EMA50 at bar index idx."""
    if idx < max(z_period + 10, 200):
        return None

    # Slice for Z-VWAP
    h = highs[idx - z_period:idx + 1]
    l = lows[idx - z_period:idx + 1]
    c = closes[idx - z_period:idx + 1]
    v = volumes[idx - z_period:idx + 1]

    # Z-VWAP
    tp = (h + l + c) / 3
    cum_tp_vol = np.cumsum(tp * v)
    cum_vol = np.cumsum(v)
    cum_vol_safe = np.where(cum_vol == 0, 1, cum_vol)
    vwap_arr = cum_tp_vol / cum_vol_safe
    vwap = vwap_arr[-1]
    deviations = c - vwap_arr
    std = np.std(deviations)
    if std == 0:
        return None
    z_score = float((c[-1] - vwap) / std)

    # True Range for NATR + CHOP (shared)
    tr_slice = 15  # need 15 bars for TR(14)
    h_tr = highs[idx - tr_slice + 1:idx + 1]
    l_tr = lows[idx - tr_slice + 1:idx + 1]
    c_tr = closes[idx - tr_slice:idx + 1]  # one extra for prev close
    tr = np.maximum(h_tr - l_tr,
                    np.maximum(np.abs(h_tr - c_tr[:-1]),
                               np.abs(l_tr - c_tr[:-1])))

    # NATR (14-period ATR / close * 100)
    atr = np.mean(tr[-14:])
    natr = (atr / closes[idx]) * 100 if closes[idx] > 0 else 0

    # CHOP (14-period)
    chop = 50.0
    chop_tr = tr[-14:]
    atr_sum = np.sum(chop_tr)
    highest = np.max(h_tr[-14:])
    lowest = np.min(l_tr[-14:])
    if highest > lowest:
        chop = float(100 * np.log10(atr_sum / (highest - lowest)) / np.log10(14))

    # EMA 50
    ema50 = None
    if idx >= 50:
        ema = closes[0]
        mult = 2 / 51
        for i in range(1, idx + 1):
            ema = (closes[i] - ema) * mult + ema
        ema50 = ema

    # EMA distance (absolute %)
    ema50_dist = None
    if ema50 and ema50 > 0:
        ema50_dist = abs((closes[idx] - ema50) / ema50 * 100)

    return {
        "z": z_score,
        "natr": natr,
        "chop": chop,
        "ema50": ema50,
        "ema50_dist": ema50_dist,
        "price": float(closes[idx]),
    }


def simulate_deal(closes, start_idx, side, tp_pct, sl_pct):
    """Simulate a simple DCA deal (BO only, no SOs for speed).
    Returns (pnl_pct, reason, duration_bars)."""
    entry = closes[start_idx]

    for i in range(start_idx + 1, min(start_idx + 500, len(closes))):
        price = closes[i]
        if side == "BUY":
            pnl_pct = (price - entry) / entry * 100
        else:
            pnl_pct = (entry - price) / entry * 100

        if pnl_pct >= tp_pct:
            return pnl_pct, "TP", i - start_idx
        if pnl_pct <= -sl_pct:
            return pnl_pct, "SL", i - start_idx

    # Timeout β€” close at last bar
    price = closes[min(start_idx + 499, len(closes) - 1)]
    if side == "BUY":
        pnl_pct = (price - entry) / entry * 100
    else:
        pnl_pct = (entry - price) / entry * 100
    return pnl_pct, "TIMEOUT", 500


def run():
    # Period: Apr 7–10 (4 days)
    start_dt = datetime(2026, 4, 7, 0, 0, tzinfo=timezone.utc)
    end_dt = datetime(2026, 4, 11, 0, 0, tzinfo=timezone.utc)
    start_ms = int(start_dt.timestamp() * 1000)
    end_ms = int(end_dt.timestamp() * 1000)

    # Need extra history for EMA200 warmup
    warmup_start = start_dt - timedelta(days=4)  # 4 extra days
    warmup_ms = int(warmup_start.timestamp() * 1000)

    symbols = get_top_symbols(50)
    print(f"Top symbols: {len(symbols)}")

    # Fetch klines for all symbols
    symbol_data = {}
    for idx, symbol in enumerate(symbols):
        klines = fetch_klines_paginated(symbol, "5", warmup_ms, end_ms)
        if klines and len(klines) > 300:
            symbol_data[symbol] = klines
        if (idx + 1) % 5 == 0:
            print(f"  Fetched {idx + 1}/{len(symbols)} symbols ({len(klines) if klines else 0} bars)...")
            time.sleep(1)
        else:
            time.sleep(0.3)

    print(f"\nLoaded {len(symbol_data)} symbols with data")

    # ═══════════════════════════════════════════════
    # SCAN + SIMULATE for each filter combo
    # ═══════════════════════════════════════════════

    filter_configs = {
        "A) CHOPβ‰₯45 only (current)":  {"chop_min": 45, "ema_max": None},
        "B) EMA50≀3% only":           {"chop_min": None, "ema_max": 3.0},
        "C) EMA50≀5% only":           {"chop_min": None, "ema_max": 5.0},
        "D) CHOPβ‰₯45 + EMA50≀3%":      {"chop_min": 45, "ema_max": 3.0},
        "E) CHOPβ‰₯45 + EMA50≀5%":      {"chop_min": 45, "ema_max": 5.0},
        "F) EMA50≀4% only":           {"chop_min": None, "ema_max": 4.0},
        "G) EMA50≀6% only":           {"chop_min": None, "ema_max": 6.0},
        "H) No filters":              {"chop_min": None, "ema_max": None},
    }

    # Find start index (where Apr 7 begins in the data)
    results = {}

    for config_name, config in filter_configs.items():
        all_trades = []

        for symbol, klines in symbol_data.items():
            closes = np.array([float(k[4]) for k in klines])
            highs = np.array([float(k[2]) for k in klines])
            lows = np.array([float(k[3]) for k in klines])
            volumes = np.array([float(k[5]) for k in klines])
            timestamps = [int(k[0]) for k in klines]

            # Find start of Apr 7 in this data
            scan_start = None
            for i, ts in enumerate(timestamps):
                if ts >= start_ms:
                    scan_start = i
                    break
            if scan_start is None or scan_start < 210:
                continue

            # Find end of Apr 10
            scan_end = len(timestamps) - 1
            for i, ts in enumerate(timestamps):
                if ts > end_ms:
                    scan_end = i
                    break

            cooldown_until = 0

            # Scan every 12 bars (= 60 min, like bot scan_interval)
            for idx in range(scan_start, scan_end, 12):
                if idx <= cooldown_until:
                    continue

                ind = calc_indicators(highs, lows, closes, volumes, idx)
                if ind is None:
                    continue

                z = ind["z"]
                natr = ind["natr"]
                chop = ind["chop"]
                ema_dist = ind["ema50_dist"]

                # NATR filter always on
                if natr < NATR_MIN or natr > NATR_MAX:
                    continue

                # Entry signal
                side = None
                if z < -Z_THRESHOLD:
                    side = "BUY"
                elif z > Z_THRESHOLD:
                    side = "SELL"
                if not side:
                    continue

                # Apply filter combo
                if config["chop_min"] is not None and chop < config["chop_min"]:
                    continue
                if config["ema_max"] is not None and ema_dist is not None and ema_dist > config["ema_max"]:
                    continue

                # Simulate deal
                pnl_pct, reason, duration = simulate_deal(closes, idx, side, TP_PCT, SL_PCT)

                # Convert to USD (approx BO=$3 at 3x)
                pnl_usd = pnl_pct / 100 * 3.0 * 3  # $3 BO Γ— 3x leverage

                all_trades.append({
                    "symbol": symbol,
                    "side": side,
                    "z": z,
                    "natr": natr,
                    "chop": chop,
                    "ema_dist": ema_dist,
                    "pnl_pct": pnl_pct,
                    "pnl_usd": pnl_usd,
                    "reason": reason,
                    "duration": duration,
                })

                # Cooldown
                cooldown_until = idx + COOLDOWN_BARS

        # Aggregate
        if not all_trades:
            results[config_name] = {"trades": 0}
            continue

        total_pnl = sum(t["pnl_usd"] for t in all_trades)
        wins = sum(1 for t in all_trades if t["pnl_usd"] > 0)
        losses = len(all_trades) - wins
        wr = wins / len(all_trades) * 100
        avg_pnl = total_pnl / len(all_trades)
        sl_count = sum(1 for t in all_trades if t["reason"] == "SL")
        tp_count = sum(1 for t in all_trades if t["reason"] == "TP")
        sl_pnl = sum(t["pnl_usd"] for t in all_trades if t["reason"] == "SL")

        results[config_name] = {
            "trades": len(all_trades),
            "wins": wins,
            "losses": losses,
            "wr": wr,
            "pnl": total_pnl,
            "avg_pnl": avg_pnl,
            "sl_count": sl_count,
            "tp_count": tp_count,
            "sl_pnl": sl_pnl,
            "all_trades": all_trades,
        }

    # ═══════════════════════════════════════════════
    # PRINT RESULTS
    # ═══════════════════════════════════════════════
    print(f"\n{'='*85}")
    print(f"RESULTS: Apr 7-10, {len(symbol_data)} symbols, BO-only simulation ($3 BO, 3x lev)")
    print(f"{'='*85}")

    print(f"\n{'Config':<30} {'Trades':>7} {'WR%':>6} {'PnL':>10} {'Avg':>8} {'SLs':>5} {'SL PnL':>10} {'TPs':>5}")
    print("-" * 90)

    for name in filter_configs:
        r = results.get(name, {})
        if r["trades"] == 0:
            print(f"{name:<30} {'0':>7}")
            continue
        print(
            f"{name:<30} {r['trades']:>7} {r['wr']:>5.1f}% ${r['pnl']:>+8.2f} "
            f"${r['avg_pnl']:>+7.4f} {r['sl_count']:>5} ${r['sl_pnl']:>+8.2f} {r['tp_count']:>5}"
        )

    # ═══════════════════════════════════════════════
    # PER-FILTER: trades that differ
    # ═══════════════════════════════════════════════
    print(f"\n{'='*85}")
    print("COMPARISON: What CHOP blocks vs what EMA50 blocks")
    print(f"{'='*85}")

    # Get trade sets for A (CHOP only) and C (EMA50≀5% only)
    a_trades = results.get("A) CHOPβ‰₯45 only (current)", {}).get("all_trades", [])
    c_trades = results.get("C) EMA50≀5% only", {}).get("all_trades", [])
    h_trades = results.get("H) No filters", {}).get("all_trades", [])

    if h_trades:
        # Trades that CHOP blocks but EMA5% would allow
        chop_keys = set(f"{t['symbol']}_{t['z']:.2f}" for t in a_trades)
        ema5_keys = set(f"{t['symbol']}_{t['z']:.2f}" for t in c_trades)
        nofilter_keys = set(f"{t['symbol']}_{t['z']:.2f}" for t in h_trades)

        # All unfiltered trades with CHOP/EMA info
        print(f"\nUnfiltered trades: {len(h_trades)}")
        chop_blocked = [t for t in h_trades if t["chop"] < 45]
        ema5_blocked = [t for t in h_trades if t["ema_dist"] is not None and t["ema_dist"] > 5.0]
        both_blocked = [t for t in h_trades if t["chop"] < 45 and t.get("ema_dist", 0) and t["ema_dist"] > 5.0]

        chop_pnl = sum(t["pnl_usd"] for t in chop_blocked)
        ema5_pnl = sum(t["pnl_usd"] for t in ema5_blocked)
        both_pnl = sum(t["pnl_usd"] for t in both_blocked)

        chop_only_blocked = [t for t in h_trades if t["chop"] < 45 and (t.get("ema_dist") is None or t["ema_dist"] <= 5.0)]
        ema_only_blocked = [t for t in h_trades if t["chop"] >= 45 and t.get("ema_dist") is not None and t["ema_dist"] > 5.0]

        print(f"\nCHOP<45 blocks: {len(chop_blocked)} trades β†’ PnL ${chop_pnl:+.2f}")
        print(f"EMA50>5% blocks: {len(ema5_blocked)} trades β†’ PnL ${ema5_pnl:+.2f}")
        print(f"Both block (overlap): {len(both_blocked)} trades β†’ PnL ${both_pnl:+.2f}")
        print(f"CHOP-only blocks (EMA would allow): {len(chop_only_blocked)} trades β†’ PnL ${sum(t['pnl_usd'] for t in chop_only_blocked):+.2f}")
        print(f"EMA-only blocks (CHOP would allow): {len(ema_only_blocked)} trades β†’ PnL ${sum(t['pnl_usd'] for t in ema_only_blocked):+.2f}")

        # Distribution
        print(f"\n--- CHOP-only blocked trades (CHOP kills, EMA50 ≀5% allows) ---")
        if chop_only_blocked:
            w = sum(1 for t in chop_only_blocked if t["pnl_usd"] > 0)
            wr = w / len(chop_only_blocked) * 100
            pnl = sum(t["pnl_usd"] for t in chop_only_blocked)
            print(f"  {len(chop_only_blocked)} trades | WR {wr:.1f}% | PnL ${pnl:+.2f}")
            print(f"  β†’ These are trades we'd GAIN by removing CHOP (if using EMA50≀5%)")

        print(f"\n--- EMA-only blocked trades (EMA50>5% kills, CHOPβ‰₯45 allows) ---")
        if ema_only_blocked:
            w = sum(1 for t in ema_only_blocked if t["pnl_usd"] > 0)
            wr = w / len(ema_only_blocked) * 100
            pnl = sum(t["pnl_usd"] for t in ema_only_blocked)
            print(f"  {len(ema_only_blocked)} trades | WR {wr:.1f}% | PnL ${pnl:+.2f}")
            print(f"  β†’ These are bad trades CHOP misses but EMA50 catches")


if __name__ == "__main__":
    run()

πŸ“œ Git History

dd32dfdchore: initial commit β€” version control setup5 weeks ago
Show last diff
Loading...