← Back
"""
Adaptive Grid Spacing Backtest — spacing tied to NATR

Идея: spacing = NATR(5m) × multiplier
Если монета волатильная (NATR 2%) → spacing шире (0.6%)
Если спокойная (NATR 0.5%) → spacing уже (0.15%)

Тестируем:
  A) Fixed 0.1% (текущий baseline)
  B) Fixed 0.2%, 0.3% (простые варианты)
  C) Adaptive: spacing = NATR_5m × 0.3
  D) Adaptive: spacing = NATR_5m × 0.5
  E) Adaptive: spacing = NATR_5m × 0.7

+ Sideways filter (score >= 50 на 15m) vs без фильтра

Usage: python3 backtest_adaptive_spacing.py
"""

import requests
import pandas as pd
import numpy as np
import time
import json
from datetime import datetime, timedelta
from pathlib import Path

# ============================================================
# CONFIG
# ============================================================
SYMBOLS = [
    "ETHUSDT", "DOGEUSDT", "PENGUUSDT", "ENAUSDT",
    "NEARUSDT", "WLDUSDT", "SOLUSDT", "ARBUSDT",
    "XRPUSDT", "LINKUSDT", "SUIUSDT", "OPUSDT",
    "ADAUSDT", "UNIUSDT", "AVAXUSDT",
]

DAYS_BACK = 14
DEPOSIT = 50.0
LEVERAGE = 10
POSITION_SIZE_USD = 3.0
FEE_PCT = 0.02 / 100  # maker 0.02%
GRID_LEVELS = 8
MAX_LOSS_PCT = 3.0

# Spacing variants to test
SPACING_CONFIGS = [
    {'name': 'fixed_0.1%', 'type': 'fixed', 'value': 0.1},
    {'name': 'fixed_0.2%', 'type': 'fixed', 'value': 0.2},
    {'name': 'fixed_0.3%', 'type': 'fixed', 'value': 0.3},
    {'name': 'natr×0.3',   'type': 'adaptive', 'mult': 0.3, 'min': 0.08, 'max': 0.8},
    {'name': 'natr×0.5',   'type': 'adaptive', 'mult': 0.5, 'min': 0.10, 'max': 1.0},
    {'name': 'natr×0.7',   'type': 'adaptive', 'mult': 0.7, 'min': 0.12, 'max': 1.2},
]

# Screener params (15m)
SCREENER_TF = '15m'
BB_PERIOD = 20
BB_STD = 2.0
ADX_PERIOD = 14
RANGE_LOOKBACK = 24
SCORE_THRESHOLD = 45  # slightly lower to get more samples

# Session = 1 hour of 1m candles
SESSION_CANDLES = 60
SESSION_COOLDOWN = 5  # candles gap between sessions

# NATR calc on 5m resampled from 1m
NATR_PERIOD = 14


# ============================================================
# DATA FETCH
# ============================================================
def fetch_klines(symbol, interval, days_back):
    url = "https://fapi.binance.com/fapi/v1/klines"
    end_ts = int(time.time() * 1000)
    start_ts = int((time.time() - days_back * 86400) * 1000)
    all_candles = []
    current_start = start_ts

    while current_start < end_ts:
        params = {"symbol": symbol, "interval": interval,
                  "startTime": current_start, "limit": 1500}
        try:
            resp = requests.get(url, params=params, timeout=10)
            data = resp.json()
            if not isinstance(data, list) or len(data) == 0:
                break
            all_candles.extend(data)
            current_start = data[-1][0] + 1
            time.sleep(0.08)
        except Exception as e:
            print(f"    [fetch] Error: {e}")
            time.sleep(1)
            continue

    df = pd.DataFrame(all_candles, columns=[
        'timestamp', 'open', 'high', 'low', 'close', 'volume',
        'close_time', 'quote_volume', 'trades', 'taker_buy_base',
        'taker_buy_quote', 'ignore'
    ])
    for col in ['open', 'high', 'low', 'close', 'volume', 'quote_volume']:
        df[col] = df[col].astype(float)
    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
    df = df.drop_duplicates(subset='timestamp').sort_values('timestamp').reset_index(drop=True)
    return df


# ============================================================
# RESAMPLE 1m → 5m for NATR
# ============================================================
def calc_natr_5m(df_1m):
    """Resample 1m to 5m and calc NATR(14) = ATR/close * 100"""
    df = df_1m.set_index('timestamp').resample('5min').agg({
        'open': 'first', 'high': 'max', 'low': 'min',
        'close': 'last', 'volume': 'sum'
    }).dropna().reset_index()

    high, low, close = df['high'], df['low'], df['close']
    tr1 = high - low
    tr2 = (high - close.shift(1)).abs()
    tr3 = (low - close.shift(1)).abs()
    tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
    atr = tr.ewm(alpha=1/NATR_PERIOD, min_periods=NATR_PERIOD).mean()
    df['natr_5m'] = (atr / close) * 100

    return df[['timestamp', 'natr_5m']].dropna()


# ============================================================
# SCREENER INDICATORS (15m)
# ============================================================
def calc_screener(df):
    df['bb_mid'] = df['close'].rolling(BB_PERIOD).mean()
    df['bb_std'] = df['close'].rolling(BB_PERIOD).std()
    df['bb_upper'] = df['bb_mid'] + BB_STD * df['bb_std']
    df['bb_lower'] = df['bb_mid'] - BB_STD * df['bb_std']
    df['bb_width'] = ((df['bb_upper'] - df['bb_lower']) / df['bb_mid']) * 100

    high, low, close = df['high'], df['low'], df['close']
    plus_dm = high.diff()
    minus_dm = -low.diff()
    plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0.0)
    minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0.0)
    tr1 = high - low
    tr2 = (high - close.shift(1)).abs()
    tr3 = (low - close.shift(1)).abs()
    tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
    atr = tr.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean()
    plus_di = 100 * (plus_dm.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean() / atr)
    minus_di = 100 * (minus_dm.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean() / atr)
    dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-10)
    df['adx'] = dx.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean()
    df['natr'] = (atr / close) * 100

    df['range_high'] = df['high'].rolling(RANGE_LOOKBACK).max()
    df['range_low'] = df['low'].rolling(RANGE_LOOKBACK).min()
    rng = df['range_high'] - df['range_low']
    df['range_pos'] = (df['close'] - df['range_low']) / rng.replace(0, np.nan)

    df['candle_dir'] = np.where(df['close'] > df['open'], 1, -1)
    df['dir_change'] = (df['candle_dir'] != df['candle_dir'].shift(1)).astype(int)
    df['dir_changes'] = df['dir_change'].rolling(RANGE_LOOKBACK).sum()

    return df


def sideways_score(row):
    score = 0
    adx = row.get('adx', np.nan)
    bb_w = row.get('bb_width', np.nan)
    rp = row.get('range_pos', np.nan)
    dc = row.get('dir_changes', np.nan)
    natr = row.get('natr', np.nan)

    if any(pd.isna(x) for x in [adx, bb_w, rp, dc, natr]):
        return 0

    # ADX → 25 pts
    if adx <= 5:
        score += 25
    elif adx <= 20:
        score += 25 * (1 - (adx - 5) / 15)
    elif adx <= 30:
        score += max(0, -5 * (adx - 20) / 10)
    else:
        score -= 10

    # BB width 1.5-4% → 20 pts
    if 1.5 <= bb_w <= 4.0:
        if bb_w <= 2.5:
            score += 20 * (bb_w - 1.5) / 1.0
        else:
            score += 20 * (4.0 - bb_w) / 1.5
    elif 0.5 <= bb_w < 1.5:
        score += 5

    # Range pos 0.3-0.7 → 20 pts
    if 0.3 <= rp <= 0.7:
        score += 20 * (1 - abs(rp - 0.5) / 0.2)
    elif 0.2 <= rp < 0.3 or 0.7 < rp <= 0.8:
        score += 5

    # Direction changes → 20 pts
    max_ch = RANGE_LOOKBACK * 0.7
    if dc >= 8:
        score += min(20, 20 * (dc - 8) / (max_ch - 8))

    # NATR 0.15-0.6% → 15 pts
    if 0.15 <= natr <= 0.6:
        if natr <= 0.3:
            score += 15 * (natr - 0.15) / 0.15
        else:
            score += 15 * (0.6 - natr) / 0.3
    elif 0.1 <= natr < 0.15:
        score += 3

    return max(0, round(score, 1))


# ============================================================
# GRID ENGINE with variable spacing
# ============================================================
def run_grid_session(df_1m, start_idx, spacing_pct, duration=SESSION_CANDLES):
    """Grid session with given spacing %"""
    if start_idx + 5 >= len(df_1m):
        return None

    end_idx = min(start_idx + duration, len(df_1m) - 1)
    mid_price = df_1m['close'].iloc[start_idx]

    buy_levels = [mid_price * (1 - lvl * spacing_pct / 100) for lvl in range(1, GRID_LEVELS + 1)]
    sell_levels = [mid_price * (1 + lvl * spacing_pct / 100) for lvl in range(1, GRID_LEVELS + 1)]

    buy_fills = [False] * GRID_LEVELS
    sell_fills = [False] * GRID_LEVELS
    positions = []
    pnl = 0.0
    fees = 0.0
    trades = 0
    round_trips = 0
    max_capital = GRID_LEVELS * 2 * POSITION_SIZE_USD
    max_loss = max_capital * MAX_LOSS_PCT / 100
    close_reason = 'timeout'

    for j in range(start_idx + 1, end_idx):
        price = df_1m['close'].iloc[j]
        lo = df_1m['low'].iloc[j]
        hi = df_1m['high'].iloc[j]

        for lvl in range(GRID_LEVELS):
            if not buy_fills[lvl] and lo <= buy_levels[lvl]:
                buy_fills[lvl] = True
                positions.append(('long', buy_levels[lvl]))
                fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT
                trades += 1

        for lvl in range(GRID_LEVELS):
            if not sell_fills[lvl] and hi >= sell_levels[lvl]:
                sell_fills[lvl] = True
                positions.append(('short', sell_levels[lvl]))
                fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT
                trades += 1

        # Round-trips
        for lvl in range(GRID_LEVELS):
            if buy_fills[lvl] and sell_fills[lvl]:
                spread = sell_levels[lvl] - buy_levels[lvl]
                qty = (POSITION_SIZE_USD * LEVERAGE) / buy_levels[lvl]
                pnl += qty * spread
                fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT * 2
                buy_fills[lvl] = False
                sell_fills[lvl] = False
                round_trips += 1
                positions = [p for p in positions if not (p[0] == 'long' and abs(p[1] - buy_levels[lvl]) < 1e-10)]
                positions = [p for p in positions if not (p[0] == 'short' and abs(p[1] - sell_levels[lvl]) < 1e-10)]

        # Unrealized check
        unrealized = 0
        for side, entry_px in positions:
            qty = (POSITION_SIZE_USD * LEVERAGE) / entry_px
            if side == 'long':
                unrealized += qty * (price - entry_px)
            else:
                unrealized += qty * (entry_px - price)

        if pnl + unrealized - fees < -max_loss and positions:
            for side, entry_px in positions:
                qty = (POSITION_SIZE_USD * LEVERAGE) / entry_px
                if side == 'long':
                    pnl += qty * (price - entry_px)
                else:
                    pnl += qty * (entry_px - price)
                fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT
            positions = []
            close_reason = 'max_loss'
            break

    # Close remaining
    if positions:
        price = df_1m['close'].iloc[min(end_idx, len(df_1m) - 1)]
        for side, entry_px in positions:
            qty = (POSITION_SIZE_USD * LEVERAGE) / entry_px
            if side == 'long':
                pnl += qty * (price - entry_px)
            else:
                pnl += qty * (entry_px - price)
            fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT

    net = pnl - fees
    if trades == 0:
        return None

    return {
        'pnl': round(net, 4),
        'gross_pnl': round(pnl, 4),
        'trades': trades,
        'round_trips': round_trips,
        'fees': round(fees, 4),
        'close_reason': close_reason,
        'spacing': round(spacing_pct, 3),
    }


# ============================================================
# MAIN
# ============================================================
if __name__ == "__main__":
    print("=" * 70)
    print("  ADAPTIVE SPACING BACKTEST — NATR-based vs Fixed")
    print(f"  {len(SYMBOLS)} coins | {DAYS_BACK}d | ${DEPOSIT} dep | {LEVERAGE}x")
    print(f"  Grid: {GRID_LEVELS} lvl × ${POSITION_SIZE_USD} | Session: {SESSION_CANDLES}min")
    print("=" * 70)

    # Results per spacing config
    all_results = {cfg['name']: {'sessions': [], 'filtered': []} for cfg in SPACING_CONFIGS}
    natr_stats = []

    for sym in SYMBOLS:
        print(f"\n  📊 {sym}...")

        # Fetch 1m
        df_1m = fetch_klines(sym, '1m', DAYS_BACK)
        if len(df_1m) < 1000:
            print(f"    skip ({len(df_1m)} candles)")
            continue

        # Calc NATR on 5m
        df_natr5 = calc_natr_5m(df_1m)
        if len(df_natr5) < 50:
            print(f"    skip (not enough 5m data)")
            continue

        # Fetch 15m for screener
        df_15m = fetch_klines(sym, SCREENER_TF, DAYS_BACK)
        df_15m = calc_screener(df_15m)
        df_15m['sw_score'] = df_15m.apply(sideways_score, axis=1)

        # Build lookup: timestamp → natr_5m (nearest)
        natr_lookup = df_natr5.set_index('timestamp')['natr_5m']

        # Process each hour window
        # Step through 1m data in SESSION_CANDLES chunks
        warmup_1m = 100  # skip first 100 candles
        i = warmup_1m

        sym_count = 0
        while i + SESSION_CANDLES < len(df_1m):
            ts = df_1m['timestamp'].iloc[i]

            # Get current NATR from 5m (closest timestamp <=)
            natr_mask = natr_lookup.index <= ts
            if natr_mask.sum() == 0:
                i += SESSION_CANDLES + SESSION_COOLDOWN
                continue
            current_natr = natr_lookup.loc[natr_mask].iloc[-1]

            if pd.isna(current_natr) or current_natr <= 0:
                i += SESSION_CANDLES + SESSION_COOLDOWN
                continue

            # Get screener score (closest 15m candle <=)
            scr_mask = df_15m['timestamp'] <= ts
            if scr_mask.sum() == 0:
                i += SESSION_CANDLES + SESSION_COOLDOWN
                continue
            scr_row = df_15m.loc[scr_mask].iloc[-1]
            sw_score = scr_row['sw_score']

            natr_stats.append(current_natr)

            # Run grid for each spacing config
            for cfg in SPACING_CONFIGS:
                if cfg['type'] == 'fixed':
                    spacing = cfg['value']
                else:
                    spacing = current_natr * cfg['mult']
                    spacing = max(cfg['min'], min(cfg['max'], spacing))

                result = run_grid_session(df_1m, i, spacing)
                if result:
                    result['symbol'] = sym
                    result['natr_5m'] = round(current_natr, 3)
                    result['sw_score'] = round(sw_score, 1)
                    result['ts'] = str(ts)

                    all_results[cfg['name']]['sessions'].append(result)
                    if sw_score >= SCORE_THRESHOLD:
                        all_results[cfg['name']]['filtered'].append(result)

            sym_count += 1
            i += SESSION_CANDLES + SESSION_COOLDOWN

        print(f"    {sym_count} windows processed | avg NATR_5m: {np.mean(natr_stats[-sym_count:]) if sym_count else 0:.3f}%")

    # ============================================================
    # REPORTS
    # ============================================================
    print("\n" + "=" * 70)
    print("  🏆 ИТОГО ПО ВСЕМ МОНЕТАМ")
    print("=" * 70)

    # NATR distribution
    if natr_stats:
        print(f"\n  NATR(5m) distribution across all windows:")
        for pct in [10, 25, 50, 75, 90]:
            print(f"    P{pct}: {np.percentile(natr_stats, pct):.3f}%")

    # Summary table
    print(f"\n  {'Config':<15} {'Mode':<10} {'Sess':>6} {'WR':>6} {'PnL':>10} {'Avg PnL':>10} {'RTs':>6} {'Avg Spc':>8} {'MaxLoss':>8}")
    print(f"  {'─'*79}")

    summary_data = {}

    for cfg in SPACING_CONFIGS:
        name = cfg['name']
        for mode, label in [('sessions', 'ALL'), ('filtered', 'FILT')]:
            sess = all_results[name][mode]
            if not sess:
                continue

            total_pnl = sum(s['pnl'] for s in sess)
            wins = len([s for s in sess if s['pnl'] > 0])
            wr = 100 * wins / len(sess)
            avg_pnl = total_pnl / len(sess)
            rts = sum(s['round_trips'] for s in sess)
            avg_spc = np.mean([s['spacing'] for s in sess])
            ml = len([s for s in sess if s['close_reason'] == 'max_loss'])

            print(f"  {name:<15} {label:<10} {len(sess):>6} {wr:>5.0f}% ${total_pnl:>8.2f} ${avg_pnl:>8.4f} {rts:>6} {avg_spc:>7.3f}% {ml:>8}")

            summary_data[f"{name}_{mode}"] = {
                'sessions': len(sess),
                'win_rate': round(wr, 1),
                'total_pnl': round(total_pnl, 4),
                'avg_pnl': round(avg_pnl, 4),
                'round_trips': rts,
                'avg_spacing': round(avg_spc, 3),
                'max_loss_stops': ml,
            }

    # NATR bracket analysis — which NATR ranges are profitable?
    print(f"\n  📊 PnL by NATR bracket (best adaptive config):")

    # Find best adaptive config
    best_adaptive = None
    best_pnl = -9999
    for cfg in SPACING_CONFIGS:
        if cfg['type'] == 'adaptive':
            sess = all_results[cfg['name']]['sessions']
            if sess:
                tp = sum(s['pnl'] for s in sess)
                if tp > best_pnl:
                    best_pnl = tp
                    best_adaptive = cfg['name']

    if best_adaptive:
        sess = all_results[best_adaptive]['sessions']
        brackets = [(0, 0.3), (0.3, 0.5), (0.5, 0.8), (0.8, 1.2), (1.2, 2.0), (2.0, 99)]
        print(f"\n  Config: {best_adaptive}")
        print(f"  {'NATR range':<15} {'Sess':>6} {'WR':>6} {'PnL':>10} {'Avg PnL':>10} {'Avg Spc':>8} {'RTs':>6}")
        print(f"  {'─'*61}")

        for lo, hi in brackets:
            subset = [s for s in sess if lo <= s['natr_5m'] < hi]
            if not subset:
                continue
            tp = sum(s['pnl'] for s in subset)
            w = len([s for s in subset if s['pnl'] > 0])
            wr = 100 * w / len(subset)
            avg = tp / len(subset)
            avg_s = np.mean([s['spacing'] for s in subset])
            rt = sum(s['round_trips'] for s in subset)
            print(f"  {lo:.1f}-{hi:.1f}%{'':<9} {len(subset):>6} {wr:>5.0f}% ${tp:>8.2f} ${avg:>8.4f} {avg_s:>7.3f}% {rt:>6}")

    # Score bracket analysis for best config
    print(f"\n  📊 PnL by Sideways Score (best adaptive: {best_adaptive}):")
    if best_adaptive:
        sess = all_results[best_adaptive]['sessions']
        brackets = [(0, 20), (20, 35), (35, 50), (50, 65), (65, 100)]
        print(f"  {'Score':<15} {'Sess':>6} {'WR':>6} {'PnL':>10} {'Avg PnL':>10} {'RTs':>6}")
        print(f"  {'─'*53}")

        for lo, hi in brackets:
            subset = [s for s in sess if lo <= s['sw_score'] < hi]
            if not subset:
                continue
            tp = sum(s['pnl'] for s in subset)
            w = len([s for s in subset if s['pnl'] > 0])
            wr = 100 * w / len(subset)
            avg = tp / len(subset)
            rt = sum(s['round_trips'] for s in subset)
            print(f"  {lo}-{hi}{'':<11} {len(subset):>6} {wr:>5.0f}% ${tp:>8.2f} ${avg:>8.4f} {rt:>6}")

    # Per-symbol breakdown for best config
    print(f"\n  📊 Per-symbol (best adaptive: {best_adaptive}):")
    if best_adaptive:
        sess = all_results[best_adaptive]['sessions']
        syms = sorted(set(s['symbol'] for s in sess))
        print(f"  {'Symbol':<12} {'Sess':>6} {'WR':>6} {'PnL':>10} {'Avg NATR':>10} {'RTs':>6}")
        print(f"  {'─'*50}")
        for sym in syms:
            subset = [s for s in sess if s['symbol'] == sym]
            tp = sum(s['pnl'] for s in subset)
            w = len([s for s in subset if s['pnl'] > 0])
            wr = 100 * w / len(subset)
            an = np.mean([s['natr_5m'] for s in subset])
            rt = sum(s['round_trips'] for s in subset)
            emoji = '🟢' if tp > 0 else '🔴'
            print(f"  {emoji} {sym:<10} {len(subset):>6} {wr:>5.0f}% ${tp:>8.2f} {an:>9.3f}% {rt:>6}")

    # Save
    output = {
        'config': {
            'symbols': SYMBOLS, 'days_back': DAYS_BACK,
            'grid_levels': GRID_LEVELS, 'position_size': POSITION_SIZE_USD,
            'leverage': LEVERAGE, 'session_candles': SESSION_CANDLES,
            'score_threshold': SCORE_THRESHOLD,
        },
        'spacing_configs': SPACING_CONFIGS,
        'summary': summary_data,
        'natr_percentiles': {
            f'p{p}': round(np.percentile(natr_stats, p), 3)
            for p in [10, 25, 50, 75, 90]
        } if natr_stats else {},
        'tested_at': datetime.now().isoformat(),
    }

    out_path = Path(__file__).parent / 'results_adaptive_spacing.json'
    with open(out_path, 'w') as f:
        json.dump(output, f, indent=2)
    print(f"\n💾 Saved to {out_path}")
    print("Done!")

📜 Git History

c6f6bd5chore: initial commit — version control setup5 weeks ago
Show last diff
Loading...