← Back
"""
Grid Backtest — Adaptive Spacing based on NATR
================================================
Tests coins that traded live and lost money.
Compares fixed 0.1% spacing vs adaptive spacing = NATR/N.

Hypothesis: RIVER, PLAY, TRU are too volatile for 0.1% spacing.
When NATR is high, need wider spacing to avoid getting steamrolled.

Usage: python3 backtest_grid_adaptive.py
"""

import requests
import pandas as pd
import numpy as np
import time
import json
from datetime import datetime
from pathlib import Path

# ============================================================
# CONFIG
# ============================================================
SYMBOLS = ["RIVERUSDT", "PLAYUSDT", "TRUUSDT", "ZECUSDT", "DOGEUSDT", "SOLUSDT"]
INTERVAL = "1m"
DAYS_BACK = 7
DEPOSIT = 50.0
LEVERAGE = 10
POSITION_SIZE_USD = 3.0
FEE_PCT = 0.02 / 100       # maker 0.02%

GRID_LEVELS = 8
MAX_LOSS_PCT = 3.0

# Fixed spacing for comparison
FIXED_SPACING = 0.1

# Adaptive spacing: spacing = NATR * MULTIPLIER
# NATR(14) on 1m ≈ volatility per candle as % of price
# If NATR = 0.05% → spacing = 0.05 * 2 = 0.1% (same as fixed for low vol)
# If NATR = 0.15% → spacing = 0.15 * 2 = 0.3% (wider for high vol)
ADAPTIVE_MULTIPLIERS = [1.5, 2.0, 2.5, 3.0]
NATR_PERIOD = 14

# BB/ADX for reference
BB_PERIOD = 20
BB_STD = 2.0
ADX_PERIOD = 14


# ============================================================
# DATA FETCH
# ============================================================
def fetch_klines(symbol, interval, days_back):
    url = "https://fapi.binance.com/fapi/v1/klines"
    end_ts = int(time.time() * 1000)
    start_ts = int((time.time() - days_back * 86400) * 1000)

    all_candles = []
    current_start = start_ts

    while current_start < end_ts:
        params = {
            "symbol": symbol, "interval": interval,
            "startTime": current_start, "limit": 1500
        }
        try:
            resp = requests.get(url, params=params, timeout=10)
            data = resp.json()
            if not isinstance(data, list) or len(data) == 0:
                break
            all_candles.extend(data)
            current_start = data[-1][0] + 1
            time.sleep(0.1)
        except Exception as e:
            print(f"  Error: {e}, retrying...")
            time.sleep(1)
            continue

    df = pd.DataFrame(all_candles, columns=[
        'timestamp', 'open', 'high', 'low', 'close', 'volume',
        'close_time', 'quote_volume', 'trades', 'taker_buy_base',
        'taker_buy_quote', 'ignore'
    ])
    for col in ['open', 'high', 'low', 'close', 'volume', 'quote_volume']:
        df[col] = df[col].astype(float)
    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
    df = df.drop_duplicates(subset='timestamp').sort_values('timestamp').reset_index(drop=True)
    return df


# ============================================================
# INDICATORS
# ============================================================
def calc_indicators(df):
    # ATR / NATR
    high, low, close = df['high'], df['low'], df['close']
    tr1 = high - low
    tr2 = (high - close.shift(1)).abs()
    tr3 = (low - close.shift(1)).abs()
    tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
    df['atr'] = tr.ewm(alpha=1/NATR_PERIOD, min_periods=NATR_PERIOD).mean()
    df['natr'] = (df['atr'] / df['close']) * 100  # as percentage

    # BB
    df['bb_mid'] = close.rolling(BB_PERIOD).mean()
    df['bb_std'] = close.rolling(BB_PERIOD).std()
    df['bb_upper'] = df['bb_mid'] + BB_STD * df['bb_std']
    df['bb_lower'] = df['bb_mid'] - BB_STD * df['bb_std']
    df['bb_width'] = ((df['bb_upper'] - df['bb_lower']) / df['bb_mid']) * 100

    # ADX
    plus_dm = high.diff()
    minus_dm = -low.diff()
    plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0.0)
    minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0.0)
    atr_adx = tr.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean()
    plus_di = 100 * (plus_dm.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean() / atr_adx)
    minus_di = 100 * (minus_dm.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean() / atr_adx)
    dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-10)
    df['adx'] = dx.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean()

    return df


# ============================================================
# GRID ENGINE — supports both fixed and adaptive spacing
# ============================================================
def run_grid(df, spacing_mode="fixed", spacing_value=0.1, adaptive_mult=2.0):
    """
    Grid engine.
    spacing_mode: "fixed" = constant % spacing, "adaptive" = NATR * multiplier
    No recenter — infinite grid, just round-trips + max loss stop.
    """
    results = {
        'sessions': [],
        'total_pnl': 0,
        'total_trades': 0,
        'total_fees': 0,
        'total_round_trips': 0,
        'max_loss_stops': 0,
    }

    warmup = max(BB_PERIOD, ADX_PERIOD, NATR_PERIOD) + 10
    i = warmup
    session_id = 0

    while i < len(df):
        mid_price = df['close'].iloc[i]

        # Determine spacing
        if spacing_mode == "adaptive":
            natr = df['natr'].iloc[i]
            if pd.isna(natr) or natr <= 0:
                i += 1
                continue
            spacing_pct = natr * adaptive_mult
            # Clamp: min 0.05%, max 1.0%
            spacing_pct = max(0.05, min(1.0, spacing_pct))
        else:
            spacing_pct = spacing_value

        spacing_abs = mid_price * spacing_pct / 100

        # Create grid levels
        buy_levels = [mid_price - k * spacing_abs for k in range(1, GRID_LEVELS + 1)]
        sell_levels = [mid_price + k * spacing_abs for k in range(1, GRID_LEVELS + 1)]

        buy_fills = [False] * GRID_LEVELS
        sell_fills = [False] * GRID_LEVELS
        # Track individual fill prices for round-trip matching
        buy_fill_prices = [0.0] * GRID_LEVELS
        sell_fill_prices = [0.0] * GRID_LEVELS

        session_pnl = 0.0
        session_fees = 0.0
        session_trades = 0
        session_rts = 0
        session_start = i
        max_capital = GRID_LEVELS * 2 * POSITION_SIZE_USD
        max_loss = max_capital * MAX_LOSS_PCT / 100

        j = i + 1
        session_closed = False
        close_reason = 'timeout'

        while j < len(df) and not session_closed:
            price = df['close'].iloc[j]
            low_px = df['low'].iloc[j]
            high_px = df['high'].iloc[j]

            # Fill buy levels
            for lvl in range(GRID_LEVELS):
                if not buy_fills[lvl] and low_px <= buy_levels[lvl]:
                    buy_fills[lvl] = True
                    buy_fill_prices[lvl] = buy_levels[lvl]
                    session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT
                    session_trades += 1

            # Fill sell levels
            for lvl in range(GRID_LEVELS):
                if not sell_fills[lvl] and high_px >= sell_levels[lvl]:
                    sell_fills[lvl] = True
                    sell_fill_prices[lvl] = sell_levels[lvl]
                    session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT
                    session_trades += 1

            # Check round-trips (buy + sell pair both filled)
            for lvl in range(GRID_LEVELS):
                if buy_fills[lvl] and sell_fills[lvl]:
                    # Round trip!
                    buy_px = buy_fill_prices[lvl]
                    sell_px = sell_fill_prices[lvl]
                    qty = (POSITION_SIZE_USD * LEVERAGE) / buy_px
                    rt_pnl = qty * (sell_px - buy_px)
                    rt_fee = POSITION_SIZE_USD * LEVERAGE * FEE_PCT  # close fee
                    session_pnl += rt_pnl
                    session_fees += rt_fee
                    session_rts += 1
                    # Reset level for re-use
                    buy_fills[lvl] = False
                    sell_fills[lvl] = False

            # Unrealized PnL from unfilled one-sided fills
            unrealized = 0
            for lvl in range(GRID_LEVELS):
                if buy_fills[lvl] and not sell_fills[lvl]:
                    qty = (POSITION_SIZE_USD * LEVERAGE) / buy_fill_prices[lvl]
                    unrealized += qty * (price - buy_fill_prices[lvl])
                elif sell_fills[lvl] and not buy_fills[lvl]:
                    qty = (POSITION_SIZE_USD * LEVERAGE) / sell_fill_prices[lvl]
                    unrealized += qty * (sell_fill_prices[lvl] - price)

            net_pnl = session_pnl + unrealized - session_fees

            # Max loss stop
            if net_pnl < -max_loss:
                # Close all open positions at market
                close_pnl = 0
                for lvl in range(GRID_LEVELS):
                    if buy_fills[lvl] and not sell_fills[lvl]:
                        qty = (POSITION_SIZE_USD * LEVERAGE) / buy_fill_prices[lvl]
                        close_pnl += qty * (price - buy_fill_prices[lvl])
                        session_fees += POSITION_SIZE_USD * LEVERAGE * 0.04 / 100  # taker
                    elif sell_fills[lvl] and not buy_fills[lvl]:
                        qty = (POSITION_SIZE_USD * LEVERAGE) / sell_fill_prices[lvl]
                        close_pnl += qty * (sell_fill_prices[lvl] - price)
                        session_fees += POSITION_SIZE_USD * LEVERAGE * 0.04 / 100  # taker
                session_pnl += close_pnl
                session_closed = True
                close_reason = 'max_loss'

            # Session timeout — 60 min
            if j - session_start >= 60:
                close_pnl = 0
                for lvl in range(GRID_LEVELS):
                    if buy_fills[lvl] and not sell_fills[lvl]:
                        qty = (POSITION_SIZE_USD * LEVERAGE) / buy_fill_prices[lvl]
                        close_pnl += qty * (price - buy_fill_prices[lvl])
                        session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT
                    elif sell_fills[lvl] and not buy_fills[lvl]:
                        qty = (POSITION_SIZE_USD * LEVERAGE) / sell_fill_prices[lvl]
                        close_pnl += qty * (sell_fill_prices[lvl] - price)
                        session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT
                session_pnl += close_pnl
                session_closed = True
                close_reason = 'timeout'

            j += 1

        net_session = session_pnl - session_fees

        if session_trades > 0:
            session_id += 1
            results['sessions'].append({
                'id': session_id,
                'spacing_pct': round(spacing_pct, 4),
                'trades': session_trades,
                'round_trips': session_rts,
                'pnl': round(net_session, 4),
                'fees': round(session_fees, 4),
                'close_reason': close_reason,
                'duration_min': j - session_start,
            })
            results['total_pnl'] += net_session
            results['total_trades'] += session_trades
            results['total_fees'] += session_fees
            results['total_round_trips'] += session_rts
            if close_reason == 'max_loss':
                results['max_loss_stops'] += 1

        i = j + 5  # cooldown

    return results


# ============================================================
# REPORT
# ============================================================
def print_compact_report(symbol, natr_avg, results_dict):
    print(f"\n{'='*80}")
    print(f"  {symbol}  |  Avg NATR(14): {natr_avg:.4f}%  |  7 days 1m")
    print(f"{'='*80}")
    print(f"  {'Mode':<22} {'PnL':>8} {'RTs':>5} {'MaxLoss':>8} {'Sessions':>9} {'Avg Spc':>8}")
    print(f"  {'─'*66}")

    for name, res in results_dict.items():
        pnl = res['total_pnl']
        rts = res['total_round_trips']
        ml = res['max_loss_stops']
        sess = len(res['sessions'])
        # Average spacing used
        if res['sessions']:
            avg_sp = sum(s['spacing_pct'] for s in res['sessions']) / len(res['sessions'])
        else:
            avg_sp = 0
        emoji = '🟢' if pnl > 0 else '🔴'
        print(f"  {emoji} {name:<20} ${pnl:>+7.2f} {rts:>5} {ml:>8} {sess:>9} {avg_sp:>7.3f}%")

    print(f"{'='*80}")


# ============================================================
# MAIN
# ============================================================
if __name__ == "__main__":
    print("=" * 80)
    print("  GRID BACKTEST — ADAPTIVE SPACING (NATR-based)")
    print(f"  Symbols: {', '.join(SYMBOLS)}")
    print(f"  {INTERVAL} | {DAYS_BACK} days | ${POSITION_SIZE_USD}×{LEVERAGE}x | {GRID_LEVELS} levels")
    print("=" * 80)

    all_results = {}

    for symbol in SYMBOLS:
        print(f"\n{'─'*40}")
        print(f"  Fetching {symbol}...")
        df = fetch_klines(symbol, INTERVAL, DAYS_BACK)
        if len(df) < 500:
            print(f"  ⚠️ Too few candles ({len(df)}), skipping")
            continue

        df = calc_indicators(df)

        # NATR stats
        valid_natr = df['natr'].dropna()
        natr_avg = valid_natr.mean()
        natr_p50 = valid_natr.median()
        natr_p90 = valid_natr.quantile(0.9)
        print(f"  NATR(14): avg={natr_avg:.4f}% median={natr_p50:.4f}% p90={natr_p90:.4f}%")
        print(f"  BB width avg: {df['bb_width'].dropna().mean():.4f}%")
        print(f"  ADX avg: {df['adx'].dropna().mean():.1f}")
        print(f"  Price: ${df['close'].iloc[-1]:.6f}")

        # Run tests
        results_dict = {}

        # 1. Fixed 0.1% (current production)
        print(f"  Testing fixed 0.1%...")
        results_dict["Fixed 0.1%"] = run_grid(df, "fixed", 0.1)

        # 2. Fixed 0.2%
        print(f"  Testing fixed 0.2%...")
        results_dict["Fixed 0.2%"] = run_grid(df, "fixed", 0.2)

        # 3. Fixed 0.3%
        print(f"  Testing fixed 0.3%...")
        results_dict["Fixed 0.3%"] = run_grid(df, "fixed", 0.3)

        # 4-7. Adaptive with different multipliers
        for mult in ADAPTIVE_MULTIPLIERS:
            label = f"Adaptive ×{mult}"
            print(f"  Testing {label}...")
            results_dict[label] = run_grid(df, "adaptive", adaptive_mult=mult)

        print_compact_report(symbol, natr_avg, results_dict)
        all_results[symbol] = {
            'natr_avg': round(natr_avg, 4),
            'natr_p50': round(natr_p50, 4),
            'natr_p90': round(natr_p90, 4),
            'results': {k: {'pnl': round(v['total_pnl'], 4), 'rts': v['total_round_trips'],
                            'max_loss': v['max_loss_stops'], 'sessions': len(v['sessions'])}
                        for k, v in results_dict.items()}
        }

    # Summary table
    print(f"\n\n{'='*80}")
    print(f"  SUMMARY — ALL COINS")
    print(f"{'='*80}")
    print(f"  {'Symbol':<14} {'NATR':>6} {'Fix0.1':>8} {'Fix0.2':>8} {'Fix0.3':>8} {'Adp×1.5':>8} {'Adp×2.0':>8} {'Adp×2.5':>8} {'Adp×3.0':>8}")
    print(f"  {'─'*78}")
    for sym, data in all_results.items():
        natr = data['natr_avg']
        vals = []
        for key in ["Fixed 0.1%", "Fixed 0.2%", "Fixed 0.3%",
                     "Adaptive ×1.5", "Adaptive ×2.0", "Adaptive ×2.5", "Adaptive ×3.0"]:
            pnl = data['results'].get(key, {}).get('pnl', 0)
            vals.append(f"${pnl:>+6.2f}")
        print(f"  {sym:<14} {natr:>5.3f}% {' '.join(vals)}")
    print(f"{'='*80}")

    # Save
    out_path = Path(__file__).parent / 'results_grid_adaptive.json'
    with open(out_path, 'w') as f:
        json.dump(all_results, f, indent=2)
    print(f"\n💾 Saved to {out_path}")

📜 Git History

c6f6bd5chore: initial commit — version control setup5 weeks ago
Show last diff
Loading...