β ΠΠ°Π·Π°Π΄"""
Range/Chop Finder β ΠΈΡΠ΅ΠΌ ΠΈΠ΄Π΅Π°Π»ΡΠ½ΡΠ΅ ΠΏΠ°ΡΠ°ΠΌΠ΅ΡΡΡ Π΄Π»Ρ ΠΎΠΏΡΠ΅Π΄Π΅Π»Π΅Π½ΠΈΡ Π±ΠΎΠΊΠΎΠ²ΠΈΠΊΠ°
Π’Π΅ΡΡΠΈΡΡΠ΅ΠΌ Π½Π° 1h Π΄Π°Π½Π½ΡΡ
, ΡΠ°Π·Π½ΡΠ΅ ΠΊΠΎΠΌΠ±ΠΈΠ½Π°ΡΠΈΠΈ ΠΈΠ½Π΄ΠΈΠΊΠ°ΡΠΎΡΠΎΠ².
ΠΠ½Π΄ΠΈΠΊΠ°ΡΠΎΡΡ Π΄Π»Ρ ΠΎΠΏΡΠ΅Π΄Π΅Π»Π΅Π½ΠΈΡ Π±ΠΎΠΊΠΎΠ²ΠΈΠΊΠ°:
1. ADX β ΡΠΈΠ»Π° ΡΡΠ΅Π½Π΄Π° (Π½ΠΈΠΆΠ΅ = Π±ΠΎΠ»Π΅Π΅ Π±ΠΎΠΊΠΎΠ²ΠΈΠΊ)
2. BB Width β ΡΠΈΡΠΈΠ½Π° ΠΏΠΎΠ»ΠΎΡ (ΡΠΆΠ΅ = ΠΊΠΎΠ½ΡΠΎΠ»ΠΈΠ΄Π°ΡΠΈΡ)
3. Choppiness Index β 0-100, >61.8 = ΡΠΎΠΏ/ΠΏΠΈΠ»Π°
4. Price Position β Π³Π΄Π΅ ΡΠ΅Π½Π° Π²Π½ΡΡΡΠΈ range (0.5 = ΡΠ΅ΡΠ΅Π΄ΠΈΠ½Π° = ΠΈΠ΄Π΅Π°Π»)
5. ATR Ratio β ΡΠ΅ΠΊΡΡΠΈΠΉ ATR / ATR Π·Π° 48h (ΠΏΠ°Π΄Π°Π΅Ρ = Π·Π°ΡΡΡ
Π°Π½ΠΈΠ΅)
6. MA Cross Count β ΡΠΊΠΎΠ»ΡΠΊΠΎ ΡΠ°Π· ΡΠ΅Π½Π° ΠΏΠ΅ΡΠ΅ΡΠ΅ΠΊΠ»Π° SMA Π·Π° N ΡΠ²Π΅ΡΠ΅ΠΉ (ΠΌΠ½ΠΎΠ³ΠΎ = ΠΏΠΈΠ»Π°)
Π¦Π΅Π»Ρ: Π½Π°ΠΉΡΠΈ ΠΊΠΎΠΌΠ±ΠΈΠ½Π°ΡΠΈΡ ΠΏΡΠΈ ΠΊΠΎΡΠΎΡΠΎΠΉ ΠΠ‘Π ΠΌΠΎΠ½Π΅ΡΡ Π² ΠΏΠ»ΡΡΠ΅, Π½Π΅ ΡΠΎΠ»ΡΠΊΠΎ ΠΌΠ΅ΠΌΡ.
"""
import requests
import pandas as pd
import numpy as np
import time
import json
from datetime import datetime
from pathlib import Path
# ============================================================
# CONFIG
# ============================================================
SYMBOLS = ["DOGEUSDT", "ETHUSDT", "1000PEPEUSDT", "XRPUSDT", "SOLUSDT", "AVAXUSDT", "DOTUSDT", "UNIUSDT", "TRXUSDT"]
INTERVAL = "1m"
DAYS_BACK = 30
DEPOSIT = 50.0
LEVERAGE = 10
ORDER_SIZE_USD = 5.0
FEE_PCT = 0.02 / 100
GRID_COUNT = 10
LOOKBACK_H = 48
ADX_EXIT = 30
BREAKOUT_CONFIRM = 5
def fetch_klines(symbol, interval, days_back):
url = "https://fapi.binance.com/fapi/v1/klines"
end_ts = int(time.time() * 1000)
start_ts = int((time.time() - days_back * 86400) * 1000)
all_candles = []
current_start = start_ts
while current_start < end_ts:
params = {"symbol": symbol, "interval": interval, "startTime": current_start, "limit": 1500}
try:
resp = requests.get(url, params=params, timeout=10)
data = resp.json()
if not isinstance(data, list) or len(data) == 0:
break
all_candles.extend(data)
current_start = data[-1][0] + 1
time.sleep(0.1)
except:
time.sleep(1)
continue
df = pd.DataFrame(all_candles, columns=[
'timestamp', 'open', 'high', 'low', 'close', 'volume',
'close_time', 'quote_volume', 'trades', 'taker_buy_base', 'taker_buy_quote', 'ignore'
])
for col in ['open', 'high', 'low', 'close', 'volume']:
df[col] = df[col].astype(float)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df = df.drop_duplicates(subset='timestamp').sort_values('timestamp').reset_index(drop=True)
return df
def add_indicators(df):
"""All indicators computed on 1m data with 1h-equivalent windows"""
p = 60 # 1h = 60 candles of 1m
# === BB Width (1h equiv) ===
df['bb_mid'] = df['close'].rolling(p).mean()
df['bb_std'] = df['close'].rolling(p).std()
df['bb_upper'] = df['bb_mid'] + 2.0 * df['bb_std']
df['bb_lower'] = df['bb_mid'] - 2.0 * df['bb_std']
df['bb_width'] = ((df['bb_upper'] - df['bb_lower']) / df['bb_mid']) * 100
# === ADX (smoothed for 1h) ===
high, low, close = df['high'], df['low'], df['close']
plus_dm = high.diff()
minus_dm = -low.diff()
plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0.0)
minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0.0)
tr1 = high - low
tr2 = (high - close.shift(1)).abs()
tr3 = (low - close.shift(1)).abs()
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
ap = 56 # ADX period for 1m (14 * 4)
atr = tr.ewm(alpha=1/ap, min_periods=ap).mean()
plus_di = 100 * (plus_dm.ewm(alpha=1/ap, min_periods=ap).mean() / atr)
minus_di = 100 * (minus_dm.ewm(alpha=1/ap, min_periods=ap).mean() / atr)
dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-10)
df['adx'] = dx.ewm(alpha=1/ap, min_periods=ap).mean()
# === Choppiness Index (14h = 840 candles of 1m) ===
ci_period = 14 * 60 # 14h
atr14 = tr.rolling(1).sum() # single candle TR
sum_atr = atr14.rolling(ci_period).sum()
highest = high.rolling(ci_period).max()
lowest = low.rolling(ci_period).min()
ci_range = highest - lowest
df['chop'] = 100 * np.log10(sum_atr / (ci_range + 1e-10)) / np.log10(ci_period)
# === Price Position in range (0 = bottom, 1 = top, 0.5 = middle) ===
lb = LOOKBACK_H * 60
df['range_high'] = df['high'].rolling(lb).max()
df['range_low'] = df['low'].rolling(lb).min()
range_size = df['range_high'] - df['range_low']
df['price_pos'] = (df['close'] - df['range_low']) / (range_size + 1e-10)
df['range_pct'] = (range_size / df['close']) * 100
# === ATR Ratio (current 2h ATR / 48h ATR) β falling = consolidation ===
atr_short = tr.rolling(120).mean() # 2h
atr_long = tr.rolling(lb).mean() # 48h
df['atr_ratio'] = atr_short / (atr_long + 1e-10)
# === MA Cross Count β how many times price crossed SMA in last 4h ===
sma = df['close'].rolling(p).mean()
cross = ((df['close'] > sma) != (df['close'].shift(1) > sma)).astype(int)
df['ma_crosses'] = cross.rolling(240).sum() # 4h window
# === RSI (14h equiv) ===
delta = df['close'].diff()
gain = delta.where(delta > 0, 0.0).ewm(alpha=1/ap, min_periods=ap).mean()
loss = (-delta.where(delta < 0, 0.0)).ewm(alpha=1/ap, min_periods=ap).mean()
rs = gain / (loss + 1e-10)
df['rsi'] = 100 - (100 / (1 + rs))
return df
def run_grid_with_params(df, entry_params, breakout_pct=1.0):
"""
Grid engine with configurable entry conditions.
entry_params dict with thresholds for each indicator.
"""
adx_max = entry_params.get('adx_max', 25)
bb_max = entry_params.get('bb_max', 3.0)
chop_min = entry_params.get('chop_min', 0) # higher = more choppy
price_pos_min = entry_params.get('pp_min', 0.0) # 0.3 = not at bottom
price_pos_max = entry_params.get('pp_max', 1.0) # 0.7 = not at top
atr_ratio_max = entry_params.get('atr_ratio_max', 999) # <0.8 = cooling down
ma_crosses_min = entry_params.get('ma_crosses_min', 0) # >5 = choppy
results = {
'sessions': [],
'total_pnl': 0,
'total_rts': 0,
'total_fees': 0,
'max_drawdown': 0,
}
warmup = LOOKBACK_H * 60 + 200
i = warmup
cooldown_until = 0
equity = DEPOSIT
while i < len(df):
if i < cooldown_until:
i += 1
continue
row = df.iloc[i]
# Check all entry conditions
checks = [
not pd.isna(row.get('adx', np.nan)),
row.get('adx', 99) < adx_max,
row.get('bb_width', 99) < bb_max,
row.get('chop', 0) > chop_min,
row.get('price_pos', -1) >= price_pos_min,
row.get('price_pos', 2) <= price_pos_max,
row.get('atr_ratio', 99) < atr_ratio_max,
row.get('ma_crosses', 0) >= ma_crosses_min,
]
if not all(checks):
i += 1
continue
# === ENTRY: ranging confirmed ===
price = row['close']
grid_upper = row['range_high']
grid_lower = row['range_low']
grid_range = grid_upper - grid_lower
if grid_range <= 0 or grid_range / price * 100 < 0.3:
i += 1
continue
step = grid_range / (GRID_COUNT + 1)
grid_levels = [grid_lower + step * (k + 1) for k in range(GRID_COUNT)]
buy_orders = {}
sell_orders = {}
for idx, lvl in enumerate(grid_levels):
if lvl < price:
buy_orders[idx] = lvl
else:
sell_orders[idx] = lvl
net_position = 0.0
avg_entry = 0.0
session_pnl = 0.0
session_fees = 0.0
session_rts = 0
session_start = i
start_equity = equity
min_equity = equity
outside_count = 0
filled_buys = set()
filled_sells = set()
session_active = True
close_reason = 'end_of_data'
j = i + 1
while j < len(df) and session_active:
candle_high = df['high'].iloc[j]
candle_low = df['low'].iloc[j]
price = df['close'].iloc[j]
curr_adx = df['adx'].iloc[j] if not pd.isna(df['adx'].iloc[j]) else 0
# Fill buys
for idx, lvl in list(buy_orders.items()):
if candle_low <= lvl and idx not in filled_buys:
qty = (ORDER_SIZE_USD * LEVERAGE) / lvl
session_fees += ORDER_SIZE_USD * LEVERAGE * FEE_PCT
if net_position >= 0:
total_cost = abs(net_position) * avg_entry + qty * lvl
net_position += qty
avg_entry = total_cost / net_position if net_position > 0 else 0
else:
if qty >= abs(net_position):
close_pnl = abs(net_position) * (avg_entry - lvl)
session_pnl += close_pnl
remaining = qty - abs(net_position)
net_position = remaining
avg_entry = lvl if remaining > 0 else 0
session_rts += 1
else:
session_pnl += qty * (avg_entry - lvl)
net_position += qty
filled_buys.add(idx)
next_sell = idx + 1
if next_sell < GRID_COUNT and next_sell not in sell_orders:
sell_orders[next_sell] = grid_levels[next_sell]
filled_sells.discard(next_sell)
# Fill sells
for idx, lvl in list(sell_orders.items()):
if candle_high >= lvl and idx not in filled_sells:
qty = (ORDER_SIZE_USD * LEVERAGE) / lvl
session_fees += ORDER_SIZE_USD * LEVERAGE * FEE_PCT
if net_position <= 0:
total_cost = abs(net_position) * avg_entry + qty * lvl
net_position -= qty
avg_entry = total_cost / abs(net_position) if net_position != 0 else 0
else:
if qty >= net_position:
close_pnl = net_position * (lvl - avg_entry)
session_pnl += close_pnl
remaining = qty - net_position
net_position = -remaining
avg_entry = lvl if remaining > 0 else 0
session_rts += 1
else:
session_pnl += qty * (lvl - avg_entry)
net_position -= qty
filled_sells.add(idx)
next_buy = idx - 1
if next_buy >= 0 and next_buy not in buy_orders:
buy_orders[next_buy] = grid_levels[next_buy]
filled_buys.discard(next_buy)
# Unrealized
if net_position > 0:
unrealized = net_position * (price - avg_entry)
elif net_position < 0:
unrealized = abs(net_position) * (avg_entry - price)
else:
unrealized = 0
current_equity = start_equity + session_pnl + unrealized - session_fees
min_equity = min(min_equity, current_equity)
# Exit: ADX
if curr_adx > ADX_EXIT:
close_reason = 'adx_breakout'
session_active = False
# Exit: price breakout
if price > grid_upper * (1 + breakout_pct / 100) or \
price < grid_lower * (1 - breakout_pct / 100):
outside_count += 1
if outside_count >= BREAKOUT_CONFIRM:
close_reason = 'price_breakout'
session_active = False
else:
outside_count = 0
# Exit: liquidation protect
if current_equity < DEPOSIT * 0.2:
close_reason = 'liquidation'
session_active = False
j += 1
# Close position
if net_position != 0:
final_price = df['close'].iloc[min(j, len(df) - 1)]
if net_position > 0:
session_pnl += net_position * (final_price - avg_entry)
else:
session_pnl += abs(net_position) * (avg_entry - final_price)
session_fees += abs(net_position) * final_price * FEE_PCT
net_session = session_pnl - session_fees
equity += net_session
drawdown = start_equity - min_equity
results['max_drawdown'] = max(results['max_drawdown'], drawdown)
if j - session_start > 1: # skip empty sessions
results['sessions'].append({
'pnl': round(net_session, 4),
'rts': session_rts,
'duration_h': round((j - session_start) / 60, 1),
'close_reason': close_reason,
'dd': round(drawdown, 2),
})
results['total_pnl'] += net_session
results['total_rts'] += session_rts
results['total_fees'] += session_fees
cooldown_until = j + 120
i = j + 1
return results
# ============================================================
# MAIN β Test parameter combinations
# ============================================================
if __name__ == "__main__":
# Parameter sets to test
PARAM_SETS = {
"A_baseline": {
'adx_max': 25, 'bb_max': 3.0,
'chop_min': 0, 'pp_min': 0.0, 'pp_max': 1.0,
'atr_ratio_max': 999, 'ma_crosses_min': 0,
},
"B_strict_adx": {
'adx_max': 20, 'bb_max': 3.0,
'chop_min': 0, 'pp_min': 0.0, 'pp_max': 1.0,
'atr_ratio_max': 999, 'ma_crosses_min': 0,
},
"C_chop_filter": {
'adx_max': 25, 'bb_max': 3.0,
'chop_min': 50, 'pp_min': 0.0, 'pp_max': 1.0,
'atr_ratio_max': 999, 'ma_crosses_min': 0,
},
"D_mid_price": {
'adx_max': 25, 'bb_max': 3.0,
'chop_min': 0, 'pp_min': 0.25, 'pp_max': 0.75,
'atr_ratio_max': 999, 'ma_crosses_min': 0,
},
"E_atr_cooling": {
'adx_max': 25, 'bb_max': 3.0,
'chop_min': 0, 'pp_min': 0.0, 'pp_max': 1.0,
'atr_ratio_max': 0.8, 'ma_crosses_min': 0,
},
"F_chop+mid": {
'adx_max': 25, 'bb_max': 3.0,
'chop_min': 50, 'pp_min': 0.25, 'pp_max': 0.75,
'atr_ratio_max': 999, 'ma_crosses_min': 0,
},
"G_full_combo": {
'adx_max': 20, 'bb_max': 2.5,
'chop_min': 50, 'pp_min': 0.3, 'pp_max': 0.7,
'atr_ratio_max': 0.85, 'ma_crosses_min': 3,
},
"H_choppy_saw": {
'adx_max': 22, 'bb_max': 3.0,
'chop_min': 55, 'pp_min': 0.3, 'pp_max': 0.7,
'atr_ratio_max': 999, 'ma_crosses_min': 5,
},
}
# Fetch all data first
print("=" * 80)
print(" RANGE FINDER β Parameter Optimization")
print("=" * 80)
all_data = {}
for symbol in SYMBOLS:
print(f"[fetch] {symbol}...")
df = fetch_klines(symbol, INTERVAL, DAYS_BACK)
df = add_indicators(df)
all_data[symbol] = df
print(f" β {len(df)} candles, chop range: {df['chop'].dropna().min():.1f}-{df['chop'].dropna().max():.1f}, "
f"ma_crosses: {df['ma_crosses'].dropna().min():.0f}-{df['ma_crosses'].dropna().max():.0f}")
# Run all combinations
results_table = {}
for pname, params in PARAM_SETS.items():
print(f"\n{'='*80}")
print(f" Testing: {pname}")
print(f" Params: ADX<{params['adx_max']} BB<{params['bb_max']}% Chop>{params['chop_min']} "
f"PP:{params['pp_min']}-{params['pp_max']} ATR_r<{params['atr_ratio_max']} MA_x>={params['ma_crosses_min']}")
print(f"{'='*80}")
row = {}
for symbol in SYMBOLS:
df = all_data[symbol]
result = run_grid_with_params(df, params, breakout_pct=1.0)
sessions = result['sessions']
wins = len([s for s in sessions if s['pnl'] > 0])
total = len(sessions)
wr = 100 * wins / max(total, 1)
roi = result['total_pnl'] / DEPOSIT * 100
row[symbol] = {
'pnl': round(result['total_pnl'], 2),
'sessions': total,
'rts': result['total_rts'],
'wr': round(wr, 0),
'max_dd': round(result['max_drawdown'], 2),
'roi': round(roi, 1),
}
emoji = 'π’' if result['total_pnl'] > 0 else 'π΄'
short = symbol.replace('USDT', '').replace('1000', '')
print(f" {emoji} {short:<6} {total:>3} sess {result['total_rts']:>3} RTs ${result['total_pnl']:>7.2f} ({roi:>+5.1f}%) WR:{wr:>3.0f}% DD:${result['max_drawdown']:>5.2f}")
results_table[pname] = row
# === FINAL SUMMARY ===
print(f"\n\n{'='*120}")
print(f" FINAL SUMMARY β Avg ROI by parameter set")
print(f"{'='*120}")
header = f" {'Params':<16}"
for sym in SYMBOLS:
short = sym.replace('USDT', '').replace('1000', '')
header += f" {short:>7}"
header += f" {'AVG':>8} {'#Plus':>6}"
print(header)
print(f" {'β'*110}")
best_name = None
best_avg = -999
for pname, row in results_table.items():
line = f" {pname:<16}"
rois = []
plus_count = 0
for sym in SYMBOLS:
r = row[sym]
roi = r['roi']
rois.append(roi)
if roi > 0:
plus_count += 1
line += f" {roi:>+6.1f}%"
avg = np.mean(rois)
line += f" {avg:>+7.1f}% {plus_count:>4}/{len(SYMBOLS)}"
print(line)
if avg > best_avg:
best_avg = avg
best_name = pname
print(f" {'β'*110}")
print(f" π Best: {best_name} (avg ROI: {best_avg:+.1f}%)")
print(f"{'='*120}")
# Save
out = {'params': PARAM_SETS, 'results': {}}
for pname, row in results_table.items():
out['results'][pname] = row
with open(Path(__file__).parent / 'results_range_finder.json', 'w') as f:
json.dump(out, f, indent=2)
print(f"\nπΎ Saved to results_range_finder.json")