← Назад"""
Optimal Grid Backtest — Fixed 0.3% spacing + Sideways filter + NATR filter
Лучшие параметры из предыдущих тестов:
- Spacing: 0.3% fixed
- Sideways score >= 45 (15m screener)
- NATR(5m) 0.3-0.5% (sweet spot)
Сравниваем комбинации:
A) 0.3% + score>=45 (без NATR фильтра) — baseline filtered
B) 0.3% + score>=45 + NATR 0.25-0.55% — широкий NATR
C) 0.3% + score>=45 + NATR 0.30-0.50% — узкий NATR
D) 0.3% + NATR 0.30-0.50% (без score) — только NATR
E) 0.3% + score>=45 + NATR 0.25-0.55% + range_pos 0.25-0.75 — full combo
Usage: python3 backtest_optimal_grid.py
"""
import requests
import pandas as pd
import numpy as np
import time
import json
from datetime import datetime
from pathlib import Path
# ============================================================
# CONFIG
# ============================================================
SYMBOLS = [
"ETHUSDT", "DOGEUSDT", "PENGUUSDT", "ENAUSDT",
"NEARUSDT", "WLDUSDT", "SOLUSDT", "ARBUSDT",
"XRPUSDT", "LINKUSDT", "SUIUSDT", "OPUSDT",
"ADAUSDT", "UNIUSDT", "AVAXUSDT",
]
DAYS_BACK = 14
LEVERAGE = 10
POSITION_SIZE_USD = 3.0
FEE_PCT = 0.02 / 100
GRID_LEVELS = 8
GRID_SPACING_PCT = 0.3 # optimal from prev test
MAX_LOSS_PCT = 3.0
SESSION_CANDLES = 60
SESSION_COOLDOWN = 5
BB_PERIOD = 20
BB_STD = 2.0
ADX_PERIOD = 14
NATR_PERIOD = 14
RANGE_LOOKBACK = 24
# Filter combos
COMBOS = [
{
'name': 'A: score>=45',
'score_min': 45, 'natr_min': 0, 'natr_max': 99, 'rp_min': 0, 'rp_max': 1,
},
{
'name': 'B: score>=45 + NATR .25-.55',
'score_min': 45, 'natr_min': 0.25, 'natr_max': 0.55, 'rp_min': 0, 'rp_max': 1,
},
{
'name': 'C: score>=45 + NATR .30-.50',
'score_min': 45, 'natr_min': 0.30, 'natr_max': 0.50, 'rp_min': 0, 'rp_max': 1,
},
{
'name': 'D: NATR .30-.50 only',
'score_min': 0, 'natr_min': 0.30, 'natr_max': 0.50, 'rp_min': 0, 'rp_max': 1,
},
{
'name': 'E: full combo',
'score_min': 45, 'natr_min': 0.25, 'natr_max': 0.55, 'rp_min': 0.25, 'rp_max': 0.75,
},
{
'name': 'F: score>=55 + NATR .25-.55',
'score_min': 55, 'natr_min': 0.25, 'natr_max': 0.55, 'rp_min': 0, 'rp_max': 1,
},
{
'name': 'BASELINE: no filter',
'score_min': 0, 'natr_min': 0, 'natr_max': 99, 'rp_min': 0, 'rp_max': 1,
},
]
def fetch_klines(symbol, interval, days_back):
url = "https://fapi.binance.com/fapi/v1/klines"
end_ts = int(time.time() * 1000)
start_ts = int((time.time() - days_back * 86400) * 1000)
all_candles = []
current_start = start_ts
while current_start < end_ts:
params = {"symbol": symbol, "interval": interval,
"startTime": current_start, "limit": 1500}
try:
resp = requests.get(url, params=params, timeout=10)
data = resp.json()
if not isinstance(data, list) or len(data) == 0:
break
all_candles.extend(data)
current_start = data[-1][0] + 1
time.sleep(0.08)
except Exception as e:
time.sleep(1)
continue
df = pd.DataFrame(all_candles, columns=[
'timestamp', 'open', 'high', 'low', 'close', 'volume',
'close_time', 'quote_volume', 'trades', 'taker_buy_base',
'taker_buy_quote', 'ignore'
])
for col in ['open', 'high', 'low', 'close', 'volume', 'quote_volume']:
df[col] = df[col].astype(float)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df = df.drop_duplicates(subset='timestamp').sort_values('timestamp').reset_index(drop=True)
return df
def calc_natr_5m(df_1m):
df = df_1m.set_index('timestamp').resample('5min').agg({
'open': 'first', 'high': 'max', 'low': 'min',
'close': 'last', 'volume': 'sum'
}).dropna().reset_index()
h, l, c = df['high'], df['low'], df['close']
tr = pd.concat([h - l, (h - c.shift(1)).abs(), (l - c.shift(1)).abs()], axis=1).max(axis=1)
atr = tr.ewm(alpha=1/NATR_PERIOD, min_periods=NATR_PERIOD).mean()
df['natr_5m'] = (atr / c) * 100
return df[['timestamp', 'natr_5m']].dropna()
def calc_screener(df):
df['bb_mid'] = df['close'].rolling(BB_PERIOD).mean()
df['bb_std'] = df['close'].rolling(BB_PERIOD).std()
df['bb_upper'] = df['bb_mid'] + BB_STD * df['bb_std']
df['bb_lower'] = df['bb_mid'] - BB_STD * df['bb_std']
df['bb_width'] = ((df['bb_upper'] - df['bb_lower']) / df['bb_mid']) * 100
h, l, c = df['high'], df['low'], df['close']
plus_dm = h.diff()
minus_dm = -l.diff()
plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0.0)
minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0.0)
tr = pd.concat([h - l, (h - c.shift(1)).abs(), (l - c.shift(1)).abs()], axis=1).max(axis=1)
atr = tr.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean()
plus_di = 100 * (plus_dm.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean() / atr)
minus_di = 100 * (minus_dm.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean() / atr)
dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-10)
df['adx'] = dx.ewm(alpha=1/ADX_PERIOD, min_periods=ADX_PERIOD).mean()
df['natr'] = (atr / c) * 100
df['range_high'] = df['high'].rolling(RANGE_LOOKBACK).max()
df['range_low'] = df['low'].rolling(RANGE_LOOKBACK).min()
rng = df['range_high'] - df['range_low']
df['range_pos'] = (df['close'] - df['range_low']) / rng.replace(0, np.nan)
df['candle_dir'] = np.where(df['close'] > df['open'], 1, -1)
df['dir_change'] = (df['candle_dir'] != df['candle_dir'].shift(1)).astype(int)
df['dir_changes'] = df['dir_change'].rolling(RANGE_LOOKBACK).sum()
return df
def sideways_score(row):
score = 0
adx = row.get('adx', np.nan)
bb_w = row.get('bb_width', np.nan)
rp = row.get('range_pos', np.nan)
dc = row.get('dir_changes', np.nan)
natr = row.get('natr', np.nan)
if any(pd.isna(x) for x in [adx, bb_w, rp, dc, natr]):
return 0
if adx <= 5: score += 25
elif adx <= 20: score += 25 * (1 - (adx - 5) / 15)
elif adx <= 30: score += max(0, -5 * (adx - 20) / 10)
else: score -= 10
if 1.5 <= bb_w <= 4.0:
score += 20 * (bb_w - 1.5) / 1.0 if bb_w <= 2.5 else 20 * (4.0 - bb_w) / 1.5
elif 0.5 <= bb_w < 1.5:
score += 5
if 0.3 <= rp <= 0.7:
score += 20 * (1 - abs(rp - 0.5) / 0.2)
elif 0.2 <= rp < 0.3 or 0.7 < rp <= 0.8:
score += 5
max_ch = RANGE_LOOKBACK * 0.7
if dc >= 8:
score += min(20, 20 * (dc - 8) / (max_ch - 8))
if 0.15 <= natr <= 0.6:
score += 15 * (natr - 0.15) / 0.15 if natr <= 0.3 else 15 * (0.6 - natr) / 0.3
elif 0.1 <= natr < 0.15:
score += 3
return max(0, round(score, 1))
def run_grid_session(df_1m, start_idx):
if start_idx + 5 >= len(df_1m):
return None
end_idx = min(start_idx + SESSION_CANDLES, len(df_1m) - 1)
mid_price = df_1m['close'].iloc[start_idx]
buy_levels = [mid_price * (1 - lvl * GRID_SPACING_PCT / 100) for lvl in range(1, GRID_LEVELS + 1)]
sell_levels = [mid_price * (1 + lvl * GRID_SPACING_PCT / 100) for lvl in range(1, GRID_LEVELS + 1)]
buy_fills = [False] * GRID_LEVELS
sell_fills = [False] * GRID_LEVELS
positions = []
pnl = 0.0
fees = 0.0
trades = 0
round_trips = 0
max_capital = GRID_LEVELS * 2 * POSITION_SIZE_USD
max_loss = max_capital * MAX_LOSS_PCT / 100
close_reason = 'timeout'
for j in range(start_idx + 1, end_idx):
price = df_1m['close'].iloc[j]
lo = df_1m['low'].iloc[j]
hi = df_1m['high'].iloc[j]
for lvl in range(GRID_LEVELS):
if not buy_fills[lvl] and lo <= buy_levels[lvl]:
buy_fills[lvl] = True
positions.append(('long', buy_levels[lvl]))
fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT
trades += 1
for lvl in range(GRID_LEVELS):
if not sell_fills[lvl] and hi >= sell_levels[lvl]:
sell_fills[lvl] = True
positions.append(('short', sell_levels[lvl]))
fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT
trades += 1
for lvl in range(GRID_LEVELS):
if buy_fills[lvl] and sell_fills[lvl]:
spread = sell_levels[lvl] - buy_levels[lvl]
qty = (POSITION_SIZE_USD * LEVERAGE) / buy_levels[lvl]
pnl += qty * spread
fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT * 2
buy_fills[lvl] = False
sell_fills[lvl] = False
round_trips += 1
positions = [p for p in positions if not (p[0] == 'long' and abs(p[1] - buy_levels[lvl]) < 1e-10)]
positions = [p for p in positions if not (p[0] == 'short' and abs(p[1] - sell_levels[lvl]) < 1e-10)]
unrealized = 0
for side, entry_px in positions:
qty = (POSITION_SIZE_USD * LEVERAGE) / entry_px
if side == 'long':
unrealized += qty * (price - entry_px)
else:
unrealized += qty * (entry_px - price)
if pnl + unrealized - fees < -max_loss and positions:
for side, entry_px in positions:
qty = (POSITION_SIZE_USD * LEVERAGE) / entry_px
if side == 'long':
pnl += qty * (price - entry_px)
else:
pnl += qty * (entry_px - price)
fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT
positions = []
close_reason = 'max_loss'
break
if positions:
price = df_1m['close'].iloc[min(end_idx, len(df_1m) - 1)]
for side, entry_px in positions:
qty = (POSITION_SIZE_USD * LEVERAGE) / entry_px
if side == 'long':
pnl += qty * (price - entry_px)
else:
pnl += qty * (entry_px - price)
fees += POSITION_SIZE_USD * LEVERAGE * FEE_PCT
net = pnl - fees
if trades == 0:
return None
return {
'pnl': round(net, 4),
'trades': trades,
'round_trips': round_trips,
'fees': round(fees, 4),
'close_reason': close_reason,
}
# ============================================================
# MAIN
# ============================================================
if __name__ == "__main__":
print("=" * 75)
print(" OPTIMAL GRID — 0.3% spacing + filters")
print(f" {len(SYMBOLS)} coins | {DAYS_BACK}d | 10x | 8 lvl × $3 × 0.3%")
print("=" * 75)
# Per-combo results
combo_results = {c['name']: [] for c in COMBOS}
for sym in SYMBOLS:
print(f"\n 📊 {sym}...", end=" ", flush=True)
df_1m = fetch_klines(sym, '1m', DAYS_BACK)
if len(df_1m) < 1000:
print("skip")
continue
df_natr5 = calc_natr_5m(df_1m)
df_15m = fetch_klines(sym, '15m', DAYS_BACK)
df_15m = calc_screener(df_15m)
df_15m['sw_score'] = df_15m.apply(sideways_score, axis=1)
natr_lookup = df_natr5.set_index('timestamp')['natr_5m']
# Also get range_pos from 15m for combo E
rp_lookup = df_15m.set_index('timestamp')['range_pos']
i = 100
win_count = 0
while i + SESSION_CANDLES < len(df_1m):
ts = df_1m['timestamp'].iloc[i]
# Get NATR
nm = natr_lookup.index <= ts
if nm.sum() == 0:
i += SESSION_CANDLES + SESSION_COOLDOWN
continue
current_natr = natr_lookup.loc[nm].iloc[-1]
if pd.isna(current_natr):
i += SESSION_CANDLES + SESSION_COOLDOWN
continue
# Get score & range_pos
sm = df_15m['timestamp'] <= ts
if sm.sum() == 0:
i += SESSION_CANDLES + SESSION_COOLDOWN
continue
scr_row = df_15m.loc[sm].iloc[-1]
score = scr_row['sw_score']
range_pos = scr_row['range_pos'] if not pd.isna(scr_row['range_pos']) else 0.5
# Run grid once (same for all combos — same spacing)
result = run_grid_session(df_1m, i)
if result:
result['symbol'] = sym
result['natr_5m'] = round(current_natr, 3)
result['sw_score'] = round(score, 1)
result['range_pos'] = round(range_pos, 2)
result['ts'] = str(ts)
# Assign to matching combos
for c in COMBOS:
if (score >= c['score_min'] and
c['natr_min'] <= current_natr <= c['natr_max'] and
c['rp_min'] <= range_pos <= c['rp_max']):
combo_results[c['name']].append(result)
win_count += 1
i += SESSION_CANDLES + SESSION_COOLDOWN
print(f"{win_count} windows")
# ============================================================
# REPORT
# ============================================================
print("\n" + "=" * 75)
print(" 🏆 РЕЗУЛЬТАТЫ — СРАВНЕНИЕ КОМБИНАЦИЙ ФИЛЬТРОВ")
print("=" * 75)
print(f"\n {'Combo':<32} {'Sess':>5} {'WR':>6} {'PnL':>10} {'Avg':>9} {'$/day':>7} {'RTs':>5} {'ML':>4}")
print(f" {'─'*78}")
best_name = None
best_avg = -999
for c in COMBOS:
name = c['name']
sess = combo_results[name]
if not sess:
print(f" {name:<32} {'—':>5}")
continue
total_pnl = sum(s['pnl'] for s in sess)
wins = len([s for s in sess if s['pnl'] > 0])
wr = 100 * wins / len(sess)
avg = total_pnl / len(sess)
rts = sum(s['round_trips'] for s in sess)
ml = len([s for s in sess if s['close_reason'] == 'max_loss'])
per_day = total_pnl / DAYS_BACK
emoji = '🟢' if total_pnl > 0 else '🔴'
print(f" {emoji} {name:<30} {len(sess):>5} {wr:>5.0f}% ${total_pnl:>8.2f} ${avg:>7.4f} ${per_day:>5.2f} {rts:>5} {ml:>4}")
if avg > best_avg and c['name'] != 'BASELINE: no filter':
best_avg = avg
best_name = name
# Detailed analysis of best combo
if best_name:
print(f"\n{'='*75}")
print(f" 🥇 ЛУЧШИЙ: {best_name}")
print(f"{'='*75}")
sess = combo_results[best_name]
total_pnl = sum(s['pnl'] for s in sess)
wins = [s for s in sess if s['pnl'] > 0]
losses = [s for s in sess if s['pnl'] <= 0]
print(f" Sessions: {len(sess)} | WR: {100*len(wins)/len(sess):.0f}%")
print(f" Total PnL: ${total_pnl:.4f} | Per day: ${total_pnl/DAYS_BACK:.2f}")
print(f" Per week: ${total_pnl/DAYS_BACK*7:.2f} | Per month (est): ${total_pnl/DAYS_BACK*30:.2f}")
if wins:
print(f" Avg win: ${sum(s['pnl'] for s in wins)/len(wins):.4f}")
if losses:
print(f" Avg loss: ${sum(s['pnl'] for s in losses)/len(losses):.4f}")
# Profit factor
gross_wins = sum(s['pnl'] for s in wins) if wins else 0
gross_losses = abs(sum(s['pnl'] for s in losses)) if losses else 1
pf = gross_wins / gross_losses if gross_losses > 0 else 0
print(f" Profit factor: {pf:.2f}")
# Equity curve stats
equity = [0]
for s in sess:
equity.append(equity[-1] + s['pnl'])
eq = np.array(equity)
peak = np.maximum.accumulate(eq)
dd = eq - peak
max_dd = dd.min()
print(f" Max drawdown: ${max_dd:.4f}")
print(f" Final equity: ${eq[-1]:.4f}")
# Per-symbol
print(f"\n Per-symbol:")
print(f" {'Symbol':<12} {'Sess':>5} {'WR':>6} {'PnL':>10} {'RTs':>5}")
print(f" {'─'*38}")
syms = sorted(set(s['symbol'] for s in sess))
for sym in syms:
sub = [s for s in sess if s['symbol'] == sym]
tp = sum(s['pnl'] for s in sub)
w = len([s for s in sub if s['pnl'] > 0])
wr = 100 * w / len(sub) if sub else 0
rt = sum(s['round_trips'] for s in sub)
e = '🟢' if tp > 0 else '🔴'
print(f" {e} {sym:<10} {len(sub):>5} {wr:>5.0f}% ${tp:>8.2f} {rt:>5}")
# Time-of-day analysis
print(f"\n By hour (UTC):")
hour_data = {}
for s in sess:
h = pd.Timestamp(s['ts']).hour
if h not in hour_data:
hour_data[h] = []
hour_data[h].append(s['pnl'])
print(f" {'Hour':>4} {'Sess':>5} {'WR':>6} {'PnL':>10}")
print(f" {'─'*27}")
for h in sorted(hour_data.keys()):
vals = hour_data[h]
tp = sum(vals)
w = len([v for v in vals if v > 0])
wr = 100 * w / len(vals) if vals else 0
e = '🟢' if tp > 0 else '⚪'
print(f" {e} {h:>3}h {len(vals):>5} {wr:>5.0f}% ${tp:>8.3f}")
# Save
output = {
'config': {
'symbols': SYMBOLS, 'days_back': DAYS_BACK,
'grid_levels': GRID_LEVELS, 'spacing': GRID_SPACING_PCT,
'position_size': POSITION_SIZE_USD, 'leverage': LEVERAGE,
},
'combos': {name: {
'sessions': len(combo_results[name]),
'total_pnl': round(sum(s['pnl'] for s in combo_results[name]), 4) if combo_results[name] else 0,
'win_rate': round(100 * len([s for s in combo_results[name] if s['pnl'] > 0]) / max(len(combo_results[name]), 1), 1),
} for name in combo_results},
'best_combo': best_name,
'tested_at': datetime.now().isoformat(),
}
out_path = Path(__file__).parent / 'results_optimal_grid.json'
with open(out_path, 'w') as f:
json.dump(output, f, indent=2)
print(f"\n💾 {out_path}")