← Back
"""
Sideways/Range Screener Backtest — 1H vs 15m
Ищем боковики/запилы по параметрам на старшем ТФ,
запускаем грид на 1m данных внутри окна.

Сравниваем:
  A) 1H screener → грид на 1m в часовом окне
  B) 15m screener → грид на 1m в 15-минутном окне
  C) Baseline — грид без фильтра (каждый час)

Скоринг боковика (0-100):
  - ADX(14) < 20         → 25 pts (нет тренда)
  - BB Width 1.5-4%      → 20 pts (активный диапазон)
  - Range Position 0.3-0.7 → 20 pts (цена в середине)
  - Direction Changes    → 20 pts (запил/пила)
  - NATR 0.15-0.6%       → 15 pts (достаточная вола)

Usage: python3 backtest_sideways_screener.py
"""

import requests
import pandas as pd
import numpy as np
import time
import json
from datetime import datetime, timedelta
from pathlib import Path

# ============================================================
# CONFIG
# ============================================================
SYMBOLS = [
    "ETHUSDT", "DOGEUSDT", "PENGUUSDT", "ENAUSDT",
    "NEARUSDT", "WLDUSDT", "UNIUSDT", "SOLUSDT",
    "ADAUSDT", "XRPUSDT", "AVAXUSDT", "LINKUSDT",
    "PEPEUSDT", "SUIUSDT", "ARBUSDT", "OPUSDT",
]

DAYS_BACK = 14  # 2 weeks of data
DEPOSIT = 50.0
LEVERAGE = 10
POSITION_SIZE_USD = 3.0
FEE_PCT = 0.02 / 100  # maker 0.02%

GRID_LEVELS = 8
GRID_SPACING_PCT = 0.1
MAX_LOSS_PCT = 3.0

# Screener thresholds
SCORE_THRESHOLD = 50  # min score to enter grid

# Indicator periods (on screener TF)
BB_PERIOD = 20
BB_STD = 2.0
ADX_PERIOD = 14
ATR_PERIOD = 14
RANGE_LOOKBACK = 24  # candles for range position (1H: 24h, 15m: 6h)


# ============================================================
# DATA FETCH
# ============================================================
def fetch_klines(symbol, interval, days_back):
    """Fetch klines from Binance Futures API"""
    url = "https://fapi.binance.com/fapi/v1/klines"
    end_ts = int(time.time() * 1000)
    start_ts = int((time.time() - days_back * 86400) * 1000)

    all_candles = []
    current_start = start_ts

    while current_start < end_ts:
        params = {
            "symbol": symbol,
            "interval": interval,
            "startTime": current_start,
            "limit": 1500
        }
        try:
            resp = requests.get(url, params=params, timeout=10)
            data = resp.json()
            if not isinstance(data, list) or len(data) == 0:
                break
            all_candles.extend(data)
            current_start = data[-1][0] + 1
            time.sleep(0.08)
        except Exception as e:
            print(f"  [fetch] Error: {e}, retrying...")
            time.sleep(1)
            continue

    df = pd.DataFrame(all_candles, columns=[
        'timestamp', 'open', 'high', 'low', 'close', 'volume',
        'close_time', 'quote_volume', 'trades', 'taker_buy_base',
        'taker_buy_quote', 'ignore'
    ])

    for col in ['open', 'high', 'low', 'close', 'volume', 'quote_volume']:
        df[col] = df[col].astype(float)

    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
    df = df.drop_duplicates(subset='timestamp').sort_values('timestamp').reset_index(drop=True)
    return df


# ============================================================
# INDICATORS
# ============================================================
def calc_indicators(df):
    """Calculate all indicators for screener"""
    # Bollinger Bands
    df['bb_mid'] = df['close'].rolling(BB_PERIOD).mean()
    df['bb_std'] = df['close'].rolling(BB_PERIOD).std()
    df['bb_upper'] = df['bb_mid'] + BB_STD * df['bb_std']
    df['bb_lower'] = df['bb_mid'] - BB_STD * df['bb_std']
    df['bb_width'] = ((df['bb_upper'] - df['bb_lower']) / df['bb_mid']) * 100

    # ADX
    high, low, close = df['high'], df['low'], df['close']
    plus_dm = high.diff()
    minus_dm = -low.diff()
    plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0.0)
    minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0.0)

    tr1 = high - low
    tr2 = (high - close.shift(1)).abs()
    tr3 = (low - close.shift(1)).abs()
    tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)

    atr = tr.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean()
    plus_di = 100 * (plus_dm.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean() / atr)
    minus_di = 100 * (minus_dm.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean() / atr)
    dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-10)
    df['adx'] = dx.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean()

    # ATR & NATR
    df['atr'] = atr
    df['natr'] = (atr / df['close']) * 100

    # Range position (price position in recent high-low range)
    df['range_high'] = df['high'].rolling(RANGE_LOOKBACK).max()
    df['range_low'] = df['low'].rolling(RANGE_LOOKBACK).min()
    range_span = df['range_high'] - df['range_low']
    df['range_pos'] = (df['close'] - df['range_low']) / range_span.replace(0, np.nan)

    # Direction changes (how many candles changed direction in lookback)
    df['candle_dir'] = np.where(df['close'] > df['open'], 1, -1)
    df['dir_change'] = (df['candle_dir'] != df['candle_dir'].shift(1)).astype(int)
    df['dir_changes'] = df['dir_change'].rolling(RANGE_LOOKBACK).sum()

    return df


# ============================================================
# SIDEWAYS SCORER
# ============================================================
def calc_sideways_score(row):
    """
    Score 0-100 how "sideways" the market is.
    Higher = better for grid.
    """
    score = 0

    adx = row.get('adx', np.nan)
    bb_w = row.get('bb_width', np.nan)
    range_pos = row.get('range_pos', np.nan)
    dir_changes = row.get('dir_changes', np.nan)
    natr = row.get('natr', np.nan)

    if any(pd.isna(x) for x in [adx, bb_w, range_pos, dir_changes, natr]):
        return 0

    # 1. ADX < 20 → 25 pts (lower = more sideways)
    if adx <= 5:
        score += 25
    elif adx <= 20:
        score += 25 * (1 - (adx - 5) / 15)
    elif adx <= 30:
        score += max(0, -5 * (adx - 20) / 10)  # penalty zone
    else:
        score -= 10  # trending hard

    # 2. BB Width 1.5-4%, peak at 2.5% → 20 pts
    if 1.5 <= bb_w <= 4.0:
        # Triangle peak at 2.5%
        if bb_w <= 2.5:
            score += 20 * (bb_w - 1.5) / 1.0
        else:
            score += 20 * (4.0 - bb_w) / 1.5
    elif 0.5 <= bb_w < 1.5:
        score += 5  # too tight but not zero
    # else: too wide, 0 pts

    # 3. Range Position 0.3-0.7, peak at 0.5 → 20 pts
    if 0.3 <= range_pos <= 0.7:
        # Peak at 0.5
        dist_from_center = abs(range_pos - 0.5)
        score += 20 * (1 - dist_from_center / 0.2)
    elif 0.2 <= range_pos < 0.3 or 0.7 < range_pos <= 0.8:
        score += 5  # edges, some points
    # else: at extreme, 0 pts

    # 4. Direction Changes → 20 pts (more changes = more chop)
    # max useful = RANGE_LOOKBACK * 0.7 (70% of candles are reversals)
    max_changes = RANGE_LOOKBACK * 0.7
    if dir_changes >= 8:
        score += min(20, 20 * (dir_changes - 8) / (max_changes - 8))

    # 5. NATR 0.15-0.6%, peak at 0.3% → 15 pts
    if 0.15 <= natr <= 0.6:
        if natr <= 0.3:
            score += 15 * (natr - 0.15) / 0.15
        else:
            score += 15 * (0.6 - natr) / 0.3
    elif 0.1 <= natr < 0.15:
        score += 3  # tiny vol

    return max(0, round(score, 1))


# ============================================================
# GRID ENGINE (on 1m candles)
# ============================================================
def run_grid_session(df_1m, start_idx, duration_candles):
    """
    Run a single grid session on 1m data.
    Returns: dict with pnl, trades, fees, close_reason
    """
    if start_idx + 5 >= len(df_1m):
        return None

    end_idx = min(start_idx + duration_candles, len(df_1m) - 1)
    mid_price = df_1m['close'].iloc[start_idx]

    # Build grid levels
    buy_levels = []
    sell_levels = []
    for lvl in range(1, GRID_LEVELS + 1):
        buy_levels.append(mid_price * (1 - lvl * GRID_SPACING_PCT / 100))
        sell_levels.append(mid_price * (1 + lvl * GRID_SPACING_PCT / 100))

    buy_fills = [False] * GRID_LEVELS
    sell_fills = [False] * GRID_LEVELS
    positions = []
    session_pnl = 0.0
    session_fees = 0.0
    session_trades = 0
    round_trips = 0
    max_capital = GRID_LEVELS * 2 * POSITION_SIZE_USD
    max_loss = max_capital * MAX_LOSS_PCT / 100
    close_reason = 'timeout'

    for j in range(start_idx + 1, end_idx):
        price = df_1m['close'].iloc[j]
        low_px = df_1m['low'].iloc[j]
        high_px = df_1m['high'].iloc[j]

        # Check buy fills
        for lvl in range(GRID_LEVELS):
            if not buy_fills[lvl] and low_px <= buy_levels[lvl]:
                buy_fills[lvl] = True
                positions.append(('long', buy_levels[lvl], POSITION_SIZE_USD))
                session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT
                session_trades += 1

        # Check sell fills
        for lvl in range(GRID_LEVELS):
            if not sell_fills[lvl] and high_px >= sell_levels[lvl]:
                sell_fills[lvl] = True
                positions.append(('short', sell_levels[lvl], POSITION_SIZE_USD))
                session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT
                session_trades += 1

        # Realize round-trips
        for lvl in range(GRID_LEVELS):
            if buy_fills[lvl] and sell_fills[lvl]:
                spread = sell_levels[lvl] - buy_levels[lvl]
                qty = (POSITION_SIZE_USD * LEVERAGE) / buy_levels[lvl]
                session_pnl += qty * spread
                session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT * 2  # close both sides
                buy_fills[lvl] = False
                sell_fills[lvl] = False
                round_trips += 1
                # Remove matched from positions
                positions = [p for p in positions
                             if not (p[0] == 'long' and abs(p[1] - buy_levels[lvl]) < 1e-10)]
                positions = [p for p in positions
                             if not (p[0] == 'short' and abs(p[1] - sell_levels[lvl]) < 1e-10)]

        # Unrealized PnL
        unrealized = 0
        for side, entry_px, size_usd in positions:
            qty = (size_usd * LEVERAGE) / entry_px
            if side == 'long':
                unrealized += qty * (price - entry_px)
            else:
                unrealized += qty * (entry_px - price)

        net_pnl = session_pnl + unrealized - session_fees

        # Max loss stop
        if net_pnl < -max_loss and len(positions) > 0:
            for side, entry_px, size_usd in positions:
                qty = (size_usd * LEVERAGE) / entry_px
                if side == 'long':
                    session_pnl += qty * (price - entry_px)
                else:
                    session_pnl += qty * (entry_px - price)
                session_fees += size_usd * LEVERAGE * FEE_PCT
            positions = []
            close_reason = 'max_loss'
            break

    # Close remaining positions at end
    if positions:
        price = df_1m['close'].iloc[min(end_idx, len(df_1m) - 1)]
        for side, entry_px, size_usd in positions:
            qty = (size_usd * LEVERAGE) / entry_px
            if side == 'long':
                session_pnl += qty * (price - entry_px)
            else:
                session_pnl += qty * (entry_px - price)
            session_fees += size_usd * LEVERAGE * FEE_PCT

    net = session_pnl - session_fees

    if session_trades == 0:
        return None

    return {
        'pnl': round(net, 4),
        'trades': session_trades,
        'round_trips': round_trips,
        'fees': round(session_fees, 4),
        'close_reason': close_reason,
    }


# ============================================================
# MAIN BACKTEST LOGIC
# ============================================================
def backtest_symbol(symbol, screener_interval, df_screener, df_1m):
    """
    For each screener candle:
    1. Calculate sideways score
    2. If score >= threshold, run grid on 1m for that window
    3. Also run baseline (every window, no filter)
    """
    # How many 1m candles per screener candle
    if screener_interval == '1h':
        candles_per_window = 60
    elif screener_interval == '15m':
        candles_per_window = 15
    else:
        candles_per_window = 60

    # Calculate screener scores
    df_screener = calc_indicators(df_screener)
    df_screener['sideways_score'] = df_screener.apply(calc_sideways_score, axis=1)

    # Build timestamp index for 1m data
    df_1m_ts = df_1m.set_index('timestamp')

    filtered_sessions = []
    baseline_sessions = []
    score_distribution = []

    warmup = max(BB_PERIOD, ADX_PERIOD, RANGE_LOOKBACK) + 5

    for i in range(warmup, len(df_screener)):
        row = df_screener.iloc[i]
        score = row['sideways_score']
        ts = row['timestamp']

        score_distribution.append({
            'ts': str(ts),
            'score': score,
            'adx': round(row['adx'], 1) if not pd.isna(row['adx']) else None,
            'bb_width': round(row['bb_width'], 2) if not pd.isna(row['bb_width']) else None,
            'range_pos': round(row['range_pos'], 2) if not pd.isna(row['range_pos']) else None,
            'dir_changes': int(row['dir_changes']) if not pd.isna(row['dir_changes']) else None,
            'natr': round(row['natr'], 3) if not pd.isna(row['natr']) else None,
        })

        # Find this timestamp in 1m data
        mask = df_1m['timestamp'] >= ts
        if mask.sum() == 0:
            continue
        start_idx = mask.idxmax()

        # Run grid session
        result = run_grid_session(df_1m, start_idx, candles_per_window)

        if result is not None:
            result['score'] = score
            result['ts'] = str(ts)
            result['adx'] = round(row['adx'], 1) if not pd.isna(row['adx']) else None
            result['bb_width'] = round(row['bb_width'], 2) if not pd.isna(row['bb_width']) else None
            result['range_pos'] = round(row['range_pos'], 2) if not pd.isna(row['range_pos']) else None

            # Baseline: every window
            baseline_sessions.append(result)

            # Filtered: only high-score windows
            if score >= SCORE_THRESHOLD:
                filtered_sessions.append(result)

    return filtered_sessions, baseline_sessions, score_distribution


def summarize(sessions, label):
    """Print summary for a set of sessions"""
    if not sessions:
        print(f"  {label}: нет сессий")
        return {}

    total_pnl = sum(s['pnl'] for s in sessions)
    wins = [s for s in sessions if s['pnl'] > 0]
    losses = [s for s in sessions if s['pnl'] <= 0]
    wr = 100 * len(wins) / len(sessions) if sessions else 0
    avg_pnl = total_pnl / len(sessions)
    total_rt = sum(s['round_trips'] for s in sessions)
    total_trades = sum(s['trades'] for s in sessions)
    avg_score = np.mean([s.get('score', 0) for s in sessions])

    # Max drawdown streak
    streak = 0
    max_streak = 0
    for s in sessions:
        if s['pnl'] <= 0:
            streak += 1
            max_streak = max(max_streak, streak)
        else:
            streak = 0

    print(f"\n  {'─'*55}")
    print(f"  {label}")
    print(f"  {'─'*55}")
    print(f"  Sessions:       {len(sessions)}")
    print(f"  Win Rate:       {wr:.0f}% ({len(wins)}W / {len(losses)}L)")
    print(f"  Total PnL:      ${total_pnl:.4f}")
    print(f"  Avg PnL/session:${avg_pnl:.4f}")
    print(f"  Total trades:   {total_trades} | Round-trips: {total_rt}")
    print(f"  Avg score:      {avg_score:.1f}")
    print(f"  Max loss streak:{max_streak}")

    if wins:
        print(f"  Avg win:        ${sum(s['pnl'] for s in wins)/len(wins):.4f}")
    if losses:
        print(f"  Avg loss:       ${sum(s['pnl'] for s in losses)/len(losses):.4f}")

    # Close reasons
    reasons = {}
    for s in sessions:
        r = s['close_reason']
        reasons[r] = reasons.get(r, 0) + 1
    print(f"  Close reasons:  {reasons}")

    return {
        'sessions': len(sessions),
        'win_rate': round(wr, 1),
        'total_pnl': round(total_pnl, 4),
        'avg_pnl': round(avg_pnl, 4),
        'total_trades': total_trades,
        'round_trips': total_rt,
        'avg_score': round(avg_score, 1),
        'max_loss_streak': max_streak,
    }


# ============================================================
# SCORE BRACKET ANALYSIS
# ============================================================
def analyze_score_brackets(all_sessions):
    """Разбиваем сессии по скору и смотрим PnL"""
    brackets = [
        (0, 20, "0-20 (trending)"),
        (20, 40, "20-40 (mixed)"),
        (40, 60, "40-60 (sideways-ish)"),
        (60, 80, "60-80 (sideways)"),
        (80, 101, "80-100 (strong sideways)"),
    ]

    print(f"\n  {'Score Bracket':<25} {'Sessions':>8} {'WR':>6} {'Total PnL':>12} {'Avg PnL':>10} {'RTs':>6}")
    print(f"  {'─'*67}")

    for lo, hi, label in brackets:
        subset = [s for s in all_sessions if lo <= s.get('score', 0) < hi]
        if not subset:
            print(f"  {label:<25} {'—':>8}")
            continue
        total = sum(s['pnl'] for s in subset)
        wins = len([s for s in subset if s['pnl'] > 0])
        wr = 100 * wins / len(subset)
        avg = total / len(subset)
        rts = sum(s['round_trips'] for s in subset)
        print(f"  {label:<25} {len(subset):>8} {wr:>5.0f}% ${total:>10.4f} ${avg:>9.4f} {rts:>6}")


# ============================================================
# MAIN
# ============================================================
if __name__ == "__main__":
    print("=" * 65)
    print("  SIDEWAYS SCREENER BACKTEST — 1H vs 15m")
    print(f"  {len(SYMBOLS)} coins | {DAYS_BACK} days | ${DEPOSIT} dep | {LEVERAGE}x")
    print(f"  Grid: {GRID_LEVELS} levels × {GRID_SPACING_PCT}% spacing × ${POSITION_SIZE_USD}")
    print(f"  Score threshold: {SCORE_THRESHOLD}")
    print("=" * 65)

    all_results = {
        '1h': {'filtered': [], 'baseline': []},
        '15m': {'filtered': [], 'baseline': []},
    }

    for sym in SYMBOLS:
        print(f"\n{'─'*65}")
        print(f"  📊 {sym}")
        print(f"{'─'*65}")

        # Fetch data
        print(f"  [fetch] 1m data...")
        df_1m = fetch_klines(sym, '1m', DAYS_BACK)
        if len(df_1m) < 500:
            print(f"  [skip] Not enough 1m data ({len(df_1m)})")
            continue
        print(f"  [fetch] Got {len(df_1m)} 1m candles")

        for tf, interval in [('1h', '1h'), ('15m', '15m')]:
            print(f"\n  [{tf}] Fetching screener data...")
            df_scr = fetch_klines(sym, interval, DAYS_BACK)
            if len(df_scr) < 50:
                print(f"  [{tf}] Not enough data ({len(df_scr)})")
                continue
            print(f"  [{tf}] Got {len(df_scr)} candles")

            filtered, baseline, scores = backtest_symbol(sym, interval, df_scr, df_1m)

            # Per-symbol summary
            print(f"\n  [{tf}] BASELINE (all windows):")
            if baseline:
                bl_pnl = sum(s['pnl'] for s in baseline)
                bl_wr = 100 * len([s for s in baseline if s['pnl'] > 0]) / len(baseline)
                print(f"    {len(baseline)} sessions | WR {bl_wr:.0f}% | PnL ${bl_pnl:.4f}")

            print(f"  [{tf}] FILTERED (score >= {SCORE_THRESHOLD}):")
            if filtered:
                fi_pnl = sum(s['pnl'] for s in filtered)
                fi_wr = 100 * len([s for s in filtered if s['pnl'] > 0]) / len(filtered)
                fi_avg_score = np.mean([s['score'] for s in filtered])
                print(f"    {len(filtered)} sessions | WR {fi_wr:.0f}% | PnL ${fi_pnl:.4f} | avg score {fi_avg_score:.0f}")
            else:
                print(f"    Нет сессий с score >= {SCORE_THRESHOLD}")

            # Add to totals
            for s in filtered:
                s['symbol'] = sym
            for s in baseline:
                s['symbol'] = sym
            all_results[tf]['filtered'].extend(filtered)
            all_results[tf]['baseline'].extend(baseline)

    # ============================================================
    # FINAL COMPARISON
    # ============================================================
    print("\n" + "=" * 65)
    print("  🏆 ИТОГО ПО ВСЕМ МОНЕТАМ")
    print("=" * 65)

    summary = {}

    for tf in ['1h', '15m']:
        print(f"\n{'='*65}")
        print(f"  ⏰ ТАЙМФРЕЙМ: {tf.upper()}")
        print(f"{'='*65}")

        s_bl = summarize(all_results[tf]['baseline'], f"📦 BASELINE ({tf}) — все окна")
        s_fi = summarize(all_results[tf]['filtered'], f"🎯 FILTERED ({tf}) — score >= {SCORE_THRESHOLD}")

        print(f"\n  📊 Score Bracket Analysis ({tf}):")
        analyze_score_brackets(all_results[tf]['baseline'])

        summary[tf] = {'baseline': s_bl, 'filtered': s_fi}

    # Head-to-head
    print(f"\n{'='*65}")
    print(f"  ⚡ HEAD-TO-HEAD: 1H vs 15m (FILTERED)")
    print(f"{'='*65}")
    print(f"  {'Metric':<22} {'1H':>15} {'15m':>15}")
    print(f"  {'─'*52}")

    for key, label in [
        ('sessions', 'Sessions'),
        ('win_rate', 'Win Rate %'),
        ('total_pnl', 'Total PnL $'),
        ('avg_pnl', 'Avg PnL/sess $'),
        ('round_trips', 'Round-trips'),
        ('avg_score', 'Avg Score'),
        ('max_loss_streak', 'Max Loss Streak'),
    ]:
        v1h = summary.get('1h', {}).get('filtered', {}).get(key, '—')
        v15m = summary.get('15m', {}).get('filtered', {}).get(key, '—')
        if isinstance(v1h, float):
            v1h = f"{v1h:.4f}" if 'pnl' in key.lower() else f"{v1h}"
        if isinstance(v15m, float):
            v15m = f"{v15m:.4f}" if 'pnl' in key.lower() else f"{v15m}"
        print(f"  {label:<22} {str(v1h):>15} {str(v15m):>15}")

    # Save results
    output = {
        'config': {
            'symbols': SYMBOLS,
            'days_back': DAYS_BACK,
            'grid_levels': GRID_LEVELS,
            'grid_spacing_pct': GRID_SPACING_PCT,
            'position_size_usd': POSITION_SIZE_USD,
            'leverage': LEVERAGE,
            'score_threshold': SCORE_THRESHOLD,
        },
        'summary': summary,
        '1h_filtered_sessions': all_results['1h']['filtered'][:50],  # top 50
        '15m_filtered_sessions': all_results['15m']['filtered'][:50],
        'tested_at': datetime.utcnow().isoformat(),
    }

    out_path = Path(__file__).parent / 'results_sideways_screener.json'
    with open(out_path, 'w') as f:
        json.dump(output, f, indent=2)
    print(f"\n💾 Results saved to {out_path}")
    print("Done!")

📜 Git History

c6f6bd5chore: initial commit — version control setup5 weeks ago
Show last diff
Loading...