← Назад#!/usr/bin/env python3
"""
S7_Combined Backtest — EMA + Divergence + Volume
=================================================
Гибрид трёх лучших стратегий из бэктеста:
- S3: WT Cross в зоне + EMA200 тренд фильтр
- S2: Дивергенция цены vs WT
- S4: Volume > N×avg подтверждение
Варианты:
- S7_ALL: все 3 фильтра обязательны (strict)
- S7_EMA_DIV: EMA + Divergence (без Volume)
- S7_EMA_VOL: EMA + Volume (без Divergence)
- S7_ANY2: EMA обязательна + любой из (Div OR Vol)
"""
import json
import time
import numpy as np
import pandas as pd
from datetime import datetime
from tabulate import tabulate
# ============================================================
# CONFIG
# ============================================================
SYMBOLS = ["OPUSDT", "FETUSDT", "AVAXUSDT", "NEARUSDT", "APTUSDT"]
TIMEFRAMES = ["5m", "15m"]
DAYS_BACK = 90
COMMISSION = 0.0004
SLIPPAGE = 0.0001
WT_CHANNEL_LEN = 10
WT_AVG_LEN = 21
WT_MA_LEN = 4
SLTP_CONFIGS = [
{"name": "SL1.5/TP3", "sl": 0.015, "tp": 0.03},
{"name": "SL2/TP3", "sl": 0.02, "tp": 0.03},
{"name": "SL2/TP4", "sl": 0.02, "tp": 0.04},
{"name": "SL1.5/TP4", "sl": 0.015, "tp": 0.04},
{"name": "SL2.5/TP5", "sl": 0.025, "tp": 0.05},
]
# ============================================================
# DATA
# ============================================================
def get_binance_klines(symbol, interval, days_back):
import requests
url = "https://fapi.binance.com/fapi/v1/klines"
end_time = int(time.time() * 1000)
interval_ms = {
"5m": 300000, "15m": 900000, "1h": 3600000,
}
ms_back = days_back * 24 * 3600 * 1000
start_time = end_time - ms_back
all_klines = []
current_start = start_time
while current_start < end_time:
params = {"symbol": symbol, "interval": interval, "startTime": current_start, "limit": 1500}
try:
resp = requests.get(url, params=params, timeout=10)
data = resp.json()
if not data or not isinstance(data, list):
break
all_klines.extend(data)
current_start = data[-1][0] + interval_ms.get(interval, 300000)
time.sleep(0.1)
except Exception as e:
print(f" Error: {symbol} {interval}: {e}")
time.sleep(1)
continue
if not all_klines:
return None
df = pd.DataFrame(all_klines, columns=[
"timestamp", "open", "high", "low", "close", "volume",
"close_time", "quote_volume", "trades", "taker_buy_base",
"taker_buy_quote", "ignore"
])
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
for col in ["open", "high", "low", "close", "volume", "quote_volume"]:
df[col] = df[col].astype(float)
df = df.drop_duplicates(subset=["timestamp"]).sort_values("timestamp").reset_index(drop=True)
return df
# ============================================================
# INDICATORS
# ============================================================
def calc_ema(series, period):
return series.ewm(span=period, adjust=False).mean()
def calc_sma(series, period):
return series.rolling(window=period).mean()
def calc_wavetrend(df):
hlc3 = (df["high"] + df["low"] + df["close"]) / 3
esa = calc_ema(hlc3, WT_CHANNEL_LEN)
d = calc_ema((hlc3 - esa).abs(), WT_CHANNEL_LEN)
ci = (hlc3 - esa) / (0.015 * d)
wt1 = calc_ema(ci, WT_AVG_LEN)
wt2 = calc_sma(wt1, WT_MA_LEN)
return wt1, wt2
def calc_volume_ratio(df, period=20):
avg_vol = df["volume"].rolling(window=period).mean()
return df["volume"] / avg_vol
def detect_divergence(df, wt1, lookback=20):
div = pd.Series(0, index=df.index)
for i in range(lookback, len(df)):
window_close = df["close"].iloc[i-lookback:i+1]
window_wt = wt1.iloc[i-lookback:i+1]
# Bullish divergence: price new low, WT higher low
curr_low = window_close.iloc[-1]
prev_low = window_close.min()
prev_low_idx = window_close.idxmin()
if curr_low <= prev_low * 1.001:
wt_curr = window_wt.iloc[-1]
wt_prev = window_wt.loc[prev_low_idx] if prev_low_idx in window_wt.index else window_wt.min()
if wt_curr > wt_prev + 5:
div.iloc[i] = 1
# Bearish divergence: price new high, WT lower high
curr_high = window_close.iloc[-1]
prev_high = window_close.max()
prev_high_idx = window_close.idxmax()
if curr_high >= prev_high * 0.999:
wt_curr = window_wt.iloc[-1]
wt_prev = window_wt.loc[prev_high_idx] if prev_high_idx in window_wt.index else window_wt.max()
if wt_curr < wt_prev - 5:
div.iloc[i] = -1
return div
# ============================================================
# COMBINED STRATEGIES
# ============================================================
def check_wt_cross(wt1, wt2, i):
"""Returns 1 for bullish cross, -1 for bearish cross, 0 for none."""
cross_up = wt1.iloc[i] > wt2.iloc[i] and wt1.iloc[i-1] <= wt2.iloc[i-1]
cross_down = wt1.iloc[i] < wt2.iloc[i] and wt1.iloc[i-1] >= wt2.iloc[i-1]
if cross_up:
return 1
elif cross_down:
return -1
return 0
def check_wt_zone(wt1, wt2, i, oversold=-53, overbought=53):
"""Returns 1 if oversold, -1 if overbought, 0 if neutral."""
if wt1.iloc[i] < oversold or wt2.iloc[i] < oversold:
return 1
elif wt1.iloc[i] > overbought or wt2.iloc[i] > overbought:
return -1
return 0
def s7_all_three(df, wt1, wt2, ema200, vol_ratio, div_series, vol_mult=1.5):
"""S7_ALL: WT cross в зоне + EMA200 + Divergence + Volume (все обязательны)."""
signals = pd.Series(0, index=df.index)
for i in range(1, len(df)):
cross = check_wt_cross(wt1, wt2, i)
zone = check_wt_zone(wt1, wt2, i)
if cross == 0:
continue
recent_div = div_series.iloc[max(0, i-5):i+1]
has_vol = vol_ratio.iloc[i] > vol_mult
if cross == 1 and zone == 1:
if df["close"].iloc[i] > ema200.iloc[i] and (recent_div == 1).any() and has_vol:
signals.iloc[i] = 1
elif cross == -1 and zone == -1:
if df["close"].iloc[i] < ema200.iloc[i] and (recent_div == -1).any() and has_vol:
signals.iloc[i] = -1
return signals
def s7_ema_div(df, wt1, wt2, ema200, vol_ratio, div_series):
"""S7_EMA_DIV: WT cross в зоне + EMA200 + Divergence (без Volume)."""
signals = pd.Series(0, index=df.index)
for i in range(1, len(df)):
cross = check_wt_cross(wt1, wt2, i)
zone = check_wt_zone(wt1, wt2, i)
if cross == 0:
continue
recent_div = div_series.iloc[max(0, i-5):i+1]
if cross == 1 and zone == 1:
if df["close"].iloc[i] > ema200.iloc[i] and (recent_div == 1).any():
signals.iloc[i] = 1
elif cross == -1 and zone == -1:
if df["close"].iloc[i] < ema200.iloc[i] and (recent_div == -1).any():
signals.iloc[i] = -1
return signals
def s7_ema_vol(df, wt1, wt2, ema200, vol_ratio, div_series, vol_mult=1.5):
"""S7_EMA_VOL: WT cross в зоне + EMA200 + Volume (без Divergence)."""
signals = pd.Series(0, index=df.index)
for i in range(1, len(df)):
cross = check_wt_cross(wt1, wt2, i)
zone = check_wt_zone(wt1, wt2, i)
if cross == 0:
continue
has_vol = vol_ratio.iloc[i] > vol_mult
if cross == 1 and zone == 1:
if df["close"].iloc[i] > ema200.iloc[i] and has_vol:
signals.iloc[i] = 1
elif cross == -1 and zone == -1:
if df["close"].iloc[i] < ema200.iloc[i] and has_vol:
signals.iloc[i] = -1
return signals
def s7_any2(df, wt1, wt2, ema200, vol_ratio, div_series, vol_mult=1.5):
"""S7_ANY2: WT cross в зоне + EMA200 (обяз) + (Divergence OR Volume)."""
signals = pd.Series(0, index=df.index)
for i in range(1, len(df)):
cross = check_wt_cross(wt1, wt2, i)
zone = check_wt_zone(wt1, wt2, i)
if cross == 0:
continue
recent_div = div_series.iloc[max(0, i-5):i+1]
has_vol = vol_ratio.iloc[i] > vol_mult
if cross == 1 and zone == 1:
if df["close"].iloc[i] > ema200.iloc[i] and ((recent_div == 1).any() or has_vol):
signals.iloc[i] = 1
elif cross == -1 and zone == -1:
if df["close"].iloc[i] < ema200.iloc[i] and ((recent_div == -1).any() or has_vol):
signals.iloc[i] = -1
return signals
# Бонус: оригинальные стратегии для сравнения
def s3_ema_filter(df, wt1, wt2, ema200, oversold=-53, overbought=53):
signals = pd.Series(0, index=df.index)
for i in range(1, len(df)):
cross = check_wt_cross(wt1, wt2, i)
zone = check_wt_zone(wt1, wt2, i)
if cross == 1 and zone == 1 and df["close"].iloc[i] > ema200.iloc[i]:
signals.iloc[i] = 1
elif cross == -1 and zone == -1 and df["close"].iloc[i] < ema200.iloc[i]:
signals.iloc[i] = -1
return signals
def s2_divergence(df, wt1, wt2, div_series):
signals = pd.Series(0, index=df.index)
for i in range(1, len(df)):
cross = check_wt_cross(wt1, wt2, i)
if cross == 0:
continue
recent_div = div_series.iloc[max(0, i-5):i+1]
if cross == 1 and (recent_div == 1).any():
signals.iloc[i] = 1
elif cross == -1 and (recent_div == -1).any():
signals.iloc[i] = -1
return signals
# ============================================================
# BACKTESTER
# ============================================================
def run_backtest(df, signals, sl_pct=0.02, tp_pct=0.03):
trades = []
in_position = False
entry_price = 0
direction = 0
entry_idx = 0
for i in range(len(df)):
if not in_position:
if signals.iloc[i] != 0:
direction = signals.iloc[i]
entry_price = df["close"].iloc[i] * (1 + SLIPPAGE * direction)
entry_idx = i
in_position = True
else:
high = df["high"].iloc[i]
low = df["low"].iloc[i]
if direction == 1:
sl_price = entry_price * (1 - sl_pct)
tp_price = entry_price * (1 + tp_pct)
if low <= sl_price:
trades.append({"pnl_pct": -sl_pct - COMMISSION*2, "result": "SL",
"bars": i - entry_idx, "dir": "LONG",
"entry_time": str(df["timestamp"].iloc[entry_idx]),
"exit_time": str(df["timestamp"].iloc[i])})
in_position = False
elif high >= tp_price:
trades.append({"pnl_pct": tp_pct - COMMISSION*2, "result": "TP",
"bars": i - entry_idx, "dir": "LONG",
"entry_time": str(df["timestamp"].iloc[entry_idx]),
"exit_time": str(df["timestamp"].iloc[i])})
in_position = False
elif direction == -1:
sl_price = entry_price * (1 + sl_pct)
tp_price = entry_price * (1 - tp_pct)
if high >= sl_price:
trades.append({"pnl_pct": -sl_pct - COMMISSION*2, "result": "SL",
"bars": i - entry_idx, "dir": "SHORT",
"entry_time": str(df["timestamp"].iloc[entry_idx]),
"exit_time": str(df["timestamp"].iloc[i])})
in_position = False
elif low <= tp_price:
trades.append({"pnl_pct": tp_pct - COMMISSION*2, "result": "TP",
"bars": i - entry_idx, "dir": "SHORT",
"entry_time": str(df["timestamp"].iloc[entry_idx]),
"exit_time": str(df["timestamp"].iloc[i])})
in_position = False
return trades
def calc_stats(trades):
if not trades:
return {"trades": 0, "wins": 0, "losses": 0, "win_rate": 0,
"total_pnl": 0, "profit_factor": 0, "max_dd": 0, "avg_bars": 0}
df_t = pd.DataFrame(trades)
wins = df_t[df_t["pnl_pct"] > 0]
losses = df_t[df_t["pnl_pct"] <= 0]
gross_profit = wins["pnl_pct"].sum() if len(wins) > 0 else 0
gross_loss = abs(losses["pnl_pct"].sum()) if len(losses) > 0 else 0.0001
cumul = df_t["pnl_pct"].cumsum()
max_dd = (cumul - cumul.cummax()).min()
return {
"trades": len(trades),
"wins": len(wins),
"losses": len(losses),
"win_rate": round(len(wins) / len(trades) * 100, 1),
"total_pnl": round(df_t["pnl_pct"].sum() * 100, 2),
"profit_factor": round(gross_profit / gross_loss, 2),
"max_dd": round(max_dd * 100, 2),
"avg_bars": round(df_t["bars"].mean(), 1),
"longs": len(df_t[df_t["dir"] == "LONG"]),
"shorts": len(df_t[df_t["dir"] == "SHORT"]),
}
# ============================================================
# MAIN
# ============================================================
def main():
print("=" * 75)
print(" S7_COMBINED BACKTEST — EMA + Divergence + Volume")
print(f" Пары: {', '.join(SYMBOLS)}")
print(f" Период: {DAYS_BACK} дней | TF: {', '.join(TIMEFRAMES)}")
print("=" * 75)
# Download data
print("\n📥 Скачиваю данные с Binance...")
data = {}
for sym in SYMBOLS:
data[sym] = {}
for tf in TIMEFRAMES:
print(f" {sym} {tf}...", end=" ", flush=True)
df = get_binance_klines(sym, tf, DAYS_BACK)
if df is not None:
data[sym][tf] = df
print(f"✅ {len(df)} свечей")
else:
print("❌ FAIL")
# Run strategies
print("\n📊 Запускаю стратегии...")
strategies = {
"S3_EMA (baseline)": "s3",
"S2_Div (baseline)": "s2",
"S7_ALL (EMA+Div+Vol)": "s7_all",
"S7_EMA_DIV": "s7_ema_div",
"S7_EMA_VOL": "s7_ema_vol",
"S7_ANY2 (EMA + Div|Vol)": "s7_any2",
}
all_results = []
for tf in TIMEFRAMES:
print(f"\n{'─'*75}")
print(f" ТАЙМФРЕЙМ: {tf}")
print(f"{'─'*75}")
for strat_name, strat_key in strategies.items():
agg_trades = []
per_symbol = {}
for sym in SYMBOLS:
if tf not in data.get(sym, {}):
continue
df = data[sym][tf]
if len(df) < 250:
continue
wt1, wt2 = calc_wavetrend(df)
ema200 = calc_ema(df["close"], 200)
vol_ratio = calc_volume_ratio(df)
div_series = detect_divergence(df, wt1)
if strat_key == "s3":
signals = s3_ema_filter(df, wt1, wt2, ema200)
elif strat_key == "s2":
signals = s2_divergence(df, wt1, wt2, div_series)
elif strat_key == "s7_all":
signals = s7_all_three(df, wt1, wt2, ema200, vol_ratio, div_series)
elif strat_key == "s7_ema_div":
signals = s7_ema_div(df, wt1, wt2, ema200, vol_ratio, div_series)
elif strat_key == "s7_ema_vol":
signals = s7_ema_vol(df, wt1, wt2, ema200, vol_ratio, div_series)
elif strat_key == "s7_any2":
signals = s7_any2(df, wt1, wt2, ema200, vol_ratio, div_series)
trades = run_backtest(df, signals, sl_pct=0.02, tp_pct=0.03)
agg_trades.extend(trades)
if trades:
sym_stats = calc_stats(trades)
per_symbol[sym] = sym_stats
stats = calc_stats(agg_trades)
stats["strategy"] = strat_name
stats["timeframe"] = tf
stats["per_symbol"] = per_symbol
all_results.append(stats)
sym_detail = " | ".join([f"{s}:{d['trades']}t/{d['win_rate']}%" for s, d in per_symbol.items()])
print(f" {strat_name:28s} | {stats['trades']:3d}t | WR {stats['win_rate']:5.1f}% | "
f"PnL {stats['total_pnl']:+7.2f}% | PF {stats['profit_factor']:.2f} | "
f"DD {stats['max_dd']:.2f}%")
if sym_detail:
print(f" └─ {sym_detail}")
# SL/TP optimization for best strategy
print(f"\n{'='*75}")
print(" SL/TP ОПТИМИЗАЦИЯ лучших комбо стратегий")
print(f"{'='*75}")
# Test all combined strategies with all SL/TP configs
combo_strats = ["s7_all", "s7_ema_div", "s7_ema_vol", "s7_any2"]
combo_names = ["S7_ALL", "S7_EMA_DIV", "S7_EMA_VOL", "S7_ANY2"]
best_tf = "5m" # бэктест показал 5m как лучший
sltp_table = []
for strat_key, strat_name in zip(combo_strats, combo_names):
for config in SLTP_CONFIGS:
agg_trades = []
for sym in SYMBOLS:
if best_tf not in data.get(sym, {}):
continue
df = data[sym][best_tf]
if len(df) < 250:
continue
wt1, wt2 = calc_wavetrend(df)
ema200 = calc_ema(df["close"], 200)
vol_ratio = calc_volume_ratio(df)
div_series = detect_divergence(df, wt1)
if strat_key == "s7_all":
signals = s7_all_three(df, wt1, wt2, ema200, vol_ratio, div_series)
elif strat_key == "s7_ema_div":
signals = s7_ema_div(df, wt1, wt2, ema200, vol_ratio, div_series)
elif strat_key == "s7_ema_vol":
signals = s7_ema_vol(df, wt1, wt2, ema200, vol_ratio, div_series)
elif strat_key == "s7_any2":
signals = s7_any2(df, wt1, wt2, ema200, vol_ratio, div_series)
trades = run_backtest(df, signals, sl_pct=config["sl"], tp_pct=config["tp"])
agg_trades.extend(trades)
stats = calc_stats(agg_trades)
sltp_table.append([
strat_name, config["name"], stats["trades"],
f"{stats['win_rate']:.1f}%", f"{stats['total_pnl']:+.2f}%",
f"{stats['profit_factor']:.2f}", f"{stats['max_dd']:.2f}%",
f"{stats['avg_bars']:.0f}", f"{stats['longs']}L/{stats['shorts']}S"
])
headers = ["Strategy", "SL/TP", "Trades", "WR", "PnL", "PF", "MaxDD", "AvgBars", "L/S"]
print(tabulate(sltp_table, headers=headers, tablefmt="grid"))
# Save results
save_data = []
for r in all_results:
entry = {k: v for k, v in r.items() if k != "per_symbol"}
entry["per_symbol"] = {sym: stats for sym, stats in r.get("per_symbol", {}).items()}
save_data.append(entry)
with open("/home/app/wt-backtest/results_combined.json", "w") as f:
json.dump(save_data, f, indent=2, default=str)
print(f"\n💾 Результаты сохранены в /home/app/wt-backtest/results_combined.json")
print("Done! ✅")
if __name__ == "__main__":
main()