โ ะะฐะทะฐะด"""
Hybrid Grid Backtest v4 โ Screener on 15m/1h, Grid on 1m
==========================================================
The correct approach:
- SCREENER runs on higher TF (15m or 1h) to detect range/breakout
- GRID SIMULATION runs on 1m data to capture all micro round-trips
This gives realistic RT counts since grids operate on tick/1m level.
Usage:
python backtest_hybrid_v4.py
"""
import ccxt
import pandas as pd
import numpy as np
import json
import time
from datetime import datetime, timedelta, timezone
# ============================================================
# COINS
# ============================================================
COINS = [
"ENA/USDT", "PENGU/USDT", "NEAR/USDT", "WLD/USDT", "UNI/USDT",
"VIRTUAL/USDT", "SOL/USDT", "AVAX/USDT", "1000PEPE/USDT",
"DOT/USDT", "ICP/USDT", "ETH/USDT", "TON/USDT",
"LINK/USDT", "DOGE/USDT", "XRP/USDT", "AAVE/USDT",
"FIL/USDT", "SUI/USDT", "OP/USDT", "ARB/USDT",
]
SCREENER_TFS = ["15m", "1h"]
GRID_TF = "1m" # grid always simulated on 1m
LOOKBACK_DAYS = 14 # 14 days (1m data is heavy)
# ============================================================
# GRID CONFIG (v3 settings)
# ============================================================
SPACING_MIN_PCT = 0.3
SPACING_MAX_PCT = 0.5
ATR_SPACING_MULT = 0.5
ATR_PERIOD = 14
GRID_LEVELS = 8
LEVERAGE = 5
POSITION_SIZE = 3.0
MAKER_FEE = 0.0002
TAKER_FEE = 0.0004
# Inventory management
INVENTORY_WARN_LEVELS = 3
INVENTORY_MAX_LEVELS = 5
UNSTUCK_THRESHOLD = 4
UNSTUCK_CLOSE_PCT = 0.25
# Trailing center
EMA_CENTER_PERIOD = 20
CENTER_UPDATE_MINS = 5 # update center every 5 min (5 x 1m bars)
# ============================================================
# SCREENER CONFIG
# ============================================================
BB_PERIOD = 20
BB_STD = 2.0
BB_WIDTH_MIN = 0.3
BB_WIDTH_MAX = 1.5
ADX_MAX_ENTRY = 25
ADX_PERIOD = 14
CHOP_PERIOD = 14
CHOP_MIN_ENTRY = 55
CHOP_MAX_EXIT = 40
BREAKOUT_BB_MULT = 1.5
BREAKOUT_ADX = 28
# Session
MIN_SESSION_MINS = {"15m": 150, "1h": 600} # min 2.5h for 15m, 10h for 1h
MAX_SESSION_MINS = {"15m": 2880, "1h": 4320} # max 2d for 15m, 3d for 1h
# Risk
MAX_LOSS_PER_SESSION = 5.0
DEPOSIT = 50.0
# ============================================================
# INDICATORS
# ============================================================
def fetch_klines(exchange, symbol, timeframe, days):
all_klines = []
since = exchange.parse8601(
(datetime.now(timezone.utc) - timedelta(days=days)).isoformat()
)
limit = 1000
while True:
try:
klines = exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=limit)
except Exception as e:
print(f" Error {symbol} {timeframe}: {e}")
return None
if not klines:
break
all_klines.extend(klines)
since = klines[-1][0] + 1
if len(klines) < limit:
break
time.sleep(0.12)
if not all_klines:
return None
df = pd.DataFrame(all_klines, columns=['ts', 'open', 'high', 'low', 'close', 'volume'])
df['ts'] = pd.to_datetime(df['ts'], unit='ms')
return df
def calc_atr(df, period=ATR_PERIOD):
tr = pd.concat([
df['high'] - df['low'],
(df['high'] - df['close'].shift(1)).abs(),
(df['low'] - df['close'].shift(1)).abs()
], axis=1).max(axis=1)
return tr.ewm(alpha=1/period, min_periods=period).mean()
def calc_bb_width(close, period=BB_PERIOD, std=BB_STD):
mid = close.rolling(period).mean()
s = close.rolling(period).std()
return ((mid + std * s) - (mid - std * s)) / mid * 100
def calc_adx(df, period=ADX_PERIOD):
h, l, c = df['high'], df['low'], df['close']
pdm = h.diff(); mdm = -l.diff()
pdm = pdm.where((pdm > mdm) & (pdm > 0), 0.0)
mdm = mdm.where((mdm > pdm) & (mdm > 0), 0.0)
tr = pd.concat([h-l, (h-c.shift(1)).abs(), (l-c.shift(1)).abs()], axis=1).max(axis=1)
atr = tr.ewm(alpha=1/period, min_periods=period).mean()
pdi = 100*(pdm.ewm(alpha=1/period, min_periods=period).mean()/atr)
mdi = 100*(mdm.ewm(alpha=1/period, min_periods=period).mean()/atr)
dx = 100*(pdi-mdi).abs()/(pdi+mdi+1e-10)
return dx.ewm(alpha=1/period, min_periods=period).mean()
def calc_chop(df, period=CHOP_PERIOD):
tr = pd.concat([
df['high']-df['low'],
(df['high']-df['close'].shift(1)).abs(),
(df['low']-df['close'].shift(1)).abs()
], axis=1).max(axis=1)
atr_sum = tr.rolling(period).sum()
hl = (df['high'].rolling(period).max() - df['low'].rolling(period).min()).replace(0, np.nan)
return 100 * np.log10(atr_sum / hl) / np.log10(period)
def calc_ema(series, period):
return series.ewm(span=period, adjust=False).mean()
def get_spacing(atr_val, price):
if price <= 0: return SPACING_MIN_PCT
raw = (atr_val / price) * 100 * ATR_SPACING_MULT
return np.clip(raw, SPACING_MIN_PCT, SPACING_MAX_PCT)
# ============================================================
# SCREENER โ finds session windows on higher TF
# ============================================================
def find_sessions(df_htf, screener_tf):
"""Find entry/exit timestamps on higher TF data."""
bw = calc_bb_width(df_htf['close'])
adx = calc_adx(df_htf)
chop = calc_chop(df_htf)
min_bars_htf = {"15m": 10, "1h": 10}[screener_tf]
max_bars_htf = {"15m": 192, "1h": 72}[screener_tf]
sessions = []
i = max(BB_PERIOD, ADX_PERIOD, CHOP_PERIOD) + 5
in_session = False
session_start_i = 0
while i < len(df_htf):
b, a, c = bw.iloc[i], adx.iloc[i], chop.iloc[i]
if pd.isna(b) or pd.isna(a) or pd.isna(c):
i += 1; continue
if not in_session:
bb_ok = BB_WIDTH_MIN <= b <= BB_WIDTH_MAX
adx_ok = a < ADX_MAX_ENTRY
chop_ok = c > CHOP_MIN_ENTRY
if bb_ok and adx_ok and chop_ok:
in_session = True
session_start_i = i
else:
bars_in = i - session_start_i
exit_now = False
reason = ""
bb_break = b > BB_WIDTH_MAX * BREAKOUT_BB_MULT
adx_break = a > BREAKOUT_ADX
chop_trend = c < CHOP_MAX_EXIT
if bars_in >= min_bars_htf and bb_break and (adx_break or chop_trend):
exit_now = True; reason = "breakout"
elif bars_in >= max_bars_htf:
exit_now = True; reason = "max_time"
if exit_now:
sessions.append({
"entry_ts": df_htf['ts'].iloc[session_start_i],
"exit_ts": df_htf['ts'].iloc[i],
"bars_htf": bars_in,
"reason": reason,
"bw_entry": round(bw.iloc[session_start_i], 3),
"adx_entry": round(adx.iloc[session_start_i], 1),
"chop_entry": round(chop.iloc[session_start_i], 1),
})
in_session = False
i += 3; continue
i += 1
return sessions
# ============================================================
# GRID SIMULATION on 1m data
# ============================================================
def simulate_grid_on_1m(df_1m, entry_ts, exit_ts):
"""
Run grid sim on 1m bars between entry_ts and exit_ts.
Returns detailed PnL breakdown.
"""
mask = (df_1m['ts'] >= entry_ts) & (df_1m['ts'] <= exit_ts)
df = df_1m[mask].reset_index(drop=True)
if len(df) < 10:
return None
closes = df['close'].values
highs = df['high'].values
lows = df['low'].values
n = len(closes)
# Pre-calc
atr_s = calc_atr(df, ATR_PERIOD)
ema_s = calc_ema(df['close'], EMA_CENTER_PERIOD)
notional = POSITION_SIZE * LEVERAGE # $15
center = closes[0]
inv_levels = 0
inv_cost = 0.0
realized = 0.0
rts = 0
peak_inv = 0
partial_cls = 0
unstuck_cls = 0
fees_total = 0.0
for i in range(1, n):
# Trailing center every N bars
if i % CENTER_UPDATE_MINS == 0 and not pd.isna(ema_s.iloc[i]):
center = ema_s.iloc[i]
# ATR spacing
atr_v = atr_s.iloc[i] if not pd.isna(atr_s.iloc[i]) else abs(closes[i]-closes[i-1])
if pd.isna(atr_v): atr_v = abs(closes[i]-closes[i-1])
sp_pct = get_spacing(atr_v, closes[i])
spacing = closes[i] * sp_pct / 100
if spacing <= 0: continue
prev_c = closes[i-1]
curr_c = closes[i]
# Also use high/low for intra-bar crossings (more realistic)
bar_low = lows[i]
bar_high = highs[i]
# Estimate crossings using full bar range, not just close-to-close
# Price path: prev_close โ bar extremes โ curr_close
# Conservative: count levels between bar_low and bar_high
low_lvl = int(bar_low / spacing)
high_lvl = int(bar_high / spacing)
prev_lvl = int(prev_c / spacing)
curr_lvl = int(curr_c / spacing)
# Total levels traversed โ (high - low) / spacing
# But net = curr_lvl - prev_lvl
# Round trips within bar โ (total_traversed - abs(net)) / 2
total_traversed = high_lvl - low_lvl
net_move = curr_lvl - prev_lvl
abs_net = abs(net_move)
# Internal round-trips (price went back and forth within bar)
internal_rts = max(0, (total_traversed - abs_net)) // 2
# Process internal round-trips (clean profit, no inventory change)
if internal_rts > 0:
rt_gross = notional * sp_pct / 100
rt_fee = notional * MAKER_FEE * 2
rt_net = rt_gross - rt_fee
realized += internal_rts * rt_net
fees_total += internal_rts * rt_fee
rts += internal_rts
# Process net directional move (changes inventory)
direction = 1 if net_move > 0 else -1 if net_move < 0 else 0
for lc in range(min(abs_net, GRID_LEVELS)):
fee = notional * MAKER_FEE
fees_total += fee
if direction > 0: # price UP โ sells fill
fill_p = prev_c + spacing * (lc + 0.5)
if inv_levels <= -INVENTORY_MAX_LEVELS:
break
if inv_levels > 0: # close long
rt_pnl = notional * (fill_p - inv_cost) / inv_cost - fee
realized += rt_pnl; inv_levels -= 1; rts += 1
else: # open/add short
if inv_levels == 0: inv_cost = fill_p
else:
inv_cost = (abs(inv_levels)*inv_cost + fill_p) / (abs(inv_levels)+1)
inv_levels -= 1; realized -= fee
elif direction < 0: # price DOWN โ buys fill
fill_p = prev_c - spacing * (lc + 0.5)
if fill_p <= 0: continue
if inv_levels >= INVENTORY_MAX_LEVELS:
break
if inv_levels < 0: # close short
rt_pnl = notional * (inv_cost - fill_p) / inv_cost - fee
realized += rt_pnl; inv_levels += 1; rts += 1
else: # open/add long
if inv_levels == 0: inv_cost = fill_p
else:
inv_cost = (abs(inv_levels)*inv_cost + fill_p) / (abs(inv_levels)+1)
inv_levels += 1; realized -= fee
if abs(inv_levels) > peak_inv:
peak_inv = abs(inv_levels)
# Partial close at warn level
if abs(inv_levels) >= INVENTORY_WARN_LEVELS and inv_cost > 0:
excess = abs(inv_levels) - INVENTORY_WARN_LEVELS + 1
to_close = max(1, excess // 2)
if inv_levels > 0:
pnl_per = notional * (curr_c - inv_cost) / inv_cost
else:
pnl_per = notional * (inv_cost - curr_c) / inv_cost
cl_fee = to_close * notional * TAKER_FEE
realized += to_close * pnl_per - cl_fee
fees_total += cl_fee
if inv_levels > 0: inv_levels -= to_close
else: inv_levels += to_close
partial_cls += to_close
# Unstucking near EMA
if abs(inv_levels) >= UNSTUCK_THRESHOLD and inv_cost > 0:
ema_v = ema_s.iloc[i] if not pd.isna(ema_s.iloc[i]) else center
dist = abs(curr_c - ema_v) / ema_v * 100
if dist < sp_pct * 2:
to_unstuck = max(1, int(abs(inv_levels) * UNSTUCK_CLOSE_PCT))
if inv_levels > 0:
pnl_per = notional * (curr_c - inv_cost) / inv_cost
else:
pnl_per = notional * (inv_cost - curr_c) / inv_cost
u_fee = to_unstuck * notional * TAKER_FEE
realized += to_unstuck * pnl_per - u_fee
fees_total += u_fee
if inv_levels > 0: inv_levels -= to_unstuck
else: inv_levels += to_unstuck
unstuck_cls += to_unstuck
# Hard stop
if inv_levels != 0 and inv_cost > 0:
if inv_levels > 0:
unreal = abs(inv_levels) * notional * (curr_c - inv_cost) / inv_cost
else:
unreal = abs(inv_levels) * notional * (inv_cost - curr_c) / inv_cost
if realized + unreal <= -MAX_LOSS_PER_SESSION:
cl_fee = abs(inv_levels) * notional * TAKER_FEE
total = realized + unreal - cl_fee
return {
"bars_1m": n, "realized": round(realized+unreal-cl_fee, 4),
"inventory_pnl": 0, "total_pnl": round(total, 4),
"rts": rts, "peak_inv": peak_inv,
"partial": partial_cls, "unstuck": unstuck_cls,
"fees": round(fees_total+cl_fee, 4),
"avg_spacing": round(sp_pct, 3),
"stopped": True,
}
# Session end โ close inventory
inv_pnl = 0.0
if inv_levels != 0 and inv_cost > 0:
final = closes[-1]
if inv_levels > 0:
inv_pnl = abs(inv_levels) * notional * (final - inv_cost) / inv_cost
else:
inv_pnl = abs(inv_levels) * notional * (inv_cost - final) / inv_cost
cl_fee = abs(inv_levels) * notional * TAKER_FEE
inv_pnl -= cl_fee
fees_total += cl_fee
return {
"bars_1m": n,
"realized": round(realized, 4),
"inventory_pnl": round(inv_pnl, 4),
"total_pnl": round(realized + inv_pnl, 4),
"rts": rts,
"peak_inv": peak_inv,
"partial": partial_cls,
"unstuck": unstuck_cls,
"fees": round(fees_total, 4),
"avg_spacing": round(sp_pct, 3),
"stopped": False,
}
# ============================================================
# MAIN
# ============================================================
def main():
print("=" * 70)
print("HYBRID GRID BACKTEST v4")
print("Screener: 15m / 1h | Grid simulation: 1m")
print("=" * 70)
print(f"Period: {LOOKBACK_DAYS}d | Spacing: ATR {SPACING_MIN_PCT}-{SPACING_MAX_PCT}%")
print(f"Leverage: {LEVERAGE}x | ${POSITION_SIZE}/level (notional ${POSITION_SIZE*LEVERAGE})")
print(f"Inv mgmt: warn@{INVENTORY_WARN_LEVELS} cap@{INVENTORY_MAX_LEVELS} unstuck@{UNSTUCK_THRESHOLD}")
print(f"Screener: BB+ADX+CHOP | Center: EMA({EMA_CENTER_PERIOD}) trailing")
print("=" * 70)
exchange = ccxt.binanceusdm({
'enableRateLimit': True,
'options': {'defaultType': 'future'},
})
all_results = {}
for stf in SCREENER_TFS:
print(f"\n{'='*60}")
print(f" SCREENER TF: {stf} | GRID TF: 1m")
print(f"{'='*60}")
stf_results = []
for symbol in COINS:
print(f"\n {symbol}...", end=" ", flush=True)
# Fetch screener TF
df_htf = fetch_klines(exchange, symbol, stf, LOOKBACK_DAYS)
if df_htf is None or len(df_htf) < 50:
print("SKIP (no HTF data)")
continue
# Find sessions
sessions = find_sessions(df_htf, stf)
if not sessions:
print("0 sessions")
continue
print(f"{len(sessions)} sessions found, fetching 1m...", end=" ", flush=True)
# Fetch 1m data
df_1m = fetch_klines(exchange, symbol, GRID_TF, LOOKBACK_DAYS)
if df_1m is None or len(df_1m) < 100:
print("SKIP (no 1m data)")
continue
# Simulate grid for each session
coin_sessions = []
for sess in sessions:
result = simulate_grid_on_1m(df_1m, sess['entry_ts'], sess['exit_ts'])
if result is None:
continue
result.update({
"entry_ts": str(sess['entry_ts']),
"exit_ts": str(sess['exit_ts']),
"reason": sess['reason'],
"bw_entry": sess['bw_entry'],
"adx_entry": sess['adx_entry'],
"chop_entry": sess['chop_entry'],
})
coin_sessions.append(result)
if not coin_sessions:
print("0 valid sims")
continue
total = sum(s['total_pnl'] for s in coin_sessions)
real = sum(s['realized'] for s in coin_sessions)
inv = sum(s['inventory_pnl'] for s in coin_sessions)
wins = sum(1 for s in coin_sessions if s['total_pnl'] > 0)
wr = wins / len(coin_sessions) * 100
total_rts = sum(s['rts'] for s in coin_sessions)
total_fees = sum(s['fees'] for s in coin_sessions)
avg_1m_bars = np.mean([s['bars_1m'] for s in coin_sessions])
stops = sum(1 for s in coin_sessions if s['stopped'])
print(f"PnL ${total:+.2f} (real ${real:+.2f} inv ${inv:+.2f}) | "
f"WR {wr:.0f}% ({wins}/{len(coin_sessions)}) | "
f"{total_rts} RTs | fees ${total_fees:.2f} | "
f"avg {avg_1m_bars:.0f} 1m bars | stops:{stops}")
stf_results.append({
"symbol": symbol,
"sessions": len(coin_sessions),
"total_pnl": round(total, 2),
"realized_pnl": round(real, 2),
"inventory_pnl": round(inv, 2),
"win_rate": round(wr, 1),
"wins": wins,
"total_rts": total_rts,
"total_fees": round(total_fees, 2),
"avg_bars_1m": round(avg_1m_bars, 0),
"stopped_sessions": stops,
"detail": coin_sessions,
})
all_results[stf] = stf_results
# ============================================================
# SUMMARY
# ============================================================
print("\n\n" + "=" * 70)
print("SUMMARY โ HYBRID v4 (screener HTF + grid 1m)")
print("=" * 70)
for stf in SCREENER_TFS:
data = all_results.get(stf, [])
if not data: print(f"\n{stf}: No data"); continue
tp = sum(c['total_pnl'] for c in data)
rp = sum(c['realized_pnl'] for c in data)
ip = sum(c['inventory_pnl'] for c in data)
sess = sum(c['sessions'] for c in data)
wins = sum(c['wins'] for c in data)
rts = sum(c['total_rts'] for c in data)
fees = sum(c['total_fees'] for c in data)
coins_up = sum(1 for c in data if c['total_pnl'] > 0)
wr = wins/sess*100 if sess > 0 else 0
print(f"\n๐ Screener {stf} + Grid 1m:")
print(f" Total PnL: ${tp:+.2f} (realized ${rp:+.2f}, inventory ${ip:+.2f})")
print(f" Fees paid: ${fees:.2f}")
print(f" Sessions: {sess} (wins {wins}, WR {wr:.1f}%)")
print(f" Round-trips: {rts}")
print(f" Profitable coins: {coins_up}/{len(data)}")
if sess > 0:
print(f" Avg PnL/session: ${tp/sess:+.3f}")
print(f" Avg PnL/day: ${tp/LOOKBACK_DAYS:+.2f}")
srt = sorted(data, key=lambda c: c['total_pnl'], reverse=True)
print(f"\n ๐ข Top 5:")
for c in srt[:5]:
print(f" {c['symbol']:15s} ${c['total_pnl']:+8.2f} "
f"(real ${c['realized_pnl']:+.2f} inv ${c['inventory_pnl']:+.2f}) "
f"WR {c['win_rate']:.0f}% {c['total_rts']} RTs "
f"{c['sessions']} sess fees ${c['total_fees']:.2f}")
if len(srt) > 5:
print(f"\n ๐ด Bottom 5:")
for c in srt[-5:]:
print(f" {c['symbol']:15s} ${c['total_pnl']:+8.2f} "
f"(real ${c['realized_pnl']:+.2f} inv ${c['inventory_pnl']:+.2f}) "
f"WR {c['win_rate']:.0f}% {c['sessions']} sess")
# Comparison table
print("\n\n" + "=" * 70)
print("SCREENER TF COMPARISON (both use 1m for grid)")
print("=" * 70)
hdr = f" {'Scr':4s} | {'Total':>9s} | {'Real':>9s} | {'Inv':>9s} | {'Fees':>7s} | {'Sess':>5s} | {'WR':>5s} | {'RTs':>6s} | {'$/sess':>7s} | {'$/day':>7s}"
print(hdr)
print(" " + "-" * 90)
for stf in SCREENER_TFS:
d = all_results.get(stf, [])
tp = sum(c['total_pnl'] for c in d)
rp = sum(c['realized_pnl'] for c in d)
ip = sum(c['inventory_pnl'] for c in d)
fe = sum(c['total_fees'] for c in d)
ss = sum(c['sessions'] for c in d)
wi = sum(c['wins'] for c in d)
rt = sum(c['total_rts'] for c in d)
wr = wi/ss*100 if ss > 0 else 0
av = tp/ss if ss > 0 else 0
dy = tp/LOOKBACK_DAYS
print(f" {stf:4s} | ${tp:+8.2f} | ${rp:+8.2f} | ${ip:+8.2f} | ${fe:6.2f} | {ss:5d} | {wr:4.0f}% | {rt:6d} | ${av:+6.2f} | ${dy:+6.2f}")
print("\n\nPrevious results for reference:")
print(" v2 (15m screener+grid): $-2542, 0% WR")
print(" v3 (15m screener+grid): $-689, 2.5% WR")
print(" v4 (hybrid) results above โโโ")
# Save
out = "/home/app/trading-bot/grid-bot/backtests/results_hybrid_v4.json"
compact = {}
for stf in SCREENER_TFS:
compact[stf] = [{k:v for k,v in c.items() if k != 'detail'} for c in all_results.get(stf, [])]
with open(out, 'w') as f:
json.dump(compact, f, indent=2)
print(f"\n๐พ Saved: {out}")
if __name__ == "__main__":
main()