← Назад
""" Range/Chop Finder β€” ΠΈΡ‰Π΅ΠΌ ΠΈΠ΄Π΅Π°Π»ΡŒΠ½Ρ‹Π΅ ΠΏΠ°Ρ€Π°ΠΌΠ΅Ρ‚Ρ€Ρ‹ для опрСдСлСния Π±ΠΎΠΊΠΎΠ²ΠΈΠΊΠ° ВСстируСм Π½Π° 1h Π΄Π°Π½Π½Ρ‹Ρ…, Ρ€Π°Π·Π½Ρ‹Π΅ ΠΊΠΎΠΌΠ±ΠΈΠ½Π°Ρ†ΠΈΠΈ ΠΈΠ½Π΄ΠΈΠΊΠ°Ρ‚ΠΎΡ€ΠΎΠ². Π˜Π½Π΄ΠΈΠΊΠ°Ρ‚ΠΎΡ€Ρ‹ для опрСдСлСния Π±ΠΎΠΊΠΎΠ²ΠΈΠΊΠ°: 1. ADX β€” сила Ρ‚Ρ€Π΅Π½Π΄Π° (Π½ΠΈΠΆΠ΅ = Π±ΠΎΠ»Π΅Π΅ Π±ΠΎΠΊΠΎΠ²ΠΈΠΊ) 2. BB Width β€” ΡˆΠΈΡ€ΠΈΠ½Π° полос (ΡƒΠΆΠ΅ = консолидация) 3. Choppiness Index β€” 0-100, >61.8 = Ρ‡ΠΎΠΏ/ΠΏΠΈΠ»Π° 4. Price Position β€” Π³Π΄Π΅ Ρ†Π΅Π½Π° Π²Π½ΡƒΡ‚Ρ€ΠΈ range (0.5 = сСрСдина = ΠΈΠ΄Π΅Π°Π») 5. ATR Ratio β€” Ρ‚Π΅ΠΊΡƒΡ‰ΠΈΠΉ ATR / ATR Π·Π° 48h (ΠΏΠ°Π΄Π°Π΅Ρ‚ = Π·Π°Ρ‚ΡƒΡ…Π°Π½ΠΈΠ΅) 6. MA Cross Count β€” сколько Ρ€Π°Π· Ρ†Π΅Π½Π° пСрСсСкла SMA Π·Π° N свСчСй (ΠΌΠ½ΠΎΠ³ΠΎ = ΠΏΠΈΠ»Π°) ЦСль: Π½Π°ΠΉΡ‚ΠΈ ΠΊΠΎΠΌΠ±ΠΈΠ½Π°Ρ†ΠΈΡŽ ΠΏΡ€ΠΈ ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠΉ Π’Π‘Π• ΠΌΠΎΠ½Π΅Ρ‚Ρ‹ Π² плюсС, Π½Π΅ Ρ‚ΠΎΠ»ΡŒΠΊΠΎ ΠΌΠ΅ΠΌΡ‹. """ import requests import pandas as pd import numpy as np import time import json from datetime import datetime from pathlib import Path # ============================================================ # CONFIG # ============================================================ SYMBOLS = ["DOGEUSDT", "ETHUSDT", "1000PEPEUSDT", "XRPUSDT", "SOLUSDT", "AVAXUSDT", "DOTUSDT", "UNIUSDT", "TRXUSDT"] INTERVAL = "1m" DAYS_BACK = 30 DEPOSIT = 50.0 LEVERAGE = 10 ORDER_SIZE_USD = 5.0 FEE_PCT = 0.02 / 100 GRID_COUNT = 10 LOOKBACK_H = 48 ADX_EXIT = 30 BREAKOUT_CONFIRM = 5 def fetch_klines(symbol, interval, days_back): url = "https://fapi.binance.com/fapi/v1/klines" end_ts = int(time.time() * 1000) start_ts = int((time.time() - days_back * 86400) * 1000) all_candles = [] current_start = start_ts while current_start < end_ts: params = {"symbol": symbol, "interval": interval, "startTime": current_start, "limit": 1500} try: resp = requests.get(url, params=params, timeout=10) data = resp.json() if not isinstance(data, list) or len(data) == 0: break all_candles.extend(data) current_start = data[-1][0] + 1 time.sleep(0.1) except: time.sleep(1) continue df = pd.DataFrame(all_candles, columns=[ 'timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_volume', 'trades', 'taker_buy_base', 'taker_buy_quote', 'ignore' ]) for col in ['open', 'high', 'low', 'close', 'volume']: df[col] = df[col].astype(float) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') df = df.drop_duplicates(subset='timestamp').sort_values('timestamp').reset_index(drop=True) return df def add_indicators(df): """All indicators computed on 1m data with 1h-equivalent windows""" p = 60 # 1h = 60 candles of 1m # === BB Width (1h equiv) === df['bb_mid'] = df['close'].rolling(p).mean() df['bb_std'] = df['close'].rolling(p).std() df['bb_upper'] = df['bb_mid'] + 2.0 * df['bb_std'] df['bb_lower'] = df['bb_mid'] - 2.0 * df['bb_std'] df['bb_width'] = ((df['bb_upper'] - df['bb_lower']) / df['bb_mid']) * 100 # === ADX (smoothed for 1h) === high, low, close = df['high'], df['low'], df['close'] plus_dm = high.diff() minus_dm = -low.diff() plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0.0) minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0.0) tr1 = high - low tr2 = (high - close.shift(1)).abs() tr3 = (low - close.shift(1)).abs() tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) ap = 56 # ADX period for 1m (14 * 4) atr = tr.ewm(alpha=1/ap, min_periods=ap).mean() plus_di = 100 * (plus_dm.ewm(alpha=1/ap, min_periods=ap).mean() / atr) minus_di = 100 * (minus_dm.ewm(alpha=1/ap, min_periods=ap).mean() / atr) dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-10) df['adx'] = dx.ewm(alpha=1/ap, min_periods=ap).mean() # === Choppiness Index (14h = 840 candles of 1m) === ci_period = 14 * 60 # 14h atr14 = tr.rolling(1).sum() # single candle TR sum_atr = atr14.rolling(ci_period).sum() highest = high.rolling(ci_period).max() lowest = low.rolling(ci_period).min() ci_range = highest - lowest df['chop'] = 100 * np.log10(sum_atr / (ci_range + 1e-10)) / np.log10(ci_period) # === Price Position in range (0 = bottom, 1 = top, 0.5 = middle) === lb = LOOKBACK_H * 60 df['range_high'] = df['high'].rolling(lb).max() df['range_low'] = df['low'].rolling(lb).min() range_size = df['range_high'] - df['range_low'] df['price_pos'] = (df['close'] - df['range_low']) / (range_size + 1e-10) df['range_pct'] = (range_size / df['close']) * 100 # === ATR Ratio (current 2h ATR / 48h ATR) β€” falling = consolidation === atr_short = tr.rolling(120).mean() # 2h atr_long = tr.rolling(lb).mean() # 48h df['atr_ratio'] = atr_short / (atr_long + 1e-10) # === MA Cross Count β€” how many times price crossed SMA in last 4h === sma = df['close'].rolling(p).mean() cross = ((df['close'] > sma) != (df['close'].shift(1) > sma)).astype(int) df['ma_crosses'] = cross.rolling(240).sum() # 4h window # === RSI (14h equiv) === delta = df['close'].diff() gain = delta.where(delta > 0, 0.0).ewm(alpha=1/ap, min_periods=ap).mean() loss = (-delta.where(delta < 0, 0.0)).ewm(alpha=1/ap, min_periods=ap).mean() rs = gain / (loss + 1e-10) df['rsi'] = 100 - (100 / (1 + rs)) return df def run_grid_with_params(df, entry_params, breakout_pct=1.0): """ Grid engine with configurable entry conditions. entry_params dict with thresholds for each indicator. """ adx_max = entry_params.get('adx_max', 25) bb_max = entry_params.get('bb_max', 3.0) chop_min = entry_params.get('chop_min', 0) # higher = more choppy price_pos_min = entry_params.get('pp_min', 0.0) # 0.3 = not at bottom price_pos_max = entry_params.get('pp_max', 1.0) # 0.7 = not at top atr_ratio_max = entry_params.get('atr_ratio_max', 999) # <0.8 = cooling down ma_crosses_min = entry_params.get('ma_crosses_min', 0) # >5 = choppy results = { 'sessions': [], 'total_pnl': 0, 'total_rts': 0, 'total_fees': 0, 'max_drawdown': 0, } warmup = LOOKBACK_H * 60 + 200 i = warmup cooldown_until = 0 equity = DEPOSIT while i < len(df): if i < cooldown_until: i += 1 continue row = df.iloc[i] # Check all entry conditions checks = [ not pd.isna(row.get('adx', np.nan)), row.get('adx', 99) < adx_max, row.get('bb_width', 99) < bb_max, row.get('chop', 0) > chop_min, row.get('price_pos', -1) >= price_pos_min, row.get('price_pos', 2) <= price_pos_max, row.get('atr_ratio', 99) < atr_ratio_max, row.get('ma_crosses', 0) >= ma_crosses_min, ] if not all(checks): i += 1 continue # === ENTRY: ranging confirmed === price = row['close'] grid_upper = row['range_high'] grid_lower = row['range_low'] grid_range = grid_upper - grid_lower if grid_range <= 0 or grid_range / price * 100 < 0.3: i += 1 continue step = grid_range / (GRID_COUNT + 1) grid_levels = [grid_lower + step * (k + 1) for k in range(GRID_COUNT)] buy_orders = {} sell_orders = {} for idx, lvl in enumerate(grid_levels): if lvl < price: buy_orders[idx] = lvl else: sell_orders[idx] = lvl net_position = 0.0 avg_entry = 0.0 session_pnl = 0.0 session_fees = 0.0 session_rts = 0 session_start = i start_equity = equity min_equity = equity outside_count = 0 filled_buys = set() filled_sells = set() session_active = True close_reason = 'end_of_data' j = i + 1 while j < len(df) and session_active: candle_high = df['high'].iloc[j] candle_low = df['low'].iloc[j] price = df['close'].iloc[j] curr_adx = df['adx'].iloc[j] if not pd.isna(df['adx'].iloc[j]) else 0 # Fill buys for idx, lvl in list(buy_orders.items()): if candle_low <= lvl and idx not in filled_buys: qty = (ORDER_SIZE_USD * LEVERAGE) / lvl session_fees += ORDER_SIZE_USD * LEVERAGE * FEE_PCT if net_position >= 0: total_cost = abs(net_position) * avg_entry + qty * lvl net_position += qty avg_entry = total_cost / net_position if net_position > 0 else 0 else: if qty >= abs(net_position): close_pnl = abs(net_position) * (avg_entry - lvl) session_pnl += close_pnl remaining = qty - abs(net_position) net_position = remaining avg_entry = lvl if remaining > 0 else 0 session_rts += 1 else: session_pnl += qty * (avg_entry - lvl) net_position += qty filled_buys.add(idx) next_sell = idx + 1 if next_sell < GRID_COUNT and next_sell not in sell_orders: sell_orders[next_sell] = grid_levels[next_sell] filled_sells.discard(next_sell) # Fill sells for idx, lvl in list(sell_orders.items()): if candle_high >= lvl and idx not in filled_sells: qty = (ORDER_SIZE_USD * LEVERAGE) / lvl session_fees += ORDER_SIZE_USD * LEVERAGE * FEE_PCT if net_position <= 0: total_cost = abs(net_position) * avg_entry + qty * lvl net_position -= qty avg_entry = total_cost / abs(net_position) if net_position != 0 else 0 else: if qty >= net_position: close_pnl = net_position * (lvl - avg_entry) session_pnl += close_pnl remaining = qty - net_position net_position = -remaining avg_entry = lvl if remaining > 0 else 0 session_rts += 1 else: session_pnl += qty * (lvl - avg_entry) net_position -= qty filled_sells.add(idx) next_buy = idx - 1 if next_buy >= 0 and next_buy not in buy_orders: buy_orders[next_buy] = grid_levels[next_buy] filled_buys.discard(next_buy) # Unrealized if net_position > 0: unrealized = net_position * (price - avg_entry) elif net_position < 0: unrealized = abs(net_position) * (avg_entry - price) else: unrealized = 0 current_equity = start_equity + session_pnl + unrealized - session_fees min_equity = min(min_equity, current_equity) # Exit: ADX if curr_adx > ADX_EXIT: close_reason = 'adx_breakout' session_active = False # Exit: price breakout if price > grid_upper * (1 + breakout_pct / 100) or \ price < grid_lower * (1 - breakout_pct / 100): outside_count += 1 if outside_count >= BREAKOUT_CONFIRM: close_reason = 'price_breakout' session_active = False else: outside_count = 0 # Exit: liquidation protect if current_equity < DEPOSIT * 0.2: close_reason = 'liquidation' session_active = False j += 1 # Close position if net_position != 0: final_price = df['close'].iloc[min(j, len(df) - 1)] if net_position > 0: session_pnl += net_position * (final_price - avg_entry) else: session_pnl += abs(net_position) * (avg_entry - final_price) session_fees += abs(net_position) * final_price * FEE_PCT net_session = session_pnl - session_fees equity += net_session drawdown = start_equity - min_equity results['max_drawdown'] = max(results['max_drawdown'], drawdown) if j - session_start > 1: # skip empty sessions results['sessions'].append({ 'pnl': round(net_session, 4), 'rts': session_rts, 'duration_h': round((j - session_start) / 60, 1), 'close_reason': close_reason, 'dd': round(drawdown, 2), }) results['total_pnl'] += net_session results['total_rts'] += session_rts results['total_fees'] += session_fees cooldown_until = j + 120 i = j + 1 return results # ============================================================ # MAIN β€” Test parameter combinations # ============================================================ if __name__ == "__main__": # Parameter sets to test PARAM_SETS = { "A_baseline": { 'adx_max': 25, 'bb_max': 3.0, 'chop_min': 0, 'pp_min': 0.0, 'pp_max': 1.0, 'atr_ratio_max': 999, 'ma_crosses_min': 0, }, "B_strict_adx": { 'adx_max': 20, 'bb_max': 3.0, 'chop_min': 0, 'pp_min': 0.0, 'pp_max': 1.0, 'atr_ratio_max': 999, 'ma_crosses_min': 0, }, "C_chop_filter": { 'adx_max': 25, 'bb_max': 3.0, 'chop_min': 50, 'pp_min': 0.0, 'pp_max': 1.0, 'atr_ratio_max': 999, 'ma_crosses_min': 0, }, "D_mid_price": { 'adx_max': 25, 'bb_max': 3.0, 'chop_min': 0, 'pp_min': 0.25, 'pp_max': 0.75, 'atr_ratio_max': 999, 'ma_crosses_min': 0, }, "E_atr_cooling": { 'adx_max': 25, 'bb_max': 3.0, 'chop_min': 0, 'pp_min': 0.0, 'pp_max': 1.0, 'atr_ratio_max': 0.8, 'ma_crosses_min': 0, }, "F_chop+mid": { 'adx_max': 25, 'bb_max': 3.0, 'chop_min': 50, 'pp_min': 0.25, 'pp_max': 0.75, 'atr_ratio_max': 999, 'ma_crosses_min': 0, }, "G_full_combo": { 'adx_max': 20, 'bb_max': 2.5, 'chop_min': 50, 'pp_min': 0.3, 'pp_max': 0.7, 'atr_ratio_max': 0.85, 'ma_crosses_min': 3, }, "H_choppy_saw": { 'adx_max': 22, 'bb_max': 3.0, 'chop_min': 55, 'pp_min': 0.3, 'pp_max': 0.7, 'atr_ratio_max': 999, 'ma_crosses_min': 5, }, } # Fetch all data first print("=" * 80) print(" RANGE FINDER β€” Parameter Optimization") print("=" * 80) all_data = {} for symbol in SYMBOLS: print(f"[fetch] {symbol}...") df = fetch_klines(symbol, INTERVAL, DAYS_BACK) df = add_indicators(df) all_data[symbol] = df print(f" β†’ {len(df)} candles, chop range: {df['chop'].dropna().min():.1f}-{df['chop'].dropna().max():.1f}, " f"ma_crosses: {df['ma_crosses'].dropna().min():.0f}-{df['ma_crosses'].dropna().max():.0f}") # Run all combinations results_table = {} for pname, params in PARAM_SETS.items(): print(f"\n{'='*80}") print(f" Testing: {pname}") print(f" Params: ADX<{params['adx_max']} BB<{params['bb_max']}% Chop>{params['chop_min']} " f"PP:{params['pp_min']}-{params['pp_max']} ATR_r<{params['atr_ratio_max']} MA_x>={params['ma_crosses_min']}") print(f"{'='*80}") row = {} for symbol in SYMBOLS: df = all_data[symbol] result = run_grid_with_params(df, params, breakout_pct=1.0) sessions = result['sessions'] wins = len([s for s in sessions if s['pnl'] > 0]) total = len(sessions) wr = 100 * wins / max(total, 1) roi = result['total_pnl'] / DEPOSIT * 100 row[symbol] = { 'pnl': round(result['total_pnl'], 2), 'sessions': total, 'rts': result['total_rts'], 'wr': round(wr, 0), 'max_dd': round(result['max_drawdown'], 2), 'roi': round(roi, 1), } emoji = '🟒' if result['total_pnl'] > 0 else 'πŸ”΄' short = symbol.replace('USDT', '').replace('1000', '') print(f" {emoji} {short:<6} {total:>3} sess {result['total_rts']:>3} RTs ${result['total_pnl']:>7.2f} ({roi:>+5.1f}%) WR:{wr:>3.0f}% DD:${result['max_drawdown']:>5.2f}") results_table[pname] = row # === FINAL SUMMARY === print(f"\n\n{'='*120}") print(f" FINAL SUMMARY β€” Avg ROI by parameter set") print(f"{'='*120}") header = f" {'Params':<16}" for sym in SYMBOLS: short = sym.replace('USDT', '').replace('1000', '') header += f" {short:>7}" header += f" {'AVG':>8} {'#Plus':>6}" print(header) print(f" {'─'*110}") best_name = None best_avg = -999 for pname, row in results_table.items(): line = f" {pname:<16}" rois = [] plus_count = 0 for sym in SYMBOLS: r = row[sym] roi = r['roi'] rois.append(roi) if roi > 0: plus_count += 1 line += f" {roi:>+6.1f}%" avg = np.mean(rois) line += f" {avg:>+7.1f}% {plus_count:>4}/{len(SYMBOLS)}" print(line) if avg > best_avg: best_avg = avg best_name = pname print(f" {'─'*110}") print(f" πŸ† Best: {best_name} (avg ROI: {best_avg:+.1f}%)") print(f"{'='*120}") # Save out = {'params': PARAM_SETS, 'results': {}} for pname, row in results_table.items(): out['results'][pname] = row with open(Path(__file__).parent / 'results_range_finder.json', 'w') as f: json.dump(out, f, indent=2) print(f"\nπŸ’Ύ Saved to results_range_finder.json")