← Back
ā˜†
"""
Backtest: DCA Z-VWAP with Simplified Filters v2
=================================================
Dynamic screener (ALL Bybit USDT perps), filters:
  - Volume >= $20M
  - NATR >= 0.75%  (no max cap, no CHOP)
  - EMA trend filter: LONG only above EMA, SHORT only below
  - No Z-max cap (removed)
  - Blacklist: only heavyweights (BTC, ETH, TRX, stables)

Usage:
  python backtests/backtest_simplified_filters.py
"""

import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

import json
import time
import numpy as np
from datetime import datetime
from pybit.unified_trading import HTTP

# ============================================================
# CONFIG
# ============================================================
DCA_CONFIG = {
    "base_order_usd": 7.0,
    "safety_order_usd": 5.0,
    "max_safety_orders": 3,
    "natr_factor": 1.0,             # SO spacing = NATR Ɨ factor Ɨ step_scale^n
    "step_scale": 1.3,
    "volume_scale": 1.3,
    "take_profit_pct": 1.5,
    "stop_loss_pct": 8.0,
    "z_entry_threshold": 1.8,
    "z_tp_threshold": 0.3,
    "cooldown_bars": 12,            # 12Ɨ5m = 1h
    "leverage": 3,
    "ema_period": 50,               # EMA trend filter (50 bars Ɨ 5m = ~4h)
}

VWAP_PERIOD = 50
MAKER_FEE = 0.0002
TAKER_FEE = 0.00055
TIMEFRAME = "5"
DAYS = 7

# === SIMPLIFIED FILTERS ===
MIN_NATR_PCT = 0.75       # only minimum, no max
MIN_VOLUME_24H = 20_000_000
MAX_DEALS_CONCURRENT = 6

# === SLIM BLACKLIST — only heavyweights ===
BLACKLIST = {
    "BTCUSDT", "ETHUSDT", "TRXUSDT",
    "USDCUSDT", "BTCPERP", "ETHPERP",
}

session = HTTP(testnet=False)


# ============================================================
# DYNAMIC SCREENER — fetch all USDT perps, filter by volume
# ============================================================
def get_screener_symbols():
    """Get all tradeable USDT perps with volume >= threshold."""
    print("šŸ“” Fetching all Bybit USDT perps...")
    resp = session.get_tickers(category="linear")
    if resp["retCode"] != 0:
        print(f"  āŒ Ticker error: {resp['retMsg']}")
        return []

    symbols = []
    for t in resp["result"]["list"]:
        sym = t["symbol"]
        if not sym.endswith("USDT"):
            continue
        if sym in BLACKLIST:
            continue
        vol24h = float(t.get("turnover24h", 0))
        if vol24h < MIN_VOLUME_24H:
            continue
        symbols.append({
            "symbol": sym,
            "volume_24h": vol24h,
            "price": float(t.get("lastPrice", 0)),
        })

    symbols.sort(key=lambda x: x["volume_24h"], reverse=True)
    print(f"  āœ… {len(symbols)} symbols pass volume filter (>= ${MIN_VOLUME_24H/1e6:.0f}M)")
    return symbols


# ============================================================
# DATA FETCHING
# ============================================================
def fetch_klines(symbol: str, interval: str, days: int = 7) -> list[dict]:
    """Fetch klines from Bybit."""
    all_klines = []
    bars_needed = days * 24 * 60 // int(interval)
    end_time = int(datetime.now().timestamp() * 1000)

    while len(all_klines) < bars_needed:
        try:
            resp = session.get_kline(
                category="linear",
                symbol=symbol,
                interval=interval,
                limit=1000,
                end=end_time,
            )
            if resp["retCode"] != 0:
                break
            items = resp["result"]["list"]
            if not items:
                break
            for item in items:
                all_klines.append({
                    "ts": int(item[0]),
                    "open": float(item[1]),
                    "high": float(item[2]),
                    "low": float(item[3]),
                    "close": float(item[4]),
                    "volume": float(item[5]),
                })
            end_time = int(items[-1][0]) - 1
            if len(items) < 1000:
                break
        except Exception as e:
            print(f"  Fetch error {symbol}: {e}")
            break

    all_klines.reverse()
    seen = set()
    unique = []
    for k in all_klines:
        if k["ts"] not in seen:
            seen.add(k["ts"])
            unique.append(k)
    return unique[-bars_needed:] if len(unique) > bars_needed else unique


# ============================================================
# INDICATORS
# ============================================================
def calc_zvwap(highs, lows, closes, volumes, period=VWAP_PERIOD):
    """Z-Score from rolling VWAP."""
    n = len(closes)
    z_scores = np.full(n, 0.0)
    for i in range(period, n):
        h = highs[i-period:i]
        l = lows[i-period:i]
        c = closes[i-period:i]
        v = volumes[i-period:i]
        tp = (h + l + c) / 3
        cum_tp_vol = np.cumsum(tp * v)
        cum_vol = np.cumsum(v)
        cum_vol_safe = np.where(cum_vol == 0, 1, cum_vol)
        vwap_arr = cum_tp_vol / cum_vol_safe
        vwap = vwap_arr[-1]
        deviations = c - vwap_arr
        std = np.std(deviations)
        if std > 0:
            z_scores[i] = (closes[i] - vwap) / std
    return z_scores


def calc_natr(highs, lows, closes, period=14):
    """Normalized ATR (%) over last `period` bars."""
    n = len(closes)
    if n < period + 1:
        return 0.0
    trs = []
    for i in range(n - period, n):
        tr = max(highs[i] - lows[i],
                 abs(highs[i] - closes[i-1]),
                 abs(lows[i] - closes[i-1]))
        trs.append(tr)
    atr = np.mean(trs)
    return (atr / closes[-1]) * 100 if closes[-1] > 0 else 0.0


def calc_ema(closes, period):
    """Calculate EMA array. Returns np array same length as closes."""
    ema = np.full(len(closes), np.nan)
    if len(closes) < period:
        return ema
    # Seed with SMA
    ema[period - 1] = np.mean(closes[:period])
    k = 2.0 / (period + 1)
    for i in range(period, len(closes)):
        ema[i] = closes[i] * k + ema[i - 1] * (1 - k)
    return ema


# ============================================================
# DCA DEAL SIMULATOR
# ============================================================
class DCADeal:
    def __init__(self, symbol, side, entry_price, entry_bar, config, natr_pct):
        self.symbol = symbol
        self.side = side
        self.config = config
        self.entry_bar = entry_bar
        self.natr_pct = natr_pct

        leverage = config["leverage"]
        self.orders = [{
            "price": entry_price,
            "usd": config["base_order_usd"],
            "qty": (config["base_order_usd"] * leverage) / entry_price,
            "type": "BO",
        }]

        # NATR-based SO spacing (like live bot)
        self.so_triggers = []
        cumul_deviation = 0
        for i in range(config["max_safety_orders"]):
            spacing = natr_pct * config["natr_factor"] * (config["step_scale"] ** i)
            cumul_deviation += spacing
            so_size = config["safety_order_usd"] * (config["volume_scale"] ** i)

            if side == "LONG":
                trigger_price = entry_price * (1 - cumul_deviation / 100)
            else:
                trigger_price = entry_price * (1 + cumul_deviation / 100)

            self.so_triggers.append({
                "price": trigger_price,
                "usd": so_size,
                "deviation_pct": cumul_deviation,
                "filled": False,
            })

        self.so_filled = 0
        self.so_fill_times = []
        self.closed = False
        self.close_price = 0
        self.close_bar = 0
        self.close_reason = ""
        self.pnl = 0
        self.fees = 0
        self.max_drawdown_pct = 0

    @property
    def avg_entry(self):
        total_qty = sum(o["qty"] for o in self.orders)
        total_cost = sum(o["qty"] * o["price"] for o in self.orders)
        return total_cost / total_qty if total_qty > 0 else 0

    @property
    def total_qty(self):
        return sum(o["qty"] for o in self.orders)

    @property
    def total_invested(self):
        return sum(o["usd"] for o in self.orders)

    def tick(self, bar_idx, high, low, close, z_score):
        if self.closed:
            return True

        leverage = self.config["leverage"]

        # Check SO fills
        for i, so in enumerate(self.so_triggers):
            if so["filled"]:
                continue
            filled = False
            if self.side == "LONG" and low <= so["price"]:
                filled = True
            elif self.side == "SHORT" and high >= so["price"]:
                filled = True
            if filled:
                qty = (so["usd"] * leverage) / so["price"]
                self.orders.append({
                    "price": so["price"],
                    "usd": so["usd"],
                    "qty": qty,
                    "type": f"SO{i+1}",
                })
                so["filled"] = True
                self.so_filled += 1
                self.so_fill_times.append(bar_idx)
                self.fees += so["usd"] * leverage * MAKER_FEE

        avg = self.avg_entry

        # Max drawdown
        if self.side == "LONG":
            dd_pct = (avg - low) / avg * 100
        else:
            dd_pct = (high - avg) / avg * 100
        self.max_drawdown_pct = max(self.max_drawdown_pct, dd_pct)

        # SO Guard: 3 SOs in 15 bars (75 min on 5m) → force close
        if len(self.so_fill_times) >= 3:
            last3 = self.so_fill_times[-3:]
            if last3[-1] - last3[0] <= 15:
                self.close_price = close
                self._close(bar_idx, "SO_GUARD")
                return True

        # Time Stop (progressive)
        bars_open = bar_idx - self.entry_bar
        hours_open = bars_open * 5 / 60  # 5m bars
        time_limits = {1: 3.0, 2: 2.0, 3: 1.5}
        for min_sos, max_hours in time_limits.items():
            if self.so_filled >= min_sos and hours_open >= max_hours:
                self.close_price = close
                self._close(bar_idx, f"TIME_STOP({self.so_filled}SO/{hours_open:.1f}h)")
                return True

        # TP
        tp_price = avg * (1 + self.config["take_profit_pct"] / 100) if self.side == "LONG" else \
                   avg * (1 - self.config["take_profit_pct"] / 100)
        tp_hit = False
        if self.side == "LONG" and high >= tp_price:
            tp_hit = True
            self.close_price = tp_price
        elif self.side == "SHORT" and low <= tp_price:
            tp_hit = True
            self.close_price = tp_price

        # Z-reversion TP
        z_tp = False
        if self.side == "LONG" and z_score >= -self.config["z_tp_threshold"]:
            if close > avg:
                z_tp = True
                self.close_price = close
        elif self.side == "SHORT" and z_score <= self.config["z_tp_threshold"]:
            if close < avg:
                z_tp = True
                self.close_price = close

        # SL
        sl_price = avg * (1 - self.config["stop_loss_pct"] / 100) if self.side == "LONG" else \
                   avg * (1 + self.config["stop_loss_pct"] / 100)
        sl_hit = False
        if self.side == "LONG" and low <= sl_price:
            sl_hit = True
            self.close_price = sl_price
        elif self.side == "SHORT" and high >= sl_price:
            sl_hit = True
            self.close_price = sl_price

        if sl_hit:
            self._close(bar_idx, "SL")
            return True
        elif tp_hit:
            self._close(bar_idx, "TP%")
            return True
        elif z_tp:
            self._close(bar_idx, "Z-TP")
            return True
        return False

    def _close(self, bar_idx, reason):
        self.closed = True
        self.close_bar = bar_idx
        self.close_reason = reason
        total_qty = self.total_qty
        avg = self.avg_entry
        if self.side == "LONG":
            self.pnl = total_qty * (self.close_price - avg)
        else:
            self.pnl = total_qty * (avg - self.close_price)
        exit_fee = total_qty * self.close_price * TAKER_FEE
        entry_fee = self.orders[0]["qty"] * self.orders[0]["price"] * TAKER_FEE
        self.fees += exit_fee + entry_fee
        self.pnl -= self.fees


# ============================================================
# BACKTEST ENGINE (multi-symbol concurrent deals)
# ============================================================
def run_backtest(symbols_data, config):
    """Run backtest across all symbols with concurrent deal limit."""
    print(f"\n{'='*70}")
    print(f"  Running {DAYS}-day backtest on {len(symbols_data)} symbols")
    print(f"  Filters: NATR >= {MIN_NATR_PCT}%, Vol >= ${MIN_VOLUME_24H/1e6:.0f}M")
    print(f"  Blacklist: {sorted(BLACKLIST)}")
    print(f"  Max concurrent deals: {MAX_DEALS_CONCURRENT}")
    print(f"{'='*70}")
    print(f"  BO=${config['base_order_usd']}, SO=${config['safety_order_usd']}, "
          f"MaxSO={config['max_safety_orders']}, SL={config['stop_loss_pct']}%, "
          f"TP={config['take_profit_pct']}%, Lev={config['leverage']}x")
    print(f"  Z-entry={config['z_entry_threshold']}, Z-TP={config['z_tp_threshold']}, "
          f"EMA trend={config['ema_period']} (trade WITH trend only)")
    print()

    # Fetch data for all symbols
    all_kline_data = {}
    skipped_natr = []

    for idx, sd in enumerate(symbols_data):
        sym = sd["symbol"]
        print(f"  [{idx+1}/{len(symbols_data)}] {sym} (Vol ${sd['volume_24h']/1e6:.1f}M)...", end=" ", flush=True)
        time.sleep(0.15)  # rate limit

        klines = fetch_klines(sym, TIMEFRAME, days=DAYS)
        if len(klines) < VWAP_PERIOD + 50:
            print(f"āš ļø {len(klines)} bars, skip")
            continue

        # Calculate NATR on fetched data
        closes = np.array([k["close"] for k in klines])
        highs = np.array([k["high"] for k in klines])
        lows = np.array([k["low"] for k in klines])
        natr = calc_natr(highs, lows, closes, 14)

        if natr < MIN_NATR_PCT:
            print(f"āš ļø NATR {natr:.2f}% < {MIN_NATR_PCT}%, skip")
            skipped_natr.append((sym, natr))
            continue

        print(f"āœ… {len(klines)} bars, NATR={natr:.2f}%")
        all_kline_data[sym] = {
            "klines": klines,
            "natr": natr,
            "volume": sd["volume_24h"],
        }

    print(f"\n  šŸ“Š {len(all_kline_data)} symbols ready for backtest")
    if skipped_natr:
        print(f"  ā­ļø  Skipped (low NATR): {', '.join(f'{s}({n:.2f}%)' for s,n in skipped_natr[:10])}")

    # Align all data to common timeline using bar index
    # For simplicity: run each symbol independently but track concurrent deals globally
    # Build unified timeline: merge all bars from all symbols
    all_timestamps = set()
    for sym, data in all_kline_data.items():
        for k in data["klines"]:
            all_timestamps.add(k["ts"])
    timeline = sorted(all_timestamps)
    ts_to_idx = {ts: i for i, ts in enumerate(timeline)}

    # Index klines by timestamp for each symbol
    sym_bars = {}
    for sym, data in all_kline_data.items():
        bars_by_ts = {}
        for k in data["klines"]:
            bars_by_ts[k["ts"]] = k
        sym_bars[sym] = bars_by_ts

    # Pre-compute Z-scores and EMA for each symbol
    sym_z = {}
    sym_ema = {}
    sym_np = {}
    ema_period = config.get("ema_period", 50)
    for sym, data in all_kline_data.items():
        klines = data["klines"]
        closes = np.array([k["close"] for k in klines])
        highs = np.array([k["high"] for k in klines])
        lows = np.array([k["low"] for k in klines])
        volumes = np.array([k["volume"] for k in klines])
        z_scores = calc_zvwap(highs, lows, closes, volumes, VWAP_PERIOD)
        ema_arr = calc_ema(closes, ema_period)
        sym_z[sym] = {klines[i]["ts"]: z_scores[i] for i in range(len(klines))}
        sym_ema[sym] = {klines[i]["ts"]: ema_arr[i] for i in range(len(klines))}
        sym_np[sym] = {"highs": highs, "lows": lows, "closes": closes, "volumes": volumes,
                       "timestamps": [k["ts"] for k in klines]}

    # Simulate with global concurrent deal limit
    active_deals = {}  # symbol -> DCADeal
    all_deals = []
    cooldowns = {}  # symbol -> cooldown_until_ts
    signals_generated = 0
    signals_skipped_max_deals = 0
    signals_skipped_ema = 0

    for ts in timeline:
        # Check active deals
        for sym in list(active_deals.keys()):
            deal = active_deals[sym]
            if ts not in sym_bars.get(sym, {}):
                continue
            bar = sym_bars[sym][ts]
            z = sym_z[sym].get(ts, 0)
            closed = deal.tick(
                ts_to_idx[ts],
                bar["high"], bar["low"], bar["close"], z
            )
            if closed:
                all_deals.append(deal)
                cooldowns[sym] = ts + config["cooldown_bars"] * 5 * 60 * 1000  # cooldown in ms
                del active_deals[sym]

        # Check for new entries
        for sym, data in all_kline_data.items():
            if sym in active_deals:
                continue
            if ts not in sym_bars.get(sym, {}):
                continue
            if sym in cooldowns and ts < cooldowns[sym]:
                continue

            z = sym_z[sym].get(ts, 0)
            bar = sym_bars[sym][ts]
            ema_val = sym_ema[sym].get(ts, np.nan)

            # Check Z thresholds
            if abs(z) <= config["z_entry_threshold"]:
                continue

            signals_generated += 1

            # EMA trend filter: only trade WITH trend
            # Price above EMA → uptrend → only LONG allowed
            # Price below EMA → downtrend → only SHORT allowed
            side = "LONG" if z < -config["z_entry_threshold"] else "SHORT"

            if not np.isnan(ema_val):
                if side == "LONG" and bar["close"] < ema_val:
                    signals_skipped_ema += 1
                    continue
                if side == "SHORT" and bar["close"] > ema_val:
                    signals_skipped_ema += 1
                    continue

            if len(active_deals) >= MAX_DEALS_CONCURRENT:
                signals_skipped_max_deals += 1
                continue

            natr = data["natr"]
            deal = DCADeal(sym, side, bar["close"], ts_to_idx[ts], config, natr)
            active_deals[sym] = deal

    # Force-close remaining deals
    for sym, deal in active_deals.items():
        klines = all_kline_data[sym]["klines"]
        deal.close_price = klines[-1]["close"]
        deal._close(ts_to_idx[klines[-1]["ts"]], "END")
        all_deals.append(deal)

    return all_deals, signals_generated, signals_skipped_max_deals, signals_skipped_ema, all_kline_data


# ============================================================
# DISPLAY RESULTS
# ============================================================
def print_results(all_deals, signals_gen, skip_max, skip_ema, kline_data):
    if not all_deals:
        print("\nāŒ No deals generated!")
        return

    total_pnl = sum(d.pnl for d in all_deals)
    wins = [d for d in all_deals if d.pnl > 0]
    losses = [d for d in all_deals if d.pnl <= 0]
    wr = len(wins) / len(all_deals) * 100

    gross_profit = sum(d.pnl for d in wins) if wins else 0
    gross_loss = abs(sum(d.pnl for d in losses)) if losses else 0.001
    pf = gross_profit / gross_loss if gross_loss > 0 else float('inf')

    avg_win = gross_profit / len(wins) if wins else 0
    avg_loss = gross_loss / len(losses) if losses else 0
    max_dd = max(d.max_drawdown_pct for d in all_deals)
    total_fees = sum(d.fees for d in all_deals)
    avg_sos = sum(d.so_filled for d in all_deals) / len(all_deals)
    avg_bars = sum(d.close_bar - d.entry_bar for d in all_deals) / len(all_deals)

    longs = [d for d in all_deals if d.side == "LONG"]
    shorts = [d for d in all_deals if d.side == "SHORT"]

    # Close reasons
    reasons = {}
    for d in all_deals:
        r = d.close_reason.split("(")[0]  # group TIME_STOP variants
        reasons[r] = reasons.get(r, 0) + 1

    print(f"\n{'='*70}")
    print(f"  RESULTS — {DAYS}-day backtest, {len(kline_data)} symbols")
    print(f"{'='*70}")

    pnl_emoji = "🟢" if total_pnl > 0 else "šŸ”“"
    print(f"\n  {pnl_emoji} Total PnL:     ${total_pnl:+.2f}")
    print(f"  šŸ“Š Profit Factor: {pf:.2f}")
    print(f"  šŸŽÆ Win Rate:      {wr:.1f}% ({len(wins)}W / {len(losses)}L)")
    print(f"  šŸ“ˆ Avg Win:       ${avg_win:.3f}")
    print(f"  šŸ“‰ Avg Loss:      ${avg_loss:.3f}")
    print(f"  šŸ’° Deals:         {len(all_deals)} (L:{len(longs)} / S:{len(shorts)})")
    print(f"  šŸ”„ Avg SOs:       {avg_sos:.1f}")
    print(f"  ā±ļø  Avg Duration:  {avg_bars:.0f} bars ({avg_bars*5/60:.1f}h)")
    print(f"  šŸ“‰ Max DD:        {max_dd:.1f}%")
    print(f"  šŸ’ø Total Fees:    ${total_fees:.2f}")
    print(f"  šŸ“” Signals:       {signals_gen} (skipped: {skip_ema} EMA-trend, {skip_max} max-deals)")

    print(f"\n  Close Reasons:")
    for r, cnt in sorted(reasons.items(), key=lambda x: -x[1]):
        pct = cnt / len(all_deals) * 100
        print(f"    {r:20s} {cnt:4d} ({pct:.0f}%)")

    # Long vs Short
    long_pnl = sum(d.pnl for d in longs)
    short_pnl = sum(d.pnl for d in shorts)
    long_wr = sum(1 for d in longs if d.pnl > 0) / len(longs) * 100 if longs else 0
    short_wr = sum(1 for d in shorts if d.pnl > 0) / len(shorts) * 100 if shorts else 0
    print(f"\n  LONG:  ${long_pnl:+.2f} ({len(longs)} deals, WR={long_wr:.0f}%)")
    print(f"  SHORT: ${short_pnl:+.2f} ({len(shorts)} deals, WR={short_wr:.0f}%)")

    # Per-symbol breakdown (top 10 by deal count)
    sym_stats = {}
    for d in all_deals:
        if d.symbol not in sym_stats:
            sym_stats[d.symbol] = {"deals": 0, "pnl": 0, "wins": 0}
        sym_stats[d.symbol]["deals"] += 1
        sym_stats[d.symbol]["pnl"] += d.pnl
        if d.pnl > 0:
            sym_stats[d.symbol]["wins"] += 1

    print(f"\n  {'─'*60}")
    print(f"  Per-Symbol Breakdown (sorted by PnL):")
    print(f"  {'Symbol':15s} {'Deals':>6s} {'PnL':>10s} {'WR':>6s} {'AvgPnL':>10s}")
    print(f"  {'─'*60}")

    sorted_syms = sorted(sym_stats.items(), key=lambda x: x[1]["pnl"], reverse=True)
    for sym, st in sorted_syms:
        wr_sym = st["wins"] / st["deals"] * 100 if st["deals"] > 0 else 0
        avg_pnl = st["pnl"] / st["deals"]
        emoji = "🟢" if st["pnl"] > 0 else "šŸ”“"
        print(f"  {emoji} {sym:13s} {st['deals']:>5d}  ${st['pnl']:>+8.2f}  {wr_sym:>5.0f}%  ${avg_pnl:>+8.3f}")

    # Would-be blacklist comparison
    old_blacklist_coins = {"RAVEUSDT", "FARTCOINUSDT", "TAOUSDT", "ARIAUSDT", "SIRENUSDT", "MAGMAUSDT"}
    old_bl_deals = [d for d in all_deals if d.symbol in old_blacklist_coins]
    if old_bl_deals:
        old_bl_pnl = sum(d.pnl for d in old_bl_deals)
        old_bl_wins = sum(1 for d in old_bl_deals if d.pnl > 0)
        old_bl_wr = old_bl_wins / len(old_bl_deals) * 100
        print(f"\n  {'─'*60}")
        print(f"  āš ļø  Ex-blacklist coins (RAVE/FART/TAO/ARIA/SIREN/MAGMA):")
        print(f"     {len(old_bl_deals)} deals, PnL=${old_bl_pnl:+.2f}, WR={old_bl_wr:.0f}%")
        pnl_without = total_pnl - old_bl_pnl
        print(f"     Without them: PnL=${pnl_without:+.2f} (delta ${-old_bl_pnl:+.2f})")

    return sym_stats


# ============================================================
# MAIN
# ============================================================
def main():
    print("=" * 70)
    print("  DCA Z-VWAP Backtest — SIMPLIFIED FILTERS v2")
    print("  NATR >= 0.75% | No max NATR | No CHOP | No Z-max")
    print("  + EMA50 trend filter (trade WITH trend)")
    print("  Blacklist: BTC, ETH, TRX only")
    print("=" * 70)

    symbols_data = get_screener_symbols()
    if not symbols_data:
        print("āŒ No symbols found")
        return

    # Limit to top 50 by volume to keep backtest manageable
    symbols_data = symbols_data[:50]
    print(f"  Testing top {len(symbols_data)} by volume")

    all_deals, sig_gen, skip_max, skip_zmax, kline_data = run_backtest(symbols_data, DCA_CONFIG)
    sym_stats = print_results(all_deals, sig_gen, skip_max, skip_zmax, kline_data)

    # Save results
    output_path = os.path.join(os.path.dirname(__file__), "results_simplified_filters.json")
    save_data = {
        "config": DCA_CONFIG,
        "filters": {
            "min_natr_pct": MIN_NATR_PCT,
            "max_natr_pct": "NONE (removed)",
            "min_chop": "NONE (removed)",
            "min_volume_24h": MIN_VOLUME_24H,
            "blacklist": sorted(BLACKLIST),
        },
        "summary": {
            "total_deals": len(all_deals),
            "total_pnl": round(sum(d.pnl for d in all_deals), 4),
            "win_rate": round(sum(1 for d in all_deals if d.pnl > 0) / len(all_deals) * 100, 1) if all_deals else 0,
            "symbols_tested": len(kline_data),
            "signals_generated": sig_gen,
        },
        "per_symbol": {sym: {"deals": st["deals"], "pnl": round(st["pnl"], 4),
                             "wr": round(st["wins"]/st["deals"]*100, 1) if st["deals"] > 0 else 0}
                       for sym, st in (sym_stats or {}).items()},
        "deals": [
            {
                "symbol": d.symbol, "side": d.side,
                "entry": round(d.orders[0]["price"], 6),
                "avg": round(d.avg_entry, 6),
                "exit": round(d.close_price, 6),
                "pnl": round(d.pnl, 4),
                "sos": d.so_filled,
                "natr": round(d.natr_pct, 2),
                "reason": d.close_reason,
                "invested": round(d.total_invested, 2),
                "dd": round(d.max_drawdown_pct, 2),
            }
            for d in all_deals
        ],
    }
    with open(output_path, "w") as f:
        json.dump(save_data, f, indent=2)
    print(f"\nšŸ’¾ Results saved to {output_path}")


if __name__ == "__main__":
    main()

šŸ“œ Git History

dd32dfdchore: initial commit — version control setup5 weeks ago
Show last diff
Loading...