← Назад
""" Grid Backtest — Adaptive Spacing based on NATR ================================================ Tests coins that traded live and lost money. Compares fixed 0.1% spacing vs adaptive spacing = NATR/N. Hypothesis: RIVER, PLAY, TRU are too volatile for 0.1% spacing. When NATR is high, need wider spacing to avoid getting steamrolled. Usage: python3 backtest_grid_adaptive.py """ import requests import pandas as pd import numpy as np import time import json from datetime import datetime from pathlib import Path # ============================================================ # CONFIG # ============================================================ SYMBOLS = ["RIVERUSDT", "PLAYUSDT", "TRUUSDT", "ZECUSDT", "DOGEUSDT", "SOLUSDT"] INTERVAL = "1m" DAYS_BACK = 7 DEPOSIT = 50.0 LEVERAGE = 10 POSITION_SIZE_USD = 3.0 FEE_PCT = 0.02 / 100 # maker 0.02% GRID_LEVELS = 8 MAX_LOSS_PCT = 3.0 # Fixed spacing for comparison FIXED_SPACING = 0.1 # Adaptive spacing: spacing = NATR * MULTIPLIER # NATR(14) on 1m ≈ volatility per candle as % of price # If NATR = 0.05% → spacing = 0.05 * 2 = 0.1% (same as fixed for low vol) # If NATR = 0.15% → spacing = 0.15 * 2 = 0.3% (wider for high vol) ADAPTIVE_MULTIPLIERS = [1.5, 2.0, 2.5, 3.0] NATR_PERIOD = 14 # BB/ADX for reference BB_PERIOD = 20 BB_STD = 2.0 ADX_PERIOD = 14 # ============================================================ # DATA FETCH # ============================================================ def fetch_klines(symbol, interval, days_back): url = "https://fapi.binance.com/fapi/v1/klines" end_ts = int(time.time() * 1000) start_ts = int((time.time() - days_back * 86400) * 1000) all_candles = [] current_start = start_ts while current_start < end_ts: params = { "symbol": symbol, "interval": interval, "startTime": current_start, "limit": 1500 } try: resp = requests.get(url, params=params, timeout=10) data = resp.json() if not isinstance(data, list) or len(data) == 0: break all_candles.extend(data) current_start = data[-1][0] + 1 time.sleep(0.1) except Exception as e: print(f" Error: {e}, retrying...") time.sleep(1) continue df = pd.DataFrame(all_candles, columns=[ 'timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_volume', 'trades', 'taker_buy_base', 'taker_buy_quote', 'ignore' ]) for col in ['open', 'high', 'low', 'close', 'volume', 'quote_volume']: df[col] = df[col].astype(float) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') df = df.drop_duplicates(subset='timestamp').sort_values('timestamp').reset_index(drop=True) return df # ============================================================ # INDICATORS # ============================================================ def calc_indicators(df): # ATR / NATR high, low, close = df['high'], df['low'], df['close'] tr1 = high - low tr2 = (high - close.shift(1)).abs() tr3 = (low - close.shift(1)).abs() tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) df['atr'] = tr.ewm(alpha=1/NATR_PERIOD, min_periods=NATR_PERIOD).mean() df['natr'] = (df['atr'] / df['close']) * 100 # as percentage # BB df['bb_mid'] = close.rolling(BB_PERIOD).mean() df['bb_std'] = close.rolling(BB_PERIOD).std() df['bb_upper'] = df['bb_mid'] + BB_STD * df['bb_std'] df['bb_lower'] = df['bb_mid'] - BB_STD * df['bb_std'] df['bb_width'] = ((df['bb_upper'] - df['bb_lower']) / df['bb_mid']) * 100 # ADX plus_dm = high.diff() minus_dm = -low.diff() plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0.0) minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0.0) atr_adx = tr.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean() plus_di = 100 * (plus_dm.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean() / atr_adx) minus_di = 100 * (minus_dm.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean() / atr_adx) dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-10) df['adx'] = dx.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean() return df # ============================================================ # GRID ENGINE — supports both fixed and adaptive spacing # ============================================================ def run_grid(df, spacing_mode="fixed", spacing_value=0.1, adaptive_mult=2.0): """ Grid engine. spacing_mode: "fixed" = constant % spacing, "adaptive" = NATR * multiplier No recenter — infinite grid, just round-trips + max loss stop. """ results = { 'sessions': [], 'total_pnl': 0, 'total_trades': 0, 'total_fees': 0, 'total_round_trips': 0, 'max_loss_stops': 0, } warmup = max(BB_PERIOD, ADX_PERIOD, NATR_PERIOD) + 10 i = warmup session_id = 0 while i < len(df): mid_price = df['close'].iloc[i] # Determine spacing if spacing_mode == "adaptive": natr = df['natr'].iloc[i] if pd.isna(natr) or natr <= 0: i += 1 continue spacing_pct = natr * adaptive_mult # Clamp: min 0.05%, max 1.0% spacing_pct = max(0.05, min(1.0, spacing_pct)) else: spacing_pct = spacing_value spacing_abs = mid_price * spacing_pct / 100 # Create grid levels buy_levels = [mid_price - k * spacing_abs for k in range(1, GRID_LEVELS + 1)] sell_levels = [mid_price + k * spacing_abs for k in range(1, GRID_LEVELS + 1)] buy_fills = [False] * GRID_LEVELS sell_fills = [False] * GRID_LEVELS # Track individual fill prices for round-trip matching buy_fill_prices = [0.0] * GRID_LEVELS sell_fill_prices = [0.0] * GRID_LEVELS session_pnl = 0.0 session_fees = 0.0 session_trades = 0 session_rts = 0 session_start = i max_capital = GRID_LEVELS * 2 * POSITION_SIZE_USD max_loss = max_capital * MAX_LOSS_PCT / 100 j = i + 1 session_closed = False close_reason = 'timeout' while j < len(df) and not session_closed: price = df['close'].iloc[j] low_px = df['low'].iloc[j] high_px = df['high'].iloc[j] # Fill buy levels for lvl in range(GRID_LEVELS): if not buy_fills[lvl] and low_px <= buy_levels[lvl]: buy_fills[lvl] = True buy_fill_prices[lvl] = buy_levels[lvl] session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT session_trades += 1 # Fill sell levels for lvl in range(GRID_LEVELS): if not sell_fills[lvl] and high_px >= sell_levels[lvl]: sell_fills[lvl] = True sell_fill_prices[lvl] = sell_levels[lvl] session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT session_trades += 1 # Check round-trips (buy + sell pair both filled) for lvl in range(GRID_LEVELS): if buy_fills[lvl] and sell_fills[lvl]: # Round trip! buy_px = buy_fill_prices[lvl] sell_px = sell_fill_prices[lvl] qty = (POSITION_SIZE_USD * LEVERAGE) / buy_px rt_pnl = qty * (sell_px - buy_px) rt_fee = POSITION_SIZE_USD * LEVERAGE * FEE_PCT # close fee session_pnl += rt_pnl session_fees += rt_fee session_rts += 1 # Reset level for re-use buy_fills[lvl] = False sell_fills[lvl] = False # Unrealized PnL from unfilled one-sided fills unrealized = 0 for lvl in range(GRID_LEVELS): if buy_fills[lvl] and not sell_fills[lvl]: qty = (POSITION_SIZE_USD * LEVERAGE) / buy_fill_prices[lvl] unrealized += qty * (price - buy_fill_prices[lvl]) elif sell_fills[lvl] and not buy_fills[lvl]: qty = (POSITION_SIZE_USD * LEVERAGE) / sell_fill_prices[lvl] unrealized += qty * (sell_fill_prices[lvl] - price) net_pnl = session_pnl + unrealized - session_fees # Max loss stop if net_pnl < -max_loss: # Close all open positions at market close_pnl = 0 for lvl in range(GRID_LEVELS): if buy_fills[lvl] and not sell_fills[lvl]: qty = (POSITION_SIZE_USD * LEVERAGE) / buy_fill_prices[lvl] close_pnl += qty * (price - buy_fill_prices[lvl]) session_fees += POSITION_SIZE_USD * LEVERAGE * 0.04 / 100 # taker elif sell_fills[lvl] and not buy_fills[lvl]: qty = (POSITION_SIZE_USD * LEVERAGE) / sell_fill_prices[lvl] close_pnl += qty * (sell_fill_prices[lvl] - price) session_fees += POSITION_SIZE_USD * LEVERAGE * 0.04 / 100 # taker session_pnl += close_pnl session_closed = True close_reason = 'max_loss' # Session timeout — 60 min if j - session_start >= 60: close_pnl = 0 for lvl in range(GRID_LEVELS): if buy_fills[lvl] and not sell_fills[lvl]: qty = (POSITION_SIZE_USD * LEVERAGE) / buy_fill_prices[lvl] close_pnl += qty * (price - buy_fill_prices[lvl]) session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT elif sell_fills[lvl] and not buy_fills[lvl]: qty = (POSITION_SIZE_USD * LEVERAGE) / sell_fill_prices[lvl] close_pnl += qty * (sell_fill_prices[lvl] - price) session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT session_pnl += close_pnl session_closed = True close_reason = 'timeout' j += 1 net_session = session_pnl - session_fees if session_trades > 0: session_id += 1 results['sessions'].append({ 'id': session_id, 'spacing_pct': round(spacing_pct, 4), 'trades': session_trades, 'round_trips': session_rts, 'pnl': round(net_session, 4), 'fees': round(session_fees, 4), 'close_reason': close_reason, 'duration_min': j - session_start, }) results['total_pnl'] += net_session results['total_trades'] += session_trades results['total_fees'] += session_fees results['total_round_trips'] += session_rts if close_reason == 'max_loss': results['max_loss_stops'] += 1 i = j + 5 # cooldown return results # ============================================================ # REPORT # ============================================================ def print_compact_report(symbol, natr_avg, results_dict): print(f"\n{'='*80}") print(f" {symbol} | Avg NATR(14): {natr_avg:.4f}% | 7 days 1m") print(f"{'='*80}") print(f" {'Mode':<22} {'PnL':>8} {'RTs':>5} {'MaxLoss':>8} {'Sessions':>9} {'Avg Spc':>8}") print(f" {'─'*66}") for name, res in results_dict.items(): pnl = res['total_pnl'] rts = res['total_round_trips'] ml = res['max_loss_stops'] sess = len(res['sessions']) # Average spacing used if res['sessions']: avg_sp = sum(s['spacing_pct'] for s in res['sessions']) / len(res['sessions']) else: avg_sp = 0 emoji = '🟢' if pnl > 0 else '🔴' print(f" {emoji} {name:<20} ${pnl:>+7.2f} {rts:>5} {ml:>8} {sess:>9} {avg_sp:>7.3f}%") print(f"{'='*80}") # ============================================================ # MAIN # ============================================================ if __name__ == "__main__": print("=" * 80) print(" GRID BACKTEST — ADAPTIVE SPACING (NATR-based)") print(f" Symbols: {', '.join(SYMBOLS)}") print(f" {INTERVAL} | {DAYS_BACK} days | ${POSITION_SIZE_USD}×{LEVERAGE}x | {GRID_LEVELS} levels") print("=" * 80) all_results = {} for symbol in SYMBOLS: print(f"\n{'─'*40}") print(f" Fetching {symbol}...") df = fetch_klines(symbol, INTERVAL, DAYS_BACK) if len(df) < 500: print(f" ⚠️ Too few candles ({len(df)}), skipping") continue df = calc_indicators(df) # NATR stats valid_natr = df['natr'].dropna() natr_avg = valid_natr.mean() natr_p50 = valid_natr.median() natr_p90 = valid_natr.quantile(0.9) print(f" NATR(14): avg={natr_avg:.4f}% median={natr_p50:.4f}% p90={natr_p90:.4f}%") print(f" BB width avg: {df['bb_width'].dropna().mean():.4f}%") print(f" ADX avg: {df['adx'].dropna().mean():.1f}") print(f" Price: ${df['close'].iloc[-1]:.6f}") # Run tests results_dict = {} # 1. Fixed 0.1% (current production) print(f" Testing fixed 0.1%...") results_dict["Fixed 0.1%"] = run_grid(df, "fixed", 0.1) # 2. Fixed 0.2% print(f" Testing fixed 0.2%...") results_dict["Fixed 0.2%"] = run_grid(df, "fixed", 0.2) # 3. Fixed 0.3% print(f" Testing fixed 0.3%...") results_dict["Fixed 0.3%"] = run_grid(df, "fixed", 0.3) # 4-7. Adaptive with different multipliers for mult in ADAPTIVE_MULTIPLIERS: label = f"Adaptive ×{mult}" print(f" Testing {label}...") results_dict[label] = run_grid(df, "adaptive", adaptive_mult=mult) print_compact_report(symbol, natr_avg, results_dict) all_results[symbol] = { 'natr_avg': round(natr_avg, 4), 'natr_p50': round(natr_p50, 4), 'natr_p90': round(natr_p90, 4), 'results': {k: {'pnl': round(v['total_pnl'], 4), 'rts': v['total_round_trips'], 'max_loss': v['max_loss_stops'], 'sessions': len(v['sessions'])} for k, v in results_dict.items()} } # Summary table print(f"\n\n{'='*80}") print(f" SUMMARY — ALL COINS") print(f"{'='*80}") print(f" {'Symbol':<14} {'NATR':>6} {'Fix0.1':>8} {'Fix0.2':>8} {'Fix0.3':>8} {'Adp×1.5':>8} {'Adp×2.0':>8} {'Adp×2.5':>8} {'Adp×3.0':>8}") print(f" {'─'*78}") for sym, data in all_results.items(): natr = data['natr_avg'] vals = [] for key in ["Fixed 0.1%", "Fixed 0.2%", "Fixed 0.3%", "Adaptive ×1.5", "Adaptive ×2.0", "Adaptive ×2.5", "Adaptive ×3.0"]: pnl = data['results'].get(key, {}).get('pnl', 0) vals.append(f"${pnl:>+6.2f}") print(f" {sym:<14} {natr:>5.3f}% {' '.join(vals)}") print(f"{'='*80}") # Save out_path = Path(__file__).parent / 'results_grid_adaptive.json' with open(out_path, 'w') as f: json.dump(all_results, f, indent=2) print(f"\n💾 Saved to {out_path}")