โ† ะะฐะทะฐะด
""" Grid Strategy Backtest โ€” Classic vs Smart (BB Squeeze Filter) BTCUSDT 1m, 7 days, Binance Futures Two strategies compared: A) Classic Grid โ€” fixed levels, full stop on breakout B) Smart Grid โ€” BB squeeze activates grid, BB expansion โ†’ gradual unwind Usage: python3 backtest_grid.py """ import requests import pandas as pd import numpy as np import time import json from datetime import datetime, timedelta from pathlib import Path # ============================================================ # CONFIG # ============================================================ SYMBOL = "PENGUUSDT" INTERVAL = "1m" DAYS_BACK = 30 DEPOSIT = 50.0 # total deposit LEVERAGE = 10 POSITION_SIZE_USD = 3.0 # per grid level ($3 ร— 10x = $30 notional) FEE_PCT = 0.02 / 100 # maker 0.02% (limit orders = grid standard) # Grid params GRID_LEVELS = 8 # levels above + below mid (total 16, max $48 used of $50) GRID_SPACING_PCT = 0.1 # 0.1% between levels (tight for DOGE 1m) MAX_LOSS_PCT = 3.0 # max loss cap per grid session (% of total grid capital) # BB params (for Smart Grid) BB_PERIOD = 20 BB_STD = 2.0 BB_SQUEEZE_THRESHOLD = 1.5 # BB width % โ€” DOGE more volatile, higher threshold ADX_PERIOD = 14 ADX_THRESHOLD = 25 # ADX < 25 = ranging market # ============================================================ # DATA FETCH โ€” Binance Futures # ============================================================ def fetch_klines(symbol, interval, days_back): """Fetch klines from Binance Futures API""" url = "https://fapi.binance.com/fapi/v1/klines" end_ts = int(time.time() * 1000) start_ts = int((time.time() - days_back * 86400) * 1000) all_candles = [] current_start = start_ts print(f"[fetch] {symbol} {interval} โ€” {days_back} days...") while current_start < end_ts: params = { "symbol": symbol, "interval": interval, "startTime": current_start, "limit": 1500 } try: resp = requests.get(url, params=params, timeout=10) data = resp.json() if not isinstance(data, list) or len(data) == 0: break all_candles.extend(data) current_start = data[-1][0] + 1 time.sleep(0.1) except Exception as e: print(f"[fetch] Error: {e}, retrying...") time.sleep(1) continue df = pd.DataFrame(all_candles, columns=[ 'timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_volume', 'trades', 'taker_buy_base', 'taker_buy_quote', 'ignore' ]) for col in ['open', 'high', 'low', 'close', 'volume', 'quote_volume']: df[col] = df[col].astype(float) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') df = df.drop_duplicates(subset='timestamp').sort_values('timestamp').reset_index(drop=True) print(f"[fetch] Got {len(df)} candles: {df['timestamp'].iloc[0]} โ†’ {df['timestamp'].iloc[-1]}") return df # ============================================================ # INDICATORS # ============================================================ def calc_bollinger(df, period=20, std=2.0): """Bollinger Bands + BB Width %""" df['bb_mid'] = df['close'].rolling(period).mean() df['bb_std'] = df['close'].rolling(period).std() df['bb_upper'] = df['bb_mid'] + std * df['bb_std'] df['bb_lower'] = df['bb_mid'] - std * df['bb_std'] df['bb_width'] = ((df['bb_upper'] - df['bb_lower']) / df['bb_mid']) * 100 return df def calc_adx(df, period=14): """ADX indicator""" high = df['high'] low = df['low'] close = df['close'] plus_dm = high.diff() minus_dm = -low.diff() plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0.0) minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0.0) tr1 = high - low tr2 = (high - close.shift(1)).abs() tr3 = (low - close.shift(1)).abs() tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) atr = tr.ewm(alpha=1/period, min_periods=period).mean() plus_di = 100 * (plus_dm.ewm(alpha=1/period, min_periods=period).mean() / atr) minus_di = 100 * (minus_dm.ewm(alpha=1/period, min_periods=period).mean() / atr) dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-10) df['adx'] = dx.ewm(alpha=1/period, min_periods=period).mean() return df def calc_atr(df, period=14): """ATR for trailing stop""" high = df['high'] low = df['low'] close = df['close'] tr1 = high - low tr2 = (high - close.shift(1)).abs() tr3 = (low - close.shift(1)).abs() tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) df['atr'] = tr.ewm(alpha=1/period, min_periods=period).mean() return df # ============================================================ # GRID ENGINE โ€” Classic # ============================================================ def run_classic_grid(df): """ Classic Grid: fixed levels around entry price. When price hits a level โ€” open small position. Stop loss on entire accumulated position at MAX_LOSS_PCT. """ results = { 'sessions': [], 'total_pnl': 0, 'total_trades': 0, 'total_fees': 0, } i = BB_PERIOD + ADX_PERIOD # skip warmup session_id = 0 while i < len(df): # Start new grid session at current price mid_price = df['close'].iloc[i] # Create grid levels buy_levels = [] # below mid sell_levels = [] # above mid for lvl in range(1, GRID_LEVELS + 1): buy_levels.append(mid_price * (1 - lvl * GRID_SPACING_PCT / 100)) sell_levels.append(mid_price * (1 + lvl * GRID_SPACING_PCT / 100)) # Track fills buy_fills = [False] * GRID_LEVELS sell_fills = [False] * GRID_LEVELS positions = [] # list of (price, side, size_usd) session_pnl = 0 session_fees = 0 session_trades = 0 session_start = i max_capital = GRID_LEVELS * 2 * POSITION_SIZE_USD max_loss = max_capital * MAX_LOSS_PCT / 100 # Run session j = i + 1 session_closed = False while j < len(df) and not session_closed: price = df['close'].iloc[j] low = df['low'].iloc[j] high = df['high'].iloc[j] # Check buy level fills (price dipped to level) for lvl in range(GRID_LEVELS): if not buy_fills[lvl] and low <= buy_levels[lvl]: buy_fills[lvl] = True fill_price = buy_levels[lvl] positions.append(('long', fill_price, POSITION_SIZE_USD)) session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT session_trades += 1 # Check sell level fills (price rose to level) for lvl in range(GRID_LEVELS): if not sell_fills[lvl] and high >= sell_levels[lvl]: sell_fills[lvl] = True fill_price = sell_levels[lvl] positions.append(('short', fill_price, POSITION_SIZE_USD)) session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT session_trades += 1 # Calculate unrealized PnL unrealized = 0 for side, entry_px, size_usd in positions: qty = (size_usd * LEVERAGE) / entry_px if side == 'long': unrealized += qty * (price - entry_px) else: unrealized += qty * (entry_px - price) net_pnl = unrealized - session_fees # Close completed grid pairs (buy filled + corresponding sell filled) # Realize profit from completed round-trips realized = 0 remaining = [] for lvl in range(GRID_LEVELS): if buy_fills[lvl] and sell_fills[lvl]: # Round trip complete spread = sell_levels[lvl] - buy_levels[lvl] qty = (POSITION_SIZE_USD * LEVERAGE) / buy_levels[lvl] realized += qty * spread buy_fills[lvl] = False sell_fills[lvl] = False session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT # close fee session_pnl += realized # MAX LOSS stop โ€” close everything if net_pnl < -max_loss and len(positions) > 0: # Close all positions at current price close_pnl = 0 for side, entry_px, size_usd in positions: qty = (size_usd * LEVERAGE) / entry_px if side == 'long': close_pnl += qty * (price - entry_px) else: close_pnl += qty * (entry_px - price) session_fees += size_usd * LEVERAGE * FEE_PCT session_pnl += close_pnl positions = [] session_closed = True close_reason = 'max_loss' # Session timeout โ€” 60 min max if j - session_start >= 60: # Close remaining positions close_pnl = 0 for side, entry_px, size_usd in positions: qty = (size_usd * LEVERAGE) / entry_px if side == 'long': close_pnl += qty * (price - entry_px) else: close_pnl += qty * (entry_px - price) session_fees += size_usd * LEVERAGE * FEE_PCT session_pnl += close_pnl positions = [] session_closed = True close_reason = 'timeout' j += 1 net_session = session_pnl - session_fees if session_trades > 0: session_id += 1 results['sessions'].append({ 'id': session_id, 'start': str(df['timestamp'].iloc[session_start]), 'trades': session_trades, 'pnl': round(net_session, 4), 'fees': round(session_fees, 4), 'close_reason': close_reason if session_closed else 'end', 'duration_min': j - session_start, }) results['total_pnl'] += net_session results['total_trades'] += session_trades results['total_fees'] += session_fees # Next session starts after cooldown (5 min) i = j + 5 return results # ============================================================ # GRID ENGINE โ€” Smart (BB Squeeze + ADX filter) # ============================================================ def run_smart_grid(df): """ Smart Grid: - Only activates when BB squeeze detected (BB width < threshold AND ADX < 25) - Grid levels based on BB bands (not fixed %) - When BB expands (breakout) โ†’ gradually unwind, don't hard stop - Trail remaining positions with ATR-based stop """ results = { 'sessions': [], 'total_pnl': 0, 'total_trades': 0, 'total_fees': 0, } warmup = max(BB_PERIOD, ADX_PERIOD) + 10 i = warmup session_id = 0 cooldown_until = 0 while i < len(df): if i < cooldown_until: i += 1 continue bb_width = df['bb_width'].iloc[i] adx = df['adx'].iloc[i] # Wait for squeeze condition if bb_width > BB_SQUEEZE_THRESHOLD or adx > ADX_THRESHOLD: i += 1 continue # SQUEEZE DETECTED โ€” start grid session mid_price = df['bb_mid'].iloc[i] bb_upper = df['bb_upper'].iloc[i] bb_lower = df['bb_lower'].iloc[i] bb_range = bb_upper - bb_lower if bb_range <= 0 or np.isnan(bb_range): i += 1 continue # Grid levels within BB bands step = bb_range / (GRID_LEVELS + 1) buy_levels = [bb_lower + step * k for k in range(1, GRID_LEVELS // 2 + 1)] sell_levels = [bb_upper - step * k for k in range(1, GRID_LEVELS // 2 + 1)] sell_levels.reverse() n_buy = len(buy_levels) n_sell = len(sell_levels) buy_fills = [False] * n_buy sell_fills = [False] * n_sell positions = [] session_pnl = 0 session_fees = 0 session_trades = 0 session_start = i unwinding = False unwind_start = 0 max_capital = (n_buy + n_sell) * POSITION_SIZE_USD max_loss = max_capital * MAX_LOSS_PCT / 100 j = i + 1 session_closed = False close_reason = 'end' while j < len(df) and not session_closed: price = df['close'].iloc[j] low_px = df['low'].iloc[j] high_px = df['high'].iloc[j] curr_bb_width = df['bb_width'].iloc[j] curr_adx = df['adx'].iloc[j] curr_atr = df['atr'].iloc[j] if not np.isnan(df['atr'].iloc[j]) else 0 if not unwinding: # Fill buy levels for lvl in range(n_buy): if not buy_fills[lvl] and low_px <= buy_levels[lvl]: buy_fills[lvl] = True positions.append(('long', buy_levels[lvl], POSITION_SIZE_USD)) session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT session_trades += 1 # Fill sell levels for lvl in range(n_sell): if not sell_fills[lvl] and high_px >= sell_levels[lvl]: sell_fills[lvl] = True positions.append(('short', sell_levels[lvl], POSITION_SIZE_USD)) session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT session_trades += 1 # Realize round-trip profits for lvl in range(min(n_buy, n_sell)): if lvl < n_buy and lvl < n_sell and buy_fills[lvl] and sell_fills[lvl]: spread = sell_levels[lvl] - buy_levels[lvl] qty = (POSITION_SIZE_USD * LEVERAGE) / buy_levels[lvl] session_pnl += qty * spread session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT buy_fills[lvl] = False sell_fills[lvl] = False # Remove matched positions positions = [(s, p, sz) for s, p, sz in positions if not (s == 'long' and abs(p - buy_levels[lvl]) < 0.01) and not (s == 'short' and abs(p - sell_levels[lvl]) < 0.01)] # BREAKOUT DETECTION โ€” BB expanding + ADX rising if curr_bb_width > BB_SQUEEZE_THRESHOLD * 1.5 or curr_adx > ADX_THRESHOLD: unwinding = True unwind_start = j else: # UNWINDING MODE โ€” close positions gradually with trailing # Close 1 position per candle (gradual, not dump) if len(positions) > 0: side, entry_px, size_usd = positions.pop(0) qty = (size_usd * LEVERAGE) / entry_px if side == 'long': session_pnl += qty * (price - entry_px) else: session_pnl += qty * (entry_px - price) session_fees += size_usd * LEVERAGE * FEE_PCT else: session_closed = True close_reason = 'unwind_complete' # If unwinding takes too long (>10 candles), close all if j - unwind_start > 10 and len(positions) > 0: for side, entry_px, size_usd in positions: qty = (size_usd * LEVERAGE) / entry_px if side == 'long': session_pnl += qty * (price - entry_px) else: session_pnl += qty * (entry_px - price) session_fees += size_usd * LEVERAGE * FEE_PCT positions = [] session_closed = True close_reason = 'unwind_forced' # Unrealized PnL check unrealized = 0 for side, entry_px, size_usd in positions: qty = (size_usd * LEVERAGE) / entry_px if side == 'long': unrealized += qty * (price - entry_px) else: unrealized += qty * (entry_px - price) net_pnl = session_pnl + unrealized - session_fees # Max loss cap if net_pnl < -max_loss and len(positions) > 0: for side, entry_px, size_usd in positions: qty = (size_usd * LEVERAGE) / entry_px if side == 'long': session_pnl += qty * (price - entry_px) else: session_pnl += qty * (entry_px - price) session_fees += size_usd * LEVERAGE * FEE_PCT positions = [] session_closed = True close_reason = 'max_loss' # Session timeout โ€” 120 min for smart (longer than classic) if j - session_start >= 120: for side, entry_px, size_usd in positions: qty = (size_usd * LEVERAGE) / entry_px if side == 'long': session_pnl += qty * (price - entry_px) else: session_pnl += qty * (entry_px - price) session_fees += size_usd * LEVERAGE * FEE_PCT positions = [] session_closed = True close_reason = 'timeout' j += 1 net_session = session_pnl - session_fees if session_trades > 0: session_id += 1 results['sessions'].append({ 'id': session_id, 'start': str(df['timestamp'].iloc[session_start]), 'bb_width': round(bb_width, 4), 'adx': round(adx, 2), 'trades': session_trades, 'pnl': round(net_session, 4), 'fees': round(session_fees, 4), 'close_reason': close_reason, 'duration_min': j - session_start, }) results['total_pnl'] += net_session results['total_trades'] += session_trades results['total_fees'] += session_fees # Cooldown โ€” 10 min after session cooldown_until = j + 10 i = j + 1 return results # ============================================================ # REPORT # ============================================================ def print_report(name, results, df): sessions = results['sessions'] total_pnl = results['total_pnl'] total_trades = results['total_trades'] total_fees = results['total_fees'] wins = [s for s in sessions if s['pnl'] > 0] losses = [s for s in sessions if s['pnl'] <= 0] print(f"\n{'='*60}") print(f" {name}") print(f"{'='*60}") print(f" Symbol: {SYMBOL}") print(f" Period: {df['timestamp'].iloc[0]} โ†’ {df['timestamp'].iloc[-1]}") print(f" Candles: {len(df)}") print(f" Grid levels: {GRID_LEVELS} (${POSITION_SIZE_USD} ร— {LEVERAGE}x per level)") print(f" Grid spacing: {GRID_SPACING_PCT}%") print(f"{'โ”€'*60}") print(f" ๐Ÿ“Š Sessions: {len(sessions)}") print(f" โœ… Winning: {len(wins)} ({100*len(wins)/max(len(sessions),1):.0f}%)") print(f" โŒ Losing: {len(losses)} ({100*len(losses)/max(len(sessions),1):.0f}%)") print(f" ๐Ÿ“ˆ Total trades: {total_trades}") print(f"{'โ”€'*60}") print(f" ๐Ÿ’ฐ Total PnL: ${total_pnl:.4f}") print(f" ๐Ÿ’ธ Total fees: ${total_fees:.4f}") print(f" ๐Ÿ“Š Net PnL: ${total_pnl:.4f}") if len(wins) > 0: avg_win = sum(s['pnl'] for s in wins) / len(wins) print(f" ๐ŸŸข Avg win: ${avg_win:.4f}") if len(losses) > 0: avg_loss = sum(s['pnl'] for s in losses) / len(losses) print(f" ๐Ÿ”ด Avg loss: ${avg_loss:.4f}") # Close reasons reasons = {} for s in sessions: r = s.get('close_reason', 'unknown') reasons[r] = reasons.get(r, 0) + 1 print(f"{'โ”€'*60}") print(f" Close reasons:") for r, cnt in sorted(reasons.items(), key=lambda x: -x[1]): print(f" {r}: {cnt}") # Top 5 sessions if sessions: print(f"{'โ”€'*60}") print(f" Top 5 sessions (by PnL):") sorted_s = sorted(sessions, key=lambda x: x['pnl'], reverse=True)[:5] for s in sorted_s: emoji = '๐ŸŸข' if s['pnl'] > 0 else '๐Ÿ”ด' print(f" {emoji} #{s['id']} {s['start'][:16]} | {s['trades']}t | ${s['pnl']:.4f} | {s['duration_min']}min | {s['close_reason']}") # Worst 5 if len(sessions) > 5: print(f" Worst 5 sessions:") sorted_s = sorted(sessions, key=lambda x: x['pnl'])[:5] for s in sorted_s: emoji = '๐ŸŸข' if s['pnl'] > 0 else '๐Ÿ”ด' print(f" {emoji} #{s['id']} {s['start'][:16]} | {s['trades']}t | ${s['pnl']:.4f} | {s['duration_min']}min | {s['close_reason']}") print(f"{'='*60}\n") # ============================================================ # MAIN # ============================================================ if __name__ == "__main__": print("=" * 60) print(" GRID STRATEGY BACKTEST") print(f" {SYMBOL} | {INTERVAL} | {DAYS_BACK} days") print(f" ${POSITION_SIZE_USD} ร— {LEVERAGE}x per level | {GRID_LEVELS} levels") print("=" * 60) # 1. Fetch data df = fetch_klines(SYMBOL, INTERVAL, DAYS_BACK) # 2. Calculate indicators print("[indicators] Calculating BB, ADX, ATR...") df = calc_bollinger(df, BB_PERIOD, BB_STD) df = calc_adx(df, ADX_PERIOD) df = calc_atr(df) # BB squeeze stats valid = df.dropna(subset=['bb_width', 'adx']) squeeze_candles = len(valid[(valid['bb_width'] < BB_SQUEEZE_THRESHOLD) & (valid['adx'] < ADX_THRESHOLD)]) print(f"[indicators] BB squeeze candles: {squeeze_candles}/{len(valid)} ({100*squeeze_candles/max(len(valid),1):.1f}%)") print(f"[indicators] BB width range: {valid['bb_width'].min():.4f} โ€” {valid['bb_width'].max():.4f}") print(f"[indicators] ADX range: {valid['adx'].min():.1f} โ€” {valid['adx'].max():.1f}") # 3. Run Classic Grid print("\n[backtest] Running Classic Grid...") classic = run_classic_grid(df) # 4. Run Smart Grid print("[backtest] Running Smart Grid (BB squeeze filter)...") smart = run_smart_grid(df) # 5. Reports print_report("๐Ÿ“ฆ CLASSIC GRID (baseline)", classic, df) print_report("๐Ÿง  SMART GRID (BB squeeze + gradual unwind)", smart, df) # 6. Comparison print("=" * 60) print(" ๐Ÿ“Š COMPARISON") print("=" * 60) print(f" {'Metric':<25} {'Classic':>12} {'Smart':>12}") print(f" {'โ”€'*49}") print(f" {'Sessions':<25} {len(classic['sessions']):>12} {len(smart['sessions']):>12}") print(f" {'Total trades':<25} {classic['total_trades']:>12} {smart['total_trades']:>12}") c_pnl = f"${classic['total_pnl']:.4f}" s_pnl = f"${smart['total_pnl']:.4f}" c_fee = f"${classic['total_fees']:.4f}" s_fee = f"${smart['total_fees']:.4f}" print(f" {'Total PnL':<25} {c_pnl:>12} {s_pnl:>12}") print(f" {'Total fees':<25} {c_fee:>12} {s_fee:>12}") c_wr = 100 * len([s for s in classic['sessions'] if s['pnl'] > 0]) / max(len(classic['sessions']), 1) s_wr = 100 * len([s for s in smart['sessions'] if s['pnl'] > 0]) / max(len(smart['sessions']), 1) print(f" {'Win rate':<25} {f'{c_wr:.0f}%':>12} {f'{s_wr:.0f}%':>12}") print("=" * 60) # 7. Save results output = { 'config': { 'symbol': SYMBOL, 'interval': INTERVAL, 'days_back': DAYS_BACK, 'grid_levels': GRID_LEVELS, 'grid_spacing_pct': GRID_SPACING_PCT, 'position_size_usd': POSITION_SIZE_USD, 'leverage': LEVERAGE, 'max_loss_pct': MAX_LOSS_PCT, 'bb_squeeze_threshold': BB_SQUEEZE_THRESHOLD, 'adx_threshold': ADX_THRESHOLD, }, 'classic': { 'total_pnl': round(classic['total_pnl'], 4), 'total_trades': classic['total_trades'], 'total_fees': round(classic['total_fees'], 4), 'sessions': classic['sessions'], }, 'smart': { 'total_pnl': round(smart['total_pnl'], 4), 'total_trades': smart['total_trades'], 'total_fees': round(smart['total_fees'], 4), 'sessions': smart['sessions'], }, 'tested_at': datetime.utcnow().isoformat(), } out_path = Path(__file__).parent / 'results_grid.json' with open(out_path, 'w') as f: json.dump(output, f, indent=2) print(f"\n๐Ÿ’พ Results saved to {out_path}")