← Back
β˜†
"""
Deep analysis: EMA50 threshold 3% vs 5% β€” granular 0.5% steps.
Uses the enriched trade data from backtest_chop_vs_ema (full simulation).
Re-runs simulation with fine-grained EMA thresholds + bucket analysis.
"""

import json
import time
import numpy as np
import requests
from datetime import datetime, timezone, timedelta
from collections import defaultdict

BYBIT_KLINE_URL = "https://api.bybit.com/v5/market/kline"
BYBIT_TICKER_URL = "https://api.bybit.com/v5/market/tickers"

TP_PCT = 1.5
SL_PCT = 10.0
Z_THRESHOLD = 1.8
NATR_MIN = 0.75
NATR_MAX = 2.5
COOLDOWN_BARS = 60
BLACKLIST = {"BTCUSDT", "ETHUSDT", "USDCUSDT"}


def fetch_klines_paginated(symbol, interval, start_ms, end_ms, limit=1000):
    all_klines = []
    cursor_end = end_ms
    while cursor_end > start_ms:
        params = {"category": "linear", "symbol": symbol, "interval": interval,
                  "endTime": cursor_end, "limit": limit}
        try:
            r = requests.get(BYBIT_KLINE_URL, params=params, timeout=15)
            data = r.json()
            if data.get("retCode") != 0: break
            rows = data["result"]["list"]
            if not rows: break
            rows.reverse()
            for row in rows:
                ts = int(row[0])
                if start_ms <= ts <= end_ms:
                    all_klines.append(row)
            oldest = int(rows[0][0])
            if oldest <= start_ms: break
            cursor_end = oldest - 1
            time.sleep(0.2)
        except Exception as e:
            print(f"  Error {symbol}: {e}")
            break
    seen = set()
    unique = []
    for k in all_klines:
        ts = int(k[0])
        if ts not in seen:
            seen.add(ts)
            unique.append(k)
    unique.sort(key=lambda x: int(x[0]))
    return unique


def get_top_symbols(n=50):
    r = requests.get(BYBIT_TICKER_URL, params={"category": "linear"}, timeout=10)
    data = r.json()
    candidates = []
    for t in data["result"]["list"]:
        sym = t["symbol"]
        if not sym.endswith("USDT") or sym in BLACKLIST: continue
        vol = float(t.get("turnover24h", 0))
        if vol >= 20_000_000:
            candidates.append((sym, vol))
    candidates.sort(key=lambda x: x[1], reverse=True)
    return [c[0] for c in candidates[:n]]


def calc_indicators(highs, lows, closes, volumes, idx, z_period=50):
    if idx < max(z_period + 10, 200):
        return None
    h = highs[idx - z_period:idx + 1]
    l = lows[idx - z_period:idx + 1]
    c = closes[idx - z_period:idx + 1]
    v = volumes[idx - z_period:idx + 1]
    tp = (h + l + c) / 3
    cum_tp_vol = np.cumsum(tp * v)
    cum_vol = np.cumsum(v)
    cum_vol_safe = np.where(cum_vol == 0, 1, cum_vol)
    vwap_arr = cum_tp_vol / cum_vol_safe
    vwap = vwap_arr[-1]
    deviations = c - vwap_arr
    std = np.std(deviations)
    if std == 0: return None
    z_score = float((c[-1] - vwap) / std)

    tr_slice = 15
    h_tr = highs[idx - tr_slice + 1:idx + 1]
    l_tr = lows[idx - tr_slice + 1:idx + 1]
    c_tr = closes[idx - tr_slice:idx + 1]
    tr = np.maximum(h_tr - l_tr, np.maximum(np.abs(h_tr - c_tr[:-1]), np.abs(l_tr - c_tr[:-1])))
    atr = np.mean(tr[-14:])
    natr = (atr / closes[idx]) * 100 if closes[idx] > 0 else 0

    ema50 = None
    if idx >= 50:
        ema = closes[0]
        mult = 2 / 51
        for i in range(1, idx + 1):
            ema = (closes[i] - ema) * mult + ema
        ema50 = ema

    ema50_dist = None
    if ema50 and ema50 > 0:
        ema50_dist = abs((closes[idx] - ema50) / ema50 * 100)

    return {"z": z_score, "natr": natr, "ema50_dist": ema50_dist, "price": float(closes[idx])}


def simulate_deal(closes, start_idx, side, tp_pct, sl_pct):
    entry = closes[start_idx]
    for i in range(start_idx + 1, min(start_idx + 500, len(closes))):
        price = closes[i]
        pnl_pct = ((price - entry) / entry * 100) if side == "BUY" else ((entry - price) / entry * 100)
        if pnl_pct >= tp_pct: return pnl_pct, "TP", i - start_idx
        if pnl_pct <= -sl_pct: return pnl_pct, "SL", i - start_idx
    price = closes[min(start_idx + 499, len(closes) - 1)]
    pnl_pct = ((price - entry) / entry * 100) if side == "BUY" else ((entry - price) / entry * 100)
    return pnl_pct, "TIMEOUT", 500


def run():
    start_dt = datetime(2026, 4, 7, 0, 0, tzinfo=timezone.utc)
    end_dt = datetime(2026, 4, 11, 0, 0, tzinfo=timezone.utc)
    start_ms = int(start_dt.timestamp() * 1000)
    end_ms = int(end_dt.timestamp() * 1000)
    warmup_start = start_dt - timedelta(days=4)
    warmup_ms = int(warmup_start.timestamp() * 1000)

    symbols = get_top_symbols(50)
    print(f"Symbols: {len(symbols)}")

    symbol_data = {}
    for idx, sym in enumerate(symbols):
        klines = fetch_klines_paginated(sym, "5", warmup_ms, end_ms)
        if klines and len(klines) > 300:
            symbol_data[sym] = klines
        if (idx + 1) % 5 == 0:
            print(f"  {idx+1}/{len(symbols)}...")
            time.sleep(1)
        else:
            time.sleep(0.3)

    print(f"Loaded: {len(symbol_data)} symbols\n")

    # Generate ALL signals (no EMA filter, only NATR)
    all_signals = []
    for symbol, klines in symbol_data.items():
        closes = np.array([float(k[4]) for k in klines])
        highs = np.array([float(k[2]) for k in klines])
        lows = np.array([float(k[3]) for k in klines])
        volumes = np.array([float(k[5]) for k in klines])
        timestamps = [int(k[0]) for k in klines]

        scan_start = None
        for i, ts in enumerate(timestamps):
            if ts >= start_ms:
                scan_start = i
                break
        if scan_start is None or scan_start < 210:
            continue

        scan_end = len(timestamps) - 1
        for i, ts in enumerate(timestamps):
            if ts > end_ms:
                scan_end = i
                break

        cooldown_until = 0
        for idx in range(scan_start, scan_end, 12):
            if idx <= cooldown_until:
                continue
            ind = calc_indicators(highs, lows, closes, volumes, idx)
            if ind is None: continue
            if ind["natr"] < NATR_MIN or ind["natr"] > NATR_MAX: continue

            side = None
            if ind["z"] < -Z_THRESHOLD: side = "BUY"
            elif ind["z"] > Z_THRESHOLD: side = "SELL"
            if not side: continue

            pnl_pct, reason, duration = simulate_deal(closes, idx, side, TP_PCT, SL_PCT)
            pnl_usd = pnl_pct / 100 * 3.0 * 3

            all_signals.append({
                "symbol": symbol,
                "side": side,
                "z": ind["z"],
                "natr": ind["natr"],
                "ema50_dist": ind["ema50_dist"],
                "pnl_pct": pnl_pct,
                "pnl_usd": pnl_usd,
                "reason": reason,
                "duration": duration,
            })
            cooldown_until = idx + COOLDOWN_BARS

    print(f"Total unfiltered signals: {len(all_signals)}")
    base_pnl = sum(t["pnl_usd"] for t in all_signals)
    base_w = sum(1 for t in all_signals if t["pnl_usd"] > 0)
    base_wr = base_w / len(all_signals) * 100
    print(f"Baseline (NATR only): {len(all_signals)} | WR {base_wr:.1f}% | PnL ${base_pnl:+.2f}\n")

    # ═══════════════════════════════════════════════
    # PART 1: Granular EMA thresholds (0.5% steps)
    # ═══════════════════════════════════════════════
    print("=" * 85)
    print("PART 1: EMA50 distance threshold β€” 0.5% steps")
    print("=" * 85)
    print(f"\n{'Threshold':<12} {'Trades':>7} {'vs Base':>8} {'WR%':>6} {'PnL':>10} {'Avg':>8} {'SLs':>5} {'SL$':>10} {'$/trade↑':>10}")
    print("-" * 90)

    for threshold in [1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0, 5.5, 6.0, 7.0, 8.0, 99]:
        filtered = [t for t in all_signals if t["ema50_dist"] is not None and (t["ema50_dist"] <= threshold or threshold >= 99)]
        if not filtered: continue
        pnl = sum(t["pnl_usd"] for t in filtered)
        w = sum(1 for t in filtered if t["pnl_usd"] > 0)
        wr = w / len(filtered) * 100
        avg = pnl / len(filtered)
        sls = sum(1 for t in filtered if t["reason"] == "SL")
        sl_pnl = sum(t["pnl_usd"] for t in filtered if t["reason"] == "SL")
        cut = len(all_signals) - len(filtered)
        label = "ALL" if threshold >= 99 else f"≀{threshold}%"
        # marginal: what does the NEXT 0.5% add?
        print(f"{label:<12} {len(filtered):>7} {f'-{cut}':>8} {wr:>5.1f}% ${pnl:>+8.2f} ${avg:>+7.4f} {sls:>5} ${sl_pnl:>+8.2f}")

    # ═══════════════════════════════════════════════
    # PART 2: What's IN each bucket (marginal analysis)
    # ═══════════════════════════════════════════════
    print(f"\n{'='*85}")
    print("PART 2: MARGINAL ANALYSIS β€” what each 1% band adds")
    print(f"{'='*85}")

    bands = [(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7), (7, 99)]
    print(f"\n{'Band':<12} {'Trades':>7} {'WR%':>6} {'PnL':>10} {'Avg':>8} {'SLs':>5} {'Top Losers'}")
    print("-" * 90)

    for lo, hi in bands:
        band_label = f"{lo}-{hi}%" if hi < 99 else f"{lo}%+"
        band_trades = [t for t in all_signals
                       if t["ema50_dist"] is not None and lo <= t["ema50_dist"] < hi]
        if not band_trades: continue
        pnl = sum(t["pnl_usd"] for t in band_trades)
        w = sum(1 for t in band_trades if t["pnl_usd"] > 0)
        wr = w / len(band_trades) * 100
        avg = pnl / len(band_trades)
        sls = sum(1 for t in band_trades if t["reason"] == "SL")
        # Top losers in this band
        losers = sorted(band_trades, key=lambda x: x["pnl_usd"])[:3]
        loser_str = ", ".join(f"{t['symbol'][:8]} ${t['pnl_usd']:+.2f}" for t in losers if t["pnl_usd"] < 0)
        print(f"{band_label:<12} {len(band_trades):>7} {wr:>5.1f}% ${pnl:>+8.2f} ${avg:>+7.4f} {sls:>5}  {loser_str or 'none'}")

    # ═══════════════════════════════════════════════
    # PART 3: Cumulative PnL curve (add trades by dist)
    # ═══════════════════════════════════════════════
    print(f"\n{'='*85}")
    print("PART 3: CUMULATIVE PnL β€” adding trades from closest β†’ farthest from EMA50")
    print("(Shows when adding more distant trades starts HURTING)")
    print(f"{'='*85}")

    valid = [t for t in all_signals if t["ema50_dist"] is not None]
    valid.sort(key=lambda x: x["ema50_dist"])

    cumul_pnl = 0
    peak_pnl = -999
    peak_idx = 0
    peak_dist = 0

    print(f"\n{'#':>5} {'Dist%':>7} {'Symbol':<12} {'Side':<5} {'PnL':>8} {'Cumul':>10} {'WR%':>6}")
    print("-" * 60)

    wins = 0
    for i, t in enumerate(valid):
        cumul_pnl += t["pnl_usd"]
        if t["pnl_usd"] > 0: wins += 1
        wr = wins / (i + 1) * 100
        if cumul_pnl > peak_pnl:
            peak_pnl = cumul_pnl
            peak_idx = i
            peak_dist = t["ema50_dist"]
        # Print every 10th + last + around interesting zones
        if (i + 1) % 10 == 0 or i == len(valid) - 1 or t["pnl_usd"] < -0.5:
            print(f"{i+1:>5} {t['ema50_dist']:>6.1f}% {t['symbol']:<12} {t['side']:<5} ${t['pnl_usd']:>+6.2f} ${cumul_pnl:>+8.2f} {wr:>5.1f}%")

    print(f"\nπŸ† Peak cumulative PnL: ${peak_pnl:+.2f} at trade #{peak_idx+1} (EMA50 dist ≀{peak_dist:.1f}%)")

    # ═══════════════════════════════════════════════
    # PART 4: Risk-adjusted: PnL per trade (efficiency)
    # ═══════════════════════════════════════════════
    print(f"\n{'='*85}")
    print("PART 4: EFFICIENCY β€” PnL/trade Γ— volume (total PnL) for each threshold")
    print("Finding the OPTIMAL balance between quality and quantity")
    print(f"{'='*85}")

    print(f"\n{'Threshold':<12} {'Trades':>7} {'PnL':>10} {'$/trade':>8} {'WR%':>6} {'Profit Factor':>14} {'Max Drawdown':>13}")
    print("-" * 80)

    for threshold in [2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0, 6.0, 99]:
        filtered = [t for t in all_signals if t["ema50_dist"] is not None and (t["ema50_dist"] <= threshold or threshold >= 99)]
        if not filtered: continue
        gross_profit = sum(t["pnl_usd"] for t in filtered if t["pnl_usd"] > 0)
        gross_loss = abs(sum(t["pnl_usd"] for t in filtered if t["pnl_usd"] <= 0))
        pf = gross_profit / gross_loss if gross_loss > 0 else 999
        pnl = sum(t["pnl_usd"] for t in filtered)
        avg = pnl / len(filtered)
        w = sum(1 for t in filtered if t["pnl_usd"] > 0)
        wr = w / len(filtered) * 100
        # Max drawdown
        cumul = 0
        peak = 0
        max_dd = 0
        for t in filtered:
            cumul += t["pnl_usd"]
            if cumul > peak: peak = cumul
            dd = peak - cumul
            if dd > max_dd: max_dd = dd
        label = "ALL" if threshold >= 99 else f"≀{threshold}%"
        print(f"{label:<12} {len(filtered):>7} ${pnl:>+8.2f} ${avg:>+7.4f} {wr:>5.1f}% {pf:>13.2f} ${max_dd:>11.2f}")

    # ═══════════════════════════════════════════════
    # PART 5: Per-symbol breakdown (who benefits from filter)
    # ═══════════════════════════════════════════════
    print(f"\n{'='*85}")
    print("PART 5: PER-SYMBOL β€” which coins hurt most at dist>3% vs >5%")
    print(f"{'='*85}")

    by_sym_3plus = defaultdict(list)
    by_sym_5plus = defaultdict(list)
    for t in all_signals:
        d = t.get("ema50_dist")
        if d is None: continue
        if d > 3.0: by_sym_3plus[t["symbol"]].append(t)
        if d > 5.0: by_sym_5plus[t["symbol"]].append(t)

    print(f"\n--- Trades with EMA50 dist 3-5% (the contested zone) ---")
    zone_3_5 = [t for t in all_signals if t["ema50_dist"] is not None and 3.0 < t["ema50_dist"] <= 5.0]
    if zone_3_5:
        by_sym = defaultdict(list)
        for t in zone_3_5: by_sym[t["symbol"]].append(t)

        print(f"\n{'Symbol':<15} {'Trades':>7} {'WR%':>6} {'PnL':>10} {'Avg Dist':>9}")
        print("-" * 55)
        sym_stats = []
        for sym, trades in sorted(by_sym.items()):
            pnl = sum(t["pnl_usd"] for t in trades)
            w = sum(1 for t in trades if t["pnl_usd"] > 0)
            wr = w / len(trades) * 100
            avg_dist = np.mean([t["ema50_dist"] for t in trades])
            sym_stats.append((sym, len(trades), wr, pnl, avg_dist))

        sym_stats.sort(key=lambda x: x[3])
        for sym, n, wr, pnl, avg_d in sym_stats:
            print(f"{sym:<15} {n:>7} {wr:>5.1f}% ${pnl:>+8.2f} {avg_d:>8.1f}%")

        total_pnl = sum(t["pnl_usd"] for t in zone_3_5)
        total_w = sum(1 for t in zone_3_5 if t["pnl_usd"] > 0)
        total_wr = total_w / len(zone_3_5) * 100
        print(f"\n{'TOTAL 3-5%':<15} {len(zone_3_5):>7} {total_wr:>5.1f}% ${total_pnl:>+8.2f}")
        print(f"\nβ†’ Cutting ≀3% LOSES these {len(zone_3_5)} trades (PnL ${total_pnl:+.2f})")
        print(f"β†’ Cutting ≀5% KEEPS them")


if __name__ == "__main__":
    run()

πŸ“œ Git History

dd32dfdchore: initial commit β€” version control setup5 weeks ago
Show last diff
Loading...