← Back
ā˜†
"""
Session Grid Backtest v2 — Fixed Model
========================================
Proper grid simulation with inventory tracking.

Grid mechanics:
- Grid centered at entry price
- Buy limit orders below center, sell limit orders above
- When price crosses DOWN through a level → BUY fills (we go LONG)
- When price crosses UP through a level → SELL fills (we go SHORT)
- Each complete buy-then-sell or sell-then-buy = round-trip profit

Inventory risk:
- If price drifts up: we accumulate short inventory (sells filled, buys not)
- If price drifts down: we accumulate long inventory (buys filled, sells not)
- At session close, we CLOSE inventory at market → realized P&L

Usage:
    python backtest_session_grid_v2.py
"""

import ccxt
import pandas as pd
import numpy as np
import json
import time
from datetime import datetime, timedelta, timezone

# ============================================================
# CONFIG
# ============================================================
COINS = [
    "ENA/USDT", "PENGU/USDT", "NEAR/USDT", "WLD/USDT", "UNI/USDT",
    "VIRTUAL/USDT", "SOL/USDT", "AVAX/USDT", "1000PEPE/USDT",
    "DOT/USDT", "1000SHIB/USDT", "ICP/USDT", "ETH/USDT", "TON/USDT",
    "LINK/USDT", "DOGE/USDT", "XRP/USDT", "AAVE/USDT", "ADA/USDT",
    "FIL/USDT", "ONDO/USDT", "SUI/USDT", "OP/USDT", "ARB/USDT",
]

TIMEFRAMES = ["15m", "1h"]
LOOKBACK_DAYS = 30

# Grid params
GRID_SPACING_PCT = 0.1      # 0.1% between levels
GRID_LEVELS = 8             # 8 per side (16 total orders)
POSITION_SIZE = 3.0         # $3 notional per level
LEVERAGE = 10               # effective position = $30 per level
MAKER_FEE = 0.0002          # 0.02% (limit order fill)
TAKER_FEE = 0.0004          # 0.04% (market close)

# Screener params (BB/ADX on the SAME timeframe as grid)
BB_PERIOD = 20
BB_STD = 2.0
BB_WIDTH_MIN = 0.3
BB_WIDTH_MAX = 1.5
ADX_MAX_ENTRY = 25
ADX_PERIOD = 14

# Breakout exit
BREAKOUT_BB_MULT = 1.5
BREAKOUT_ADX = 30

# Session limits
MIN_SESSION_BARS = 10
MAX_SESSION_BARS = {"15m": 192, "1h": 72}  # ~2d for 15m, 3d for 1h

# Risk
DEPOSIT = 50.0
MAX_LOSS_PER_SESSION = 3.0  # $3 max loss per session (hard stop)


def fetch_klines(exchange, symbol, timeframe, days):
    """Fetch historical klines from Binance Futures."""
    all_klines = []
    since = exchange.parse8601(
        datetime.now(timezone.utc).replace(microsecond=0)
        .__sub__(timedelta(days=days)).isoformat()
    )
    limit = 1000

    while True:
        try:
            klines = exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=limit)
        except Exception as e:
            print(f"  Error {symbol} {timeframe}: {e}")
            return None
        if not klines:
            break
        all_klines.extend(klines)
        since = klines[-1][0] + 1
        if len(klines) < limit:
            break
        time.sleep(0.12)

    if not all_klines:
        return None

    df = pd.DataFrame(all_klines, columns=['ts', 'open', 'high', 'low', 'close', 'volume'])
    df['ts'] = pd.to_datetime(df['ts'], unit='ms')
    return df


def calc_bb_width(close, period=BB_PERIOD, std=BB_STD):
    mid = close.rolling(period).mean()
    s = close.rolling(period).std()
    return ((mid + std * s) - (mid - std * s)) / mid * 100


def calc_adx(df, period=ADX_PERIOD):
    high, low, close = df['high'], df['low'], df['close']
    plus_dm = high.diff()
    minus_dm = -low.diff()
    plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0.0)
    minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0.0)
    tr = pd.concat([high - low, (high - close.shift(1)).abs(), (low - close.shift(1)).abs()], axis=1).max(axis=1)
    atr = tr.ewm(alpha=1/period, min_periods=period).mean()
    plus_di = 100 * (plus_dm.ewm(alpha=1/period, min_periods=period).mean() / atr)
    minus_di = 100 * (minus_dm.ewm(alpha=1/period, min_periods=period).mean() / atr)
    dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-10)
    return dx.ewm(alpha=1/period, min_periods=period).mean()


def is_range_entry(bw, adx):
    return (BB_WIDTH_MIN <= bw <= BB_WIDTH_MAX) and (adx < ADX_MAX_ENTRY)


def is_breakout_exit(bw, adx):
    return (bw > BB_WIDTH_MAX * BREAKOUT_BB_MULT) and (adx > BREAKOUT_ADX)


def simulate_grid_session(df_session, center_price):
    """
    Proper grid simulation with inventory tracking.

    Model: price moves between bars. For each bar, we use close-to-close movement.
    When price crosses a grid level, an order fills.

    Returns: (realized_pnl, inventory_pnl, total_pnl, round_trips, peak_inventory_usd)
    """
    closes = df_session['close'].values
    highs = df_session['high'].values
    lows = df_session['low'].values

    spacing = center_price * GRID_SPACING_PCT / 100
    if spacing <= 0:
        return 0, 0, 0, 0, 0

    notional_per_level = POSITION_SIZE * LEVERAGE  # $30

    # Track inventory: positive = long, negative = short (in units of notional)
    # Each grid fill changes inventory by ±1 level
    inventory_levels = 0  # net position in grid levels
    inventory_cost = 0.0  # weighted average cost basis
    realized_pnl = 0.0
    round_trips = 0
    peak_inventory = 0
    max_unrealized_loss = 0
    running_pnl = 0

    # Use close-to-close level tracking
    center_level = center_price / spacing

    for i in range(1, len(closes)):
        prev_close = closes[i-1]
        curr_close = closes[i]

        # What levels were crossed?
        prev_lvl = prev_close / spacing
        curr_lvl = curr_close / spacing

        # Direction of movement
        if curr_lvl > prev_lvl:
            # Price went UP → sell limit orders filled
            levels_crossed = int(curr_lvl) - int(prev_lvl)
            for _ in range(min(levels_crossed, GRID_LEVELS)):
                fill_price_approx = prev_close + spacing * (_ + 0.5)
                fee = notional_per_level * MAKER_FEE

                if inventory_levels > 0:
                    # We're long, this sell CLOSES a long → round-trip
                    rt_pnl = notional_per_level * (fill_price_approx - inventory_cost) / inventory_cost - fee
                    realized_pnl += rt_pnl
                    inventory_levels -= 1
                    round_trips += 1
                else:
                    # We go shorter (open new short or add to short)
                    if inventory_levels == 0:
                        inventory_cost = fill_price_approx
                    else:
                        # Average cost for shorts
                        total_notional = abs(inventory_levels) * inventory_cost + fill_price_approx
                        inventory_cost = total_notional / (abs(inventory_levels) + 1)
                    inventory_levels -= 1
                    realized_pnl -= fee

        elif curr_lvl < prev_lvl:
            # Price went DOWN → buy limit orders filled
            levels_crossed = int(prev_lvl) - int(curr_lvl)
            for _ in range(min(levels_crossed, GRID_LEVELS)):
                fill_price_approx = prev_close - spacing * (_ + 0.5)
                fee = notional_per_level * MAKER_FEE

                if inventory_levels < 0:
                    # We're short, this buy CLOSES a short → round-trip
                    rt_pnl = notional_per_level * (inventory_cost - fill_price_approx) / inventory_cost - fee
                    realized_pnl += rt_pnl
                    inventory_levels += 1
                    round_trips += 1
                else:
                    # We go longer
                    if inventory_levels == 0:
                        inventory_cost = fill_price_approx
                    else:
                        total_notional = abs(inventory_levels) * inventory_cost + fill_price_approx
                        inventory_cost = total_notional / (abs(inventory_levels) + 1)
                    inventory_levels += 1
                    realized_pnl -= fee

        # Track peak inventory
        inv_size = abs(inventory_levels)
        if inv_size > peak_inventory:
            peak_inventory = inv_size

        # Check unrealized on current inventory
        if inventory_levels != 0 and inventory_cost > 0:
            if inventory_levels > 0:
                unrealized = abs(inventory_levels) * notional_per_level * (curr_close - inventory_cost) / inventory_cost
            else:
                unrealized = abs(inventory_levels) * notional_per_level * (inventory_cost - curr_close) / inventory_cost
            total_now = realized_pnl + unrealized
            if total_now < max_unrealized_loss:
                max_unrealized_loss = total_now

            # Hard stop check
            if total_now <= -MAX_LOSS_PER_SESSION:
                # Close inventory at market
                close_fee = abs(inventory_levels) * notional_per_level * TAKER_FEE
                inventory_pnl = unrealized - close_fee
                return realized_pnl, inventory_pnl, realized_pnl + inventory_pnl, round_trips, peak_inventory

    # Session end — close remaining inventory at market
    final_price = closes[-1]
    inventory_pnl = 0.0
    if inventory_levels != 0 and inventory_cost > 0:
        if inventory_levels > 0:
            inventory_pnl = abs(inventory_levels) * notional_per_level * (final_price - inventory_cost) / inventory_cost
        else:
            inventory_pnl = abs(inventory_levels) * notional_per_level * (inventory_cost - final_price) / inventory_cost
        close_fee = abs(inventory_levels) * notional_per_level * TAKER_FEE
        inventory_pnl -= close_fee

    total_pnl = realized_pnl + inventory_pnl
    return round(realized_pnl, 4), round(inventory_pnl, 4), round(total_pnl, 4), round_trips, peak_inventory


def backtest_coin_tf(df, timeframe):
    bb_width = calc_bb_width(df['close'])
    adx = calc_adx(df)
    max_bars = MAX_SESSION_BARS.get(timeframe, 96)
    sessions = []
    i = BB_PERIOD + ADX_PERIOD
    in_session = False
    session_start = 0
    entry_price = 0

    while i < len(df):
        bw = bb_width.iloc[i]
        ax = adx.iloc[i]

        if pd.isna(bw) or pd.isna(ax):
            i += 1
            continue

        if not in_session:
            if is_range_entry(bw, ax):
                in_session = True
                session_start = i
                entry_price = df['close'].iloc[i]
                i += 1
                continue
        else:
            bars_in = i - session_start
            should_exit = False
            exit_reason = ""

            if bars_in >= MIN_SESSION_BARS and is_breakout_exit(bw, ax):
                should_exit = True
                exit_reason = "breakout"
            elif bars_in >= max_bars:
                should_exit = True
                exit_reason = "max_time"

            if should_exit:
                session_df = df.iloc[session_start:i+1]
                real_pnl, inv_pnl, total_pnl, rts, peak_inv = simulate_grid_session(session_df, entry_price)

                sessions.append({
                    "start": str(df['ts'].iloc[session_start]),
                    "end": str(df['ts'].iloc[i]),
                    "bars": bars_in,
                    "entry_price": round(entry_price, 6),
                    "exit_price": round(df['close'].iloc[i], 6),
                    "realized_pnl": real_pnl,
                    "inventory_pnl": inv_pnl,
                    "total_pnl": total_pnl,
                    "round_trips": rts,
                    "peak_inventory": peak_inv,
                    "exit_reason": exit_reason,
                    "bb_width_entry": round(bb_width.iloc[session_start], 3),
                    "adx_entry": round(adx.iloc[session_start], 1),
                    "bb_width_exit": round(bw, 3),
                    "adx_exit": round(ax, 1),
                })

                in_session = False
                i += 3  # cooldown
                continue
        i += 1

    return sessions


def main():
    print("=" * 70)
    print("SESSION GRID BACKTEST v2 — Proper Inventory Model")
    print(f"Period: {LOOKBACK_DAYS}d | Grid: {GRID_SPACING_PCT}% spacing, "
          f"${POSITION_SIZE}Ɨ{LEVERAGE}x/level, {GRID_LEVELS} levels/side")
    print(f"Entry: BB {BB_WIDTH_MIN}-{BB_WIDTH_MAX}%, ADX<{ADX_MAX_ENTRY}")
    print(f"Exit: BB>{BB_WIDTH_MAX*BREAKOUT_BB_MULT:.1f}% + ADX>{BREAKOUT_ADX} | "
          f"or max_time ({MAX_SESSION_BARS})")
    print(f"Stop-loss: ${MAX_LOSS_PER_SESSION}/session")
    print("=" * 70)

    exchange = ccxt.binanceusdm({
        'enableRateLimit': True,
        'options': {'defaultType': 'future'},
    })

    results = {}

    for tf in TIMEFRAMES:
        print(f"\n{'='*60}")
        print(f"  TIMEFRAME: {tf}")
        print(f"{'='*60}")

        tf_results = []

        for symbol in COINS:
            print(f"\n  {symbol} ({tf})...", end=" ", flush=True)
            df = fetch_klines(exchange, symbol, tf, LOOKBACK_DAYS)

            if df is None or len(df) < BB_PERIOD + ADX_PERIOD + 20:
                print("SKIP")
                continue

            sessions = backtest_coin_tf(df, tf)

            if not sessions:
                print(f"0 sessions")
                continue

            total_pnl = sum(s['total_pnl'] for s in sessions)
            real_pnl = sum(s['realized_pnl'] for s in sessions)
            inv_pnl = sum(s['inventory_pnl'] for s in sessions)
            win_sessions = [s for s in sessions if s['total_pnl'] > 0]
            wr = len(win_sessions) / len(sessions) * 100
            total_rts = sum(s['round_trips'] for s in sessions)
            avg_bars = np.mean([s['bars'] for s in sessions])
            avg_peak = np.mean([s['peak_inventory'] for s in sessions])

            print(f"{len(sessions)} sess | PnL ${total_pnl:+.2f} (grid ${real_pnl:+.2f}, inv ${inv_pnl:+.2f}) | "
                  f"WR {wr:.0f}% | {total_rts} RTs | avg {avg_bars:.0f} bars, peak_inv {avg_peak:.1f}")

            tf_results.append({
                "symbol": symbol,
                "sessions": len(sessions),
                "total_pnl": round(total_pnl, 2),
                "realized_pnl": round(real_pnl, 2),
                "inventory_pnl": round(inv_pnl, 2),
                "win_rate": round(wr, 1),
                "total_round_trips": total_rts,
                "avg_bars": round(avg_bars, 1),
                "avg_peak_inventory": round(avg_peak, 1),
                "sessions_detail": sessions,
            })

        results[tf] = tf_results

    # ============================================================
    # SUMMARY
    # ============================================================
    print("\n\n" + "=" * 70)
    print("SUMMARY — SESSION GRID v2")
    print("=" * 70)

    for tf in TIMEFRAMES:
        tf_data = results.get(tf, [])
        if not tf_data:
            print(f"\n{tf}: No data")
            continue

        total_pnl = sum(c['total_pnl'] for c in tf_data)
        total_real = sum(c['realized_pnl'] for c in tf_data)
        total_inv = sum(c['inventory_pnl'] for c in tf_data)
        total_sess = sum(c['sessions'] for c in tf_data)
        total_rts = sum(c['total_round_trips'] for c in tf_data)
        coins_profit = sum(1 for c in tf_data if c['total_pnl'] > 0)

        print(f"\nšŸ“Š {tf} TIMEFRAME:")
        print(f"   Total PnL: ${total_pnl:+.2f} (grid ${total_real:+.2f}, inventory ${total_inv:+.2f})")
        print(f"   Sessions: {total_sess} | Round-trips: {total_rts}")
        print(f"   Profitable coins: {coins_profit}/{len(tf_data)}")
        if total_sess > 0:
            print(f"   Avg PnL/session: ${total_pnl/total_sess:+.2f}")
            print(f"   Avg PnL/day (est): ${total_pnl / LOOKBACK_DAYS:+.2f}")

        sorted_coins = sorted(tf_data, key=lambda c: c['total_pnl'], reverse=True)
        print(f"\n   🟢 Top 5:")
        for c in sorted_coins[:5]:
            print(f"      {c['symbol']:15s} ${c['total_pnl']:+8.2f} "
                  f"(grid ${c['realized_pnl']:+.2f} inv ${c['inventory_pnl']:+.2f}) "
                  f"{c['sessions']} sess WR {c['win_rate']:.0f}% {c['total_round_trips']} RTs")

        if len(sorted_coins) > 5:
            print(f"\n   šŸ”“ Bottom 5:")
            for c in sorted_coins[-5:]:
                print(f"      {c['symbol']:15s} ${c['total_pnl']:+8.2f} "
                      f"(grid ${c['realized_pnl']:+.2f} inv ${c['inventory_pnl']:+.2f}) "
                      f"{c['sessions']} sess WR {c['win_rate']:.0f}%")

    # Compare
    print("\n\n" + "=" * 70)
    print("15m vs 1h COMPARISON")
    print("=" * 70)
    print(f"  {'TF':4s} | {'Total PnL':>10s} | {'Grid PnL':>10s} | {'Inv PnL':>10s} | {'Sess':>5s} | {'RTs':>6s} | {'$/sess':>8s} | {'$/day':>7s}")
    print(f"  {'-'*4}-+-{'-'*10}-+-{'-'*10}-+-{'-'*10}-+-{'-'*5}-+-{'-'*6}-+-{'-'*8}-+-{'-'*7}")
    for tf in TIMEFRAMES:
        tf_data = results.get(tf, [])
        tp = sum(c['total_pnl'] for c in tf_data)
        rp = sum(c['realized_pnl'] for c in tf_data)
        ip = sum(c['inventory_pnl'] for c in tf_data)
        sess = sum(c['sessions'] for c in tf_data)
        rts = sum(c['total_round_trips'] for c in tf_data)
        avg = tp / sess if sess > 0 else 0
        day = tp / LOOKBACK_DAYS
        print(f"  {tf:4s} | ${tp:+9.2f} | ${rp:+9.2f} | ${ip:+9.2f} | {sess:5d} | {rts:6d} | ${avg:+7.2f} | ${day:+6.2f}")

    # Save
    output_file = "/home/app/trading-bot/grid-bot/backtests/results_session_grid_v2.json"
    compact = {}
    for tf in TIMEFRAMES:
        compact[tf] = [{k: v for k, v in c.items() if k != 'sessions_detail'} for c in results.get(tf, [])]
    with open(output_file, 'w') as f:
        json.dump(compact, f, indent=2)
    print(f"\nšŸ’¾ Saved: {output_file}")


if __name__ == "__main__":
    main()

šŸ“œ Git History

c6f6bd5chore: initial commit — version control setup5 weeks ago
Show last diff
Loading...