← Back
β˜†
"""
Backtest: CHOPβ‰₯45 (keep) + EMA50 distance cap (add) β€” loose filter for fat losers.
Uses full simulation Apr 7-10.
"""

import json, time, numpy as np, requests
from datetime import datetime, timezone, timedelta
from collections import defaultdict

BYBIT_KLINE_URL = "https://api.bybit.com/v5/market/kline"
BYBIT_TICKER_URL = "https://api.bybit.com/v5/market/tickers"

TP_PCT = 1.5; SL_PCT = 10.0; Z_THRESHOLD = 1.8
NATR_MIN = 0.75; NATR_MAX = 2.5; COOLDOWN_BARS = 60
BLACKLIST = {"BTCUSDT", "ETHUSDT", "USDCUSDT"}


def fetch_klines_paginated(symbol, interval, start_ms, end_ms, limit=1000):
    all_klines = []; cursor_end = end_ms
    while cursor_end > start_ms:
        params = {"category": "linear", "symbol": symbol, "interval": interval,
                  "endTime": cursor_end, "limit": limit}
        try:
            r = requests.get(BYBIT_KLINE_URL, params=params, timeout=15)
            data = r.json()
            if data.get("retCode") != 0: break
            rows = data["result"]["list"]
            if not rows: break
            rows.reverse()
            for row in rows:
                ts = int(row[0])
                if start_ms <= ts <= end_ms: all_klines.append(row)
            oldest = int(rows[0][0])
            if oldest <= start_ms: break
            cursor_end = oldest - 1; time.sleep(0.2)
        except: break
    seen = set(); unique = []
    for k in all_klines:
        ts = int(k[0])
        if ts not in seen: seen.add(ts); unique.append(k)
    unique.sort(key=lambda x: int(x[0])); return unique


def get_top_symbols(n=50):
    r = requests.get(BYBIT_TICKER_URL, params={"category": "linear"}, timeout=10)
    data = r.json()
    cands = [(t["symbol"], float(t.get("turnover24h", 0))) for t in data["result"]["list"]
             if t["symbol"].endswith("USDT") and t["symbol"] not in BLACKLIST and float(t.get("turnover24h", 0)) >= 20_000_000]
    cands.sort(key=lambda x: x[1], reverse=True)
    return [c[0] for c in cands[:n]]


def calc_indicators(highs, lows, closes, volumes, idx, z_period=50):
    if idx < max(z_period + 10, 200): return None
    h = highs[idx-z_period:idx+1]; l = lows[idx-z_period:idx+1]
    c = closes[idx-z_period:idx+1]; v = volumes[idx-z_period:idx+1]
    tp = (h+l+c)/3; ctv = np.cumsum(tp*v); cv = np.cumsum(v)
    cvs = np.where(cv==0, 1, cv); vwap_arr = ctv/cvs
    dev = c - vwap_arr; std = np.std(dev)
    if std == 0: return None
    z = float((c[-1] - vwap_arr[-1]) / std)

    s = 15; h_tr = highs[idx-s+1:idx+1]; l_tr = lows[idx-s+1:idx+1]; c_tr = closes[idx-s:idx+1]
    tr = np.maximum(h_tr-l_tr, np.maximum(np.abs(h_tr-c_tr[:-1]), np.abs(l_tr-c_tr[:-1])))
    natr = (np.mean(tr[-14:]) / closes[idx]) * 100 if closes[idx] > 0 else 0

    chop = 50.0
    ct = tr[-14:]; atr_sum = np.sum(ct)
    hi = np.max(h_tr[-14:]); lo = np.min(l_tr[-14:])
    if hi > lo: chop = float(100 * np.log10(atr_sum / (hi-lo)) / np.log10(14))

    ema50 = None
    if idx >= 50:
        ema = closes[0]; mult = 2/51
        for i in range(1, idx+1): ema = (closes[i]-ema)*mult + ema
        ema50 = ema
    ema50_dist = abs((closes[idx]-ema50)/ema50*100) if ema50 and ema50 > 0 else None

    return {"z": z, "natr": natr, "chop": chop, "ema50_dist": ema50_dist, "price": float(closes[idx])}


def simulate_deal(closes, start_idx, side):
    entry = closes[start_idx]
    for i in range(start_idx+1, min(start_idx+500, len(closes))):
        p = closes[i]
        pnl = ((p-entry)/entry*100) if side=="BUY" else ((entry-p)/entry*100)
        if pnl >= TP_PCT: return pnl, "TP", i-start_idx
        if pnl <= -SL_PCT: return pnl, "SL", i-start_idx
    p = closes[min(start_idx+499, len(closes)-1)]
    pnl = ((p-entry)/entry*100) if side=="BUY" else ((entry-p)/entry*100)
    return pnl, "TIMEOUT", 500


def run():
    start_dt = datetime(2026, 4, 7, 0, 0, tzinfo=timezone.utc)
    end_dt = datetime(2026, 4, 11, 0, 0, tzinfo=timezone.utc)
    start_ms = int(start_dt.timestamp()*1000); end_ms = int(end_dt.timestamp()*1000)
    warmup_ms = int((start_dt - timedelta(days=4)).timestamp()*1000)

    symbols = get_top_symbols(50)
    print(f"Symbols: {len(symbols)}")
    symbol_data = {}
    for idx, sym in enumerate(symbols):
        kl = fetch_klines_paginated(sym, "5", warmup_ms, end_ms)
        if kl and len(kl) > 300: symbol_data[sym] = kl
        if (idx+1) % 5 == 0: print(f"  {idx+1}/{len(symbols)}..."); time.sleep(1)
        else: time.sleep(0.3)
    print(f"Loaded: {len(symbol_data)}\n")

    # Generate ALL signals with NATR filter only (no CHOP, no EMA)
    all_signals = []
    for symbol, klines in symbol_data.items():
        closes = np.array([float(k[4]) for k in klines])
        highs = np.array([float(k[2]) for k in klines])
        lows = np.array([float(k[3]) for k in klines])
        volumes = np.array([float(k[5]) for k in klines])
        timestamps = [int(k[0]) for k in klines]
        scan_start = next((i for i, ts in enumerate(timestamps) if ts >= start_ms), None)
        if scan_start is None or scan_start < 210: continue
        scan_end = next((i for i, ts in enumerate(timestamps) if ts > end_ms), len(timestamps)-1)
        cd = 0
        for idx in range(scan_start, scan_end, 12):
            if idx <= cd: continue
            ind = calc_indicators(highs, lows, closes, volumes, idx)
            if ind is None: continue
            if ind["natr"] < NATR_MIN or ind["natr"] > NATR_MAX: continue
            side = "BUY" if ind["z"] < -Z_THRESHOLD else ("SELL" if ind["z"] > Z_THRESHOLD else None)
            if not side: continue
            pnl_pct, reason, dur = simulate_deal(closes, idx, side)
            pnl_usd = pnl_pct / 100 * 9  # $3 Γ— 3x
            all_signals.append({
                "symbol": symbol, "side": side, "z": ind["z"], "natr": ind["natr"],
                "chop": ind["chop"], "ema50_dist": ind["ema50_dist"],
                "pnl_usd": pnl_usd, "reason": reason, "duration": dur,
            })
            cd = idx + COOLDOWN_BARS

    print(f"Total signals (NATR only): {len(all_signals)}")

    # ═══════════════════════════════════════════════
    # TEST CONFIGS
    # ═══════════════════════════════════════════════
    configs = [
        ("No filters",            None,  None),
        ("CHOPβ‰₯45 only",          45,    None),
        ("CHOPβ‰₯45 + EMA≀3%",      45,    3.0),
        ("CHOPβ‰₯45 + EMA≀4%",      45,    4.0),
        ("CHOPβ‰₯45 + EMA≀5%",      45,    5.0),
        ("CHOPβ‰₯45 + EMA≀6%",      45,    6.0),
        ("CHOPβ‰₯45 + EMA≀7%",      45,    7.0),
        ("CHOPβ‰₯45 + EMA≀8%",      45,    8.0),
        ("CHOPβ‰₯45 + EMA≀10%",     45,    10.0),
    ]

    print(f"\n{'='*90}")
    print(f"{'Config':<25} {'Trades':>7} {'Cut':>5} {'WR%':>6} {'PnL':>10} {'Avg':>8} {'SLs':>5} {'SL PnL':>10} {'PF':>6}")
    print("="*90)

    for name, chop_min, ema_max in configs:
        filtered = []
        for t in all_signals:
            if chop_min is not None and t["chop"] < chop_min: continue
            if ema_max is not None and t["ema50_dist"] is not None and t["ema50_dist"] > ema_max: continue
            filtered.append(t)
        if not filtered: continue
        pnl = sum(t["pnl_usd"] for t in filtered)
        w = sum(1 for t in filtered if t["pnl_usd"] > 0)
        wr = w / len(filtered) * 100
        avg = pnl / len(filtered)
        sls = sum(1 for t in filtered if t["reason"] == "SL")
        sl_pnl = sum(t["pnl_usd"] for t in filtered if t["reason"] == "SL")
        gp = sum(t["pnl_usd"] for t in filtered if t["pnl_usd"] > 0)
        gl = abs(sum(t["pnl_usd"] for t in filtered if t["pnl_usd"] <= 0))
        pf = gp / gl if gl > 0 else 999
        cut = len(all_signals) - len(filtered)
        print(f"{name:<25} {len(filtered):>7} {f'-{cut}':>5} {wr:>5.1f}% ${pnl:>+8.2f} ${avg:>+7.4f} {sls:>5} ${sl_pnl:>+8.2f} {pf:>5.2f}")

    # ═══════════════════════════════════════════════
    # What EMA catches that CHOP misses (CHOP passes, EMA blocks)
    # ═══════════════════════════════════════════════
    print(f"\n{'='*90}")
    print("WHAT DOES EMA CATCH THAT CHOP MISSES?")
    print(f"{'='*90}")

    for ema_cap in [5.0, 6.0, 7.0, 8.0]:
        # Trades that pass CHOPβ‰₯45 but have EMA > cap
        caught = [t for t in all_signals if t["chop"] >= 45 and t["ema50_dist"] is not None and t["ema50_dist"] > ema_cap]
        if not caught: continue
        pnl = sum(t["pnl_usd"] for t in caught)
        w = sum(1 for t in caught if t["pnl_usd"] > 0)
        wr = w / len(caught) * 100
        sls = [t for t in caught if t["reason"] == "SL"]

        print(f"\nCHOP passes but EMA50 > {ema_cap}%: {len(caught)} trades | WR {wr:.0f}% | PnL ${pnl:+.2f}")
        # Show them
        caught.sort(key=lambda x: x["pnl_usd"])
        for t in caught:
            emoji = "❌" if t["pnl_usd"] < -0.3 else ("βœ…" if t["pnl_usd"] > 0 else "βšͺ")
            print(f"  {emoji} {t['symbol']:<15} {t['side']:<5} dist={t['ema50_dist']:.1f}%  PnL ${t['pnl_usd']:+.2f}  CHOP={t['chop']:.0f}  {t['reason']}")

    # ═══════════════════════════════════════════════
    # Marginal: what each EMA band adds/removes from CHOP baseline
    # ═══════════════════════════════════════════════
    print(f"\n{'='*90}")
    print("MARGINAL: trades REMOVED from CHOP baseline by each EMA band")
    print(f"{'='*90}")

    chop_trades = [t for t in all_signals if t["chop"] >= 45]
    bands = [(3, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 10), (10, 99)]
    print(f"\n{'EMA Band':<12} {'Removed':>8} {'WR%':>6} {'PnL':>10} {'Verdict'}")
    print("-" * 55)
    for lo, hi in bands:
        removed = [t for t in chop_trades if t["ema50_dist"] is not None and lo <= t["ema50_dist"] < hi]
        if not removed: continue
        pnl = sum(t["pnl_usd"] for t in removed)
        w = sum(1 for t in removed if t["pnl_usd"] > 0)
        wr = w / len(removed) * 100
        verdict = "βœ… CUT" if pnl < -0.5 else ("⚠️ MIXED" if pnl < 0 else "❌ KEEP")
        label = f"{lo}-{hi}%" if hi < 99 else f"{lo}%+"
        print(f"{label:<12} {len(removed):>8} {wr:>5.1f}% ${pnl:>+8.2f}  {verdict}")


if __name__ == "__main__":
    run()

πŸ“œ Git History

dd32dfdchore: initial commit β€” version control setup5 weeks ago
Show last diff
Loading...