← Назад
""" Range Grid v2 — вход по боковику, выход по индикаторам Логика: ВХОД: sideways score >= порог, цена в середине диапазона РАБОТА: грид крутит round-trips на 1m свечах ВЫХОД: мониторим индикаторы каждые 15m/1H: - ADX пошёл вверх (> exit_adx) → пробой назревает → фиксим - BB width резко вырос → волатильность растёт → фиксим - score упал ниже exit_score → уже не боковик → фиксим РОТАЦИЯ: закрыли → ищем след монету Сравниваем мониторинг: A) 15m — проверяем каждые 15 минут B) 1H — проверяем каждый час Usage: python3 backtest_range_grid_v2.py """ import requests import pandas as pd import numpy as np import time import json from datetime import datetime from pathlib import Path # ============================================================ # CONFIG # ============================================================ SYMBOLS = [ "ETHUSDT", "DOGEUSDT", "PENGUUSDT", "ENAUSDT", "NEARUSDT", "WLDUSDT", "SOLUSDT", "ARBUSDT", "XRPUSDT", "LINKUSDT", "SUIUSDT", "OPUSDT", "ADAUSDT", "UNIUSDT", "AVAXUSDT", ] DAYS_BACK = 30 LEVERAGE = 10 DEPOSIT = 50.0 FEE_PCT = 0.02 / 100 # Grid GRID_LEVELS = 30 POS_USD = 1.5 # $1.5 × 30 = $45 of $50 used MAX_LOSS_PCT = 5.0 MAX_SESSION_HOURS = 168 # 7 days max # Range detection RANGE_LOOKBACK_H = 48 # hours lookback for range MIN_RANGE_PCT = 3.0 MAX_RANGE_PCT = 20.0 # Entry filters ENTRY_SCORE_MIN = 40 ENTRY_RANGE_POS_MIN = 0.25 # not at edges ENTRY_RANGE_POS_MAX = 0.75 ENTRY_ADX_MAX = 25 # no trend # Exit triggers (ANY of these → close) EXIT_ADX_THRESHOLD = 28 # ADX rising above this EXIT_BB_EXPANSION = 1.8 # BB width > entry BB width × this multiplier EXIT_SCORE_MIN = 15 # score dropped below this # Screener indicator params BB_PERIOD = 20 BB_STD = 2.0 ADX_PERIOD = 14 RANGE_LOOKBACK_SCORE = 24 # candles for score calc # ============================================================ # DATA # ============================================================ def fetch_klines(symbol, interval, days_back): url = "https://fapi.binance.com/fapi/v1/klines" end_ts = int(time.time() * 1000) start_ts = int((time.time() - days_back * 86400) * 1000) all_candles = [] current_start = start_ts while current_start < end_ts: params = {"symbol": symbol, "interval": interval, "startTime": current_start, "limit": 1500} try: resp = requests.get(url, params=params, timeout=10) data = resp.json() if not isinstance(data, list) or len(data) == 0: break all_candles.extend(data) current_start = data[-1][0] + 1 time.sleep(0.08) except Exception as e: time.sleep(1) continue df = pd.DataFrame(all_candles, columns=[ 'timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_volume', 'trades', 'taker_buy_base', 'taker_buy_quote', 'ignore' ]) for col in ['open', 'high', 'low', 'close', 'volume', 'quote_volume']: df[col] = df[col].astype(float) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') df = df.drop_duplicates(subset='timestamp').sort_values('timestamp').reset_index(drop=True) return df # ============================================================ # INDICATORS + SCORE # ============================================================ def calc_indicators(df): df['bb_mid'] = df['close'].rolling(BB_PERIOD).mean() df['bb_std'] = df['close'].rolling(BB_PERIOD).std() df['bb_upper'] = df['bb_mid'] + BB_STD * df['bb_std'] df['bb_lower'] = df['bb_mid'] - BB_STD * df['bb_std'] df['bb_width'] = ((df['bb_upper'] - df['bb_lower']) / df['bb_mid']) * 100 h, l, c = df['high'], df['low'], df['close'] plus_dm = h.diff() minus_dm = -l.diff() plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0.0) minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0.0) tr = pd.concat([h - l, (h - c.shift(1)).abs(), (l - c.shift(1)).abs()], axis=1).max(axis=1) atr = tr.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean() plus_di = 100 * (plus_dm.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean() / atr) minus_di = 100 * (minus_dm.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean() / atr) dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-10) df['adx'] = dx.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean() df['natr'] = (atr / c) * 100 df['range_high'] = df['high'].rolling(RANGE_LOOKBACK_SCORE).max() df['range_low'] = df['low'].rolling(RANGE_LOOKBACK_SCORE).min() rng = df['range_high'] - df['range_low'] df['range_pos'] = (df['close'] - df['range_low']) / rng.replace(0, np.nan) df['candle_dir'] = np.where(df['close'] > df['open'], 1, -1) df['dir_change'] = (df['candle_dir'] != df['candle_dir'].shift(1)).astype(int) df['dir_changes'] = df['dir_change'].rolling(RANGE_LOOKBACK_SCORE).sum() return df def sideways_score(row): score = 0 adx, bb_w = row.get('adx', np.nan), row.get('bb_width', np.nan) rp, dc = row.get('range_pos', np.nan), row.get('dir_changes', np.nan) natr = row.get('natr', np.nan) if any(pd.isna(x) for x in [adx, bb_w, rp, dc, natr]): return 0 if adx <= 5: score += 25 elif adx <= 20: score += 25 * (1 - (adx - 5) / 15) elif adx <= 30: score += max(0, -5 * (adx - 20) / 10) else: score -= 10 if 1.5 <= bb_w <= 4.0: score += 20 * (bb_w - 1.5) / 1.0 if bb_w <= 2.5 else 20 * (4.0 - bb_w) / 1.5 elif 0.5 <= bb_w < 1.5: score += 5 if 0.3 <= rp <= 0.7: score += 20 * (1 - abs(rp - 0.5) / 0.2) elif 0.2 <= rp < 0.3 or 0.7 < rp <= 0.8: score += 5 max_ch = RANGE_LOOKBACK_SCORE * 0.7 if dc >= 8: score += min(20, 20 * (dc - 8) / (max_ch - 8)) if 0.15 <= natr <= 0.6: score += 15 * (natr - 0.15) / 0.15 if natr <= 0.3 else 15 * (0.6 - natr) / 0.3 elif 0.1 <= natr < 0.15: score += 3 return max(0, round(score, 1)) # ============================================================ # GRID SESSION — runs on 1m, monitors on screener TF # ============================================================ def run_session(df_1m, df_monitor, start_ts, range_high, range_low, monitor_interval_min): """ Grid across range_high/range_low. Monitor df_monitor for exit signals. """ range_pct = ((range_high - range_low) / range_low) * 100 # Grid levels across range step = (range_high - range_low) / (GRID_LEVELS + 1) grid_levels = [range_low + step * (k + 1) for k in range(GRID_LEVELS)] # Find start in 1m data mask_1m = df_1m['timestamp'] >= start_ts if mask_1m.sum() < 10: return None start_idx = mask_1m.idxmax() mid_price = df_1m['close'].iloc[start_idx] # Entry BB width (for exit comparison) mon_mask = df_monitor['timestamp'] <= start_ts if mon_mask.sum() == 0: return None entry_mon = df_monitor.loc[mon_mask].iloc[-1] entry_bb_width = entry_mon['bb_width'] if not pd.isna(entry_mon['bb_width']) else 2.0 # Grid state: pending orders at each level level_state = {} for lv in grid_levels: level_state[lv] = 'pending' # pending → filled_buy/filled_sell → pending (after RT) positions = [] # (side, entry_px) total_pnl = 0.0 total_fees = 0.0 total_trades = 0 round_trips = 0 close_reason = 'timeout' max_end_idx = min(start_idx + MAX_SESSION_HOURS * 60, len(df_1m) - 1) last_monitor_check = start_ts max_capital = GRID_LEVELS * POS_USD max_loss = max_capital * MAX_LOSS_PCT / 100 for j in range(start_idx + 1, max_end_idx): price = df_1m['close'].iloc[j] lo = df_1m['low'].iloc[j] hi = df_1m['high'].iloc[j] ts = df_1m['timestamp'].iloc[j] # Fill grid orders for lv in grid_levels: if level_state[lv] == 'pending': if lo <= lv and price >= lv: # Price touched level from above → buy if lv < mid_price: level_state[lv] = 'filled_buy' positions.append(('long', lv)) total_fees += POS_USD * LEVERAGE * FEE_PCT total_trades += 1 else: level_state[lv] = 'filled_sell' positions.append(('short', lv)) total_fees += POS_USD * LEVERAGE * FEE_PCT total_trades += 1 elif lo <= lv: level_state[lv] = 'filled_buy' positions.append(('long', lv)) total_fees += POS_USD * LEVERAGE * FEE_PCT total_trades += 1 elif hi >= lv: level_state[lv] = 'filled_sell' positions.append(('short', lv)) total_fees += POS_USD * LEVERAGE * FEE_PCT total_trades += 1 # Match round-trips: buy below + sell above longs = sorted([(i, p) for i, (s, p) in enumerate(positions) if s == 'long'], key=lambda x: x[1]) shorts = sorted([(i, p) for i, (s, p) in enumerate(positions) if s == 'short'], key=lambda x: x[1]) matched_indices = set() for li, buy_px in longs: if li in matched_indices: continue for si, sell_px in shorts: if si in matched_indices: continue if sell_px > buy_px: qty = (POS_USD * LEVERAGE) / buy_px total_pnl += qty * (sell_px - buy_px) total_fees += POS_USD * LEVERAGE * FEE_PCT * 2 round_trips += 1 matched_indices.add(li) matched_indices.add(si) # Reset these levels level_state[buy_px] = 'pending' level_state[sell_px] = 'pending' break if matched_indices: positions = [p for i, p in enumerate(positions) if i not in matched_indices] # ── MONITOR CHECK (every N minutes) ── elapsed_min = (ts - last_monitor_check).total_seconds() / 60 if elapsed_min >= monitor_interval_min: last_monitor_check = ts # Get latest monitor data mon_mask = df_monitor['timestamp'] <= ts if mon_mask.sum() > 0: mon_row = df_monitor.loc[mon_mask].iloc[-1] curr_adx = mon_row['adx'] if not pd.isna(mon_row['adx']) else 0 curr_bb_w = mon_row['bb_width'] if not pd.isna(mon_row['bb_width']) else 0 curr_score = mon_row['sw_score'] if 'sw_score' in mon_row and not pd.isna(mon_row['sw_score']) else 50 # EXIT TRIGGERS exit_now = False # 1. ADX spiked — trend forming if curr_adx > EXIT_ADX_THRESHOLD: exit_now = True close_reason = f'adx_{curr_adx:.0f}' # 2. BB width expanded — volatility spike elif entry_bb_width > 0 and curr_bb_w > entry_bb_width * EXIT_BB_EXPANSION: exit_now = True close_reason = f'bb_expand_{curr_bb_w:.1f}' # 3. Score collapsed elif curr_score < EXIT_SCORE_MIN: exit_now = True close_reason = f'score_{curr_score:.0f}' if exit_now: # Close all positions at market for side, entry_px in positions: qty = (POS_USD * LEVERAGE) / entry_px if side == 'long': total_pnl += qty * (price - entry_px) else: total_pnl += qty * (entry_px - price) total_fees += POS_USD * LEVERAGE * FEE_PCT positions = [] break # Max loss check unrealized = 0 for side, entry_px in positions: qty = (POS_USD * LEVERAGE) / entry_px if side == 'long': unrealized += qty * (price - entry_px) else: unrealized += qty * (entry_px - price) if total_pnl + unrealized - total_fees < -max_loss and positions: for side, entry_px in positions: qty = (POS_USD * LEVERAGE) / entry_px if side == 'long': total_pnl += qty * (price - entry_px) else: total_pnl += qty * (entry_px - price) total_fees += POS_USD * LEVERAGE * FEE_PCT positions = [] close_reason = 'max_loss' break # Close remaining if positions: price = df_1m['close'].iloc[min(j if 'j' in dir() else max_end_idx - 1, len(df_1m) - 1)] for side, entry_px in positions: qty = (POS_USD * LEVERAGE) / entry_px if side == 'long': total_pnl += qty * (price - entry_px) else: total_pnl += qty * (entry_px - price) total_fees += POS_USD * LEVERAGE * FEE_PCT net = total_pnl - total_fees duration_h = (j - start_idx) / 60 if 'j' in dir() else 0 if total_trades == 0: return None return { 'pnl': round(net, 4), 'gross_pnl': round(total_pnl, 4), 'trades': total_trades, 'round_trips': round_trips, 'fees': round(total_fees, 4), 'close_reason': close_reason, 'range_pct': round(range_pct, 2), 'duration_h': round(duration_h, 1), 'spacing_pct': round(range_pct / (GRID_LEVELS + 1), 3), } # ============================================================ # MAIN # ============================================================ if __name__ == "__main__": print("=" * 75) print(" RANGE GRID v2 — indicator-based exit") print(f" {len(SYMBOLS)} coins | {DAYS_BACK}d | {GRID_LEVELS} levels × ${POS_USD}") print(f" Entry: score>={ENTRY_SCORE_MIN}, ADX<{ENTRY_ADX_MAX}, range_pos {ENTRY_RANGE_POS_MIN}-{ENTRY_RANGE_POS_MAX}") print(f" Exit: ADX>{EXIT_ADX_THRESHOLD} OR BB×{EXIT_BB_EXPANSION} OR score<{EXIT_SCORE_MIN}") print("=" * 75) results = { '15m': [], '1h': [], } for sym in SYMBOLS: print(f"\n 📊 {sym}...", end=" ", flush=True) # Fetch 1m df_1m = fetch_klines(sym, '1m', DAYS_BACK) if len(df_1m) < 2000: print("skip") continue for tf, interval, check_min in [('15m', '15m', 15), ('1h', '1h', 60)]: # Fetch monitor TF df_mon = fetch_klines(sym, interval, DAYS_BACK) if len(df_mon) < 80: continue df_mon = calc_indicators(df_mon) df_mon['sw_score'] = df_mon.apply(sideways_score, axis=1) # Also need 1H for range detection (always 1H) if tf == '1h': df_1h = df_mon else: df_1h = fetch_klines(sym, '1h', DAYS_BACK) if len(df_1h) < 80: continue df_1h = calc_indicators(df_1h) df_1h['sw_score'] = df_1h.apply(sideways_score, axis=1) # Scan for entry points every 4 hours on 1H data warmup = max(RANGE_LOOKBACK_H, 50) i = warmup cooldown_until = 0 while i < len(df_1h) - 2: if i < cooldown_until: i += 1 continue row = df_1h.iloc[i] score = row['sw_score'] if 'sw_score' in row else 0 adx = row['adx'] if not pd.isna(row['adx']) else 99 rp = row['range_pos'] if not pd.isna(row['range_pos']) else 0.5 ts = row['timestamp'] # Entry conditions if (score < ENTRY_SCORE_MIN or adx > ENTRY_ADX_MAX or rp < ENTRY_RANGE_POS_MIN or rp > ENTRY_RANGE_POS_MAX): i += 4 continue # Detect range from 1H lookback lb_start = max(0, i - RANGE_LOOKBACK_H) window = df_1h.iloc[lb_start:i] range_high = window['high'].max() range_low = window['low'].min() range_pct = ((range_high - range_low) / range_low) * 100 if range_pct < MIN_RANGE_PCT or range_pct > MAX_RANGE_PCT: i += 4 continue # RUN SESSION — grid on 1m, monitor on tf result = run_session(df_1m, df_mon, ts, range_high, range_low, check_min) if result: result['symbol'] = sym result['sw_score'] = round(score, 1) result['adx_entry'] = round(adx, 1) result['range_pos'] = round(rp, 2) result['ts'] = str(ts) result['monitor_tf'] = tf results[tf].append(result) # Cooldown: skip duration + 2h skip_hours = max(int(result['duration_h']), 2) + 2 cooldown_until = i + skip_hours else: cooldown_until = i + 2 i += 4 print("done") # ============================================================ # REPORTS # ============================================================ print("\n" + "=" * 75) print(" 🏆 РЕЗУЛЬТАТЫ") print("=" * 75) for tf in ['15m', '1h']: sess = results[tf] if not sess: print(f"\n {tf.upper()}: нет сессий") continue total_pnl = sum(s['pnl'] for s in sess) wins = [s for s in sess if s['pnl'] > 0] losses = [s for s in sess if s['pnl'] <= 0] wr = 100 * len(wins) / len(sess) rts = sum(s['round_trips'] for s in sess) print(f"\n {'='*65}") print(f" ⏰ MONITOR: {tf.upper()} (check every {15 if tf=='15m' else 60}min)") print(f" {'='*65}") print(f" Sessions: {len(sess)} | WR: {wr:.0f}% ({len(wins)}W / {len(losses)}L)") print(f" Total PnL: ${total_pnl:.2f}") print(f" Per day: ${total_pnl/DAYS_BACK:.2f} | Per week: ${total_pnl/DAYS_BACK*7:.2f} | Per month: ${total_pnl/DAYS_BACK*30:.2f}") print(f" Round-trips: {rts} | Total trades: {sum(s['trades'] for s in sess)}") if wins: print(f" Avg win: ${sum(s['pnl'] for s in wins)/len(wins):.3f} | Best: ${max(s['pnl'] for s in wins):.3f}") if losses: print(f" Avg loss: ${sum(s['pnl'] for s in losses)/len(losses):.3f} | Worst: ${min(s['pnl'] for s in losses):.3f}") gw = sum(s['pnl'] for s in wins) if wins else 0 gl = abs(sum(s['pnl'] for s in losses)) if losses else 1 print(f" Profit factor: {gw/gl:.2f}" if gl > 0 else " Profit factor: ∞") avg_dur = np.mean([s['duration_h'] for s in sess]) avg_rng = np.mean([s['range_pct'] for s in sess]) avg_spc = np.mean([s['spacing_pct'] for s in sess]) print(f" Avg duration: {avg_dur:.1f}h | Avg range: {avg_rng:.1f}% | Avg spacing: {avg_spc:.3f}%") # Equity curve equity = [0] for s in sess: equity.append(equity[-1] + s['pnl']) eq = np.array(equity) peak = np.maximum.accumulate(eq) dd = eq - peak print(f" Max drawdown: ${dd.min():.3f}") # Close reasons reasons = {} for s in sess: # Normalize reason r = s['close_reason'] if r.startswith('adx_'): r = 'adx_exit' elif r.startswith('bb_'): r = 'bb_exit' elif r.startswith('score_'): r = 'score_exit' reasons[r] = reasons.get(r, 0) + 1 print(f"\n Close reasons:") for r, cnt in sorted(reasons.items(), key=lambda x: -x[1]): sub = [s for s in sess if s['close_reason'].startswith(r.split('_')[0])] tp = sum(s['pnl'] for s in sub) avg = tp / len(sub) if sub else 0 print(f" {r:<18} {cnt:>4} sess | ${tp:>8.2f} total | ${avg:>7.3f} avg") # By range size print(f"\n By range size:") print(f" {'Range':<10} {'Sess':>5} {'WR':>6} {'PnL':>10} {'Avg':>9} {'RTs':>5} {'AvgDur':>7}") print(f" {'─'*54}") for lo, hi in [(3, 5), (5, 8), (8, 12), (12, 20)]: sub = [s for s in sess if lo <= s['range_pct'] < hi] if not sub: continue tp = sum(s['pnl'] for s in sub) w = len([s for s in sub if s['pnl'] > 0]) wr2 = 100 * w / len(sub) rt = sum(s['round_trips'] for s in sub) ad = np.mean([s['duration_h'] for s in sub]) e = '🟢' if tp > 0 else '🔴' print(f" {e} {lo}-{hi}%{'':<5} {len(sub):>5} {wr2:>5.0f}% ${tp:>8.2f} ${tp/len(sub):>7.3f} {rt:>5} {ad:>5.1f}h") # Per symbol print(f"\n Per symbol:") print(f" {'Symbol':<12} {'Sess':>5} {'WR':>6} {'PnL':>10} {'RTs':>5} {'AvgDur':>7}") print(f" {'─'*47}") syms = sorted(set(s['symbol'] for s in sess)) for sym in syms: sub = [s for s in sess if s['symbol'] == sym] tp = sum(s['pnl'] for s in sub) w = len([s for s in sub if s['pnl'] > 0]) wr2 = 100 * w / len(sub) rt = sum(s['round_trips'] for s in sub) ad = np.mean([s['duration_h'] for s in sub]) e = '🟢' if tp > 0 else '🔴' print(f" {e} {sym:<10} {len(sub):>5} {wr2:>5.0f}% ${tp:>8.2f} {rt:>5} {ad:>5.1f}h") # HEAD TO HEAD print(f"\n{'='*75}") print(f" ⚡ HEAD-TO-HEAD: 15m vs 1H monitor") print(f"{'='*75}") print(f" {'Metric':<22} {'15m':>15} {'1H':>15}") print(f" {'─'*52}") for tf in ['15m', '1h']: sess = results[tf] if not sess: continue tp = sum(s['pnl'] for s in sess) w = len([s for s in sess if s['pnl'] > 0]) wr = 100 * w / len(sess) metrics = [] for tf in ['15m', '1h']: s = results[tf] if s: metrics.append({ 'sessions': len(s), 'wr': f"{100*len([x for x in s if x['pnl']>0])/len(s):.0f}%", 'pnl': f"${sum(x['pnl'] for x in s):.2f}", 'per_day': f"${sum(x['pnl'] for x in s)/DAYS_BACK:.2f}", 'avg_pnl': f"${sum(x['pnl'] for x in s)/len(s):.3f}", 'round_trips': sum(x['round_trips'] for x in s), 'avg_dur': f"{np.mean([x['duration_h'] for x in s]):.1f}h", }) else: metrics.append({k: '—' for k in ['sessions', 'wr', 'pnl', 'per_day', 'avg_pnl', 'round_trips', 'avg_dur']}) for key, label in [ ('sessions', 'Sessions'), ('wr', 'Win Rate'), ('pnl', 'Total PnL'), ('per_day', '$/day'), ('avg_pnl', 'Avg PnL/sess'), ('round_trips', 'Round-trips'), ('avg_dur', 'Avg Duration'), ]: v15 = str(metrics[0].get(key, '—')) v1h = str(metrics[1].get(key, '—')) print(f" {label:<22} {v15:>15} {v1h:>15}") # Top sessions across both all_sess = results['15m'] + results['1h'] if all_sess: print(f"\n 📈 Top 10 sessions (all):") top = sorted(all_sess, key=lambda x: x['pnl'], reverse=True)[:10] for s in top: e = '🟢' if s['pnl'] > 0 else '🔴' print(f" {e} {s['symbol']:<10} [{s['monitor_tf']}] {s['ts'][:13]} | rng {s['range_pct']}% | {s['duration_h']}h | {s['round_trips']}rt | ${s['pnl']:.3f} | {s['close_reason']}") # Save output = { 'config': { 'grid_levels': GRID_LEVELS, 'pos_usd': POS_USD, 'entry_score_min': ENTRY_SCORE_MIN, 'entry_adx_max': ENTRY_ADX_MAX, 'exit_adx': EXIT_ADX_THRESHOLD, 'exit_bb_mult': EXIT_BB_EXPANSION, 'exit_score_min': EXIT_SCORE_MIN, 'range_lookback_h': RANGE_LOOKBACK_H, }, 'results_15m': { 'count': len(results['15m']), 'pnl': round(sum(s['pnl'] for s in results['15m']), 4) if results['15m'] else 0, }, 'results_1h': { 'count': len(results['1h']), 'pnl': round(sum(s['pnl'] for s in results['1h']), 4) if results['1h'] else 0, }, 'sessions_15m': results['15m'], 'sessions_1h': results['1h'], 'tested_at': datetime.now().isoformat(), } out_path = Path(__file__).parent / 'results_range_grid_v2.json' with open(out_path, 'w') as f: json.dump(output, f, indent=2, default=str) print(f"\n💾 {out_path}")