← Назад
""" Sideways/Range Screener Backtest — 1H vs 15m Ищем боковики/запилы по параметрам на старшем ТФ, запускаем грид на 1m данных внутри окна. Сравниваем: A) 1H screener → грид на 1m в часовом окне B) 15m screener → грид на 1m в 15-минутном окне C) Baseline — грид без фильтра (каждый час) Скоринг боковика (0-100): - ADX(14) < 20 → 25 pts (нет тренда) - BB Width 1.5-4% → 20 pts (активный диапазон) - Range Position 0.3-0.7 → 20 pts (цена в середине) - Direction Changes → 20 pts (запил/пила) - NATR 0.15-0.6% → 15 pts (достаточная вола) Usage: python3 backtest_sideways_screener.py """ import requests import pandas as pd import numpy as np import time import json from datetime import datetime, timedelta from pathlib import Path # ============================================================ # CONFIG # ============================================================ SYMBOLS = [ "ETHUSDT", "DOGEUSDT", "PENGUUSDT", "ENAUSDT", "NEARUSDT", "WLDUSDT", "UNIUSDT", "SOLUSDT", "ADAUSDT", "XRPUSDT", "AVAXUSDT", "LINKUSDT", "PEPEUSDT", "SUIUSDT", "ARBUSDT", "OPUSDT", ] DAYS_BACK = 14 # 2 weeks of data DEPOSIT = 50.0 LEVERAGE = 10 POSITION_SIZE_USD = 3.0 FEE_PCT = 0.02 / 100 # maker 0.02% GRID_LEVELS = 8 GRID_SPACING_PCT = 0.1 MAX_LOSS_PCT = 3.0 # Screener thresholds SCORE_THRESHOLD = 50 # min score to enter grid # Indicator periods (on screener TF) BB_PERIOD = 20 BB_STD = 2.0 ADX_PERIOD = 14 ATR_PERIOD = 14 RANGE_LOOKBACK = 24 # candles for range position (1H: 24h, 15m: 6h) # ============================================================ # DATA FETCH # ============================================================ def fetch_klines(symbol, interval, days_back): """Fetch klines from Binance Futures API""" url = "https://fapi.binance.com/fapi/v1/klines" end_ts = int(time.time() * 1000) start_ts = int((time.time() - days_back * 86400) * 1000) all_candles = [] current_start = start_ts while current_start < end_ts: params = { "symbol": symbol, "interval": interval, "startTime": current_start, "limit": 1500 } try: resp = requests.get(url, params=params, timeout=10) data = resp.json() if not isinstance(data, list) or len(data) == 0: break all_candles.extend(data) current_start = data[-1][0] + 1 time.sleep(0.08) except Exception as e: print(f" [fetch] Error: {e}, retrying...") time.sleep(1) continue df = pd.DataFrame(all_candles, columns=[ 'timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_volume', 'trades', 'taker_buy_base', 'taker_buy_quote', 'ignore' ]) for col in ['open', 'high', 'low', 'close', 'volume', 'quote_volume']: df[col] = df[col].astype(float) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') df = df.drop_duplicates(subset='timestamp').sort_values('timestamp').reset_index(drop=True) return df # ============================================================ # INDICATORS # ============================================================ def calc_indicators(df): """Calculate all indicators for screener""" # Bollinger Bands df['bb_mid'] = df['close'].rolling(BB_PERIOD).mean() df['bb_std'] = df['close'].rolling(BB_PERIOD).std() df['bb_upper'] = df['bb_mid'] + BB_STD * df['bb_std'] df['bb_lower'] = df['bb_mid'] - BB_STD * df['bb_std'] df['bb_width'] = ((df['bb_upper'] - df['bb_lower']) / df['bb_mid']) * 100 # ADX high, low, close = df['high'], df['low'], df['close'] plus_dm = high.diff() minus_dm = -low.diff() plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0.0) minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0.0) tr1 = high - low tr2 = (high - close.shift(1)).abs() tr3 = (low - close.shift(1)).abs() tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) atr = tr.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean() plus_di = 100 * (plus_dm.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean() / atr) minus_di = 100 * (minus_dm.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean() / atr) dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-10) df['adx'] = dx.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean() # ATR & NATR df['atr'] = atr df['natr'] = (atr / df['close']) * 100 # Range position (price position in recent high-low range) df['range_high'] = df['high'].rolling(RANGE_LOOKBACK).max() df['range_low'] = df['low'].rolling(RANGE_LOOKBACK).min() range_span = df['range_high'] - df['range_low'] df['range_pos'] = (df['close'] - df['range_low']) / range_span.replace(0, np.nan) # Direction changes (how many candles changed direction in lookback) df['candle_dir'] = np.where(df['close'] > df['open'], 1, -1) df['dir_change'] = (df['candle_dir'] != df['candle_dir'].shift(1)).astype(int) df['dir_changes'] = df['dir_change'].rolling(RANGE_LOOKBACK).sum() return df # ============================================================ # SIDEWAYS SCORER # ============================================================ def calc_sideways_score(row): """ Score 0-100 how "sideways" the market is. Higher = better for grid. """ score = 0 adx = row.get('adx', np.nan) bb_w = row.get('bb_width', np.nan) range_pos = row.get('range_pos', np.nan) dir_changes = row.get('dir_changes', np.nan) natr = row.get('natr', np.nan) if any(pd.isna(x) for x in [adx, bb_w, range_pos, dir_changes, natr]): return 0 # 1. ADX < 20 → 25 pts (lower = more sideways) if adx <= 5: score += 25 elif adx <= 20: score += 25 * (1 - (adx - 5) / 15) elif adx <= 30: score += max(0, -5 * (adx - 20) / 10) # penalty zone else: score -= 10 # trending hard # 2. BB Width 1.5-4%, peak at 2.5% → 20 pts if 1.5 <= bb_w <= 4.0: # Triangle peak at 2.5% if bb_w <= 2.5: score += 20 * (bb_w - 1.5) / 1.0 else: score += 20 * (4.0 - bb_w) / 1.5 elif 0.5 <= bb_w < 1.5: score += 5 # too tight but not zero # else: too wide, 0 pts # 3. Range Position 0.3-0.7, peak at 0.5 → 20 pts if 0.3 <= range_pos <= 0.7: # Peak at 0.5 dist_from_center = abs(range_pos - 0.5) score += 20 * (1 - dist_from_center / 0.2) elif 0.2 <= range_pos < 0.3 or 0.7 < range_pos <= 0.8: score += 5 # edges, some points # else: at extreme, 0 pts # 4. Direction Changes → 20 pts (more changes = more chop) # max useful = RANGE_LOOKBACK * 0.7 (70% of candles are reversals) max_changes = RANGE_LOOKBACK * 0.7 if dir_changes >= 8: score += min(20, 20 * (dir_changes - 8) / (max_changes - 8)) # 5. NATR 0.15-0.6%, peak at 0.3% → 15 pts if 0.15 <= natr <= 0.6: if natr <= 0.3: score += 15 * (natr - 0.15) / 0.15 else: score += 15 * (0.6 - natr) / 0.3 elif 0.1 <= natr < 0.15: score += 3 # tiny vol return max(0, round(score, 1)) # ============================================================ # GRID ENGINE (on 1m candles) # ============================================================ def run_grid_session(df_1m, start_idx, duration_candles): """ Run a single grid session on 1m data. Returns: dict with pnl, trades, fees, close_reason """ if start_idx + 5 >= len(df_1m): return None end_idx = min(start_idx + duration_candles, len(df_1m) - 1) mid_price = df_1m['close'].iloc[start_idx] # Build grid levels buy_levels = [] sell_levels = [] for lvl in range(1, GRID_LEVELS + 1): buy_levels.append(mid_price * (1 - lvl * GRID_SPACING_PCT / 100)) sell_levels.append(mid_price * (1 + lvl * GRID_SPACING_PCT / 100)) buy_fills = [False] * GRID_LEVELS sell_fills = [False] * GRID_LEVELS positions = [] session_pnl = 0.0 session_fees = 0.0 session_trades = 0 round_trips = 0 max_capital = GRID_LEVELS * 2 * POSITION_SIZE_USD max_loss = max_capital * MAX_LOSS_PCT / 100 close_reason = 'timeout' for j in range(start_idx + 1, end_idx): price = df_1m['close'].iloc[j] low_px = df_1m['low'].iloc[j] high_px = df_1m['high'].iloc[j] # Check buy fills for lvl in range(GRID_LEVELS): if not buy_fills[lvl] and low_px <= buy_levels[lvl]: buy_fills[lvl] = True positions.append(('long', buy_levels[lvl], POSITION_SIZE_USD)) session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT session_trades += 1 # Check sell fills for lvl in range(GRID_LEVELS): if not sell_fills[lvl] and high_px >= sell_levels[lvl]: sell_fills[lvl] = True positions.append(('short', sell_levels[lvl], POSITION_SIZE_USD)) session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT session_trades += 1 # Realize round-trips for lvl in range(GRID_LEVELS): if buy_fills[lvl] and sell_fills[lvl]: spread = sell_levels[lvl] - buy_levels[lvl] qty = (POSITION_SIZE_USD * LEVERAGE) / buy_levels[lvl] session_pnl += qty * spread session_fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT * 2 # close both sides buy_fills[lvl] = False sell_fills[lvl] = False round_trips += 1 # Remove matched from positions positions = [p for p in positions if not (p[0] == 'long' and abs(p[1] - buy_levels[lvl]) < 1e-10)] positions = [p for p in positions if not (p[0] == 'short' and abs(p[1] - sell_levels[lvl]) < 1e-10)] # Unrealized PnL unrealized = 0 for side, entry_px, size_usd in positions: qty = (size_usd * LEVERAGE) / entry_px if side == 'long': unrealized += qty * (price - entry_px) else: unrealized += qty * (entry_px - price) net_pnl = session_pnl + unrealized - session_fees # Max loss stop if net_pnl < -max_loss and len(positions) > 0: for side, entry_px, size_usd in positions: qty = (size_usd * LEVERAGE) / entry_px if side == 'long': session_pnl += qty * (price - entry_px) else: session_pnl += qty * (entry_px - price) session_fees += size_usd * LEVERAGE * FEE_PCT positions = [] close_reason = 'max_loss' break # Close remaining positions at end if positions: price = df_1m['close'].iloc[min(end_idx, len(df_1m) - 1)] for side, entry_px, size_usd in positions: qty = (size_usd * LEVERAGE) / entry_px if side == 'long': session_pnl += qty * (price - entry_px) else: session_pnl += qty * (entry_px - price) session_fees += size_usd * LEVERAGE * FEE_PCT net = session_pnl - session_fees if session_trades == 0: return None return { 'pnl': round(net, 4), 'trades': session_trades, 'round_trips': round_trips, 'fees': round(session_fees, 4), 'close_reason': close_reason, } # ============================================================ # MAIN BACKTEST LOGIC # ============================================================ def backtest_symbol(symbol, screener_interval, df_screener, df_1m): """ For each screener candle: 1. Calculate sideways score 2. If score >= threshold, run grid on 1m for that window 3. Also run baseline (every window, no filter) """ # How many 1m candles per screener candle if screener_interval == '1h': candles_per_window = 60 elif screener_interval == '15m': candles_per_window = 15 else: candles_per_window = 60 # Calculate screener scores df_screener = calc_indicators(df_screener) df_screener['sideways_score'] = df_screener.apply(calc_sideways_score, axis=1) # Build timestamp index for 1m data df_1m_ts = df_1m.set_index('timestamp') filtered_sessions = [] baseline_sessions = [] score_distribution = [] warmup = max(BB_PERIOD, ADX_PERIOD, RANGE_LOOKBACK) + 5 for i in range(warmup, len(df_screener)): row = df_screener.iloc[i] score = row['sideways_score'] ts = row['timestamp'] score_distribution.append({ 'ts': str(ts), 'score': score, 'adx': round(row['adx'], 1) if not pd.isna(row['adx']) else None, 'bb_width': round(row['bb_width'], 2) if not pd.isna(row['bb_width']) else None, 'range_pos': round(row['range_pos'], 2) if not pd.isna(row['range_pos']) else None, 'dir_changes': int(row['dir_changes']) if not pd.isna(row['dir_changes']) else None, 'natr': round(row['natr'], 3) if not pd.isna(row['natr']) else None, }) # Find this timestamp in 1m data mask = df_1m['timestamp'] >= ts if mask.sum() == 0: continue start_idx = mask.idxmax() # Run grid session result = run_grid_session(df_1m, start_idx, candles_per_window) if result is not None: result['score'] = score result['ts'] = str(ts) result['adx'] = round(row['adx'], 1) if not pd.isna(row['adx']) else None result['bb_width'] = round(row['bb_width'], 2) if not pd.isna(row['bb_width']) else None result['range_pos'] = round(row['range_pos'], 2) if not pd.isna(row['range_pos']) else None # Baseline: every window baseline_sessions.append(result) # Filtered: only high-score windows if score >= SCORE_THRESHOLD: filtered_sessions.append(result) return filtered_sessions, baseline_sessions, score_distribution def summarize(sessions, label): """Print summary for a set of sessions""" if not sessions: print(f" {label}: нет сессий") return {} total_pnl = sum(s['pnl'] for s in sessions) wins = [s for s in sessions if s['pnl'] > 0] losses = [s for s in sessions if s['pnl'] <= 0] wr = 100 * len(wins) / len(sessions) if sessions else 0 avg_pnl = total_pnl / len(sessions) total_rt = sum(s['round_trips'] for s in sessions) total_trades = sum(s['trades'] for s in sessions) avg_score = np.mean([s.get('score', 0) for s in sessions]) # Max drawdown streak streak = 0 max_streak = 0 for s in sessions: if s['pnl'] <= 0: streak += 1 max_streak = max(max_streak, streak) else: streak = 0 print(f"\n {'─'*55}") print(f" {label}") print(f" {'─'*55}") print(f" Sessions: {len(sessions)}") print(f" Win Rate: {wr:.0f}% ({len(wins)}W / {len(losses)}L)") print(f" Total PnL: ${total_pnl:.4f}") print(f" Avg PnL/session:${avg_pnl:.4f}") print(f" Total trades: {total_trades} | Round-trips: {total_rt}") print(f" Avg score: {avg_score:.1f}") print(f" Max loss streak:{max_streak}") if wins: print(f" Avg win: ${sum(s['pnl'] for s in wins)/len(wins):.4f}") if losses: print(f" Avg loss: ${sum(s['pnl'] for s in losses)/len(losses):.4f}") # Close reasons reasons = {} for s in sessions: r = s['close_reason'] reasons[r] = reasons.get(r, 0) + 1 print(f" Close reasons: {reasons}") return { 'sessions': len(sessions), 'win_rate': round(wr, 1), 'total_pnl': round(total_pnl, 4), 'avg_pnl': round(avg_pnl, 4), 'total_trades': total_trades, 'round_trips': total_rt, 'avg_score': round(avg_score, 1), 'max_loss_streak': max_streak, } # ============================================================ # SCORE BRACKET ANALYSIS # ============================================================ def analyze_score_brackets(all_sessions): """Разбиваем сессии по скору и смотрим PnL""" brackets = [ (0, 20, "0-20 (trending)"), (20, 40, "20-40 (mixed)"), (40, 60, "40-60 (sideways-ish)"), (60, 80, "60-80 (sideways)"), (80, 101, "80-100 (strong sideways)"), ] print(f"\n {'Score Bracket':<25} {'Sessions':>8} {'WR':>6} {'Total PnL':>12} {'Avg PnL':>10} {'RTs':>6}") print(f" {'─'*67}") for lo, hi, label in brackets: subset = [s for s in all_sessions if lo <= s.get('score', 0) < hi] if not subset: print(f" {label:<25} {'—':>8}") continue total = sum(s['pnl'] for s in subset) wins = len([s for s in subset if s['pnl'] > 0]) wr = 100 * wins / len(subset) avg = total / len(subset) rts = sum(s['round_trips'] for s in subset) print(f" {label:<25} {len(subset):>8} {wr:>5.0f}% ${total:>10.4f} ${avg:>9.4f} {rts:>6}") # ============================================================ # MAIN # ============================================================ if __name__ == "__main__": print("=" * 65) print(" SIDEWAYS SCREENER BACKTEST — 1H vs 15m") print(f" {len(SYMBOLS)} coins | {DAYS_BACK} days | ${DEPOSIT} dep | {LEVERAGE}x") print(f" Grid: {GRID_LEVELS} levels × {GRID_SPACING_PCT}% spacing × ${POSITION_SIZE_USD}") print(f" Score threshold: {SCORE_THRESHOLD}") print("=" * 65) all_results = { '1h': {'filtered': [], 'baseline': []}, '15m': {'filtered': [], 'baseline': []}, } for sym in SYMBOLS: print(f"\n{'─'*65}") print(f" 📊 {sym}") print(f"{'─'*65}") # Fetch data print(f" [fetch] 1m data...") df_1m = fetch_klines(sym, '1m', DAYS_BACK) if len(df_1m) < 500: print(f" [skip] Not enough 1m data ({len(df_1m)})") continue print(f" [fetch] Got {len(df_1m)} 1m candles") for tf, interval in [('1h', '1h'), ('15m', '15m')]: print(f"\n [{tf}] Fetching screener data...") df_scr = fetch_klines(sym, interval, DAYS_BACK) if len(df_scr) < 50: print(f" [{tf}] Not enough data ({len(df_scr)})") continue print(f" [{tf}] Got {len(df_scr)} candles") filtered, baseline, scores = backtest_symbol(sym, interval, df_scr, df_1m) # Per-symbol summary print(f"\n [{tf}] BASELINE (all windows):") if baseline: bl_pnl = sum(s['pnl'] for s in baseline) bl_wr = 100 * len([s for s in baseline if s['pnl'] > 0]) / len(baseline) print(f" {len(baseline)} sessions | WR {bl_wr:.0f}% | PnL ${bl_pnl:.4f}") print(f" [{tf}] FILTERED (score >= {SCORE_THRESHOLD}):") if filtered: fi_pnl = sum(s['pnl'] for s in filtered) fi_wr = 100 * len([s for s in filtered if s['pnl'] > 0]) / len(filtered) fi_avg_score = np.mean([s['score'] for s in filtered]) print(f" {len(filtered)} sessions | WR {fi_wr:.0f}% | PnL ${fi_pnl:.4f} | avg score {fi_avg_score:.0f}") else: print(f" Нет сессий с score >= {SCORE_THRESHOLD}") # Add to totals for s in filtered: s['symbol'] = sym for s in baseline: s['symbol'] = sym all_results[tf]['filtered'].extend(filtered) all_results[tf]['baseline'].extend(baseline) # ============================================================ # FINAL COMPARISON # ============================================================ print("\n" + "=" * 65) print(" 🏆 ИТОГО ПО ВСЕМ МОНЕТАМ") print("=" * 65) summary = {} for tf in ['1h', '15m']: print(f"\n{'='*65}") print(f" ⏰ ТАЙМФРЕЙМ: {tf.upper()}") print(f"{'='*65}") s_bl = summarize(all_results[tf]['baseline'], f"📦 BASELINE ({tf}) — все окна") s_fi = summarize(all_results[tf]['filtered'], f"🎯 FILTERED ({tf}) — score >= {SCORE_THRESHOLD}") print(f"\n 📊 Score Bracket Analysis ({tf}):") analyze_score_brackets(all_results[tf]['baseline']) summary[tf] = {'baseline': s_bl, 'filtered': s_fi} # Head-to-head print(f"\n{'='*65}") print(f" ⚡ HEAD-TO-HEAD: 1H vs 15m (FILTERED)") print(f"{'='*65}") print(f" {'Metric':<22} {'1H':>15} {'15m':>15}") print(f" {'─'*52}") for key, label in [ ('sessions', 'Sessions'), ('win_rate', 'Win Rate %'), ('total_pnl', 'Total PnL $'), ('avg_pnl', 'Avg PnL/sess $'), ('round_trips', 'Round-trips'), ('avg_score', 'Avg Score'), ('max_loss_streak', 'Max Loss Streak'), ]: v1h = summary.get('1h', {}).get('filtered', {}).get(key, '—') v15m = summary.get('15m', {}).get('filtered', {}).get(key, '—') if isinstance(v1h, float): v1h = f"{v1h:.4f}" if 'pnl' in key.lower() else f"{v1h}" if isinstance(v15m, float): v15m = f"{v15m:.4f}" if 'pnl' in key.lower() else f"{v15m}" print(f" {label:<22} {str(v1h):>15} {str(v15m):>15}") # Save results output = { 'config': { 'symbols': SYMBOLS, 'days_back': DAYS_BACK, 'grid_levels': GRID_LEVELS, 'grid_spacing_pct': GRID_SPACING_PCT, 'position_size_usd': POSITION_SIZE_USD, 'leverage': LEVERAGE, 'score_threshold': SCORE_THRESHOLD, }, 'summary': summary, '1h_filtered_sessions': all_results['1h']['filtered'][:50], # top 50 '15m_filtered_sessions': all_results['15m']['filtered'][:50], 'tested_at': datetime.utcnow().isoformat(), } out_path = Path(__file__).parent / 'results_sideways_screener.json' with open(out_path, 'w') as f: json.dump(output, f, indent=2) print(f"\n💾 Results saved to {out_path}") print("Done!")