โ ะะฐะทะฐะด"""
Grid Strategy Backtest โ Classic vs Smart (BB Squeeze Filter)
BTCUSDT 1m, 7 days, Binance Futures
Two strategies compared:
A) Classic Grid โ fixed levels, full stop on breakout
B) Smart Grid โ BB squeeze activates grid, BB expansion โ gradual unwind
Usage: python3 backtest_grid.py
"""
import requests
import pandas as pd
import numpy as np
import time
import json
from datetime import datetime, timedelta
from pathlib import Path
# ============================================================
# CONFIG
# ============================================================
SYMBOL = "PENGUUSDT"
INTERVAL = "1m"
DAYS_BACK = 30
DEPOSIT = 50.0 # total deposit
LEVERAGE = 10
POSITION_SIZE_USD = 3.0 # per grid level ($3 ร 10x = $30 notional)
FEE_PCT = 0.02 / 100 # maker 0.02% (limit orders = grid standard)
# Grid params
GRID_LEVELS = 8 # levels above + below mid (total 16, max $48 used of $50)
GRID_SPACING_PCT = 0.1 # 0.1% between levels (tight for DOGE 1m)
MAX_LOSS_PCT = 3.0 # max loss cap per grid session (% of total grid capital)
# BB params (for Smart Grid)
BB_PERIOD = 20
BB_STD = 2.0
BB_SQUEEZE_THRESHOLD = 1.5 # BB width % โ DOGE more volatile, higher threshold
ADX_PERIOD = 14
ADX_THRESHOLD = 25 # ADX < 25 = ranging market
# ============================================================
# DATA FETCH โ Binance Futures
# ============================================================
def fetch_klines(symbol, interval, days_back):
"""Fetch klines from Binance Futures API"""
url = "https://fapi.binance.com/fapi/v1/klines"
end_ts = int(time.time() * 1000)
start_ts = int((time.time() - days_back * 86400) * 1000)
all_candles = []
current_start = start_ts
print(f"[fetch] {symbol} {interval} โ {days_back} days...")
while current_start < end_ts:
params = {
"symbol": symbol,
"interval": interval,
"startTime": current_start,
"limit": 1500
}
try:
resp = requests.get(url, params=params, timeout=10)
data = resp.json()
if not isinstance(data, list) or len(data) == 0:
break
all_candles.extend(data)
current_start = data[-1][0] + 1
time.sleep(0.1)
except Exception as e:
print(f"[fetch] Error: {e}, retrying...")
time.sleep(1)
continue
df = pd.DataFrame(all_candles, columns=[
'timestamp', 'open', 'high', 'low', 'close', 'volume',
'close_time', 'quote_volume', 'trades', 'taker_buy_base',
'taker_buy_quote', 'ignore'
])
for col in ['open', 'high', 'low', 'close', 'volume', 'quote_volume']:
df[col] = df[col].astype(float)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df = df.drop_duplicates(subset='timestamp').sort_values('timestamp').reset_index(drop=True)
print(f"[fetch] Got {len(df)} candles: {df['timestamp'].iloc[0]} โ {df['timestamp'].iloc[-1]}")
return df
# ============================================================
# INDICATORS
# ============================================================
def calc_bollinger(df, period=20, std=2.0):
"""Bollinger Bands + BB Width %"""
df['bb_mid'] = df['close'].rolling(period).mean()
df['bb_std'] = df['close'].rolling(period).std()
df['bb_upper'] = df['bb_mid'] + std * df['bb_std']
df['bb_lower'] = df['bb_mid'] - std * df['bb_std']
df['bb_width'] = ((df['bb_upper'] - df['bb_lower']) / df['bb_mid']) * 100
return df
def calc_adx(df, period=14):
"""ADX indicator"""
high = df['high']
low = df['low']
close = df['close']
plus_dm = high.diff()
minus_dm = -low.diff()
plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0.0)
minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0.0)
tr1 = high - low
tr2 = (high - close.shift(1)).abs()
tr3 = (low - close.shift(1)).abs()
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
atr = tr.ewm(alpha=1/period, min_periods=period).mean()
plus_di = 100 * (plus_dm.ewm(alpha=1/period, min_periods=period).mean() / atr)
minus_di = 100 * (minus_dm.ewm(alpha=1/period, min_periods=period).mean() / atr)
dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-10)
df['adx'] = dx.ewm(alpha=1/period, min_periods=period).mean()
return df
def calc_atr(df, period=14):
"""ATR for trailing stop"""
high = df['high']
low = df['low']
close = df['close']
tr1 = high - low
tr2 = (high - close.shift(1)).abs()
tr3 = (low - close.shift(1)).abs()
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
df['atr'] = tr.ewm(alpha=1/period, min_periods=period).mean()
return df
# ============================================================
# GRID ENGINE โ Classic
# ============================================================
def run_classic_grid(df):
"""
Classic Grid: fixed levels around entry price.
When price hits a level โ open small position.
Stop loss on entire accumulated position at MAX_LOSS_PCT.
"""
results = {
'sessions': [],
'total_pnl': 0,
'total_trades': 0,
'total_fees': 0,
}
i = BB_PERIOD + ADX_PERIOD # skip warmup
session_id = 0
while i < len(df):
# Start new grid session at current price
mid_price = df['close'].iloc[i]
# Create grid levels
buy_levels = [] # below mid
sell_levels = [] # above mid
for lvl in range(1, GRID_LEVELS + 1):
buy_levels.append(mid_price * (1 - lvl * GRID_SPACING_PCT / 100))
sell_levels.append(mid_price * (1 + lvl * GRID_SPACING_PCT / 100))
# Track fills
buy_fills = [False] * GRID_LEVELS
sell_fills = [False] * GRID_LEVELS
positions = [] # list of (price, side, size_usd)
session_pnl = 0
session_fees = 0
session_trades = 0
session_start = i
max_capital = GRID_LEVELS * 2 * POSITION_SIZE_USD
max_loss = max_capital * MAX_LOSS_PCT / 100
# Run session
j = i + 1
session_closed = False
while j < len(df) and not session_closed:
price = df['close'].iloc[j]
low = df['low'].iloc[j]
high = df['high'].iloc[j]
# Check buy level fills (price dipped to level)
for lvl in range(GRID_LEVELS):
if not buy_fills[lvl] and low <= buy_levels[lvl]:
buy_fills[lvl] = True
fill_price = buy_levels[lvl]
positions.append(('long', fill_price, POSITION_SIZE_USD))
session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT
session_trades += 1
# Check sell level fills (price rose to level)
for lvl in range(GRID_LEVELS):
if not sell_fills[lvl] and high >= sell_levels[lvl]:
sell_fills[lvl] = True
fill_price = sell_levels[lvl]
positions.append(('short', fill_price, POSITION_SIZE_USD))
session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT
session_trades += 1
# Calculate unrealized PnL
unrealized = 0
for side, entry_px, size_usd in positions:
qty = (size_usd * LEVERAGE) / entry_px
if side == 'long':
unrealized += qty * (price - entry_px)
else:
unrealized += qty * (entry_px - price)
net_pnl = unrealized - session_fees
# Close completed grid pairs (buy filled + corresponding sell filled)
# Realize profit from completed round-trips
realized = 0
remaining = []
for lvl in range(GRID_LEVELS):
if buy_fills[lvl] and sell_fills[lvl]:
# Round trip complete
spread = sell_levels[lvl] - buy_levels[lvl]
qty = (POSITION_SIZE_USD * LEVERAGE) / buy_levels[lvl]
realized += qty * spread
buy_fills[lvl] = False
sell_fills[lvl] = False
session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT # close fee
session_pnl += realized
# MAX LOSS stop โ close everything
if net_pnl < -max_loss and len(positions) > 0:
# Close all positions at current price
close_pnl = 0
for side, entry_px, size_usd in positions:
qty = (size_usd * LEVERAGE) / entry_px
if side == 'long':
close_pnl += qty * (price - entry_px)
else:
close_pnl += qty * (entry_px - price)
session_fees += size_usd * LEVERAGE * FEE_PCT
session_pnl += close_pnl
positions = []
session_closed = True
close_reason = 'max_loss'
# Session timeout โ 60 min max
if j - session_start >= 60:
# Close remaining positions
close_pnl = 0
for side, entry_px, size_usd in positions:
qty = (size_usd * LEVERAGE) / entry_px
if side == 'long':
close_pnl += qty * (price - entry_px)
else:
close_pnl += qty * (entry_px - price)
session_fees += size_usd * LEVERAGE * FEE_PCT
session_pnl += close_pnl
positions = []
session_closed = True
close_reason = 'timeout'
j += 1
net_session = session_pnl - session_fees
if session_trades > 0:
session_id += 1
results['sessions'].append({
'id': session_id,
'start': str(df['timestamp'].iloc[session_start]),
'trades': session_trades,
'pnl': round(net_session, 4),
'fees': round(session_fees, 4),
'close_reason': close_reason if session_closed else 'end',
'duration_min': j - session_start,
})
results['total_pnl'] += net_session
results['total_trades'] += session_trades
results['total_fees'] += session_fees
# Next session starts after cooldown (5 min)
i = j + 5
return results
# ============================================================
# GRID ENGINE โ Smart (BB Squeeze + ADX filter)
# ============================================================
def run_smart_grid(df):
"""
Smart Grid:
- Only activates when BB squeeze detected (BB width < threshold AND ADX < 25)
- Grid levels based on BB bands (not fixed %)
- When BB expands (breakout) โ gradually unwind, don't hard stop
- Trail remaining positions with ATR-based stop
"""
results = {
'sessions': [],
'total_pnl': 0,
'total_trades': 0,
'total_fees': 0,
}
warmup = max(BB_PERIOD, ADX_PERIOD) + 10
i = warmup
session_id = 0
cooldown_until = 0
while i < len(df):
if i < cooldown_until:
i += 1
continue
bb_width = df['bb_width'].iloc[i]
adx = df['adx'].iloc[i]
# Wait for squeeze condition
if bb_width > BB_SQUEEZE_THRESHOLD or adx > ADX_THRESHOLD:
i += 1
continue
# SQUEEZE DETECTED โ start grid session
mid_price = df['bb_mid'].iloc[i]
bb_upper = df['bb_upper'].iloc[i]
bb_lower = df['bb_lower'].iloc[i]
bb_range = bb_upper - bb_lower
if bb_range <= 0 or np.isnan(bb_range):
i += 1
continue
# Grid levels within BB bands
step = bb_range / (GRID_LEVELS + 1)
buy_levels = [bb_lower + step * k for k in range(1, GRID_LEVELS // 2 + 1)]
sell_levels = [bb_upper - step * k for k in range(1, GRID_LEVELS // 2 + 1)]
sell_levels.reverse()
n_buy = len(buy_levels)
n_sell = len(sell_levels)
buy_fills = [False] * n_buy
sell_fills = [False] * n_sell
positions = []
session_pnl = 0
session_fees = 0
session_trades = 0
session_start = i
unwinding = False
unwind_start = 0
max_capital = (n_buy + n_sell) * POSITION_SIZE_USD
max_loss = max_capital * MAX_LOSS_PCT / 100
j = i + 1
session_closed = False
close_reason = 'end'
while j < len(df) and not session_closed:
price = df['close'].iloc[j]
low_px = df['low'].iloc[j]
high_px = df['high'].iloc[j]
curr_bb_width = df['bb_width'].iloc[j]
curr_adx = df['adx'].iloc[j]
curr_atr = df['atr'].iloc[j] if not np.isnan(df['atr'].iloc[j]) else 0
if not unwinding:
# Fill buy levels
for lvl in range(n_buy):
if not buy_fills[lvl] and low_px <= buy_levels[lvl]:
buy_fills[lvl] = True
positions.append(('long', buy_levels[lvl], POSITION_SIZE_USD))
session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT
session_trades += 1
# Fill sell levels
for lvl in range(n_sell):
if not sell_fills[lvl] and high_px >= sell_levels[lvl]:
sell_fills[lvl] = True
positions.append(('short', sell_levels[lvl], POSITION_SIZE_USD))
session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT
session_trades += 1
# Realize round-trip profits
for lvl in range(min(n_buy, n_sell)):
if lvl < n_buy and lvl < n_sell and buy_fills[lvl] and sell_fills[lvl]:
spread = sell_levels[lvl] - buy_levels[lvl]
qty = (POSITION_SIZE_USD * LEVERAGE) / buy_levels[lvl]
session_pnl += qty * spread
session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT
buy_fills[lvl] = False
sell_fills[lvl] = False
# Remove matched positions
positions = [(s, p, sz) for s, p, sz in positions
if not (s == 'long' and abs(p - buy_levels[lvl]) < 0.01)
and not (s == 'short' and abs(p - sell_levels[lvl]) < 0.01)]
# BREAKOUT DETECTION โ BB expanding + ADX rising
if curr_bb_width > BB_SQUEEZE_THRESHOLD * 1.5 or curr_adx > ADX_THRESHOLD:
unwinding = True
unwind_start = j
else:
# UNWINDING MODE โ close positions gradually with trailing
# Close 1 position per candle (gradual, not dump)
if len(positions) > 0:
side, entry_px, size_usd = positions.pop(0)
qty = (size_usd * LEVERAGE) / entry_px
if side == 'long':
session_pnl += qty * (price - entry_px)
else:
session_pnl += qty * (entry_px - price)
session_fees += size_usd * LEVERAGE * FEE_PCT
else:
session_closed = True
close_reason = 'unwind_complete'
# If unwinding takes too long (>10 candles), close all
if j - unwind_start > 10 and len(positions) > 0:
for side, entry_px, size_usd in positions:
qty = (size_usd * LEVERAGE) / entry_px
if side == 'long':
session_pnl += qty * (price - entry_px)
else:
session_pnl += qty * (entry_px - price)
session_fees += size_usd * LEVERAGE * FEE_PCT
positions = []
session_closed = True
close_reason = 'unwind_forced'
# Unrealized PnL check
unrealized = 0
for side, entry_px, size_usd in positions:
qty = (size_usd * LEVERAGE) / entry_px
if side == 'long':
unrealized += qty * (price - entry_px)
else:
unrealized += qty * (entry_px - price)
net_pnl = session_pnl + unrealized - session_fees
# Max loss cap
if net_pnl < -max_loss and len(positions) > 0:
for side, entry_px, size_usd in positions:
qty = (size_usd * LEVERAGE) / entry_px
if side == 'long':
session_pnl += qty * (price - entry_px)
else:
session_pnl += qty * (entry_px - price)
session_fees += size_usd * LEVERAGE * FEE_PCT
positions = []
session_closed = True
close_reason = 'max_loss'
# Session timeout โ 120 min for smart (longer than classic)
if j - session_start >= 120:
for side, entry_px, size_usd in positions:
qty = (size_usd * LEVERAGE) / entry_px
if side == 'long':
session_pnl += qty * (price - entry_px)
else:
session_pnl += qty * (entry_px - price)
session_fees += size_usd * LEVERAGE * FEE_PCT
positions = []
session_closed = True
close_reason = 'timeout'
j += 1
net_session = session_pnl - session_fees
if session_trades > 0:
session_id += 1
results['sessions'].append({
'id': session_id,
'start': str(df['timestamp'].iloc[session_start]),
'bb_width': round(bb_width, 4),
'adx': round(adx, 2),
'trades': session_trades,
'pnl': round(net_session, 4),
'fees': round(session_fees, 4),
'close_reason': close_reason,
'duration_min': j - session_start,
})
results['total_pnl'] += net_session
results['total_trades'] += session_trades
results['total_fees'] += session_fees
# Cooldown โ 10 min after session
cooldown_until = j + 10
i = j + 1
return results
# ============================================================
# REPORT
# ============================================================
def print_report(name, results, df):
sessions = results['sessions']
total_pnl = results['total_pnl']
total_trades = results['total_trades']
total_fees = results['total_fees']
wins = [s for s in sessions if s['pnl'] > 0]
losses = [s for s in sessions if s['pnl'] <= 0]
print(f"\n{'='*60}")
print(f" {name}")
print(f"{'='*60}")
print(f" Symbol: {SYMBOL}")
print(f" Period: {df['timestamp'].iloc[0]} โ {df['timestamp'].iloc[-1]}")
print(f" Candles: {len(df)}")
print(f" Grid levels: {GRID_LEVELS} (${POSITION_SIZE_USD} ร {LEVERAGE}x per level)")
print(f" Grid spacing: {GRID_SPACING_PCT}%")
print(f"{'โ'*60}")
print(f" ๐ Sessions: {len(sessions)}")
print(f" โ
Winning: {len(wins)} ({100*len(wins)/max(len(sessions),1):.0f}%)")
print(f" โ Losing: {len(losses)} ({100*len(losses)/max(len(sessions),1):.0f}%)")
print(f" ๐ Total trades: {total_trades}")
print(f"{'โ'*60}")
print(f" ๐ฐ Total PnL: ${total_pnl:.4f}")
print(f" ๐ธ Total fees: ${total_fees:.4f}")
print(f" ๐ Net PnL: ${total_pnl:.4f}")
if len(wins) > 0:
avg_win = sum(s['pnl'] for s in wins) / len(wins)
print(f" ๐ข Avg win: ${avg_win:.4f}")
if len(losses) > 0:
avg_loss = sum(s['pnl'] for s in losses) / len(losses)
print(f" ๐ด Avg loss: ${avg_loss:.4f}")
# Close reasons
reasons = {}
for s in sessions:
r = s.get('close_reason', 'unknown')
reasons[r] = reasons.get(r, 0) + 1
print(f"{'โ'*60}")
print(f" Close reasons:")
for r, cnt in sorted(reasons.items(), key=lambda x: -x[1]):
print(f" {r}: {cnt}")
# Top 5 sessions
if sessions:
print(f"{'โ'*60}")
print(f" Top 5 sessions (by PnL):")
sorted_s = sorted(sessions, key=lambda x: x['pnl'], reverse=True)[:5]
for s in sorted_s:
emoji = '๐ข' if s['pnl'] > 0 else '๐ด'
print(f" {emoji} #{s['id']} {s['start'][:16]} | {s['trades']}t | ${s['pnl']:.4f} | {s['duration_min']}min | {s['close_reason']}")
# Worst 5
if len(sessions) > 5:
print(f" Worst 5 sessions:")
sorted_s = sorted(sessions, key=lambda x: x['pnl'])[:5]
for s in sorted_s:
emoji = '๐ข' if s['pnl'] > 0 else '๐ด'
print(f" {emoji} #{s['id']} {s['start'][:16]} | {s['trades']}t | ${s['pnl']:.4f} | {s['duration_min']}min | {s['close_reason']}")
print(f"{'='*60}\n")
# ============================================================
# MAIN
# ============================================================
if __name__ == "__main__":
print("=" * 60)
print(" GRID STRATEGY BACKTEST")
print(f" {SYMBOL} | {INTERVAL} | {DAYS_BACK} days")
print(f" ${POSITION_SIZE_USD} ร {LEVERAGE}x per level | {GRID_LEVELS} levels")
print("=" * 60)
# 1. Fetch data
df = fetch_klines(SYMBOL, INTERVAL, DAYS_BACK)
# 2. Calculate indicators
print("[indicators] Calculating BB, ADX, ATR...")
df = calc_bollinger(df, BB_PERIOD, BB_STD)
df = calc_adx(df, ADX_PERIOD)
df = calc_atr(df)
# BB squeeze stats
valid = df.dropna(subset=['bb_width', 'adx'])
squeeze_candles = len(valid[(valid['bb_width'] < BB_SQUEEZE_THRESHOLD) & (valid['adx'] < ADX_THRESHOLD)])
print(f"[indicators] BB squeeze candles: {squeeze_candles}/{len(valid)} ({100*squeeze_candles/max(len(valid),1):.1f}%)")
print(f"[indicators] BB width range: {valid['bb_width'].min():.4f} โ {valid['bb_width'].max():.4f}")
print(f"[indicators] ADX range: {valid['adx'].min():.1f} โ {valid['adx'].max():.1f}")
# 3. Run Classic Grid
print("\n[backtest] Running Classic Grid...")
classic = run_classic_grid(df)
# 4. Run Smart Grid
print("[backtest] Running Smart Grid (BB squeeze filter)...")
smart = run_smart_grid(df)
# 5. Reports
print_report("๐ฆ CLASSIC GRID (baseline)", classic, df)
print_report("๐ง SMART GRID (BB squeeze + gradual unwind)", smart, df)
# 6. Comparison
print("=" * 60)
print(" ๐ COMPARISON")
print("=" * 60)
print(f" {'Metric':<25} {'Classic':>12} {'Smart':>12}")
print(f" {'โ'*49}")
print(f" {'Sessions':<25} {len(classic['sessions']):>12} {len(smart['sessions']):>12}")
print(f" {'Total trades':<25} {classic['total_trades']:>12} {smart['total_trades']:>12}")
c_pnl = f"${classic['total_pnl']:.4f}"
s_pnl = f"${smart['total_pnl']:.4f}"
c_fee = f"${classic['total_fees']:.4f}"
s_fee = f"${smart['total_fees']:.4f}"
print(f" {'Total PnL':<25} {c_pnl:>12} {s_pnl:>12}")
print(f" {'Total fees':<25} {c_fee:>12} {s_fee:>12}")
c_wr = 100 * len([s for s in classic['sessions'] if s['pnl'] > 0]) / max(len(classic['sessions']), 1)
s_wr = 100 * len([s for s in smart['sessions'] if s['pnl'] > 0]) / max(len(smart['sessions']), 1)
print(f" {'Win rate':<25} {f'{c_wr:.0f}%':>12} {f'{s_wr:.0f}%':>12}")
print("=" * 60)
# 7. Save results
output = {
'config': {
'symbol': SYMBOL,
'interval': INTERVAL,
'days_back': DAYS_BACK,
'grid_levels': GRID_LEVELS,
'grid_spacing_pct': GRID_SPACING_PCT,
'position_size_usd': POSITION_SIZE_USD,
'leverage': LEVERAGE,
'max_loss_pct': MAX_LOSS_PCT,
'bb_squeeze_threshold': BB_SQUEEZE_THRESHOLD,
'adx_threshold': ADX_THRESHOLD,
},
'classic': {
'total_pnl': round(classic['total_pnl'], 4),
'total_trades': classic['total_trades'],
'total_fees': round(classic['total_fees'], 4),
'sessions': classic['sessions'],
},
'smart': {
'total_pnl': round(smart['total_pnl'], 4),
'total_trades': smart['total_trades'],
'total_fees': round(smart['total_fees'], 4),
'sessions': smart['sessions'],
},
'tested_at': datetime.utcnow().isoformat(),
}
out_path = Path(__file__).parent / 'results_grid.json'
with open(out_path, 'w') as f:
json.dump(output, f, indent=2)
print(f"\n๐พ Results saved to {out_path}")