โ† ะะฐะทะฐะด
""" Hybrid Grid Backtest v4 โ€” Screener on 15m/1h, Grid on 1m ========================================================== The correct approach: - SCREENER runs on higher TF (15m or 1h) to detect range/breakout - GRID SIMULATION runs on 1m data to capture all micro round-trips This gives realistic RT counts since grids operate on tick/1m level. Usage: python backtest_hybrid_v4.py """ import ccxt import pandas as pd import numpy as np import json import time from datetime import datetime, timedelta, timezone # ============================================================ # COINS # ============================================================ COINS = [ "ENA/USDT", "PENGU/USDT", "NEAR/USDT", "WLD/USDT", "UNI/USDT", "VIRTUAL/USDT", "SOL/USDT", "AVAX/USDT", "1000PEPE/USDT", "DOT/USDT", "ICP/USDT", "ETH/USDT", "TON/USDT", "LINK/USDT", "DOGE/USDT", "XRP/USDT", "AAVE/USDT", "FIL/USDT", "SUI/USDT", "OP/USDT", "ARB/USDT", ] SCREENER_TFS = ["15m", "1h"] GRID_TF = "1m" # grid always simulated on 1m LOOKBACK_DAYS = 14 # 14 days (1m data is heavy) # ============================================================ # GRID CONFIG (v3 settings) # ============================================================ SPACING_MIN_PCT = 0.3 SPACING_MAX_PCT = 0.5 ATR_SPACING_MULT = 0.5 ATR_PERIOD = 14 GRID_LEVELS = 8 LEVERAGE = 5 POSITION_SIZE = 3.0 MAKER_FEE = 0.0002 TAKER_FEE = 0.0004 # Inventory management INVENTORY_WARN_LEVELS = 3 INVENTORY_MAX_LEVELS = 5 UNSTUCK_THRESHOLD = 4 UNSTUCK_CLOSE_PCT = 0.25 # Trailing center EMA_CENTER_PERIOD = 20 CENTER_UPDATE_MINS = 5 # update center every 5 min (5 x 1m bars) # ============================================================ # SCREENER CONFIG # ============================================================ BB_PERIOD = 20 BB_STD = 2.0 BB_WIDTH_MIN = 0.3 BB_WIDTH_MAX = 1.5 ADX_MAX_ENTRY = 25 ADX_PERIOD = 14 CHOP_PERIOD = 14 CHOP_MIN_ENTRY = 55 CHOP_MAX_EXIT = 40 BREAKOUT_BB_MULT = 1.5 BREAKOUT_ADX = 28 # Session MIN_SESSION_MINS = {"15m": 150, "1h": 600} # min 2.5h for 15m, 10h for 1h MAX_SESSION_MINS = {"15m": 2880, "1h": 4320} # max 2d for 15m, 3d for 1h # Risk MAX_LOSS_PER_SESSION = 5.0 DEPOSIT = 50.0 # ============================================================ # INDICATORS # ============================================================ def fetch_klines(exchange, symbol, timeframe, days): all_klines = [] since = exchange.parse8601( (datetime.now(timezone.utc) - timedelta(days=days)).isoformat() ) limit = 1000 while True: try: klines = exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=limit) except Exception as e: print(f" Error {symbol} {timeframe}: {e}") return None if not klines: break all_klines.extend(klines) since = klines[-1][0] + 1 if len(klines) < limit: break time.sleep(0.12) if not all_klines: return None df = pd.DataFrame(all_klines, columns=['ts', 'open', 'high', 'low', 'close', 'volume']) df['ts'] = pd.to_datetime(df['ts'], unit='ms') return df def calc_atr(df, period=ATR_PERIOD): tr = pd.concat([ df['high'] - df['low'], (df['high'] - df['close'].shift(1)).abs(), (df['low'] - df['close'].shift(1)).abs() ], axis=1).max(axis=1) return tr.ewm(alpha=1/period, min_periods=period).mean() def calc_bb_width(close, period=BB_PERIOD, std=BB_STD): mid = close.rolling(period).mean() s = close.rolling(period).std() return ((mid + std * s) - (mid - std * s)) / mid * 100 def calc_adx(df, period=ADX_PERIOD): h, l, c = df['high'], df['low'], df['close'] pdm = h.diff(); mdm = -l.diff() pdm = pdm.where((pdm > mdm) & (pdm > 0), 0.0) mdm = mdm.where((mdm > pdm) & (mdm > 0), 0.0) tr = pd.concat([h-l, (h-c.shift(1)).abs(), (l-c.shift(1)).abs()], axis=1).max(axis=1) atr = tr.ewm(alpha=1/period, min_periods=period).mean() pdi = 100*(pdm.ewm(alpha=1/period, min_periods=period).mean()/atr) mdi = 100*(mdm.ewm(alpha=1/period, min_periods=period).mean()/atr) dx = 100*(pdi-mdi).abs()/(pdi+mdi+1e-10) return dx.ewm(alpha=1/period, min_periods=period).mean() def calc_chop(df, period=CHOP_PERIOD): tr = pd.concat([ df['high']-df['low'], (df['high']-df['close'].shift(1)).abs(), (df['low']-df['close'].shift(1)).abs() ], axis=1).max(axis=1) atr_sum = tr.rolling(period).sum() hl = (df['high'].rolling(period).max() - df['low'].rolling(period).min()).replace(0, np.nan) return 100 * np.log10(atr_sum / hl) / np.log10(period) def calc_ema(series, period): return series.ewm(span=period, adjust=False).mean() def get_spacing(atr_val, price): if price <= 0: return SPACING_MIN_PCT raw = (atr_val / price) * 100 * ATR_SPACING_MULT return np.clip(raw, SPACING_MIN_PCT, SPACING_MAX_PCT) # ============================================================ # SCREENER โ€” finds session windows on higher TF # ============================================================ def find_sessions(df_htf, screener_tf): """Find entry/exit timestamps on higher TF data.""" bw = calc_bb_width(df_htf['close']) adx = calc_adx(df_htf) chop = calc_chop(df_htf) min_bars_htf = {"15m": 10, "1h": 10}[screener_tf] max_bars_htf = {"15m": 192, "1h": 72}[screener_tf] sessions = [] i = max(BB_PERIOD, ADX_PERIOD, CHOP_PERIOD) + 5 in_session = False session_start_i = 0 while i < len(df_htf): b, a, c = bw.iloc[i], adx.iloc[i], chop.iloc[i] if pd.isna(b) or pd.isna(a) or pd.isna(c): i += 1; continue if not in_session: bb_ok = BB_WIDTH_MIN <= b <= BB_WIDTH_MAX adx_ok = a < ADX_MAX_ENTRY chop_ok = c > CHOP_MIN_ENTRY if bb_ok and adx_ok and chop_ok: in_session = True session_start_i = i else: bars_in = i - session_start_i exit_now = False reason = "" bb_break = b > BB_WIDTH_MAX * BREAKOUT_BB_MULT adx_break = a > BREAKOUT_ADX chop_trend = c < CHOP_MAX_EXIT if bars_in >= min_bars_htf and bb_break and (adx_break or chop_trend): exit_now = True; reason = "breakout" elif bars_in >= max_bars_htf: exit_now = True; reason = "max_time" if exit_now: sessions.append({ "entry_ts": df_htf['ts'].iloc[session_start_i], "exit_ts": df_htf['ts'].iloc[i], "bars_htf": bars_in, "reason": reason, "bw_entry": round(bw.iloc[session_start_i], 3), "adx_entry": round(adx.iloc[session_start_i], 1), "chop_entry": round(chop.iloc[session_start_i], 1), }) in_session = False i += 3; continue i += 1 return sessions # ============================================================ # GRID SIMULATION on 1m data # ============================================================ def simulate_grid_on_1m(df_1m, entry_ts, exit_ts): """ Run grid sim on 1m bars between entry_ts and exit_ts. Returns detailed PnL breakdown. """ mask = (df_1m['ts'] >= entry_ts) & (df_1m['ts'] <= exit_ts) df = df_1m[mask].reset_index(drop=True) if len(df) < 10: return None closes = df['close'].values highs = df['high'].values lows = df['low'].values n = len(closes) # Pre-calc atr_s = calc_atr(df, ATR_PERIOD) ema_s = calc_ema(df['close'], EMA_CENTER_PERIOD) notional = POSITION_SIZE * LEVERAGE # $15 center = closes[0] inv_levels = 0 inv_cost = 0.0 realized = 0.0 rts = 0 peak_inv = 0 partial_cls = 0 unstuck_cls = 0 fees_total = 0.0 for i in range(1, n): # Trailing center every N bars if i % CENTER_UPDATE_MINS == 0 and not pd.isna(ema_s.iloc[i]): center = ema_s.iloc[i] # ATR spacing atr_v = atr_s.iloc[i] if not pd.isna(atr_s.iloc[i]) else abs(closes[i]-closes[i-1]) if pd.isna(atr_v): atr_v = abs(closes[i]-closes[i-1]) sp_pct = get_spacing(atr_v, closes[i]) spacing = closes[i] * sp_pct / 100 if spacing <= 0: continue prev_c = closes[i-1] curr_c = closes[i] # Also use high/low for intra-bar crossings (more realistic) bar_low = lows[i] bar_high = highs[i] # Estimate crossings using full bar range, not just close-to-close # Price path: prev_close โ†’ bar extremes โ†’ curr_close # Conservative: count levels between bar_low and bar_high low_lvl = int(bar_low / spacing) high_lvl = int(bar_high / spacing) prev_lvl = int(prev_c / spacing) curr_lvl = int(curr_c / spacing) # Total levels traversed โ‰ˆ (high - low) / spacing # But net = curr_lvl - prev_lvl # Round trips within bar โ‰ˆ (total_traversed - abs(net)) / 2 total_traversed = high_lvl - low_lvl net_move = curr_lvl - prev_lvl abs_net = abs(net_move) # Internal round-trips (price went back and forth within bar) internal_rts = max(0, (total_traversed - abs_net)) // 2 # Process internal round-trips (clean profit, no inventory change) if internal_rts > 0: rt_gross = notional * sp_pct / 100 rt_fee = notional * MAKER_FEE * 2 rt_net = rt_gross - rt_fee realized += internal_rts * rt_net fees_total += internal_rts * rt_fee rts += internal_rts # Process net directional move (changes inventory) direction = 1 if net_move > 0 else -1 if net_move < 0 else 0 for lc in range(min(abs_net, GRID_LEVELS)): fee = notional * MAKER_FEE fees_total += fee if direction > 0: # price UP โ†’ sells fill fill_p = prev_c + spacing * (lc + 0.5) if inv_levels <= -INVENTORY_MAX_LEVELS: break if inv_levels > 0: # close long rt_pnl = notional * (fill_p - inv_cost) / inv_cost - fee realized += rt_pnl; inv_levels -= 1; rts += 1 else: # open/add short if inv_levels == 0: inv_cost = fill_p else: inv_cost = (abs(inv_levels)*inv_cost + fill_p) / (abs(inv_levels)+1) inv_levels -= 1; realized -= fee elif direction < 0: # price DOWN โ†’ buys fill fill_p = prev_c - spacing * (lc + 0.5) if fill_p <= 0: continue if inv_levels >= INVENTORY_MAX_LEVELS: break if inv_levels < 0: # close short rt_pnl = notional * (inv_cost - fill_p) / inv_cost - fee realized += rt_pnl; inv_levels += 1; rts += 1 else: # open/add long if inv_levels == 0: inv_cost = fill_p else: inv_cost = (abs(inv_levels)*inv_cost + fill_p) / (abs(inv_levels)+1) inv_levels += 1; realized -= fee if abs(inv_levels) > peak_inv: peak_inv = abs(inv_levels) # Partial close at warn level if abs(inv_levels) >= INVENTORY_WARN_LEVELS and inv_cost > 0: excess = abs(inv_levels) - INVENTORY_WARN_LEVELS + 1 to_close = max(1, excess // 2) if inv_levels > 0: pnl_per = notional * (curr_c - inv_cost) / inv_cost else: pnl_per = notional * (inv_cost - curr_c) / inv_cost cl_fee = to_close * notional * TAKER_FEE realized += to_close * pnl_per - cl_fee fees_total += cl_fee if inv_levels > 0: inv_levels -= to_close else: inv_levels += to_close partial_cls += to_close # Unstucking near EMA if abs(inv_levels) >= UNSTUCK_THRESHOLD and inv_cost > 0: ema_v = ema_s.iloc[i] if not pd.isna(ema_s.iloc[i]) else center dist = abs(curr_c - ema_v) / ema_v * 100 if dist < sp_pct * 2: to_unstuck = max(1, int(abs(inv_levels) * UNSTUCK_CLOSE_PCT)) if inv_levels > 0: pnl_per = notional * (curr_c - inv_cost) / inv_cost else: pnl_per = notional * (inv_cost - curr_c) / inv_cost u_fee = to_unstuck * notional * TAKER_FEE realized += to_unstuck * pnl_per - u_fee fees_total += u_fee if inv_levels > 0: inv_levels -= to_unstuck else: inv_levels += to_unstuck unstuck_cls += to_unstuck # Hard stop if inv_levels != 0 and inv_cost > 0: if inv_levels > 0: unreal = abs(inv_levels) * notional * (curr_c - inv_cost) / inv_cost else: unreal = abs(inv_levels) * notional * (inv_cost - curr_c) / inv_cost if realized + unreal <= -MAX_LOSS_PER_SESSION: cl_fee = abs(inv_levels) * notional * TAKER_FEE total = realized + unreal - cl_fee return { "bars_1m": n, "realized": round(realized+unreal-cl_fee, 4), "inventory_pnl": 0, "total_pnl": round(total, 4), "rts": rts, "peak_inv": peak_inv, "partial": partial_cls, "unstuck": unstuck_cls, "fees": round(fees_total+cl_fee, 4), "avg_spacing": round(sp_pct, 3), "stopped": True, } # Session end โ€” close inventory inv_pnl = 0.0 if inv_levels != 0 and inv_cost > 0: final = closes[-1] if inv_levels > 0: inv_pnl = abs(inv_levels) * notional * (final - inv_cost) / inv_cost else: inv_pnl = abs(inv_levels) * notional * (inv_cost - final) / inv_cost cl_fee = abs(inv_levels) * notional * TAKER_FEE inv_pnl -= cl_fee fees_total += cl_fee return { "bars_1m": n, "realized": round(realized, 4), "inventory_pnl": round(inv_pnl, 4), "total_pnl": round(realized + inv_pnl, 4), "rts": rts, "peak_inv": peak_inv, "partial": partial_cls, "unstuck": unstuck_cls, "fees": round(fees_total, 4), "avg_spacing": round(sp_pct, 3), "stopped": False, } # ============================================================ # MAIN # ============================================================ def main(): print("=" * 70) print("HYBRID GRID BACKTEST v4") print("Screener: 15m / 1h | Grid simulation: 1m") print("=" * 70) print(f"Period: {LOOKBACK_DAYS}d | Spacing: ATR {SPACING_MIN_PCT}-{SPACING_MAX_PCT}%") print(f"Leverage: {LEVERAGE}x | ${POSITION_SIZE}/level (notional ${POSITION_SIZE*LEVERAGE})") print(f"Inv mgmt: warn@{INVENTORY_WARN_LEVELS} cap@{INVENTORY_MAX_LEVELS} unstuck@{UNSTUCK_THRESHOLD}") print(f"Screener: BB+ADX+CHOP | Center: EMA({EMA_CENTER_PERIOD}) trailing") print("=" * 70) exchange = ccxt.binanceusdm({ 'enableRateLimit': True, 'options': {'defaultType': 'future'}, }) all_results = {} for stf in SCREENER_TFS: print(f"\n{'='*60}") print(f" SCREENER TF: {stf} | GRID TF: 1m") print(f"{'='*60}") stf_results = [] for symbol in COINS: print(f"\n {symbol}...", end=" ", flush=True) # Fetch screener TF df_htf = fetch_klines(exchange, symbol, stf, LOOKBACK_DAYS) if df_htf is None or len(df_htf) < 50: print("SKIP (no HTF data)") continue # Find sessions sessions = find_sessions(df_htf, stf) if not sessions: print("0 sessions") continue print(f"{len(sessions)} sessions found, fetching 1m...", end=" ", flush=True) # Fetch 1m data df_1m = fetch_klines(exchange, symbol, GRID_TF, LOOKBACK_DAYS) if df_1m is None or len(df_1m) < 100: print("SKIP (no 1m data)") continue # Simulate grid for each session coin_sessions = [] for sess in sessions: result = simulate_grid_on_1m(df_1m, sess['entry_ts'], sess['exit_ts']) if result is None: continue result.update({ "entry_ts": str(sess['entry_ts']), "exit_ts": str(sess['exit_ts']), "reason": sess['reason'], "bw_entry": sess['bw_entry'], "adx_entry": sess['adx_entry'], "chop_entry": sess['chop_entry'], }) coin_sessions.append(result) if not coin_sessions: print("0 valid sims") continue total = sum(s['total_pnl'] for s in coin_sessions) real = sum(s['realized'] for s in coin_sessions) inv = sum(s['inventory_pnl'] for s in coin_sessions) wins = sum(1 for s in coin_sessions if s['total_pnl'] > 0) wr = wins / len(coin_sessions) * 100 total_rts = sum(s['rts'] for s in coin_sessions) total_fees = sum(s['fees'] for s in coin_sessions) avg_1m_bars = np.mean([s['bars_1m'] for s in coin_sessions]) stops = sum(1 for s in coin_sessions if s['stopped']) print(f"PnL ${total:+.2f} (real ${real:+.2f} inv ${inv:+.2f}) | " f"WR {wr:.0f}% ({wins}/{len(coin_sessions)}) | " f"{total_rts} RTs | fees ${total_fees:.2f} | " f"avg {avg_1m_bars:.0f} 1m bars | stops:{stops}") stf_results.append({ "symbol": symbol, "sessions": len(coin_sessions), "total_pnl": round(total, 2), "realized_pnl": round(real, 2), "inventory_pnl": round(inv, 2), "win_rate": round(wr, 1), "wins": wins, "total_rts": total_rts, "total_fees": round(total_fees, 2), "avg_bars_1m": round(avg_1m_bars, 0), "stopped_sessions": stops, "detail": coin_sessions, }) all_results[stf] = stf_results # ============================================================ # SUMMARY # ============================================================ print("\n\n" + "=" * 70) print("SUMMARY โ€” HYBRID v4 (screener HTF + grid 1m)") print("=" * 70) for stf in SCREENER_TFS: data = all_results.get(stf, []) if not data: print(f"\n{stf}: No data"); continue tp = sum(c['total_pnl'] for c in data) rp = sum(c['realized_pnl'] for c in data) ip = sum(c['inventory_pnl'] for c in data) sess = sum(c['sessions'] for c in data) wins = sum(c['wins'] for c in data) rts = sum(c['total_rts'] for c in data) fees = sum(c['total_fees'] for c in data) coins_up = sum(1 for c in data if c['total_pnl'] > 0) wr = wins/sess*100 if sess > 0 else 0 print(f"\n๐Ÿ“Š Screener {stf} + Grid 1m:") print(f" Total PnL: ${tp:+.2f} (realized ${rp:+.2f}, inventory ${ip:+.2f})") print(f" Fees paid: ${fees:.2f}") print(f" Sessions: {sess} (wins {wins}, WR {wr:.1f}%)") print(f" Round-trips: {rts}") print(f" Profitable coins: {coins_up}/{len(data)}") if sess > 0: print(f" Avg PnL/session: ${tp/sess:+.3f}") print(f" Avg PnL/day: ${tp/LOOKBACK_DAYS:+.2f}") srt = sorted(data, key=lambda c: c['total_pnl'], reverse=True) print(f"\n ๐ŸŸข Top 5:") for c in srt[:5]: print(f" {c['symbol']:15s} ${c['total_pnl']:+8.2f} " f"(real ${c['realized_pnl']:+.2f} inv ${c['inventory_pnl']:+.2f}) " f"WR {c['win_rate']:.0f}% {c['total_rts']} RTs " f"{c['sessions']} sess fees ${c['total_fees']:.2f}") if len(srt) > 5: print(f"\n ๐Ÿ”ด Bottom 5:") for c in srt[-5:]: print(f" {c['symbol']:15s} ${c['total_pnl']:+8.2f} " f"(real ${c['realized_pnl']:+.2f} inv ${c['inventory_pnl']:+.2f}) " f"WR {c['win_rate']:.0f}% {c['sessions']} sess") # Comparison table print("\n\n" + "=" * 70) print("SCREENER TF COMPARISON (both use 1m for grid)") print("=" * 70) hdr = f" {'Scr':4s} | {'Total':>9s} | {'Real':>9s} | {'Inv':>9s} | {'Fees':>7s} | {'Sess':>5s} | {'WR':>5s} | {'RTs':>6s} | {'$/sess':>7s} | {'$/day':>7s}" print(hdr) print(" " + "-" * 90) for stf in SCREENER_TFS: d = all_results.get(stf, []) tp = sum(c['total_pnl'] for c in d) rp = sum(c['realized_pnl'] for c in d) ip = sum(c['inventory_pnl'] for c in d) fe = sum(c['total_fees'] for c in d) ss = sum(c['sessions'] for c in d) wi = sum(c['wins'] for c in d) rt = sum(c['total_rts'] for c in d) wr = wi/ss*100 if ss > 0 else 0 av = tp/ss if ss > 0 else 0 dy = tp/LOOKBACK_DAYS print(f" {stf:4s} | ${tp:+8.2f} | ${rp:+8.2f} | ${ip:+8.2f} | ${fe:6.2f} | {ss:5d} | {wr:4.0f}% | {rt:6d} | ${av:+6.2f} | ${dy:+6.2f}") print("\n\nPrevious results for reference:") print(" v2 (15m screener+grid): $-2542, 0% WR") print(" v3 (15m screener+grid): $-689, 2.5% WR") print(" v4 (hybrid) results above โ†‘โ†‘โ†‘") # Save out = "/home/app/trading-bot/grid-bot/backtests/results_hybrid_v4.json" compact = {} for stf in SCREENER_TFS: compact[stf] = [{k:v for k,v in c.items() if k != 'detail'} for c in all_results.get(stf, [])] with open(out, 'w') as f: json.dump(compact, f, indent=2) print(f"\n๐Ÿ’พ Saved: {out}") if __name__ == "__main__": main()