← Back
ā˜†
"""
Hybrid Grid Backtest v4b — Tuned spacing + relaxed inventory
==============================================================
Changes from v4:
- Spacing: fixed ~0.4% center (0.35-0.50 range)
- Inventory warn: 3 → 8 (let grid breathe, fewer forced partial closes)
- Inventory cap: 5 → 12
- Unstuck threshold: 4 → 10

Usage:
    python backtest_hybrid_v4b.py
"""

import ccxt
import pandas as pd
import numpy as np
import json
import time
from datetime import datetime, timedelta, timezone

# ============================================================
# COINS
# ============================================================
COINS = [
    "ENA/USDT", "PENGU/USDT", "NEAR/USDT", "WLD/USDT", "UNI/USDT",
    "VIRTUAL/USDT", "SOL/USDT", "AVAX/USDT", "1000PEPE/USDT",
    "DOT/USDT", "ICP/USDT", "ETH/USDT", "TON/USDT",
    "LINK/USDT", "DOGE/USDT", "XRP/USDT", "AAVE/USDT",
    "FIL/USDT", "SUI/USDT", "OP/USDT", "ARB/USDT",
]

SCREENER_TFS = ["15m", "1h"]
GRID_TF = "1m"               # grid always simulated on 1m
LOOKBACK_DAYS = 14            # 14 days (1m data is heavy)

# ============================================================
# GRID CONFIG (v3 settings)
# ============================================================
SPACING_MIN_PCT = 0.35
SPACING_MAX_PCT = 0.50
ATR_SPACING_MULT = 0.55        # bias toward 0.4% center
ATR_PERIOD = 14
GRID_LEVELS = 8
LEVERAGE = 5
POSITION_SIZE = 3.0
MAKER_FEE = 0.0002
TAKER_FEE = 0.0004

# Inventory management — relaxed (let grid self-correct via RTs)
INVENTORY_WARN_LEVELS = 8      # was 3 → partial close only at 8 imbalance
INVENTORY_MAX_LEVELS = 12      # was 5 → hard cap at 12
UNSTUCK_THRESHOLD = 10         # was 4 → unstuck only at deep imbalance
UNSTUCK_CLOSE_PCT = 0.20       # close 20% of excess

# Trailing center
EMA_CENTER_PERIOD = 20
CENTER_UPDATE_MINS = 5        # update center every 5 min (5 x 1m bars)

# ============================================================
# SCREENER CONFIG
# ============================================================
BB_PERIOD = 20
BB_STD = 2.0
BB_WIDTH_MIN = 0.3
BB_WIDTH_MAX = 1.5
ADX_MAX_ENTRY = 25
ADX_PERIOD = 14
CHOP_PERIOD = 14
CHOP_MIN_ENTRY = 55
CHOP_MAX_EXIT = 40
BREAKOUT_BB_MULT = 1.5
BREAKOUT_ADX = 28

# Session
MIN_SESSION_MINS = {"15m": 150, "1h": 600}   # min 2.5h for 15m, 10h for 1h
MAX_SESSION_MINS = {"15m": 2880, "1h": 4320}  # max 2d for 15m, 3d for 1h

# Risk
MAX_LOSS_PER_SESSION = 5.0
DEPOSIT = 50.0


# ============================================================
# INDICATORS
# ============================================================

def fetch_klines(exchange, symbol, timeframe, days):
    all_klines = []
    since = exchange.parse8601(
        (datetime.now(timezone.utc) - timedelta(days=days)).isoformat()
    )
    limit = 1000
    while True:
        try:
            klines = exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=limit)
        except Exception as e:
            print(f"  Error {symbol} {timeframe}: {e}")
            return None
        if not klines:
            break
        all_klines.extend(klines)
        since = klines[-1][0] + 1
        if len(klines) < limit:
            break
        time.sleep(0.12)
    if not all_klines:
        return None
    df = pd.DataFrame(all_klines, columns=['ts', 'open', 'high', 'low', 'close', 'volume'])
    df['ts'] = pd.to_datetime(df['ts'], unit='ms')
    return df


def calc_atr(df, period=ATR_PERIOD):
    tr = pd.concat([
        df['high'] - df['low'],
        (df['high'] - df['close'].shift(1)).abs(),
        (df['low'] - df['close'].shift(1)).abs()
    ], axis=1).max(axis=1)
    return tr.ewm(alpha=1/period, min_periods=period).mean()


def calc_bb_width(close, period=BB_PERIOD, std=BB_STD):
    mid = close.rolling(period).mean()
    s = close.rolling(period).std()
    return ((mid + std * s) - (mid - std * s)) / mid * 100


def calc_adx(df, period=ADX_PERIOD):
    h, l, c = df['high'], df['low'], df['close']
    pdm = h.diff(); mdm = -l.diff()
    pdm = pdm.where((pdm > mdm) & (pdm > 0), 0.0)
    mdm = mdm.where((mdm > pdm) & (mdm > 0), 0.0)
    tr = pd.concat([h-l, (h-c.shift(1)).abs(), (l-c.shift(1)).abs()], axis=1).max(axis=1)
    atr = tr.ewm(alpha=1/period, min_periods=period).mean()
    pdi = 100*(pdm.ewm(alpha=1/period, min_periods=period).mean()/atr)
    mdi = 100*(mdm.ewm(alpha=1/period, min_periods=period).mean()/atr)
    dx = 100*(pdi-mdi).abs()/(pdi+mdi+1e-10)
    return dx.ewm(alpha=1/period, min_periods=period).mean()


def calc_chop(df, period=CHOP_PERIOD):
    tr = pd.concat([
        df['high']-df['low'],
        (df['high']-df['close'].shift(1)).abs(),
        (df['low']-df['close'].shift(1)).abs()
    ], axis=1).max(axis=1)
    atr_sum = tr.rolling(period).sum()
    hl = (df['high'].rolling(period).max() - df['low'].rolling(period).min()).replace(0, np.nan)
    return 100 * np.log10(atr_sum / hl) / np.log10(period)


def calc_ema(series, period):
    return series.ewm(span=period, adjust=False).mean()


def get_spacing(atr_val, price):
    if price <= 0: return SPACING_MIN_PCT
    raw = (atr_val / price) * 100 * ATR_SPACING_MULT
    return np.clip(raw, SPACING_MIN_PCT, SPACING_MAX_PCT)


# ============================================================
# SCREENER — finds session windows on higher TF
# ============================================================

def find_sessions(df_htf, screener_tf):
    """Find entry/exit timestamps on higher TF data."""
    bw = calc_bb_width(df_htf['close'])
    adx = calc_adx(df_htf)
    chop = calc_chop(df_htf)

    min_bars_htf = {"15m": 10, "1h": 10}[screener_tf]
    max_bars_htf = {"15m": 192, "1h": 72}[screener_tf]

    sessions = []
    i = max(BB_PERIOD, ADX_PERIOD, CHOP_PERIOD) + 5
    in_session = False
    session_start_i = 0

    while i < len(df_htf):
        b, a, c = bw.iloc[i], adx.iloc[i], chop.iloc[i]
        if pd.isna(b) or pd.isna(a) or pd.isna(c):
            i += 1; continue

        if not in_session:
            bb_ok = BB_WIDTH_MIN <= b <= BB_WIDTH_MAX
            adx_ok = a < ADX_MAX_ENTRY
            chop_ok = c > CHOP_MIN_ENTRY
            if bb_ok and adx_ok and chop_ok:
                in_session = True
                session_start_i = i
        else:
            bars_in = i - session_start_i
            exit_now = False
            reason = ""

            bb_break = b > BB_WIDTH_MAX * BREAKOUT_BB_MULT
            adx_break = a > BREAKOUT_ADX
            chop_trend = c < CHOP_MAX_EXIT
            if bars_in >= min_bars_htf and bb_break and (adx_break or chop_trend):
                exit_now = True; reason = "breakout"
            elif bars_in >= max_bars_htf:
                exit_now = True; reason = "max_time"

            if exit_now:
                sessions.append({
                    "entry_ts": df_htf['ts'].iloc[session_start_i],
                    "exit_ts": df_htf['ts'].iloc[i],
                    "bars_htf": bars_in,
                    "reason": reason,
                    "bw_entry": round(bw.iloc[session_start_i], 3),
                    "adx_entry": round(adx.iloc[session_start_i], 1),
                    "chop_entry": round(chop.iloc[session_start_i], 1),
                })
                in_session = False
                i += 3; continue
        i += 1
    return sessions


# ============================================================
# GRID SIMULATION on 1m data
# ============================================================

def simulate_grid_on_1m(df_1m, entry_ts, exit_ts):
    """
    Run grid sim on 1m bars between entry_ts and exit_ts.
    Returns detailed PnL breakdown.
    """
    mask = (df_1m['ts'] >= entry_ts) & (df_1m['ts'] <= exit_ts)
    df = df_1m[mask].reset_index(drop=True)

    if len(df) < 10:
        return None

    closes = df['close'].values
    highs = df['high'].values
    lows = df['low'].values
    n = len(closes)

    # Pre-calc
    atr_s = calc_atr(df, ATR_PERIOD)
    ema_s = calc_ema(df['close'], EMA_CENTER_PERIOD)

    notional = POSITION_SIZE * LEVERAGE  # $15

    center = closes[0]
    inv_levels = 0
    inv_cost = 0.0
    realized = 0.0
    rts = 0
    peak_inv = 0
    partial_cls = 0
    unstuck_cls = 0
    fees_total = 0.0

    for i in range(1, n):
        # Trailing center every N bars
        if i % CENTER_UPDATE_MINS == 0 and not pd.isna(ema_s.iloc[i]):
            center = ema_s.iloc[i]

        # ATR spacing
        atr_v = atr_s.iloc[i] if not pd.isna(atr_s.iloc[i]) else abs(closes[i]-closes[i-1])
        if pd.isna(atr_v): atr_v = abs(closes[i]-closes[i-1])
        sp_pct = get_spacing(atr_v, closes[i])
        spacing = closes[i] * sp_pct / 100
        if spacing <= 0: continue

        prev_c = closes[i-1]
        curr_c = closes[i]

        # Also use high/low for intra-bar crossings (more realistic)
        bar_low = lows[i]
        bar_high = highs[i]

        # Estimate crossings using full bar range, not just close-to-close
        # Price path: prev_close → bar extremes → curr_close
        # Conservative: count levels between bar_low and bar_high
        low_lvl = int(bar_low / spacing)
        high_lvl = int(bar_high / spacing)
        prev_lvl = int(prev_c / spacing)
        curr_lvl = int(curr_c / spacing)

        # Total levels traversed ā‰ˆ (high - low) / spacing
        # But net = curr_lvl - prev_lvl
        # Round trips within bar ā‰ˆ (total_traversed - abs(net)) / 2
        total_traversed = high_lvl - low_lvl
        net_move = curr_lvl - prev_lvl
        abs_net = abs(net_move)

        # Internal round-trips (price went back and forth within bar)
        internal_rts = max(0, (total_traversed - abs_net)) // 2

        # Process internal round-trips (clean profit, no inventory change)
        if internal_rts > 0:
            rt_gross = notional * sp_pct / 100
            rt_fee = notional * MAKER_FEE * 2
            rt_net = rt_gross - rt_fee
            realized += internal_rts * rt_net
            fees_total += internal_rts * rt_fee
            rts += internal_rts

        # Process net directional move (changes inventory)
        direction = 1 if net_move > 0 else -1 if net_move < 0 else 0
        for lc in range(min(abs_net, GRID_LEVELS)):
            fee = notional * MAKER_FEE
            fees_total += fee

            if direction > 0:  # price UP → sells fill
                fill_p = prev_c + spacing * (lc + 0.5)
                if inv_levels <= -INVENTORY_MAX_LEVELS:
                    break
                if inv_levels > 0:  # close long
                    rt_pnl = notional * (fill_p - inv_cost) / inv_cost - fee
                    realized += rt_pnl; inv_levels -= 1; rts += 1
                else:  # open/add short
                    if inv_levels == 0: inv_cost = fill_p
                    else:
                        inv_cost = (abs(inv_levels)*inv_cost + fill_p) / (abs(inv_levels)+1)
                    inv_levels -= 1; realized -= fee

            elif direction < 0:  # price DOWN → buys fill
                fill_p = prev_c - spacing * (lc + 0.5)
                if fill_p <= 0: continue
                if inv_levels >= INVENTORY_MAX_LEVELS:
                    break
                if inv_levels < 0:  # close short
                    rt_pnl = notional * (inv_cost - fill_p) / inv_cost - fee
                    realized += rt_pnl; inv_levels += 1; rts += 1
                else:  # open/add long
                    if inv_levels == 0: inv_cost = fill_p
                    else:
                        inv_cost = (abs(inv_levels)*inv_cost + fill_p) / (abs(inv_levels)+1)
                    inv_levels += 1; realized -= fee

        if abs(inv_levels) > peak_inv:
            peak_inv = abs(inv_levels)

        # Partial close at warn level
        if abs(inv_levels) >= INVENTORY_WARN_LEVELS and inv_cost > 0:
            excess = abs(inv_levels) - INVENTORY_WARN_LEVELS + 1
            to_close = max(1, excess // 2)
            if inv_levels > 0:
                pnl_per = notional * (curr_c - inv_cost) / inv_cost
            else:
                pnl_per = notional * (inv_cost - curr_c) / inv_cost
            cl_fee = to_close * notional * TAKER_FEE
            realized += to_close * pnl_per - cl_fee
            fees_total += cl_fee
            if inv_levels > 0: inv_levels -= to_close
            else: inv_levels += to_close
            partial_cls += to_close

        # Unstucking near EMA
        if abs(inv_levels) >= UNSTUCK_THRESHOLD and inv_cost > 0:
            ema_v = ema_s.iloc[i] if not pd.isna(ema_s.iloc[i]) else center
            dist = abs(curr_c - ema_v) / ema_v * 100
            if dist < sp_pct * 2:
                to_unstuck = max(1, int(abs(inv_levels) * UNSTUCK_CLOSE_PCT))
                if inv_levels > 0:
                    pnl_per = notional * (curr_c - inv_cost) / inv_cost
                else:
                    pnl_per = notional * (inv_cost - curr_c) / inv_cost
                u_fee = to_unstuck * notional * TAKER_FEE
                realized += to_unstuck * pnl_per - u_fee
                fees_total += u_fee
                if inv_levels > 0: inv_levels -= to_unstuck
                else: inv_levels += to_unstuck
                unstuck_cls += to_unstuck

        # Hard stop
        if inv_levels != 0 and inv_cost > 0:
            if inv_levels > 0:
                unreal = abs(inv_levels) * notional * (curr_c - inv_cost) / inv_cost
            else:
                unreal = abs(inv_levels) * notional * (inv_cost - curr_c) / inv_cost
            if realized + unreal <= -MAX_LOSS_PER_SESSION:
                cl_fee = abs(inv_levels) * notional * TAKER_FEE
                total = realized + unreal - cl_fee
                return {
                    "bars_1m": n, "realized": round(realized+unreal-cl_fee, 4),
                    "inventory_pnl": 0, "total_pnl": round(total, 4),
                    "rts": rts, "peak_inv": peak_inv,
                    "partial": partial_cls, "unstuck": unstuck_cls,
                    "fees": round(fees_total+cl_fee, 4),
                    "avg_spacing": round(sp_pct, 3),
                    "stopped": True,
                }

    # Session end — close inventory
    inv_pnl = 0.0
    if inv_levels != 0 and inv_cost > 0:
        final = closes[-1]
        if inv_levels > 0:
            inv_pnl = abs(inv_levels) * notional * (final - inv_cost) / inv_cost
        else:
            inv_pnl = abs(inv_levels) * notional * (inv_cost - final) / inv_cost
        cl_fee = abs(inv_levels) * notional * TAKER_FEE
        inv_pnl -= cl_fee
        fees_total += cl_fee

    return {
        "bars_1m": n,
        "realized": round(realized, 4),
        "inventory_pnl": round(inv_pnl, 4),
        "total_pnl": round(realized + inv_pnl, 4),
        "rts": rts,
        "peak_inv": peak_inv,
        "partial": partial_cls,
        "unstuck": unstuck_cls,
        "fees": round(fees_total, 4),
        "avg_spacing": round(sp_pct, 3),
        "stopped": False,
    }


# ============================================================
# MAIN
# ============================================================

def main():
    print("=" * 70)
    print("HYBRID GRID BACKTEST v4")
    print("Screener: 15m / 1h  |  Grid simulation: 1m")
    print("=" * 70)
    print(f"Period: {LOOKBACK_DAYS}d | Spacing: ATR {SPACING_MIN_PCT}-{SPACING_MAX_PCT}%")
    print(f"Leverage: {LEVERAGE}x | ${POSITION_SIZE}/level (notional ${POSITION_SIZE*LEVERAGE})")
    print(f"Inv mgmt: warn@{INVENTORY_WARN_LEVELS} cap@{INVENTORY_MAX_LEVELS} unstuck@{UNSTUCK_THRESHOLD}")
    print(f"Screener: BB+ADX+CHOP | Center: EMA({EMA_CENTER_PERIOD}) trailing")
    print("=" * 70)

    exchange = ccxt.binanceusdm({
        'enableRateLimit': True,
        'options': {'defaultType': 'future'},
    })

    all_results = {}

    for stf in SCREENER_TFS:
        print(f"\n{'='*60}")
        print(f"  SCREENER TF: {stf}  |  GRID TF: 1m")
        print(f"{'='*60}")

        stf_results = []

        for symbol in COINS:
            print(f"\n  {symbol}...", end=" ", flush=True)

            # Fetch screener TF
            df_htf = fetch_klines(exchange, symbol, stf, LOOKBACK_DAYS)
            if df_htf is None or len(df_htf) < 50:
                print("SKIP (no HTF data)")
                continue

            # Find sessions
            sessions = find_sessions(df_htf, stf)
            if not sessions:
                print("0 sessions")
                continue

            print(f"{len(sessions)} sessions found, fetching 1m...", end=" ", flush=True)

            # Fetch 1m data
            df_1m = fetch_klines(exchange, symbol, GRID_TF, LOOKBACK_DAYS)
            if df_1m is None or len(df_1m) < 100:
                print("SKIP (no 1m data)")
                continue

            # Simulate grid for each session
            coin_sessions = []
            for sess in sessions:
                result = simulate_grid_on_1m(df_1m, sess['entry_ts'], sess['exit_ts'])
                if result is None:
                    continue
                result.update({
                    "entry_ts": str(sess['entry_ts']),
                    "exit_ts": str(sess['exit_ts']),
                    "reason": sess['reason'],
                    "bw_entry": sess['bw_entry'],
                    "adx_entry": sess['adx_entry'],
                    "chop_entry": sess['chop_entry'],
                })
                coin_sessions.append(result)

            if not coin_sessions:
                print("0 valid sims")
                continue

            total = sum(s['total_pnl'] for s in coin_sessions)
            real = sum(s['realized'] for s in coin_sessions)
            inv = sum(s['inventory_pnl'] for s in coin_sessions)
            wins = sum(1 for s in coin_sessions if s['total_pnl'] > 0)
            wr = wins / len(coin_sessions) * 100
            total_rts = sum(s['rts'] for s in coin_sessions)
            total_fees = sum(s['fees'] for s in coin_sessions)
            avg_1m_bars = np.mean([s['bars_1m'] for s in coin_sessions])
            stops = sum(1 for s in coin_sessions if s['stopped'])

            print(f"PnL ${total:+.2f} (real ${real:+.2f} inv ${inv:+.2f}) | "
                  f"WR {wr:.0f}% ({wins}/{len(coin_sessions)}) | "
                  f"{total_rts} RTs | fees ${total_fees:.2f} | "
                  f"avg {avg_1m_bars:.0f} 1m bars | stops:{stops}")

            stf_results.append({
                "symbol": symbol,
                "sessions": len(coin_sessions),
                "total_pnl": round(total, 2),
                "realized_pnl": round(real, 2),
                "inventory_pnl": round(inv, 2),
                "win_rate": round(wr, 1),
                "wins": wins,
                "total_rts": total_rts,
                "total_fees": round(total_fees, 2),
                "avg_bars_1m": round(avg_1m_bars, 0),
                "stopped_sessions": stops,
                "detail": coin_sessions,
            })

        all_results[stf] = stf_results

    # ============================================================
    # SUMMARY
    # ============================================================
    print("\n\n" + "=" * 70)
    print("SUMMARY — HYBRID v4 (screener HTF + grid 1m)")
    print("=" * 70)

    for stf in SCREENER_TFS:
        data = all_results.get(stf, [])
        if not data: print(f"\n{stf}: No data"); continue

        tp = sum(c['total_pnl'] for c in data)
        rp = sum(c['realized_pnl'] for c in data)
        ip = sum(c['inventory_pnl'] for c in data)
        sess = sum(c['sessions'] for c in data)
        wins = sum(c['wins'] for c in data)
        rts = sum(c['total_rts'] for c in data)
        fees = sum(c['total_fees'] for c in data)
        coins_up = sum(1 for c in data if c['total_pnl'] > 0)
        wr = wins/sess*100 if sess > 0 else 0

        print(f"\nšŸ“Š Screener {stf} + Grid 1m:")
        print(f"   Total PnL: ${tp:+.2f} (realized ${rp:+.2f}, inventory ${ip:+.2f})")
        print(f"   Fees paid: ${fees:.2f}")
        print(f"   Sessions: {sess} (wins {wins}, WR {wr:.1f}%)")
        print(f"   Round-trips: {rts}")
        print(f"   Profitable coins: {coins_up}/{len(data)}")
        if sess > 0:
            print(f"   Avg PnL/session: ${tp/sess:+.3f}")
            print(f"   Avg PnL/day: ${tp/LOOKBACK_DAYS:+.2f}")

        srt = sorted(data, key=lambda c: c['total_pnl'], reverse=True)
        print(f"\n   🟢 Top 5:")
        for c in srt[:5]:
            print(f"      {c['symbol']:15s} ${c['total_pnl']:+8.2f} "
                  f"(real ${c['realized_pnl']:+.2f} inv ${c['inventory_pnl']:+.2f}) "
                  f"WR {c['win_rate']:.0f}% {c['total_rts']} RTs "
                  f"{c['sessions']} sess fees ${c['total_fees']:.2f}")
        if len(srt) > 5:
            print(f"\n   šŸ”“ Bottom 5:")
            for c in srt[-5:]:
                print(f"      {c['symbol']:15s} ${c['total_pnl']:+8.2f} "
                      f"(real ${c['realized_pnl']:+.2f} inv ${c['inventory_pnl']:+.2f}) "
                      f"WR {c['win_rate']:.0f}% {c['sessions']} sess")

    # Comparison table
    print("\n\n" + "=" * 70)
    print("SCREENER TF COMPARISON (both use 1m for grid)")
    print("=" * 70)
    hdr = f"  {'Scr':4s} | {'Total':>9s} | {'Real':>9s} | {'Inv':>9s} | {'Fees':>7s} | {'Sess':>5s} | {'WR':>5s} | {'RTs':>6s} | {'$/sess':>7s} | {'$/day':>7s}"
    print(hdr)
    print("  " + "-" * 90)
    for stf in SCREENER_TFS:
        d = all_results.get(stf, [])
        tp = sum(c['total_pnl'] for c in d)
        rp = sum(c['realized_pnl'] for c in d)
        ip = sum(c['inventory_pnl'] for c in d)
        fe = sum(c['total_fees'] for c in d)
        ss = sum(c['sessions'] for c in d)
        wi = sum(c['wins'] for c in d)
        rt = sum(c['total_rts'] for c in d)
        wr = wi/ss*100 if ss > 0 else 0
        av = tp/ss if ss > 0 else 0
        dy = tp/LOOKBACK_DAYS
        print(f"  {stf:4s} | ${tp:+8.2f} | ${rp:+8.2f} | ${ip:+8.2f} | ${fe:6.2f} | {ss:5d} | {wr:4.0f}% | {rt:6d} | ${av:+6.2f} | ${dy:+6.2f}")

    print("\n\nPrevious results for reference:")
    print("  v2 (15m screener+grid): $-2542, 0% WR")
    print("  v3 (15m screener+grid): $-689, 2.5% WR")
    print("  v4 (hybrid) results above ↑↑↑")

    # Save
    out = "/home/app/trading-bot/grid-bot/backtests/results_hybrid_v4.json"
    compact = {}
    for stf in SCREENER_TFS:
        compact[stf] = [{k:v for k,v in c.items() if k != 'detail'} for c in all_results.get(stf, [])]
    with open(out, 'w') as f:
        json.dump(compact, f, indent=2)
    print(f"\nšŸ’¾ Saved: {out}")


if __name__ == "__main__":
    main()

šŸ“œ Git History

c6f6bd5chore: initial commit — version control setup5 weeks ago
Show last diff
Loading...