← Back
ā˜†
"""
Smart Grid Backtest — Range Detection + Structural Breakout Exit
Tests Rick's approach: find sideways → measure range → grid inside → exit on breakout

Key differences from old backtests:
- Entry: ADX<25 + BB width < threshold → ranging market confirmed
- Grid bounds: actual high/low of lookback period (not fixed % from center)
- Exit: ADX>30 OR price outside range >1% for 5+ candles → breakout confirmed
- NO session timeout, NO max_loss hard stop
- Infinite grid within range (orders re-placed after round-trips)

Usage: python3 backtest_grid_smart.py
"""

import requests
import pandas as pd
import numpy as np
import time
import json
from datetime import datetime
from pathlib import Path

# ============================================================
# CONFIG
# ============================================================
SYMBOLS = ["DOGEUSDT", "ETHUSDT"]
INTERVAL = "1m"
DAYS_BACK = 30
DEPOSIT = 50.0
LEVERAGE = 10
ORDER_SIZE_USD = 5.0        # $5 per grid order Ɨ 10x = $50 notional
FEE_PCT = 0.02 / 100        # maker fee 0.02%

# Grid params
GRID_COUNT = 10              # number of grid lines inside the range

# Range detection
LOOKBACK_HOURS = 48          # measure range over last 48h
BB_PERIOD = 20
BB_STD = 2.0
ADX_PERIOD = 14

# Entry conditions (on 1h candles, but we compute from 1m)
ADX_ENTRY = 25               # ADX < 25 = ranging
BB_WIDTH_ENTRY = 3.0         # BB width < 3% = squeezed (1h equivalent)
# We'll use 60-candle rolling for 1h equivalent on 1m data

# Exit conditions
ADX_EXIT = 30                # ADX > 30 = trend started
BREAKOUT_PCTS = [1.0, 1.5]   # price outside range by X%
BREAKOUT_CONFIRM_CANDLES = 5  # must stay outside for N candles

# Liquidation protection
LIQUIDATION_MARGIN_PCT = 80  # close if unrealized loss > 80% of deposit


# ============================================================
# DATA FETCH
# ============================================================
def fetch_klines(symbol, interval, days_back):
    url = "https://fapi.binance.com/fapi/v1/klines"
    end_ts = int(time.time() * 1000)
    start_ts = int((time.time() - days_back * 86400) * 1000)
    all_candles = []
    current_start = start_ts

    print(f"[fetch] {symbol} {interval} — {days_back} days...")
    while current_start < end_ts:
        params = {"symbol": symbol, "interval": interval,
                  "startTime": current_start, "limit": 1500}
        try:
            resp = requests.get(url, params=params, timeout=10)
            data = resp.json()
            if not isinstance(data, list) or len(data) == 0:
                break
            all_candles.extend(data)
            current_start = data[-1][0] + 1
            time.sleep(0.1)
        except Exception as e:
            print(f"[fetch] Error: {e}")
            time.sleep(1)
            continue

    df = pd.DataFrame(all_candles, columns=[
        'timestamp', 'open', 'high', 'low', 'close', 'volume',
        'close_time', 'quote_volume', 'trades', 'taker_buy_base',
        'taker_buy_quote', 'ignore'
    ])
    for col in ['open', 'high', 'low', 'close', 'volume']:
        df[col] = df[col].astype(float)
    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
    df = df.drop_duplicates(subset='timestamp').sort_values('timestamp').reset_index(drop=True)
    print(f"[fetch] Got {len(df)} candles: {df['timestamp'].iloc[0]} → {df['timestamp'].iloc[-1]}")
    return df


# ============================================================
# INDICATORS (computed on 1m, but we use rolling windows for 1h equiv)
# ============================================================
def add_indicators(df):
    # BB on 60-candle rolling (ā‰ˆ1h on 1m data)
    period = 60
    df['bb_mid'] = df['close'].rolling(period).mean()
    df['bb_std'] = df['close'].rolling(period).std()
    df['bb_upper'] = df['bb_mid'] + BB_STD * df['bb_std']
    df['bb_lower'] = df['bb_mid'] - BB_STD * df['bb_std']
    df['bb_width'] = ((df['bb_upper'] - df['bb_lower']) / df['bb_mid']) * 100

    # ADX on 60-candle (ā‰ˆ1h)
    high, low, close = df['high'], df['low'], df['close']
    plus_dm = high.diff()
    minus_dm = -low.diff()
    plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0.0)
    minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0.0)

    tr1 = high - low
    tr2 = (high - close.shift(1)).abs()
    tr3 = (low - close.shift(1)).abs()
    tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)

    p = ADX_PERIOD * 4  # longer smoothing for 1m data
    atr = tr.ewm(alpha=1/p, min_periods=p).mean()
    plus_di = 100 * (plus_dm.ewm(alpha=1/p, min_periods=p).mean() / atr)
    minus_di = 100 * (minus_dm.ewm(alpha=1/p, min_periods=p).mean() / atr)
    dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-10)
    df['adx'] = dx.ewm(alpha=1/p, min_periods=p).mean()

    # Range: rolling high/low over LOOKBACK_HOURS
    lb = LOOKBACK_HOURS * 60  # candles
    df['range_high'] = df['high'].rolling(lb).max()
    df['range_low'] = df['low'].rolling(lb).min()
    df['range_pct'] = ((df['range_high'] - df['range_low']) / df['close']) * 100

    return df


# ============================================================
# SMART GRID ENGINE
# ============================================================
def run_smart_grid(df, breakout_pct=1.0):
    """
    1. Scan for ranging conditions (ADX<25, BB_width<3%)
    2. Measure range = high/low of last 48h
    3. Place grid: GRID_COUNT equally spaced levels
    4. Run infinite grid (re-place orders after round-trips)
    5. Exit when: ADX>30 OR price breaks range by breakout_pct% for 5 candles
    """
    results = {
        'sessions': [],
        'equity_curve': [],
        'total_pnl': 0,
        'total_round_trips': 0,
        'total_fees': 0,
        'max_drawdown': 0,
    }

    warmup = LOOKBACK_HOURS * 60 + 100
    i = warmup
    cooldown_until = 0
    equity = DEPOSIT

    while i < len(df):
        if i < cooldown_until:
            i += 1
            continue

        # Check entry conditions
        bb_w = df['bb_width'].iloc[i]
        adx = df['adx'].iloc[i]
        r_high = df['range_high'].iloc[i]
        r_low = df['range_low'].iloc[i]

        if pd.isna(bb_w) or pd.isna(adx) or pd.isna(r_high) or pd.isna(r_low):
            i += 1
            continue

        if bb_w > BB_WIDTH_ENTRY or adx > ADX_ENTRY:
            i += 1
            continue

        # RANGING DETECTED → start grid
        price = df['close'].iloc[i]
        grid_upper = r_high
        grid_lower = r_low
        grid_range = grid_upper - grid_lower

        if grid_range <= 0 or grid_range / price * 100 < 0.3:
            # Range too tight, skip
            i += 1
            continue

        # Create grid levels
        step = grid_range / (GRID_COUNT + 1)
        grid_levels = [grid_lower + step * (k + 1) for k in range(GRID_COUNT)]

        # Classify levels: below price = buy, above price = sell
        buy_orders = {}   # level_idx -> price
        sell_orders = {}   # level_idx -> price
        for idx, lvl in enumerate(grid_levels):
            if lvl < price:
                buy_orders[idx] = lvl
            else:
                sell_orders[idx] = lvl

        # State
        net_position = 0.0       # in base asset units (positive = long)
        avg_entry = 0.0          # avg entry for net position
        session_pnl = 0.0
        session_fees = 0.0
        session_rts = 0
        filled_buys = set()      # level indices with pending long
        filled_sells = set()     # level indices with pending short
        session_start = i
        start_equity = equity
        min_equity = equity
        outside_count = 0        # candles price spent outside range

        session_active = True
        close_reason = 'end_of_data'
        j = i + 1

        while j < len(df) and session_active:
            candle_high = df['high'].iloc[j]
            candle_low = df['low'].iloc[j]
            price = df['close'].iloc[j]
            curr_adx = df['adx'].iloc[j] if not pd.isna(df['adx'].iloc[j]) else 0

            # === FILL BUY ORDERS ===
            for idx, lvl in list(buy_orders.items()):
                if candle_low <= lvl and idx not in filled_buys:
                    # Buy filled
                    qty = (ORDER_SIZE_USD * LEVERAGE) / lvl
                    fee = ORDER_SIZE_USD * LEVERAGE * FEE_PCT
                    session_fees += fee

                    # Update net position
                    if net_position >= 0:
                        # Adding to long or opening long
                        total_cost = abs(net_position) * avg_entry + qty * lvl
                        net_position += qty
                        avg_entry = total_cost / net_position if net_position > 0 else 0
                    else:
                        # Reducing short
                        if qty >= abs(net_position):
                            # Close short completely + open long
                            close_pnl = abs(net_position) * (avg_entry - lvl)
                            session_pnl += close_pnl
                            remaining_qty = qty - abs(net_position)
                            net_position = remaining_qty
                            avg_entry = lvl if remaining_qty > 0 else 0
                            session_rts += 1
                        else:
                            # Partial close short
                            close_pnl = qty * (avg_entry - lvl)
                            session_pnl += close_pnl
                            net_position += qty
                            # avg_entry stays same for short

                    filled_buys.add(idx)
                    # This level becomes a sell order (grid re-place)
                    # Next sell = one level up
                    next_sell_idx = idx + 1
                    if next_sell_idx < GRID_COUNT and next_sell_idx not in sell_orders:
                        sell_orders[next_sell_idx] = grid_levels[next_sell_idx]
                        if next_sell_idx in filled_sells:
                            filled_sells.discard(next_sell_idx)

            # === FILL SELL ORDERS ===
            for idx, lvl in list(sell_orders.items()):
                if candle_high >= lvl and idx not in filled_sells:
                    # Sell filled
                    qty = (ORDER_SIZE_USD * LEVERAGE) / lvl
                    fee = ORDER_SIZE_USD * LEVERAGE * FEE_PCT
                    session_fees += fee

                    if net_position <= 0:
                        # Adding to short or opening short
                        total_cost = abs(net_position) * avg_entry + qty * lvl
                        net_position -= qty
                        avg_entry = total_cost / abs(net_position) if net_position != 0 else 0
                    else:
                        # Reducing long
                        if qty >= net_position:
                            close_pnl = net_position * (lvl - avg_entry)
                            session_pnl += close_pnl
                            remaining_qty = qty - net_position
                            net_position = -remaining_qty
                            avg_entry = lvl if remaining_qty > 0 else 0
                            session_rts += 1
                        else:
                            close_pnl = qty * (lvl - avg_entry)
                            session_pnl += close_pnl
                            net_position -= qty

                    filled_sells.add(idx)
                    # Re-place buy one level down
                    next_buy_idx = idx - 1
                    if next_buy_idx >= 0 and next_buy_idx not in buy_orders:
                        buy_orders[next_buy_idx] = grid_levels[next_buy_idx]
                        if next_buy_idx in filled_buys:
                            filled_buys.discard(next_buy_idx)

            # === UNREALIZED PnL ===
            if net_position > 0:
                unrealized = net_position * (price - avg_entry)
            elif net_position < 0:
                unrealized = abs(net_position) * (avg_entry - price)
            else:
                unrealized = 0

            current_equity = start_equity + session_pnl + unrealized - session_fees
            min_equity = min(min_equity, current_equity)

            # === EXIT CONDITIONS ===

            # 1. ADX breakout
            if curr_adx > ADX_EXIT:
                close_reason = f'adx_breakout (ADX={curr_adx:.1f})'
                session_active = False

            # 2. Price breakout (outside range for N candles)
            if price > grid_upper * (1 + breakout_pct / 100) or \
               price < grid_lower * (1 - breakout_pct / 100):
                outside_count += 1
                if outside_count >= BREAKOUT_CONFIRM_CANDLES:
                    direction = "UP" if price > grid_upper else "DOWN"
                    close_reason = f'price_breakout_{direction} ({outside_count} candles)'
                    session_active = False
            else:
                outside_count = 0

            # 3. Liquidation protection
            if current_equity < DEPOSIT * (1 - LIQUIDATION_MARGIN_PCT / 100):
                close_reason = f'liquidation_protect (equity=${current_equity:.2f})'
                session_active = False

            j += 1

        # === CLOSE SESSION: liquidate net position at market ===
        if net_position != 0:
            final_price = df['close'].iloc[min(j, len(df) - 1)]
            if net_position > 0:
                close_pnl = net_position * (final_price - avg_entry)
            else:
                close_pnl = abs(net_position) * (avg_entry - final_price)
            session_pnl += close_pnl
            close_fee = abs(net_position) * final_price * FEE_PCT
            session_fees += close_fee

        net_session = session_pnl - session_fees
        equity += net_session
        duration_hours = (j - session_start) / 60

        drawdown = start_equity - min_equity
        results['max_drawdown'] = max(results['max_drawdown'], drawdown)

        spacing_pct = (step / price) * 100

        session_data = {
            'id': len(results['sessions']) + 1,
            'start': str(df['timestamp'].iloc[session_start]),
            'end': str(df['timestamp'].iloc[min(j - 1, len(df) - 1)]),
            'duration_hours': round(duration_hours, 1),
            'grid_range': f"${grid_lower:.4f} — ${grid_upper:.4f}",
            'range_pct': round(grid_range / price * 100, 2),
            'spacing_pct': round(spacing_pct, 3),
            'entry_adx': round(float(adx), 1),
            'entry_bb_width': round(float(bb_w), 2),
            'round_trips': session_rts,
            'net_position_at_close': round(net_position, 4),
            'pnl': round(net_session, 4),
            'fees': round(session_fees, 4),
            'close_reason': close_reason,
            'equity_after': round(equity, 2),
            'max_drawdown': round(drawdown, 2),
        }

        results['sessions'].append(session_data)
        results['total_pnl'] += net_session
        results['total_round_trips'] += session_rts
        results['total_fees'] += session_fees

        # Cooldown: 2 hours after exit
        cooldown_until = j + 120
        i = j + 1

    return results


# ============================================================
# REPORT
# ============================================================
def print_report(symbol, results, df, breakout_pct):
    sessions = results['sessions']
    wins = [s for s in sessions if s['pnl'] > 0]
    losses = [s for s in sessions if s['pnl'] <= 0]

    print(f"\n{'='*70}")
    print(f"  SMART GRID — {symbol} | Breakout exit: {breakout_pct}%")
    print(f"{'='*70}")
    print(f"  Period:        {df['timestamp'].iloc[0]} → {df['timestamp'].iloc[-1]}")
    print(f"  Deposit:       ${DEPOSIT} Ɨ {LEVERAGE}x")
    print(f"  Grid lines:    {GRID_COUNT} | Order size: ${ORDER_SIZE_USD}")
    print(f"  Entry:         ADX<{ADX_ENTRY}, BB_width<{BB_WIDTH_ENTRY}%")
    print(f"  Exit:          ADX>{ADX_EXIT} OR price>{breakout_pct}% outside for {BREAKOUT_CONFIRM_CANDLES} candles")
    print(f"{'─'*70}")
    print(f"  Sessions:      {len(sessions)}")
    print(f"  Win/Loss:      {len(wins)}W / {len(losses)}L ({100*len(wins)/max(len(sessions),1):.0f}% WR)")
    print(f"  Round trips:   {results['total_round_trips']}")
    print(f"{'─'*70}")
    print(f"  Total PnL:     ${results['total_pnl']:.4f}")
    print(f"  Total fees:    ${results['total_fees']:.4f}")
    print(f"  Final equity:  ${DEPOSIT + results['total_pnl']:.2f}")
    print(f"  Max drawdown:  ${results['max_drawdown']:.2f}")
    print(f"  ROI:           {results['total_pnl']/DEPOSIT*100:.1f}%")

    if wins:
        print(f"  Avg win:       ${sum(s['pnl'] for s in wins)/len(wins):.4f}")
    if losses:
        print(f"  Avg loss:      ${sum(s['pnl'] for s in losses)/len(losses):.4f}")

    # Close reasons
    reasons = {}
    for s in sessions:
        r = s['close_reason'].split(' ')[0]
        reasons[r] = reasons.get(r, 0) + 1
    print(f"{'─'*70}")
    print(f"  Exit reasons:")
    for r, cnt in sorted(reasons.items(), key=lambda x: -x[1]):
        print(f"    {r}: {cnt}")

    # Duration stats
    if sessions:
        durations = [s['duration_hours'] for s in sessions]
        print(f"{'─'*70}")
        print(f"  Session duration: avg {np.mean(durations):.1f}h | min {min(durations):.1f}h | max {max(durations):.1f}h")

        # Spacing stats
        spacings = [s['spacing_pct'] for s in sessions]
        print(f"  Grid spacing:     avg {np.mean(spacings):.3f}% | min {min(spacings):.3f}% | max {max(spacings):.3f}%")

    # All sessions detail
    print(f"{'─'*70}")
    print(f"  {'#':>3} {'Start':>16} {'Dur(h)':>7} {'RTs':>4} {'PnL':>10} {'Equity':>8} {'DD':>6} {'Exit reason'}")
    print(f"  {'─'*90}")
    for s in sessions:
        emoji = '🟢' if s['pnl'] > 0 else 'šŸ”“'
        print(f"  {emoji}{s['id']:>2} {s['start'][5:16]:>16} {s['duration_hours']:>6.1f}h {s['round_trips']:>4} "
              f"${s['pnl']:>8.4f} ${s['equity_after']:>7.2f} ${s['max_drawdown']:>5.2f} {s['close_reason'][:35]}")

    print(f"{'='*70}\n")


# ============================================================
# MAIN
# ============================================================
if __name__ == "__main__":
    print("=" * 70)
    print("  SMART GRID BACKTEST — Range Detection + Structural Exit")
    print(f"  Symbols: {', '.join(SYMBOLS)} | {INTERVAL} | {DAYS_BACK} days")
    print("=" * 70)

    all_results = {}

    for symbol in SYMBOLS:
        df = fetch_klines(symbol, INTERVAL, DAYS_BACK)
        df = add_indicators(df)

        # Stats
        valid = df.dropna(subset=['bb_width', 'adx'])
        ranging = valid[(valid['bb_width'] < BB_WIDTH_ENTRY) & (valid['adx'] < ADX_ENTRY)]
        print(f"\n[{symbol}] Ranging candles: {len(ranging)}/{len(valid)} ({100*len(ranging)/max(len(valid),1):.1f}%)")
        print(f"[{symbol}] BB width: {valid['bb_width'].min():.2f}% — {valid['bb_width'].max():.2f}%")
        print(f"[{symbol}] ADX: {valid['adx'].min():.1f} — {valid['adx'].max():.1f}")
        if len(valid) > 0:
            print(f"[{symbol}] Range (48h): {valid['range_pct'].min():.2f}% — {valid['range_pct'].max():.2f}%")

        symbol_results = {}
        for bp in BREAKOUT_PCTS:
            print(f"\n[{symbol}] Running smart grid (breakout={bp}%)...")
            result = run_smart_grid(df, breakout_pct=bp)
            print_report(symbol, result, df, bp)
            symbol_results[f"breakout_{bp}"] = {
                'total_pnl': round(result['total_pnl'], 4),
                'total_rts': result['total_round_trips'],
                'total_fees': round(result['total_fees'], 4),
                'sessions': len(result['sessions']),
                'max_drawdown': round(result['max_drawdown'], 2),
                'win_rate': round(100 * len([s for s in result['sessions'] if s['pnl'] > 0]) / max(len(result['sessions']), 1), 1),
            }

        all_results[symbol] = symbol_results

    # Summary table
    print("\n" + "=" * 70)
    print("  SUMMARY — ALL SYMBOLS Ɨ BREAKOUT THRESHOLDS")
    print("=" * 70)
    print(f"  {'Symbol':<12} {'Breakout':>8} {'Sessions':>8} {'RTs':>6} {'PnL':>10} {'WR':>6} {'MaxDD':>8} {'ROI':>8}")
    print(f"  {'─'*66}")
    for sym, sym_res in all_results.items():
        for key, r in sym_res.items():
            bp = key.split('_')[1]
            roi = r['total_pnl'] / DEPOSIT * 100
            print(f"  {sym:<12} {bp+'%':>8} {r['sessions']:>8} {r['total_rts']:>6} ${r['total_pnl']:>8.2f} {r['win_rate']:>5.0f}% ${r['max_drawdown']:>6.2f} {roi:>6.1f}%")
    print("=" * 70)

    # Save
    out_path = Path(__file__).parent / 'results_grid_smart.json'
    with open(out_path, 'w') as f:
        json.dump(all_results, f, indent=2)
    print(f"\nšŸ’¾ Saved to {out_path}")

šŸ“œ Git History

c6f6bd5chore: initial commit — version control setup5 weeks ago
Show last diff
Loading...