← Назад"""
Grid Backtest — Adaptive Spacing based on NATR
================================================
Tests coins that traded live and lost money.
Compares fixed 0.1% spacing vs adaptive spacing = NATR/N.
Hypothesis: RIVER, PLAY, TRU are too volatile for 0.1% spacing.
When NATR is high, need wider spacing to avoid getting steamrolled.
Usage: python3 backtest_grid_adaptive.py
"""
import requests
import pandas as pd
import numpy as np
import time
import json
from datetime import datetime
from pathlib import Path
# ============================================================
# CONFIG
# ============================================================
SYMBOLS = ["RIVERUSDT", "PLAYUSDT", "TRUUSDT", "ZECUSDT", "DOGEUSDT", "SOLUSDT"]
INTERVAL = "1m"
DAYS_BACK = 7
DEPOSIT = 50.0
LEVERAGE = 10
POSITION_SIZE_USD = 3.0
FEE_PCT = 0.02 / 100 # maker 0.02%
GRID_LEVELS = 8
MAX_LOSS_PCT = 3.0
# Fixed spacing for comparison
FIXED_SPACING = 0.1
# Adaptive spacing: spacing = NATR * MULTIPLIER
# NATR(14) on 1m ≈ volatility per candle as % of price
# If NATR = 0.05% → spacing = 0.05 * 2 = 0.1% (same as fixed for low vol)
# If NATR = 0.15% → spacing = 0.15 * 2 = 0.3% (wider for high vol)
ADAPTIVE_MULTIPLIERS = [1.5, 2.0, 2.5, 3.0]
NATR_PERIOD = 14
# BB/ADX for reference
BB_PERIOD = 20
BB_STD = 2.0
ADX_PERIOD = 14
# ============================================================
# DATA FETCH
# ============================================================
def fetch_klines(symbol, interval, days_back):
url = "https://fapi.binance.com/fapi/v1/klines"
end_ts = int(time.time() * 1000)
start_ts = int((time.time() - days_back * 86400) * 1000)
all_candles = []
current_start = start_ts
while current_start < end_ts:
params = {
"symbol": symbol, "interval": interval,
"startTime": current_start, "limit": 1500
}
try:
resp = requests.get(url, params=params, timeout=10)
data = resp.json()
if not isinstance(data, list) or len(data) == 0:
break
all_candles.extend(data)
current_start = data[-1][0] + 1
time.sleep(0.1)
except Exception as e:
print(f" Error: {e}, retrying...")
time.sleep(1)
continue
df = pd.DataFrame(all_candles, columns=[
'timestamp', 'open', 'high', 'low', 'close', 'volume',
'close_time', 'quote_volume', 'trades', 'taker_buy_base',
'taker_buy_quote', 'ignore'
])
for col in ['open', 'high', 'low', 'close', 'volume', 'quote_volume']:
df[col] = df[col].astype(float)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df = df.drop_duplicates(subset='timestamp').sort_values('timestamp').reset_index(drop=True)
return df
# ============================================================
# INDICATORS
# ============================================================
def calc_indicators(df):
# ATR / NATR
high, low, close = df['high'], df['low'], df['close']
tr1 = high - low
tr2 = (high - close.shift(1)).abs()
tr3 = (low - close.shift(1)).abs()
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
df['atr'] = tr.ewm(alpha=1/NATR_PERIOD, min_periods=NATR_PERIOD).mean()
df['natr'] = (df['atr'] / df['close']) * 100 # as percentage
# BB
df['bb_mid'] = close.rolling(BB_PERIOD).mean()
df['bb_std'] = close.rolling(BB_PERIOD).std()
df['bb_upper'] = df['bb_mid'] + BB_STD * df['bb_std']
df['bb_lower'] = df['bb_mid'] - BB_STD * df['bb_std']
df['bb_width'] = ((df['bb_upper'] - df['bb_lower']) / df['bb_mid']) * 100
# ADX
plus_dm = high.diff()
minus_dm = -low.diff()
plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0.0)
minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0.0)
atr_adx = tr.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean()
plus_di = 100 * (plus_dm.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean() / atr_adx)
minus_di = 100 * (minus_dm.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean() / atr_adx)
dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-10)
df['adx'] = dx.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean()
return df
# ============================================================
# GRID ENGINE — supports both fixed and adaptive spacing
# ============================================================
def run_grid(df, spacing_mode="fixed", spacing_value=0.1, adaptive_mult=2.0):
"""
Grid engine.
spacing_mode: "fixed" = constant % spacing, "adaptive" = NATR * multiplier
No recenter — infinite grid, just round-trips + max loss stop.
"""
results = {
'sessions': [],
'total_pnl': 0,
'total_trades': 0,
'total_fees': 0,
'total_round_trips': 0,
'max_loss_stops': 0,
}
warmup = max(BB_PERIOD, ADX_PERIOD, NATR_PERIOD) + 10
i = warmup
session_id = 0
while i < len(df):
mid_price = df['close'].iloc[i]
# Determine spacing
if spacing_mode == "adaptive":
natr = df['natr'].iloc[i]
if pd.isna(natr) or natr <= 0:
i += 1
continue
spacing_pct = natr * adaptive_mult
# Clamp: min 0.05%, max 1.0%
spacing_pct = max(0.05, min(1.0, spacing_pct))
else:
spacing_pct = spacing_value
spacing_abs = mid_price * spacing_pct / 100
# Create grid levels
buy_levels = [mid_price - k * spacing_abs for k in range(1, GRID_LEVELS + 1)]
sell_levels = [mid_price + k * spacing_abs for k in range(1, GRID_LEVELS + 1)]
buy_fills = [False] * GRID_LEVELS
sell_fills = [False] * GRID_LEVELS
# Track individual fill prices for round-trip matching
buy_fill_prices = [0.0] * GRID_LEVELS
sell_fill_prices = [0.0] * GRID_LEVELS
session_pnl = 0.0
session_fees = 0.0
session_trades = 0
session_rts = 0
session_start = i
max_capital = GRID_LEVELS * 2 * POSITION_SIZE_USD
max_loss = max_capital * MAX_LOSS_PCT / 100
j = i + 1
session_closed = False
close_reason = 'timeout'
while j < len(df) and not session_closed:
price = df['close'].iloc[j]
low_px = df['low'].iloc[j]
high_px = df['high'].iloc[j]
# Fill buy levels
for lvl in range(GRID_LEVELS):
if not buy_fills[lvl] and low_px <= buy_levels[lvl]:
buy_fills[lvl] = True
buy_fill_prices[lvl] = buy_levels[lvl]
session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT
session_trades += 1
# Fill sell levels
for lvl in range(GRID_LEVELS):
if not sell_fills[lvl] and high_px >= sell_levels[lvl]:
sell_fills[lvl] = True
sell_fill_prices[lvl] = sell_levels[lvl]
session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT
session_trades += 1
# Check round-trips (buy + sell pair both filled)
for lvl in range(GRID_LEVELS):
if buy_fills[lvl] and sell_fills[lvl]:
# Round trip!
buy_px = buy_fill_prices[lvl]
sell_px = sell_fill_prices[lvl]
qty = (POSITION_SIZE_USD * LEVERAGE) / buy_px
rt_pnl = qty * (sell_px - buy_px)
rt_fee = POSITION_SIZE_USD * LEVERAGE * FEE_PCT # close fee
session_pnl += rt_pnl
session_fees += rt_fee
session_rts += 1
# Reset level for re-use
buy_fills[lvl] = False
sell_fills[lvl] = False
# Unrealized PnL from unfilled one-sided fills
unrealized = 0
for lvl in range(GRID_LEVELS):
if buy_fills[lvl] and not sell_fills[lvl]:
qty = (POSITION_SIZE_USD * LEVERAGE) / buy_fill_prices[lvl]
unrealized += qty * (price - buy_fill_prices[lvl])
elif sell_fills[lvl] and not buy_fills[lvl]:
qty = (POSITION_SIZE_USD * LEVERAGE) / sell_fill_prices[lvl]
unrealized += qty * (sell_fill_prices[lvl] - price)
net_pnl = session_pnl + unrealized - session_fees
# Max loss stop
if net_pnl < -max_loss:
# Close all open positions at market
close_pnl = 0
for lvl in range(GRID_LEVELS):
if buy_fills[lvl] and not sell_fills[lvl]:
qty = (POSITION_SIZE_USD * LEVERAGE) / buy_fill_prices[lvl]
close_pnl += qty * (price - buy_fill_prices[lvl])
session_fees += POSITION_SIZE_USD * LEVERAGE * 0.04 / 100 # taker
elif sell_fills[lvl] and not buy_fills[lvl]:
qty = (POSITION_SIZE_USD * LEVERAGE) / sell_fill_prices[lvl]
close_pnl += qty * (sell_fill_prices[lvl] - price)
session_fees += POSITION_SIZE_USD * LEVERAGE * 0.04 / 100 # taker
session_pnl += close_pnl
session_closed = True
close_reason = 'max_loss'
# Session timeout — 60 min
if j - session_start >= 60:
close_pnl = 0
for lvl in range(GRID_LEVELS):
if buy_fills[lvl] and not sell_fills[lvl]:
qty = (POSITION_SIZE_USD * LEVERAGE) / buy_fill_prices[lvl]
close_pnl += qty * (price - buy_fill_prices[lvl])
session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT
elif sell_fills[lvl] and not buy_fills[lvl]:
qty = (POSITION_SIZE_USD * LEVERAGE) / sell_fill_prices[lvl]
close_pnl += qty * (sell_fill_prices[lvl] - price)
session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT
session_pnl += close_pnl
session_closed = True
close_reason = 'timeout'
j += 1
net_session = session_pnl - session_fees
if session_trades > 0:
session_id += 1
results['sessions'].append({
'id': session_id,
'spacing_pct': round(spacing_pct, 4),
'trades': session_trades,
'round_trips': session_rts,
'pnl': round(net_session, 4),
'fees': round(session_fees, 4),
'close_reason': close_reason,
'duration_min': j - session_start,
})
results['total_pnl'] += net_session
results['total_trades'] += session_trades
results['total_fees'] += session_fees
results['total_round_trips'] += session_rts
if close_reason == 'max_loss':
results['max_loss_stops'] += 1
i = j + 5 # cooldown
return results
# ============================================================
# REPORT
# ============================================================
def print_compact_report(symbol, natr_avg, results_dict):
print(f"\n{'='*80}")
print(f" {symbol} | Avg NATR(14): {natr_avg:.4f}% | 7 days 1m")
print(f"{'='*80}")
print(f" {'Mode':<22} {'PnL':>8} {'RTs':>5} {'MaxLoss':>8} {'Sessions':>9} {'Avg Spc':>8}")
print(f" {'─'*66}")
for name, res in results_dict.items():
pnl = res['total_pnl']
rts = res['total_round_trips']
ml = res['max_loss_stops']
sess = len(res['sessions'])
# Average spacing used
if res['sessions']:
avg_sp = sum(s['spacing_pct'] for s in res['sessions']) / len(res['sessions'])
else:
avg_sp = 0
emoji = '🟢' if pnl > 0 else '🔴'
print(f" {emoji} {name:<20} ${pnl:>+7.2f} {rts:>5} {ml:>8} {sess:>9} {avg_sp:>7.3f}%")
print(f"{'='*80}")
# ============================================================
# MAIN
# ============================================================
if __name__ == "__main__":
print("=" * 80)
print(" GRID BACKTEST — ADAPTIVE SPACING (NATR-based)")
print(f" Symbols: {', '.join(SYMBOLS)}")
print(f" {INTERVAL} | {DAYS_BACK} days | ${POSITION_SIZE_USD}×{LEVERAGE}x | {GRID_LEVELS} levels")
print("=" * 80)
all_results = {}
for symbol in SYMBOLS:
print(f"\n{'─'*40}")
print(f" Fetching {symbol}...")
df = fetch_klines(symbol, INTERVAL, DAYS_BACK)
if len(df) < 500:
print(f" ⚠️ Too few candles ({len(df)}), skipping")
continue
df = calc_indicators(df)
# NATR stats
valid_natr = df['natr'].dropna()
natr_avg = valid_natr.mean()
natr_p50 = valid_natr.median()
natr_p90 = valid_natr.quantile(0.9)
print(f" NATR(14): avg={natr_avg:.4f}% median={natr_p50:.4f}% p90={natr_p90:.4f}%")
print(f" BB width avg: {df['bb_width'].dropna().mean():.4f}%")
print(f" ADX avg: {df['adx'].dropna().mean():.1f}")
print(f" Price: ${df['close'].iloc[-1]:.6f}")
# Run tests
results_dict = {}
# 1. Fixed 0.1% (current production)
print(f" Testing fixed 0.1%...")
results_dict["Fixed 0.1%"] = run_grid(df, "fixed", 0.1)
# 2. Fixed 0.2%
print(f" Testing fixed 0.2%...")
results_dict["Fixed 0.2%"] = run_grid(df, "fixed", 0.2)
# 3. Fixed 0.3%
print(f" Testing fixed 0.3%...")
results_dict["Fixed 0.3%"] = run_grid(df, "fixed", 0.3)
# 4-7. Adaptive with different multipliers
for mult in ADAPTIVE_MULTIPLIERS:
label = f"Adaptive ×{mult}"
print(f" Testing {label}...")
results_dict[label] = run_grid(df, "adaptive", adaptive_mult=mult)
print_compact_report(symbol, natr_avg, results_dict)
all_results[symbol] = {
'natr_avg': round(natr_avg, 4),
'natr_p50': round(natr_p50, 4),
'natr_p90': round(natr_p90, 4),
'results': {k: {'pnl': round(v['total_pnl'], 4), 'rts': v['total_round_trips'],
'max_loss': v['max_loss_stops'], 'sessions': len(v['sessions'])}
for k, v in results_dict.items()}
}
# Summary table
print(f"\n\n{'='*80}")
print(f" SUMMARY — ALL COINS")
print(f"{'='*80}")
print(f" {'Symbol':<14} {'NATR':>6} {'Fix0.1':>8} {'Fix0.2':>8} {'Fix0.3':>8} {'Adp×1.5':>8} {'Adp×2.0':>8} {'Adp×2.5':>8} {'Adp×3.0':>8}")
print(f" {'─'*78}")
for sym, data in all_results.items():
natr = data['natr_avg']
vals = []
for key in ["Fixed 0.1%", "Fixed 0.2%", "Fixed 0.3%",
"Adaptive ×1.5", "Adaptive ×2.0", "Adaptive ×2.5", "Adaptive ×3.0"]:
pnl = data['results'].get(key, {}).get('pnl', 0)
vals.append(f"${pnl:>+6.2f}")
print(f" {sym:<14} {natr:>5.3f}% {' '.join(vals)}")
print(f"{'='*80}")
# Save
out_path = Path(__file__).parent / 'results_grid_adaptive.json'
with open(out_path, 'w') as f:
json.dump(all_results, f, indent=2)
print(f"\n💾 Saved to {out_path}")