โ ะะฐะทะฐะด"""
Backtest: DCA Z-VWAP with Simplified Filters v2
=================================================
Dynamic screener (ALL Bybit USDT perps), filters:
- Volume >= $20M
- NATR >= 0.75% (no max cap, no CHOP)
- EMA trend filter: LONG only above EMA, SHORT only below
- No Z-max cap (removed)
- Blacklist: only heavyweights (BTC, ETH, TRX, stables)
Usage:
python backtests/backtest_simplified_filters.py
"""
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import json
import time
import numpy as np
from datetime import datetime
from pybit.unified_trading import HTTP
# ============================================================
# CONFIG
# ============================================================
DCA_CONFIG = {
"base_order_usd": 7.0,
"safety_order_usd": 5.0,
"max_safety_orders": 3,
"natr_factor": 1.0, # SO spacing = NATR ร factor ร step_scale^n
"step_scale": 1.3,
"volume_scale": 1.3,
"take_profit_pct": 1.5,
"stop_loss_pct": 8.0,
"z_entry_threshold": 1.8,
"z_tp_threshold": 0.3,
"cooldown_bars": 12, # 12ร5m = 1h
"leverage": 3,
"ema_period": 50, # EMA trend filter (50 bars ร 5m = ~4h)
}
VWAP_PERIOD = 50
MAKER_FEE = 0.0002
TAKER_FEE = 0.00055
TIMEFRAME = "5"
DAYS = 7
# === SIMPLIFIED FILTERS ===
MIN_NATR_PCT = 0.75 # only minimum, no max
MIN_VOLUME_24H = 20_000_000
MAX_DEALS_CONCURRENT = 6
# === SLIM BLACKLIST โ only heavyweights ===
BLACKLIST = {
"BTCUSDT", "ETHUSDT", "TRXUSDT",
"USDCUSDT", "BTCPERP", "ETHPERP",
}
session = HTTP(testnet=False)
# ============================================================
# DYNAMIC SCREENER โ fetch all USDT perps, filter by volume
# ============================================================
def get_screener_symbols():
"""Get all tradeable USDT perps with volume >= threshold."""
print("๐ก Fetching all Bybit USDT perps...")
resp = session.get_tickers(category="linear")
if resp["retCode"] != 0:
print(f" โ Ticker error: {resp['retMsg']}")
return []
symbols = []
for t in resp["result"]["list"]:
sym = t["symbol"]
if not sym.endswith("USDT"):
continue
if sym in BLACKLIST:
continue
vol24h = float(t.get("turnover24h", 0))
if vol24h < MIN_VOLUME_24H:
continue
symbols.append({
"symbol": sym,
"volume_24h": vol24h,
"price": float(t.get("lastPrice", 0)),
})
symbols.sort(key=lambda x: x["volume_24h"], reverse=True)
print(f" โ
{len(symbols)} symbols pass volume filter (>= ${MIN_VOLUME_24H/1e6:.0f}M)")
return symbols
# ============================================================
# DATA FETCHING
# ============================================================
def fetch_klines(symbol: str, interval: str, days: int = 7) -> list[dict]:
"""Fetch klines from Bybit."""
all_klines = []
bars_needed = days * 24 * 60 // int(interval)
end_time = int(datetime.now().timestamp() * 1000)
while len(all_klines) < bars_needed:
try:
resp = session.get_kline(
category="linear",
symbol=symbol,
interval=interval,
limit=1000,
end=end_time,
)
if resp["retCode"] != 0:
break
items = resp["result"]["list"]
if not items:
break
for item in items:
all_klines.append({
"ts": int(item[0]),
"open": float(item[1]),
"high": float(item[2]),
"low": float(item[3]),
"close": float(item[4]),
"volume": float(item[5]),
})
end_time = int(items[-1][0]) - 1
if len(items) < 1000:
break
except Exception as e:
print(f" Fetch error {symbol}: {e}")
break
all_klines.reverse()
seen = set()
unique = []
for k in all_klines:
if k["ts"] not in seen:
seen.add(k["ts"])
unique.append(k)
return unique[-bars_needed:] if len(unique) > bars_needed else unique
# ============================================================
# INDICATORS
# ============================================================
def calc_zvwap(highs, lows, closes, volumes, period=VWAP_PERIOD):
"""Z-Score from rolling VWAP."""
n = len(closes)
z_scores = np.full(n, 0.0)
for i in range(period, n):
h = highs[i-period:i]
l = lows[i-period:i]
c = closes[i-period:i]
v = volumes[i-period:i]
tp = (h + l + c) / 3
cum_tp_vol = np.cumsum(tp * v)
cum_vol = np.cumsum(v)
cum_vol_safe = np.where(cum_vol == 0, 1, cum_vol)
vwap_arr = cum_tp_vol / cum_vol_safe
vwap = vwap_arr[-1]
deviations = c - vwap_arr
std = np.std(deviations)
if std > 0:
z_scores[i] = (closes[i] - vwap) / std
return z_scores
def calc_natr(highs, lows, closes, period=14):
"""Normalized ATR (%) over last `period` bars."""
n = len(closes)
if n < period + 1:
return 0.0
trs = []
for i in range(n - period, n):
tr = max(highs[i] - lows[i],
abs(highs[i] - closes[i-1]),
abs(lows[i] - closes[i-1]))
trs.append(tr)
atr = np.mean(trs)
return (atr / closes[-1]) * 100 if closes[-1] > 0 else 0.0
def calc_ema(closes, period):
"""Calculate EMA array. Returns np array same length as closes."""
ema = np.full(len(closes), np.nan)
if len(closes) < period:
return ema
# Seed with SMA
ema[period - 1] = np.mean(closes[:period])
k = 2.0 / (period + 1)
for i in range(period, len(closes)):
ema[i] = closes[i] * k + ema[i - 1] * (1 - k)
return ema
# ============================================================
# DCA DEAL SIMULATOR
# ============================================================
class DCADeal:
def __init__(self, symbol, side, entry_price, entry_bar, config, natr_pct):
self.symbol = symbol
self.side = side
self.config = config
self.entry_bar = entry_bar
self.natr_pct = natr_pct
leverage = config["leverage"]
self.orders = [{
"price": entry_price,
"usd": config["base_order_usd"],
"qty": (config["base_order_usd"] * leverage) / entry_price,
"type": "BO",
}]
# NATR-based SO spacing (like live bot)
self.so_triggers = []
cumul_deviation = 0
for i in range(config["max_safety_orders"]):
spacing = natr_pct * config["natr_factor"] * (config["step_scale"] ** i)
cumul_deviation += spacing
so_size = config["safety_order_usd"] * (config["volume_scale"] ** i)
if side == "LONG":
trigger_price = entry_price * (1 - cumul_deviation / 100)
else:
trigger_price = entry_price * (1 + cumul_deviation / 100)
self.so_triggers.append({
"price": trigger_price,
"usd": so_size,
"deviation_pct": cumul_deviation,
"filled": False,
})
self.so_filled = 0
self.so_fill_times = []
self.closed = False
self.close_price = 0
self.close_bar = 0
self.close_reason = ""
self.pnl = 0
self.fees = 0
self.max_drawdown_pct = 0
@property
def avg_entry(self):
total_qty = sum(o["qty"] for o in self.orders)
total_cost = sum(o["qty"] * o["price"] for o in self.orders)
return total_cost / total_qty if total_qty > 0 else 0
@property
def total_qty(self):
return sum(o["qty"] for o in self.orders)
@property
def total_invested(self):
return sum(o["usd"] for o in self.orders)
def tick(self, bar_idx, high, low, close, z_score):
if self.closed:
return True
leverage = self.config["leverage"]
# Check SO fills
for i, so in enumerate(self.so_triggers):
if so["filled"]:
continue
filled = False
if self.side == "LONG" and low <= so["price"]:
filled = True
elif self.side == "SHORT" and high >= so["price"]:
filled = True
if filled:
qty = (so["usd"] * leverage) / so["price"]
self.orders.append({
"price": so["price"],
"usd": so["usd"],
"qty": qty,
"type": f"SO{i+1}",
})
so["filled"] = True
self.so_filled += 1
self.so_fill_times.append(bar_idx)
self.fees += so["usd"] * leverage * MAKER_FEE
avg = self.avg_entry
# Max drawdown
if self.side == "LONG":
dd_pct = (avg - low) / avg * 100
else:
dd_pct = (high - avg) / avg * 100
self.max_drawdown_pct = max(self.max_drawdown_pct, dd_pct)
# SO Guard: 3 SOs in 15 bars (75 min on 5m) โ force close
if len(self.so_fill_times) >= 3:
last3 = self.so_fill_times[-3:]
if last3[-1] - last3[0] <= 15:
self.close_price = close
self._close(bar_idx, "SO_GUARD")
return True
# Time Stop (progressive)
bars_open = bar_idx - self.entry_bar
hours_open = bars_open * 5 / 60 # 5m bars
time_limits = {1: 3.0, 2: 2.0, 3: 1.5}
for min_sos, max_hours in time_limits.items():
if self.so_filled >= min_sos and hours_open >= max_hours:
self.close_price = close
self._close(bar_idx, f"TIME_STOP({self.so_filled}SO/{hours_open:.1f}h)")
return True
# TP
tp_price = avg * (1 + self.config["take_profit_pct"] / 100) if self.side == "LONG" else \
avg * (1 - self.config["take_profit_pct"] / 100)
tp_hit = False
if self.side == "LONG" and high >= tp_price:
tp_hit = True
self.close_price = tp_price
elif self.side == "SHORT" and low <= tp_price:
tp_hit = True
self.close_price = tp_price
# Z-reversion TP
z_tp = False
if self.side == "LONG" and z_score >= -self.config["z_tp_threshold"]:
if close > avg:
z_tp = True
self.close_price = close
elif self.side == "SHORT" and z_score <= self.config["z_tp_threshold"]:
if close < avg:
z_tp = True
self.close_price = close
# SL
sl_price = avg * (1 - self.config["stop_loss_pct"] / 100) if self.side == "LONG" else \
avg * (1 + self.config["stop_loss_pct"] / 100)
sl_hit = False
if self.side == "LONG" and low <= sl_price:
sl_hit = True
self.close_price = sl_price
elif self.side == "SHORT" and high >= sl_price:
sl_hit = True
self.close_price = sl_price
if sl_hit:
self._close(bar_idx, "SL")
return True
elif tp_hit:
self._close(bar_idx, "TP%")
return True
elif z_tp:
self._close(bar_idx, "Z-TP")
return True
return False
def _close(self, bar_idx, reason):
self.closed = True
self.close_bar = bar_idx
self.close_reason = reason
total_qty = self.total_qty
avg = self.avg_entry
if self.side == "LONG":
self.pnl = total_qty * (self.close_price - avg)
else:
self.pnl = total_qty * (avg - self.close_price)
exit_fee = total_qty * self.close_price * TAKER_FEE
entry_fee = self.orders[0]["qty"] * self.orders[0]["price"] * TAKER_FEE
self.fees += exit_fee + entry_fee
self.pnl -= self.fees
# ============================================================
# BACKTEST ENGINE (multi-symbol concurrent deals)
# ============================================================
def run_backtest(symbols_data, config):
"""Run backtest across all symbols with concurrent deal limit."""
print(f"\n{'='*70}")
print(f" Running {DAYS}-day backtest on {len(symbols_data)} symbols")
print(f" Filters: NATR >= {MIN_NATR_PCT}%, Vol >= ${MIN_VOLUME_24H/1e6:.0f}M")
print(f" Blacklist: {sorted(BLACKLIST)}")
print(f" Max concurrent deals: {MAX_DEALS_CONCURRENT}")
print(f"{'='*70}")
print(f" BO=${config['base_order_usd']}, SO=${config['safety_order_usd']}, "
f"MaxSO={config['max_safety_orders']}, SL={config['stop_loss_pct']}%, "
f"TP={config['take_profit_pct']}%, Lev={config['leverage']}x")
print(f" Z-entry={config['z_entry_threshold']}, Z-TP={config['z_tp_threshold']}, "
f"EMA trend={config['ema_period']} (trade WITH trend only)")
print()
# Fetch data for all symbols
all_kline_data = {}
skipped_natr = []
for idx, sd in enumerate(symbols_data):
sym = sd["symbol"]
print(f" [{idx+1}/{len(symbols_data)}] {sym} (Vol ${sd['volume_24h']/1e6:.1f}M)...", end=" ", flush=True)
time.sleep(0.15) # rate limit
klines = fetch_klines(sym, TIMEFRAME, days=DAYS)
if len(klines) < VWAP_PERIOD + 50:
print(f"โ ๏ธ {len(klines)} bars, skip")
continue
# Calculate NATR on fetched data
closes = np.array([k["close"] for k in klines])
highs = np.array([k["high"] for k in klines])
lows = np.array([k["low"] for k in klines])
natr = calc_natr(highs, lows, closes, 14)
if natr < MIN_NATR_PCT:
print(f"โ ๏ธ NATR {natr:.2f}% < {MIN_NATR_PCT}%, skip")
skipped_natr.append((sym, natr))
continue
print(f"โ
{len(klines)} bars, NATR={natr:.2f}%")
all_kline_data[sym] = {
"klines": klines,
"natr": natr,
"volume": sd["volume_24h"],
}
print(f"\n ๐ {len(all_kline_data)} symbols ready for backtest")
if skipped_natr:
print(f" โญ๏ธ Skipped (low NATR): {', '.join(f'{s}({n:.2f}%)' for s,n in skipped_natr[:10])}")
# Align all data to common timeline using bar index
# For simplicity: run each symbol independently but track concurrent deals globally
# Build unified timeline: merge all bars from all symbols
all_timestamps = set()
for sym, data in all_kline_data.items():
for k in data["klines"]:
all_timestamps.add(k["ts"])
timeline = sorted(all_timestamps)
ts_to_idx = {ts: i for i, ts in enumerate(timeline)}
# Index klines by timestamp for each symbol
sym_bars = {}
for sym, data in all_kline_data.items():
bars_by_ts = {}
for k in data["klines"]:
bars_by_ts[k["ts"]] = k
sym_bars[sym] = bars_by_ts
# Pre-compute Z-scores and EMA for each symbol
sym_z = {}
sym_ema = {}
sym_np = {}
ema_period = config.get("ema_period", 50)
for sym, data in all_kline_data.items():
klines = data["klines"]
closes = np.array([k["close"] for k in klines])
highs = np.array([k["high"] for k in klines])
lows = np.array([k["low"] for k in klines])
volumes = np.array([k["volume"] for k in klines])
z_scores = calc_zvwap(highs, lows, closes, volumes, VWAP_PERIOD)
ema_arr = calc_ema(closes, ema_period)
sym_z[sym] = {klines[i]["ts"]: z_scores[i] for i in range(len(klines))}
sym_ema[sym] = {klines[i]["ts"]: ema_arr[i] for i in range(len(klines))}
sym_np[sym] = {"highs": highs, "lows": lows, "closes": closes, "volumes": volumes,
"timestamps": [k["ts"] for k in klines]}
# Simulate with global concurrent deal limit
active_deals = {} # symbol -> DCADeal
all_deals = []
cooldowns = {} # symbol -> cooldown_until_ts
signals_generated = 0
signals_skipped_max_deals = 0
signals_skipped_ema = 0
for ts in timeline:
# Check active deals
for sym in list(active_deals.keys()):
deal = active_deals[sym]
if ts not in sym_bars.get(sym, {}):
continue
bar = sym_bars[sym][ts]
z = sym_z[sym].get(ts, 0)
closed = deal.tick(
ts_to_idx[ts],
bar["high"], bar["low"], bar["close"], z
)
if closed:
all_deals.append(deal)
cooldowns[sym] = ts + config["cooldown_bars"] * 5 * 60 * 1000 # cooldown in ms
del active_deals[sym]
# Check for new entries
for sym, data in all_kline_data.items():
if sym in active_deals:
continue
if ts not in sym_bars.get(sym, {}):
continue
if sym in cooldowns and ts < cooldowns[sym]:
continue
z = sym_z[sym].get(ts, 0)
bar = sym_bars[sym][ts]
ema_val = sym_ema[sym].get(ts, np.nan)
# Check Z thresholds
if abs(z) <= config["z_entry_threshold"]:
continue
signals_generated += 1
# EMA trend filter: only trade WITH trend
# Price above EMA โ uptrend โ only LONG allowed
# Price below EMA โ downtrend โ only SHORT allowed
side = "LONG" if z < -config["z_entry_threshold"] else "SHORT"
if not np.isnan(ema_val):
if side == "LONG" and bar["close"] < ema_val:
signals_skipped_ema += 1
continue
if side == "SHORT" and bar["close"] > ema_val:
signals_skipped_ema += 1
continue
if len(active_deals) >= MAX_DEALS_CONCURRENT:
signals_skipped_max_deals += 1
continue
natr = data["natr"]
deal = DCADeal(sym, side, bar["close"], ts_to_idx[ts], config, natr)
active_deals[sym] = deal
# Force-close remaining deals
for sym, deal in active_deals.items():
klines = all_kline_data[sym]["klines"]
deal.close_price = klines[-1]["close"]
deal._close(ts_to_idx[klines[-1]["ts"]], "END")
all_deals.append(deal)
return all_deals, signals_generated, signals_skipped_max_deals, signals_skipped_ema, all_kline_data
# ============================================================
# DISPLAY RESULTS
# ============================================================
def print_results(all_deals, signals_gen, skip_max, skip_ema, kline_data):
if not all_deals:
print("\nโ No deals generated!")
return
total_pnl = sum(d.pnl for d in all_deals)
wins = [d for d in all_deals if d.pnl > 0]
losses = [d for d in all_deals if d.pnl <= 0]
wr = len(wins) / len(all_deals) * 100
gross_profit = sum(d.pnl for d in wins) if wins else 0
gross_loss = abs(sum(d.pnl for d in losses)) if losses else 0.001
pf = gross_profit / gross_loss if gross_loss > 0 else float('inf')
avg_win = gross_profit / len(wins) if wins else 0
avg_loss = gross_loss / len(losses) if losses else 0
max_dd = max(d.max_drawdown_pct for d in all_deals)
total_fees = sum(d.fees for d in all_deals)
avg_sos = sum(d.so_filled for d in all_deals) / len(all_deals)
avg_bars = sum(d.close_bar - d.entry_bar for d in all_deals) / len(all_deals)
longs = [d for d in all_deals if d.side == "LONG"]
shorts = [d for d in all_deals if d.side == "SHORT"]
# Close reasons
reasons = {}
for d in all_deals:
r = d.close_reason.split("(")[0] # group TIME_STOP variants
reasons[r] = reasons.get(r, 0) + 1
print(f"\n{'='*70}")
print(f" RESULTS โ {DAYS}-day backtest, {len(kline_data)} symbols")
print(f"{'='*70}")
pnl_emoji = "๐ข" if total_pnl > 0 else "๐ด"
print(f"\n {pnl_emoji} Total PnL: ${total_pnl:+.2f}")
print(f" ๐ Profit Factor: {pf:.2f}")
print(f" ๐ฏ Win Rate: {wr:.1f}% ({len(wins)}W / {len(losses)}L)")
print(f" ๐ Avg Win: ${avg_win:.3f}")
print(f" ๐ Avg Loss: ${avg_loss:.3f}")
print(f" ๐ฐ Deals: {len(all_deals)} (L:{len(longs)} / S:{len(shorts)})")
print(f" ๐ Avg SOs: {avg_sos:.1f}")
print(f" โฑ๏ธ Avg Duration: {avg_bars:.0f} bars ({avg_bars*5/60:.1f}h)")
print(f" ๐ Max DD: {max_dd:.1f}%")
print(f" ๐ธ Total Fees: ${total_fees:.2f}")
print(f" ๐ก Signals: {signals_gen} (skipped: {skip_ema} EMA-trend, {skip_max} max-deals)")
print(f"\n Close Reasons:")
for r, cnt in sorted(reasons.items(), key=lambda x: -x[1]):
pct = cnt / len(all_deals) * 100
print(f" {r:20s} {cnt:4d} ({pct:.0f}%)")
# Long vs Short
long_pnl = sum(d.pnl for d in longs)
short_pnl = sum(d.pnl for d in shorts)
long_wr = sum(1 for d in longs if d.pnl > 0) / len(longs) * 100 if longs else 0
short_wr = sum(1 for d in shorts if d.pnl > 0) / len(shorts) * 100 if shorts else 0
print(f"\n LONG: ${long_pnl:+.2f} ({len(longs)} deals, WR={long_wr:.0f}%)")
print(f" SHORT: ${short_pnl:+.2f} ({len(shorts)} deals, WR={short_wr:.0f}%)")
# Per-symbol breakdown (top 10 by deal count)
sym_stats = {}
for d in all_deals:
if d.symbol not in sym_stats:
sym_stats[d.symbol] = {"deals": 0, "pnl": 0, "wins": 0}
sym_stats[d.symbol]["deals"] += 1
sym_stats[d.symbol]["pnl"] += d.pnl
if d.pnl > 0:
sym_stats[d.symbol]["wins"] += 1
print(f"\n {'โ'*60}")
print(f" Per-Symbol Breakdown (sorted by PnL):")
print(f" {'Symbol':15s} {'Deals':>6s} {'PnL':>10s} {'WR':>6s} {'AvgPnL':>10s}")
print(f" {'โ'*60}")
sorted_syms = sorted(sym_stats.items(), key=lambda x: x[1]["pnl"], reverse=True)
for sym, st in sorted_syms:
wr_sym = st["wins"] / st["deals"] * 100 if st["deals"] > 0 else 0
avg_pnl = st["pnl"] / st["deals"]
emoji = "๐ข" if st["pnl"] > 0 else "๐ด"
print(f" {emoji} {sym:13s} {st['deals']:>5d} ${st['pnl']:>+8.2f} {wr_sym:>5.0f}% ${avg_pnl:>+8.3f}")
# Would-be blacklist comparison
old_blacklist_coins = {"RAVEUSDT", "FARTCOINUSDT", "TAOUSDT", "ARIAUSDT", "SIRENUSDT", "MAGMAUSDT"}
old_bl_deals = [d for d in all_deals if d.symbol in old_blacklist_coins]
if old_bl_deals:
old_bl_pnl = sum(d.pnl for d in old_bl_deals)
old_bl_wins = sum(1 for d in old_bl_deals if d.pnl > 0)
old_bl_wr = old_bl_wins / len(old_bl_deals) * 100
print(f"\n {'โ'*60}")
print(f" โ ๏ธ Ex-blacklist coins (RAVE/FART/TAO/ARIA/SIREN/MAGMA):")
print(f" {len(old_bl_deals)} deals, PnL=${old_bl_pnl:+.2f}, WR={old_bl_wr:.0f}%")
pnl_without = total_pnl - old_bl_pnl
print(f" Without them: PnL=${pnl_without:+.2f} (delta ${-old_bl_pnl:+.2f})")
return sym_stats
# ============================================================
# MAIN
# ============================================================
def main():
print("=" * 70)
print(" DCA Z-VWAP Backtest โ SIMPLIFIED FILTERS v2")
print(" NATR >= 0.75% | No max NATR | No CHOP | No Z-max")
print(" + EMA50 trend filter (trade WITH trend)")
print(" Blacklist: BTC, ETH, TRX only")
print("=" * 70)
symbols_data = get_screener_symbols()
if not symbols_data:
print("โ No symbols found")
return
# Limit to top 50 by volume to keep backtest manageable
symbols_data = symbols_data[:50]
print(f" Testing top {len(symbols_data)} by volume")
all_deals, sig_gen, skip_max, skip_zmax, kline_data = run_backtest(symbols_data, DCA_CONFIG)
sym_stats = print_results(all_deals, sig_gen, skip_max, skip_zmax, kline_data)
# Save results
output_path = os.path.join(os.path.dirname(__file__), "results_simplified_filters.json")
save_data = {
"config": DCA_CONFIG,
"filters": {
"min_natr_pct": MIN_NATR_PCT,
"max_natr_pct": "NONE (removed)",
"min_chop": "NONE (removed)",
"min_volume_24h": MIN_VOLUME_24H,
"blacklist": sorted(BLACKLIST),
},
"summary": {
"total_deals": len(all_deals),
"total_pnl": round(sum(d.pnl for d in all_deals), 4),
"win_rate": round(sum(1 for d in all_deals if d.pnl > 0) / len(all_deals) * 100, 1) if all_deals else 0,
"symbols_tested": len(kline_data),
"signals_generated": sig_gen,
},
"per_symbol": {sym: {"deals": st["deals"], "pnl": round(st["pnl"], 4),
"wr": round(st["wins"]/st["deals"]*100, 1) if st["deals"] > 0 else 0}
for sym, st in (sym_stats or {}).items()},
"deals": [
{
"symbol": d.symbol, "side": d.side,
"entry": round(d.orders[0]["price"], 6),
"avg": round(d.avg_entry, 6),
"exit": round(d.close_price, 6),
"pnl": round(d.pnl, 4),
"sos": d.so_filled,
"natr": round(d.natr_pct, 2),
"reason": d.close_reason,
"invested": round(d.total_invested, 2),
"dd": round(d.max_drawdown_pct, 2),
}
for d in all_deals
],
}
with open(output_path, "w") as f:
json.dump(save_data, f, indent=2)
print(f"\n๐พ Results saved to {output_path}")
if __name__ == "__main__":
main()