← Назад
""" Adaptive Grid Spacing Backtest — spacing tied to NATR Идея: spacing = NATR(5m) × multiplier Если монета волатильная (NATR 2%) → spacing шире (0.6%) Если спокойная (NATR 0.5%) → spacing уже (0.15%) Тестируем: A) Fixed 0.1% (текущий baseline) B) Fixed 0.2%, 0.3% (простые варианты) C) Adaptive: spacing = NATR_5m × 0.3 D) Adaptive: spacing = NATR_5m × 0.5 E) Adaptive: spacing = NATR_5m × 0.7 + Sideways filter (score >= 50 на 15m) vs без фильтра Usage: python3 backtest_adaptive_spacing.py """ import requests import pandas as pd import numpy as np import time import json from datetime import datetime, timedelta from pathlib import Path # ============================================================ # CONFIG # ============================================================ SYMBOLS = [ "ETHUSDT", "DOGEUSDT", "PENGUUSDT", "ENAUSDT", "NEARUSDT", "WLDUSDT", "SOLUSDT", "ARBUSDT", "XRPUSDT", "LINKUSDT", "SUIUSDT", "OPUSDT", "ADAUSDT", "UNIUSDT", "AVAXUSDT", ] DAYS_BACK = 14 DEPOSIT = 50.0 LEVERAGE = 10 POSITION_SIZE_USD = 3.0 FEE_PCT = 0.02 / 100 # maker 0.02% GRID_LEVELS = 8 MAX_LOSS_PCT = 3.0 # Spacing variants to test SPACING_CONFIGS = [ {'name': 'fixed_0.1%', 'type': 'fixed', 'value': 0.1}, {'name': 'fixed_0.2%', 'type': 'fixed', 'value': 0.2}, {'name': 'fixed_0.3%', 'type': 'fixed', 'value': 0.3}, {'name': 'natr×0.3', 'type': 'adaptive', 'mult': 0.3, 'min': 0.08, 'max': 0.8}, {'name': 'natr×0.5', 'type': 'adaptive', 'mult': 0.5, 'min': 0.10, 'max': 1.0}, {'name': 'natr×0.7', 'type': 'adaptive', 'mult': 0.7, 'min': 0.12, 'max': 1.2}, ] # Screener params (15m) SCREENER_TF = '15m' BB_PERIOD = 20 BB_STD = 2.0 ADX_PERIOD = 14 RANGE_LOOKBACK = 24 SCORE_THRESHOLD = 45 # slightly lower to get more samples # Session = 1 hour of 1m candles SESSION_CANDLES = 60 SESSION_COOLDOWN = 5 # candles gap between sessions # NATR calc on 5m resampled from 1m NATR_PERIOD = 14 # ============================================================ # DATA FETCH # ============================================================ def fetch_klines(symbol, interval, days_back): url = "https://fapi.binance.com/fapi/v1/klines" end_ts = int(time.time() * 1000) start_ts = int((time.time() - days_back * 86400) * 1000) all_candles = [] current_start = start_ts while current_start < end_ts: params = {"symbol": symbol, "interval": interval, "startTime": current_start, "limit": 1500} try: resp = requests.get(url, params=params, timeout=10) data = resp.json() if not isinstance(data, list) or len(data) == 0: break all_candles.extend(data) current_start = data[-1][0] + 1 time.sleep(0.08) except Exception as e: print(f" [fetch] Error: {e}") time.sleep(1) continue df = pd.DataFrame(all_candles, columns=[ 'timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_volume', 'trades', 'taker_buy_base', 'taker_buy_quote', 'ignore' ]) for col in ['open', 'high', 'low', 'close', 'volume', 'quote_volume']: df[col] = df[col].astype(float) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') df = df.drop_duplicates(subset='timestamp').sort_values('timestamp').reset_index(drop=True) return df # ============================================================ # RESAMPLE 1m → 5m for NATR # ============================================================ def calc_natr_5m(df_1m): """Resample 1m to 5m and calc NATR(14) = ATR/close * 100""" df = df_1m.set_index('timestamp').resample('5min').agg({ 'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum' }).dropna().reset_index() high, low, close = df['high'], df['low'], df['close'] tr1 = high - low tr2 = (high - close.shift(1)).abs() tr3 = (low - close.shift(1)).abs() tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) atr = tr.ewm(alpha=1/NATR_PERIOD, min_periods=NATR_PERIOD).mean() df['natr_5m'] = (atr / close) * 100 return df[['timestamp', 'natr_5m']].dropna() # ============================================================ # SCREENER INDICATORS (15m) # ============================================================ def calc_screener(df): df['bb_mid'] = df['close'].rolling(BB_PERIOD).mean() df['bb_std'] = df['close'].rolling(BB_PERIOD).std() df['bb_upper'] = df['bb_mid'] + BB_STD * df['bb_std'] df['bb_lower'] = df['bb_mid'] - BB_STD * df['bb_std'] df['bb_width'] = ((df['bb_upper'] - df['bb_lower']) / df['bb_mid']) * 100 high, low, close = df['high'], df['low'], df['close'] plus_dm = high.diff() minus_dm = -low.diff() plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0.0) minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0.0) tr1 = high - low tr2 = (high - close.shift(1)).abs() tr3 = (low - close.shift(1)).abs() tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) atr = tr.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean() plus_di = 100 * (plus_dm.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean() / atr) minus_di = 100 * (minus_dm.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean() / atr) dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-10) df['adx'] = dx.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean() df['natr'] = (atr / close) * 100 df['range_high'] = df['high'].rolling(RANGE_LOOKBACK).max() df['range_low'] = df['low'].rolling(RANGE_LOOKBACK).min() rng = df['range_high'] - df['range_low'] df['range_pos'] = (df['close'] - df['range_low']) / rng.replace(0, np.nan) df['candle_dir'] = np.where(df['close'] > df['open'], 1, -1) df['dir_change'] = (df['candle_dir'] != df['candle_dir'].shift(1)).astype(int) df['dir_changes'] = df['dir_change'].rolling(RANGE_LOOKBACK).sum() return df def sideways_score(row): score = 0 adx = row.get('adx', np.nan) bb_w = row.get('bb_width', np.nan) rp = row.get('range_pos', np.nan) dc = row.get('dir_changes', np.nan) natr = row.get('natr', np.nan) if any(pd.isna(x) for x in [adx, bb_w, rp, dc, natr]): return 0 # ADX → 25 pts if adx <= 5: score += 25 elif adx <= 20: score += 25 * (1 - (adx - 5) / 15) elif adx <= 30: score += max(0, -5 * (adx - 20) / 10) else: score -= 10 # BB width 1.5-4% → 20 pts if 1.5 <= bb_w <= 4.0: if bb_w <= 2.5: score += 20 * (bb_w - 1.5) / 1.0 else: score += 20 * (4.0 - bb_w) / 1.5 elif 0.5 <= bb_w < 1.5: score += 5 # Range pos 0.3-0.7 → 20 pts if 0.3 <= rp <= 0.7: score += 20 * (1 - abs(rp - 0.5) / 0.2) elif 0.2 <= rp < 0.3 or 0.7 < rp <= 0.8: score += 5 # Direction changes → 20 pts max_ch = RANGE_LOOKBACK * 0.7 if dc >= 8: score += min(20, 20 * (dc - 8) / (max_ch - 8)) # NATR 0.15-0.6% → 15 pts if 0.15 <= natr <= 0.6: if natr <= 0.3: score += 15 * (natr - 0.15) / 0.15 else: score += 15 * (0.6 - natr) / 0.3 elif 0.1 <= natr < 0.15: score += 3 return max(0, round(score, 1)) # ============================================================ # GRID ENGINE with variable spacing # ============================================================ def run_grid_session(df_1m, start_idx, spacing_pct, duration=SESSION_CANDLES): """Grid session with given spacing %""" if start_idx + 5 >= len(df_1m): return None end_idx = min(start_idx + duration, len(df_1m) - 1) mid_price = df_1m['close'].iloc[start_idx] buy_levels = [mid_price * (1 - lvl * spacing_pct / 100) for lvl in range(1, GRID_LEVELS + 1)] sell_levels = [mid_price * (1 + lvl * spacing_pct / 100) for lvl in range(1, GRID_LEVELS + 1)] buy_fills = [False] * GRID_LEVELS sell_fills = [False] * GRID_LEVELS positions = [] pnl = 0.0 fees = 0.0 trades = 0 round_trips = 0 max_capital = GRID_LEVELS * 2 * POSITION_SIZE_USD max_loss = max_capital * MAX_LOSS_PCT / 100 close_reason = 'timeout' for j in range(start_idx + 1, end_idx): price = df_1m['close'].iloc[j] lo = df_1m['low'].iloc[j] hi = df_1m['high'].iloc[j] for lvl in range(GRID_LEVELS): if not buy_fills[lvl] and lo <= buy_levels[lvl]: buy_fills[lvl] = True positions.append(('long', buy_levels[lvl])) fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT trades += 1 for lvl in range(GRID_LEVELS): if not sell_fills[lvl] and hi >= sell_levels[lvl]: sell_fills[lvl] = True positions.append(('short', sell_levels[lvl])) fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT trades += 1 # Round-trips for lvl in range(GRID_LEVELS): if buy_fills[lvl] and sell_fills[lvl]: spread = sell_levels[lvl] - buy_levels[lvl] qty = (POSITION_SIZE_USD * LEVERAGE) / buy_levels[lvl] pnl += qty * spread fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT * 2 buy_fills[lvl] = False sell_fills[lvl] = False round_trips += 1 positions = [p for p in positions if not (p[0] == 'long' and abs(p[1] - buy_levels[lvl]) < 1e-10)] positions = [p for p in positions if not (p[0] == 'short' and abs(p[1] - sell_levels[lvl]) < 1e-10)] # Unrealized check unrealized = 0 for side, entry_px in positions: qty = (POSITION_SIZE_USD * LEVERAGE) / entry_px if side == 'long': unrealized += qty * (price - entry_px) else: unrealized += qty * (entry_px - price) if pnl + unrealized - fees < -max_loss and positions: for side, entry_px in positions: qty = (POSITION_SIZE_USD * LEVERAGE) / entry_px if side == 'long': pnl += qty * (price - entry_px) else: pnl += qty * (entry_px - price) fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT positions = [] close_reason = 'max_loss' break # Close remaining if positions: price = df_1m['close'].iloc[min(end_idx, len(df_1m) - 1)] for side, entry_px in positions: qty = (POSITION_SIZE_USD * LEVERAGE) / entry_px if side == 'long': pnl += qty * (price - entry_px) else: pnl += qty * (entry_px - price) fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT net = pnl - fees if trades == 0: return None return { 'pnl': round(net, 4), 'gross_pnl': round(pnl, 4), 'trades': trades, 'round_trips': round_trips, 'fees': round(fees, 4), 'close_reason': close_reason, 'spacing': round(spacing_pct, 3), } # ============================================================ # MAIN # ============================================================ if __name__ == "__main__": print("=" * 70) print(" ADAPTIVE SPACING BACKTEST — NATR-based vs Fixed") print(f" {len(SYMBOLS)} coins | {DAYS_BACK}d | ${DEPOSIT} dep | {LEVERAGE}x") print(f" Grid: {GRID_LEVELS} lvl × ${POSITION_SIZE_USD} | Session: {SESSION_CANDLES}min") print("=" * 70) # Results per spacing config all_results = {cfg['name']: {'sessions': [], 'filtered': []} for cfg in SPACING_CONFIGS} natr_stats = [] for sym in SYMBOLS: print(f"\n 📊 {sym}...") # Fetch 1m df_1m = fetch_klines(sym, '1m', DAYS_BACK) if len(df_1m) < 1000: print(f" skip ({len(df_1m)} candles)") continue # Calc NATR on 5m df_natr5 = calc_natr_5m(df_1m) if len(df_natr5) < 50: print(f" skip (not enough 5m data)") continue # Fetch 15m for screener df_15m = fetch_klines(sym, SCREENER_TF, DAYS_BACK) df_15m = calc_screener(df_15m) df_15m['sw_score'] = df_15m.apply(sideways_score, axis=1) # Build lookup: timestamp → natr_5m (nearest) natr_lookup = df_natr5.set_index('timestamp')['natr_5m'] # Process each hour window # Step through 1m data in SESSION_CANDLES chunks warmup_1m = 100 # skip first 100 candles i = warmup_1m sym_count = 0 while i + SESSION_CANDLES < len(df_1m): ts = df_1m['timestamp'].iloc[i] # Get current NATR from 5m (closest timestamp <=) natr_mask = natr_lookup.index <= ts if natr_mask.sum() == 0: i += SESSION_CANDLES + SESSION_COOLDOWN continue current_natr = natr_lookup.loc[natr_mask].iloc[-1] if pd.isna(current_natr) or current_natr <= 0: i += SESSION_CANDLES + SESSION_COOLDOWN continue # Get screener score (closest 15m candle <=) scr_mask = df_15m['timestamp'] <= ts if scr_mask.sum() == 0: i += SESSION_CANDLES + SESSION_COOLDOWN continue scr_row = df_15m.loc[scr_mask].iloc[-1] sw_score = scr_row['sw_score'] natr_stats.append(current_natr) # Run grid for each spacing config for cfg in SPACING_CONFIGS: if cfg['type'] == 'fixed': spacing = cfg['value'] else: spacing = current_natr * cfg['mult'] spacing = max(cfg['min'], min(cfg['max'], spacing)) result = run_grid_session(df_1m, i, spacing) if result: result['symbol'] = sym result['natr_5m'] = round(current_natr, 3) result['sw_score'] = round(sw_score, 1) result['ts'] = str(ts) all_results[cfg['name']]['sessions'].append(result) if sw_score >= SCORE_THRESHOLD: all_results[cfg['name']]['filtered'].append(result) sym_count += 1 i += SESSION_CANDLES + SESSION_COOLDOWN print(f" {sym_count} windows processed | avg NATR_5m: {np.mean(natr_stats[-sym_count:]) if sym_count else 0:.3f}%") # ============================================================ # REPORTS # ============================================================ print("\n" + "=" * 70) print(" 🏆 ИТОГО ПО ВСЕМ МОНЕТАМ") print("=" * 70) # NATR distribution if natr_stats: print(f"\n NATR(5m) distribution across all windows:") for pct in [10, 25, 50, 75, 90]: print(f" P{pct}: {np.percentile(natr_stats, pct):.3f}%") # Summary table print(f"\n {'Config':<15} {'Mode':<10} {'Sess':>6} {'WR':>6} {'PnL':>10} {'Avg PnL':>10} {'RTs':>6} {'Avg Spc':>8} {'MaxLoss':>8}") print(f" {'─'*79}") summary_data = {} for cfg in SPACING_CONFIGS: name = cfg['name'] for mode, label in [('sessions', 'ALL'), ('filtered', 'FILT')]: sess = all_results[name][mode] if not sess: continue total_pnl = sum(s['pnl'] for s in sess) wins = len([s for s in sess if s['pnl'] > 0]) wr = 100 * wins / len(sess) avg_pnl = total_pnl / len(sess) rts = sum(s['round_trips'] for s in sess) avg_spc = np.mean([s['spacing'] for s in sess]) ml = len([s for s in sess if s['close_reason'] == 'max_loss']) print(f" {name:<15} {label:<10} {len(sess):>6} {wr:>5.0f}% ${total_pnl:>8.2f} ${avg_pnl:>8.4f} {rts:>6} {avg_spc:>7.3f}% {ml:>8}") summary_data[f"{name}_{mode}"] = { 'sessions': len(sess), 'win_rate': round(wr, 1), 'total_pnl': round(total_pnl, 4), 'avg_pnl': round(avg_pnl, 4), 'round_trips': rts, 'avg_spacing': round(avg_spc, 3), 'max_loss_stops': ml, } # NATR bracket analysis — which NATR ranges are profitable? print(f"\n 📊 PnL by NATR bracket (best adaptive config):") # Find best adaptive config best_adaptive = None best_pnl = -9999 for cfg in SPACING_CONFIGS: if cfg['type'] == 'adaptive': sess = all_results[cfg['name']]['sessions'] if sess: tp = sum(s['pnl'] for s in sess) if tp > best_pnl: best_pnl = tp best_adaptive = cfg['name'] if best_adaptive: sess = all_results[best_adaptive]['sessions'] brackets = [(0, 0.3), (0.3, 0.5), (0.5, 0.8), (0.8, 1.2), (1.2, 2.0), (2.0, 99)] print(f"\n Config: {best_adaptive}") print(f" {'NATR range':<15} {'Sess':>6} {'WR':>6} {'PnL':>10} {'Avg PnL':>10} {'Avg Spc':>8} {'RTs':>6}") print(f" {'─'*61}") for lo, hi in brackets: subset = [s for s in sess if lo <= s['natr_5m'] < hi] if not subset: continue tp = sum(s['pnl'] for s in subset) w = len([s for s in subset if s['pnl'] > 0]) wr = 100 * w / len(subset) avg = tp / len(subset) avg_s = np.mean([s['spacing'] for s in subset]) rt = sum(s['round_trips'] for s in subset) print(f" {lo:.1f}-{hi:.1f}%{'':<9} {len(subset):>6} {wr:>5.0f}% ${tp:>8.2f} ${avg:>8.4f} {avg_s:>7.3f}% {rt:>6}") # Score bracket analysis for best config print(f"\n 📊 PnL by Sideways Score (best adaptive: {best_adaptive}):") if best_adaptive: sess = all_results[best_adaptive]['sessions'] brackets = [(0, 20), (20, 35), (35, 50), (50, 65), (65, 100)] print(f" {'Score':<15} {'Sess':>6} {'WR':>6} {'PnL':>10} {'Avg PnL':>10} {'RTs':>6}") print(f" {'─'*53}") for lo, hi in brackets: subset = [s for s in sess if lo <= s['sw_score'] < hi] if not subset: continue tp = sum(s['pnl'] for s in subset) w = len([s for s in subset if s['pnl'] > 0]) wr = 100 * w / len(subset) avg = tp / len(subset) rt = sum(s['round_trips'] for s in subset) print(f" {lo}-{hi}{'':<11} {len(subset):>6} {wr:>5.0f}% ${tp:>8.2f} ${avg:>8.4f} {rt:>6}") # Per-symbol breakdown for best config print(f"\n 📊 Per-symbol (best adaptive: {best_adaptive}):") if best_adaptive: sess = all_results[best_adaptive]['sessions'] syms = sorted(set(s['symbol'] for s in sess)) print(f" {'Symbol':<12} {'Sess':>6} {'WR':>6} {'PnL':>10} {'Avg NATR':>10} {'RTs':>6}") print(f" {'─'*50}") for sym in syms: subset = [s for s in sess if s['symbol'] == sym] tp = sum(s['pnl'] for s in subset) w = len([s for s in subset if s['pnl'] > 0]) wr = 100 * w / len(subset) an = np.mean([s['natr_5m'] for s in subset]) rt = sum(s['round_trips'] for s in subset) emoji = '🟢' if tp > 0 else '🔴' print(f" {emoji} {sym:<10} {len(subset):>6} {wr:>5.0f}% ${tp:>8.2f} {an:>9.3f}% {rt:>6}") # Save output = { 'config': { 'symbols': SYMBOLS, 'days_back': DAYS_BACK, 'grid_levels': GRID_LEVELS, 'position_size': POSITION_SIZE_USD, 'leverage': LEVERAGE, 'session_candles': SESSION_CANDLES, 'score_threshold': SCORE_THRESHOLD, }, 'spacing_configs': SPACING_CONFIGS, 'summary': summary_data, 'natr_percentiles': { f'p{p}': round(np.percentile(natr_stats, p), 3) for p in [10, 25, 50, 75, 90] } if natr_stats else {}, 'tested_at': datetime.now().isoformat(), } out_path = Path(__file__).parent / 'results_adaptive_spacing.json' with open(out_path, 'w') as f: json.dump(output, f, indent=2) print(f"\n💾 Saved to {out_path}") print("Done!")