โ† ะะฐะทะฐะด
""" Session Grid Backtest v2 โ€” Fixed Model ======================================== Proper grid simulation with inventory tracking. Grid mechanics: - Grid centered at entry price - Buy limit orders below center, sell limit orders above - When price crosses DOWN through a level โ†’ BUY fills (we go LONG) - When price crosses UP through a level โ†’ SELL fills (we go SHORT) - Each complete buy-then-sell or sell-then-buy = round-trip profit Inventory risk: - If price drifts up: we accumulate short inventory (sells filled, buys not) - If price drifts down: we accumulate long inventory (buys filled, sells not) - At session close, we CLOSE inventory at market โ†’ realized P&L Usage: python backtest_session_grid_v2.py """ import ccxt import pandas as pd import numpy as np import json import time from datetime import datetime, timedelta, timezone # ============================================================ # CONFIG # ============================================================ COINS = [ "ENA/USDT", "PENGU/USDT", "NEAR/USDT", "WLD/USDT", "UNI/USDT", "VIRTUAL/USDT", "SOL/USDT", "AVAX/USDT", "1000PEPE/USDT", "DOT/USDT", "1000SHIB/USDT", "ICP/USDT", "ETH/USDT", "TON/USDT", "LINK/USDT", "DOGE/USDT", "XRP/USDT", "AAVE/USDT", "ADA/USDT", "FIL/USDT", "ONDO/USDT", "SUI/USDT", "OP/USDT", "ARB/USDT", ] TIMEFRAMES = ["15m", "1h"] LOOKBACK_DAYS = 30 # Grid params GRID_SPACING_PCT = 0.1 # 0.1% between levels GRID_LEVELS = 8 # 8 per side (16 total orders) POSITION_SIZE = 3.0 # $3 notional per level LEVERAGE = 10 # effective position = $30 per level MAKER_FEE = 0.0002 # 0.02% (limit order fill) TAKER_FEE = 0.0004 # 0.04% (market close) # Screener params (BB/ADX on the SAME timeframe as grid) BB_PERIOD = 20 BB_STD = 2.0 BB_WIDTH_MIN = 0.3 BB_WIDTH_MAX = 1.5 ADX_MAX_ENTRY = 25 ADX_PERIOD = 14 # Breakout exit BREAKOUT_BB_MULT = 1.5 BREAKOUT_ADX = 30 # Session limits MIN_SESSION_BARS = 10 MAX_SESSION_BARS = {"15m": 192, "1h": 72} # ~2d for 15m, 3d for 1h # Risk DEPOSIT = 50.0 MAX_LOSS_PER_SESSION = 3.0 # $3 max loss per session (hard stop) def fetch_klines(exchange, symbol, timeframe, days): """Fetch historical klines from Binance Futures.""" all_klines = [] since = exchange.parse8601( datetime.now(timezone.utc).replace(microsecond=0) .__sub__(timedelta(days=days)).isoformat() ) limit = 1000 while True: try: klines = exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=limit) except Exception as e: print(f" Error {symbol} {timeframe}: {e}") return None if not klines: break all_klines.extend(klines) since = klines[-1][0] + 1 if len(klines) < limit: break time.sleep(0.12) if not all_klines: return None df = pd.DataFrame(all_klines, columns=['ts', 'open', 'high', 'low', 'close', 'volume']) df['ts'] = pd.to_datetime(df['ts'], unit='ms') return df def calc_bb_width(close, period=BB_PERIOD, std=BB_STD): mid = close.rolling(period).mean() s = close.rolling(period).std() return ((mid + std * s) - (mid - std * s)) / mid * 100 def calc_adx(df, period=ADX_PERIOD): high, low, close = df['high'], df['low'], df['close'] plus_dm = high.diff() minus_dm = -low.diff() plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0.0) minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0.0) tr = pd.concat([high - low, (high - close.shift(1)).abs(), (low - close.shift(1)).abs()], axis=1).max(axis=1) atr = tr.ewm(alpha=1/period, min_periods=period).mean() plus_di = 100 * (plus_dm.ewm(alpha=1/period, min_periods=period).mean() / atr) minus_di = 100 * (minus_dm.ewm(alpha=1/period, min_periods=period).mean() / atr) dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-10) return dx.ewm(alpha=1/period, min_periods=period).mean() def is_range_entry(bw, adx): return (BB_WIDTH_MIN <= bw <= BB_WIDTH_MAX) and (adx < ADX_MAX_ENTRY) def is_breakout_exit(bw, adx): return (bw > BB_WIDTH_MAX * BREAKOUT_BB_MULT) and (adx > BREAKOUT_ADX) def simulate_grid_session(df_session, center_price): """ Proper grid simulation with inventory tracking. Model: price moves between bars. For each bar, we use close-to-close movement. When price crosses a grid level, an order fills. Returns: (realized_pnl, inventory_pnl, total_pnl, round_trips, peak_inventory_usd) """ closes = df_session['close'].values highs = df_session['high'].values lows = df_session['low'].values spacing = center_price * GRID_SPACING_PCT / 100 if spacing <= 0: return 0, 0, 0, 0, 0 notional_per_level = POSITION_SIZE * LEVERAGE # $30 # Track inventory: positive = long, negative = short (in units of notional) # Each grid fill changes inventory by ยฑ1 level inventory_levels = 0 # net position in grid levels inventory_cost = 0.0 # weighted average cost basis realized_pnl = 0.0 round_trips = 0 peak_inventory = 0 max_unrealized_loss = 0 running_pnl = 0 # Use close-to-close level tracking center_level = center_price / spacing for i in range(1, len(closes)): prev_close = closes[i-1] curr_close = closes[i] # What levels were crossed? prev_lvl = prev_close / spacing curr_lvl = curr_close / spacing # Direction of movement if curr_lvl > prev_lvl: # Price went UP โ†’ sell limit orders filled levels_crossed = int(curr_lvl) - int(prev_lvl) for _ in range(min(levels_crossed, GRID_LEVELS)): fill_price_approx = prev_close + spacing * (_ + 0.5) fee = notional_per_level * MAKER_FEE if inventory_levels > 0: # We're long, this sell CLOSES a long โ†’ round-trip rt_pnl = notional_per_level * (fill_price_approx - inventory_cost) / inventory_cost - fee realized_pnl += rt_pnl inventory_levels -= 1 round_trips += 1 else: # We go shorter (open new short or add to short) if inventory_levels == 0: inventory_cost = fill_price_approx else: # Average cost for shorts total_notional = abs(inventory_levels) * inventory_cost + fill_price_approx inventory_cost = total_notional / (abs(inventory_levels) + 1) inventory_levels -= 1 realized_pnl -= fee elif curr_lvl < prev_lvl: # Price went DOWN โ†’ buy limit orders filled levels_crossed = int(prev_lvl) - int(curr_lvl) for _ in range(min(levels_crossed, GRID_LEVELS)): fill_price_approx = prev_close - spacing * (_ + 0.5) fee = notional_per_level * MAKER_FEE if inventory_levels < 0: # We're short, this buy CLOSES a short โ†’ round-trip rt_pnl = notional_per_level * (inventory_cost - fill_price_approx) / inventory_cost - fee realized_pnl += rt_pnl inventory_levels += 1 round_trips += 1 else: # We go longer if inventory_levels == 0: inventory_cost = fill_price_approx else: total_notional = abs(inventory_levels) * inventory_cost + fill_price_approx inventory_cost = total_notional / (abs(inventory_levels) + 1) inventory_levels += 1 realized_pnl -= fee # Track peak inventory inv_size = abs(inventory_levels) if inv_size > peak_inventory: peak_inventory = inv_size # Check unrealized on current inventory if inventory_levels != 0 and inventory_cost > 0: if inventory_levels > 0: unrealized = abs(inventory_levels) * notional_per_level * (curr_close - inventory_cost) / inventory_cost else: unrealized = abs(inventory_levels) * notional_per_level * (inventory_cost - curr_close) / inventory_cost total_now = realized_pnl + unrealized if total_now < max_unrealized_loss: max_unrealized_loss = total_now # Hard stop check if total_now <= -MAX_LOSS_PER_SESSION: # Close inventory at market close_fee = abs(inventory_levels) * notional_per_level * TAKER_FEE inventory_pnl = unrealized - close_fee return realized_pnl, inventory_pnl, realized_pnl + inventory_pnl, round_trips, peak_inventory # Session end โ€” close remaining inventory at market final_price = closes[-1] inventory_pnl = 0.0 if inventory_levels != 0 and inventory_cost > 0: if inventory_levels > 0: inventory_pnl = abs(inventory_levels) * notional_per_level * (final_price - inventory_cost) / inventory_cost else: inventory_pnl = abs(inventory_levels) * notional_per_level * (inventory_cost - final_price) / inventory_cost close_fee = abs(inventory_levels) * notional_per_level * TAKER_FEE inventory_pnl -= close_fee total_pnl = realized_pnl + inventory_pnl return round(realized_pnl, 4), round(inventory_pnl, 4), round(total_pnl, 4), round_trips, peak_inventory def backtest_coin_tf(df, timeframe): bb_width = calc_bb_width(df['close']) adx = calc_adx(df) max_bars = MAX_SESSION_BARS.get(timeframe, 96) sessions = [] i = BB_PERIOD + ADX_PERIOD in_session = False session_start = 0 entry_price = 0 while i < len(df): bw = bb_width.iloc[i] ax = adx.iloc[i] if pd.isna(bw) or pd.isna(ax): i += 1 continue if not in_session: if is_range_entry(bw, ax): in_session = True session_start = i entry_price = df['close'].iloc[i] i += 1 continue else: bars_in = i - session_start should_exit = False exit_reason = "" if bars_in >= MIN_SESSION_BARS and is_breakout_exit(bw, ax): should_exit = True exit_reason = "breakout" elif bars_in >= max_bars: should_exit = True exit_reason = "max_time" if should_exit: session_df = df.iloc[session_start:i+1] real_pnl, inv_pnl, total_pnl, rts, peak_inv = simulate_grid_session(session_df, entry_price) sessions.append({ "start": str(df['ts'].iloc[session_start]), "end": str(df['ts'].iloc[i]), "bars": bars_in, "entry_price": round(entry_price, 6), "exit_price": round(df['close'].iloc[i], 6), "realized_pnl": real_pnl, "inventory_pnl": inv_pnl, "total_pnl": total_pnl, "round_trips": rts, "peak_inventory": peak_inv, "exit_reason": exit_reason, "bb_width_entry": round(bb_width.iloc[session_start], 3), "adx_entry": round(adx.iloc[session_start], 1), "bb_width_exit": round(bw, 3), "adx_exit": round(ax, 1), }) in_session = False i += 3 # cooldown continue i += 1 return sessions def main(): print("=" * 70) print("SESSION GRID BACKTEST v2 โ€” Proper Inventory Model") print(f"Period: {LOOKBACK_DAYS}d | Grid: {GRID_SPACING_PCT}% spacing, " f"${POSITION_SIZE}ร—{LEVERAGE}x/level, {GRID_LEVELS} levels/side") print(f"Entry: BB {BB_WIDTH_MIN}-{BB_WIDTH_MAX}%, ADX<{ADX_MAX_ENTRY}") print(f"Exit: BB>{BB_WIDTH_MAX*BREAKOUT_BB_MULT:.1f}% + ADX>{BREAKOUT_ADX} | " f"or max_time ({MAX_SESSION_BARS})") print(f"Stop-loss: ${MAX_LOSS_PER_SESSION}/session") print("=" * 70) exchange = ccxt.binanceusdm({ 'enableRateLimit': True, 'options': {'defaultType': 'future'}, }) results = {} for tf in TIMEFRAMES: print(f"\n{'='*60}") print(f" TIMEFRAME: {tf}") print(f"{'='*60}") tf_results = [] for symbol in COINS: print(f"\n {symbol} ({tf})...", end=" ", flush=True) df = fetch_klines(exchange, symbol, tf, LOOKBACK_DAYS) if df is None or len(df) < BB_PERIOD + ADX_PERIOD + 20: print("SKIP") continue sessions = backtest_coin_tf(df, tf) if not sessions: print(f"0 sessions") continue total_pnl = sum(s['total_pnl'] for s in sessions) real_pnl = sum(s['realized_pnl'] for s in sessions) inv_pnl = sum(s['inventory_pnl'] for s in sessions) win_sessions = [s for s in sessions if s['total_pnl'] > 0] wr = len(win_sessions) / len(sessions) * 100 total_rts = sum(s['round_trips'] for s in sessions) avg_bars = np.mean([s['bars'] for s in sessions]) avg_peak = np.mean([s['peak_inventory'] for s in sessions]) print(f"{len(sessions)} sess | PnL ${total_pnl:+.2f} (grid ${real_pnl:+.2f}, inv ${inv_pnl:+.2f}) | " f"WR {wr:.0f}% | {total_rts} RTs | avg {avg_bars:.0f} bars, peak_inv {avg_peak:.1f}") tf_results.append({ "symbol": symbol, "sessions": len(sessions), "total_pnl": round(total_pnl, 2), "realized_pnl": round(real_pnl, 2), "inventory_pnl": round(inv_pnl, 2), "win_rate": round(wr, 1), "total_round_trips": total_rts, "avg_bars": round(avg_bars, 1), "avg_peak_inventory": round(avg_peak, 1), "sessions_detail": sessions, }) results[tf] = tf_results # ============================================================ # SUMMARY # ============================================================ print("\n\n" + "=" * 70) print("SUMMARY โ€” SESSION GRID v2") print("=" * 70) for tf in TIMEFRAMES: tf_data = results.get(tf, []) if not tf_data: print(f"\n{tf}: No data") continue total_pnl = sum(c['total_pnl'] for c in tf_data) total_real = sum(c['realized_pnl'] for c in tf_data) total_inv = sum(c['inventory_pnl'] for c in tf_data) total_sess = sum(c['sessions'] for c in tf_data) total_rts = sum(c['total_round_trips'] for c in tf_data) coins_profit = sum(1 for c in tf_data if c['total_pnl'] > 0) print(f"\n๐Ÿ“Š {tf} TIMEFRAME:") print(f" Total PnL: ${total_pnl:+.2f} (grid ${total_real:+.2f}, inventory ${total_inv:+.2f})") print(f" Sessions: {total_sess} | Round-trips: {total_rts}") print(f" Profitable coins: {coins_profit}/{len(tf_data)}") if total_sess > 0: print(f" Avg PnL/session: ${total_pnl/total_sess:+.2f}") print(f" Avg PnL/day (est): ${total_pnl / LOOKBACK_DAYS:+.2f}") sorted_coins = sorted(tf_data, key=lambda c: c['total_pnl'], reverse=True) print(f"\n ๐ŸŸข Top 5:") for c in sorted_coins[:5]: print(f" {c['symbol']:15s} ${c['total_pnl']:+8.2f} " f"(grid ${c['realized_pnl']:+.2f} inv ${c['inventory_pnl']:+.2f}) " f"{c['sessions']} sess WR {c['win_rate']:.0f}% {c['total_round_trips']} RTs") if len(sorted_coins) > 5: print(f"\n ๐Ÿ”ด Bottom 5:") for c in sorted_coins[-5:]: print(f" {c['symbol']:15s} ${c['total_pnl']:+8.2f} " f"(grid ${c['realized_pnl']:+.2f} inv ${c['inventory_pnl']:+.2f}) " f"{c['sessions']} sess WR {c['win_rate']:.0f}%") # Compare print("\n\n" + "=" * 70) print("15m vs 1h COMPARISON") print("=" * 70) print(f" {'TF':4s} | {'Total PnL':>10s} | {'Grid PnL':>10s} | {'Inv PnL':>10s} | {'Sess':>5s} | {'RTs':>6s} | {'$/sess':>8s} | {'$/day':>7s}") print(f" {'-'*4}-+-{'-'*10}-+-{'-'*10}-+-{'-'*10}-+-{'-'*5}-+-{'-'*6}-+-{'-'*8}-+-{'-'*7}") for tf in TIMEFRAMES: tf_data = results.get(tf, []) tp = sum(c['total_pnl'] for c in tf_data) rp = sum(c['realized_pnl'] for c in tf_data) ip = sum(c['inventory_pnl'] for c in tf_data) sess = sum(c['sessions'] for c in tf_data) rts = sum(c['total_round_trips'] for c in tf_data) avg = tp / sess if sess > 0 else 0 day = tp / LOOKBACK_DAYS print(f" {tf:4s} | ${tp:+9.2f} | ${rp:+9.2f} | ${ip:+9.2f} | {sess:5d} | {rts:6d} | ${avg:+7.2f} | ${day:+6.2f}") # Save output_file = "/home/app/trading-bot/grid-bot/backtests/results_session_grid_v2.json" compact = {} for tf in TIMEFRAMES: compact[tf] = [{k: v for k, v in c.items() if k != 'sessions_detail'} for c in results.get(tf, [])] with open(output_file, 'w') as f: json.dump(compact, f, indent=2) print(f"\n๐Ÿ’พ Saved: {output_file}") if __name__ == "__main__": main()