← Назад
#!/usr/bin/env python3 """ S7_Combined Backtest — EMA + Divergence + Volume ================================================= Гибрид трёх лучших стратегий из бэктеста: - S3: WT Cross в зоне + EMA200 тренд фильтр - S2: Дивергенция цены vs WT - S4: Volume > N×avg подтверждение Варианты: - S7_ALL: все 3 фильтра обязательны (strict) - S7_EMA_DIV: EMA + Divergence (без Volume) - S7_EMA_VOL: EMA + Volume (без Divergence) - S7_ANY2: EMA обязательна + любой из (Div OR Vol) """ import json import time import numpy as np import pandas as pd from datetime import datetime from tabulate import tabulate # ============================================================ # CONFIG # ============================================================ SYMBOLS = ["OPUSDT", "FETUSDT", "AVAXUSDT", "NEARUSDT", "APTUSDT"] TIMEFRAMES = ["5m", "15m"] DAYS_BACK = 90 COMMISSION = 0.0004 SLIPPAGE = 0.0001 WT_CHANNEL_LEN = 10 WT_AVG_LEN = 21 WT_MA_LEN = 4 SLTP_CONFIGS = [ {"name": "SL1.5/TP3", "sl": 0.015, "tp": 0.03}, {"name": "SL2/TP3", "sl": 0.02, "tp": 0.03}, {"name": "SL2/TP4", "sl": 0.02, "tp": 0.04}, {"name": "SL1.5/TP4", "sl": 0.015, "tp": 0.04}, {"name": "SL2.5/TP5", "sl": 0.025, "tp": 0.05}, ] # ============================================================ # DATA # ============================================================ def get_binance_klines(symbol, interval, days_back): import requests url = "https://fapi.binance.com/fapi/v1/klines" end_time = int(time.time() * 1000) interval_ms = { "5m": 300000, "15m": 900000, "1h": 3600000, } ms_back = days_back * 24 * 3600 * 1000 start_time = end_time - ms_back all_klines = [] current_start = start_time while current_start < end_time: params = {"symbol": symbol, "interval": interval, "startTime": current_start, "limit": 1500} try: resp = requests.get(url, params=params, timeout=10) data = resp.json() if not data or not isinstance(data, list): break all_klines.extend(data) current_start = data[-1][0] + interval_ms.get(interval, 300000) time.sleep(0.1) except Exception as e: print(f" Error: {symbol} {interval}: {e}") time.sleep(1) continue if not all_klines: return None df = pd.DataFrame(all_klines, columns=[ "timestamp", "open", "high", "low", "close", "volume", "close_time", "quote_volume", "trades", "taker_buy_base", "taker_buy_quote", "ignore" ]) df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms") for col in ["open", "high", "low", "close", "volume", "quote_volume"]: df[col] = df[col].astype(float) df = df.drop_duplicates(subset=["timestamp"]).sort_values("timestamp").reset_index(drop=True) return df # ============================================================ # INDICATORS # ============================================================ def calc_ema(series, period): return series.ewm(span=period, adjust=False).mean() def calc_sma(series, period): return series.rolling(window=period).mean() def calc_wavetrend(df): hlc3 = (df["high"] + df["low"] + df["close"]) / 3 esa = calc_ema(hlc3, WT_CHANNEL_LEN) d = calc_ema((hlc3 - esa).abs(), WT_CHANNEL_LEN) ci = (hlc3 - esa) / (0.015 * d) wt1 = calc_ema(ci, WT_AVG_LEN) wt2 = calc_sma(wt1, WT_MA_LEN) return wt1, wt2 def calc_volume_ratio(df, period=20): avg_vol = df["volume"].rolling(window=period).mean() return df["volume"] / avg_vol def detect_divergence(df, wt1, lookback=20): div = pd.Series(0, index=df.index) for i in range(lookback, len(df)): window_close = df["close"].iloc[i-lookback:i+1] window_wt = wt1.iloc[i-lookback:i+1] # Bullish divergence: price new low, WT higher low curr_low = window_close.iloc[-1] prev_low = window_close.min() prev_low_idx = window_close.idxmin() if curr_low <= prev_low * 1.001: wt_curr = window_wt.iloc[-1] wt_prev = window_wt.loc[prev_low_idx] if prev_low_idx in window_wt.index else window_wt.min() if wt_curr > wt_prev + 5: div.iloc[i] = 1 # Bearish divergence: price new high, WT lower high curr_high = window_close.iloc[-1] prev_high = window_close.max() prev_high_idx = window_close.idxmax() if curr_high >= prev_high * 0.999: wt_curr = window_wt.iloc[-1] wt_prev = window_wt.loc[prev_high_idx] if prev_high_idx in window_wt.index else window_wt.max() if wt_curr < wt_prev - 5: div.iloc[i] = -1 return div # ============================================================ # COMBINED STRATEGIES # ============================================================ def check_wt_cross(wt1, wt2, i): """Returns 1 for bullish cross, -1 for bearish cross, 0 for none.""" cross_up = wt1.iloc[i] > wt2.iloc[i] and wt1.iloc[i-1] <= wt2.iloc[i-1] cross_down = wt1.iloc[i] < wt2.iloc[i] and wt1.iloc[i-1] >= wt2.iloc[i-1] if cross_up: return 1 elif cross_down: return -1 return 0 def check_wt_zone(wt1, wt2, i, oversold=-53, overbought=53): """Returns 1 if oversold, -1 if overbought, 0 if neutral.""" if wt1.iloc[i] < oversold or wt2.iloc[i] < oversold: return 1 elif wt1.iloc[i] > overbought or wt2.iloc[i] > overbought: return -1 return 0 def s7_all_three(df, wt1, wt2, ema200, vol_ratio, div_series, vol_mult=1.5): """S7_ALL: WT cross в зоне + EMA200 + Divergence + Volume (все обязательны).""" signals = pd.Series(0, index=df.index) for i in range(1, len(df)): cross = check_wt_cross(wt1, wt2, i) zone = check_wt_zone(wt1, wt2, i) if cross == 0: continue recent_div = div_series.iloc[max(0, i-5):i+1] has_vol = vol_ratio.iloc[i] > vol_mult if cross == 1 and zone == 1: if df["close"].iloc[i] > ema200.iloc[i] and (recent_div == 1).any() and has_vol: signals.iloc[i] = 1 elif cross == -1 and zone == -1: if df["close"].iloc[i] < ema200.iloc[i] and (recent_div == -1).any() and has_vol: signals.iloc[i] = -1 return signals def s7_ema_div(df, wt1, wt2, ema200, vol_ratio, div_series): """S7_EMA_DIV: WT cross в зоне + EMA200 + Divergence (без Volume).""" signals = pd.Series(0, index=df.index) for i in range(1, len(df)): cross = check_wt_cross(wt1, wt2, i) zone = check_wt_zone(wt1, wt2, i) if cross == 0: continue recent_div = div_series.iloc[max(0, i-5):i+1] if cross == 1 and zone == 1: if df["close"].iloc[i] > ema200.iloc[i] and (recent_div == 1).any(): signals.iloc[i] = 1 elif cross == -1 and zone == -1: if df["close"].iloc[i] < ema200.iloc[i] and (recent_div == -1).any(): signals.iloc[i] = -1 return signals def s7_ema_vol(df, wt1, wt2, ema200, vol_ratio, div_series, vol_mult=1.5): """S7_EMA_VOL: WT cross в зоне + EMA200 + Volume (без Divergence).""" signals = pd.Series(0, index=df.index) for i in range(1, len(df)): cross = check_wt_cross(wt1, wt2, i) zone = check_wt_zone(wt1, wt2, i) if cross == 0: continue has_vol = vol_ratio.iloc[i] > vol_mult if cross == 1 and zone == 1: if df["close"].iloc[i] > ema200.iloc[i] and has_vol: signals.iloc[i] = 1 elif cross == -1 and zone == -1: if df["close"].iloc[i] < ema200.iloc[i] and has_vol: signals.iloc[i] = -1 return signals def s7_any2(df, wt1, wt2, ema200, vol_ratio, div_series, vol_mult=1.5): """S7_ANY2: WT cross в зоне + EMA200 (обяз) + (Divergence OR Volume).""" signals = pd.Series(0, index=df.index) for i in range(1, len(df)): cross = check_wt_cross(wt1, wt2, i) zone = check_wt_zone(wt1, wt2, i) if cross == 0: continue recent_div = div_series.iloc[max(0, i-5):i+1] has_vol = vol_ratio.iloc[i] > vol_mult if cross == 1 and zone == 1: if df["close"].iloc[i] > ema200.iloc[i] and ((recent_div == 1).any() or has_vol): signals.iloc[i] = 1 elif cross == -1 and zone == -1: if df["close"].iloc[i] < ema200.iloc[i] and ((recent_div == -1).any() or has_vol): signals.iloc[i] = -1 return signals # Бонус: оригинальные стратегии для сравнения def s3_ema_filter(df, wt1, wt2, ema200, oversold=-53, overbought=53): signals = pd.Series(0, index=df.index) for i in range(1, len(df)): cross = check_wt_cross(wt1, wt2, i) zone = check_wt_zone(wt1, wt2, i) if cross == 1 and zone == 1 and df["close"].iloc[i] > ema200.iloc[i]: signals.iloc[i] = 1 elif cross == -1 and zone == -1 and df["close"].iloc[i] < ema200.iloc[i]: signals.iloc[i] = -1 return signals def s2_divergence(df, wt1, wt2, div_series): signals = pd.Series(0, index=df.index) for i in range(1, len(df)): cross = check_wt_cross(wt1, wt2, i) if cross == 0: continue recent_div = div_series.iloc[max(0, i-5):i+1] if cross == 1 and (recent_div == 1).any(): signals.iloc[i] = 1 elif cross == -1 and (recent_div == -1).any(): signals.iloc[i] = -1 return signals # ============================================================ # BACKTESTER # ============================================================ def run_backtest(df, signals, sl_pct=0.02, tp_pct=0.03): trades = [] in_position = False entry_price = 0 direction = 0 entry_idx = 0 for i in range(len(df)): if not in_position: if signals.iloc[i] != 0: direction = signals.iloc[i] entry_price = df["close"].iloc[i] * (1 + SLIPPAGE * direction) entry_idx = i in_position = True else: high = df["high"].iloc[i] low = df["low"].iloc[i] if direction == 1: sl_price = entry_price * (1 - sl_pct) tp_price = entry_price * (1 + tp_pct) if low <= sl_price: trades.append({"pnl_pct": -sl_pct - COMMISSION*2, "result": "SL", "bars": i - entry_idx, "dir": "LONG", "entry_time": str(df["timestamp"].iloc[entry_idx]), "exit_time": str(df["timestamp"].iloc[i])}) in_position = False elif high >= tp_price: trades.append({"pnl_pct": tp_pct - COMMISSION*2, "result": "TP", "bars": i - entry_idx, "dir": "LONG", "entry_time": str(df["timestamp"].iloc[entry_idx]), "exit_time": str(df["timestamp"].iloc[i])}) in_position = False elif direction == -1: sl_price = entry_price * (1 + sl_pct) tp_price = entry_price * (1 - tp_pct) if high >= sl_price: trades.append({"pnl_pct": -sl_pct - COMMISSION*2, "result": "SL", "bars": i - entry_idx, "dir": "SHORT", "entry_time": str(df["timestamp"].iloc[entry_idx]), "exit_time": str(df["timestamp"].iloc[i])}) in_position = False elif low <= tp_price: trades.append({"pnl_pct": tp_pct - COMMISSION*2, "result": "TP", "bars": i - entry_idx, "dir": "SHORT", "entry_time": str(df["timestamp"].iloc[entry_idx]), "exit_time": str(df["timestamp"].iloc[i])}) in_position = False return trades def calc_stats(trades): if not trades: return {"trades": 0, "wins": 0, "losses": 0, "win_rate": 0, "total_pnl": 0, "profit_factor": 0, "max_dd": 0, "avg_bars": 0} df_t = pd.DataFrame(trades) wins = df_t[df_t["pnl_pct"] > 0] losses = df_t[df_t["pnl_pct"] <= 0] gross_profit = wins["pnl_pct"].sum() if len(wins) > 0 else 0 gross_loss = abs(losses["pnl_pct"].sum()) if len(losses) > 0 else 0.0001 cumul = df_t["pnl_pct"].cumsum() max_dd = (cumul - cumul.cummax()).min() return { "trades": len(trades), "wins": len(wins), "losses": len(losses), "win_rate": round(len(wins) / len(trades) * 100, 1), "total_pnl": round(df_t["pnl_pct"].sum() * 100, 2), "profit_factor": round(gross_profit / gross_loss, 2), "max_dd": round(max_dd * 100, 2), "avg_bars": round(df_t["bars"].mean(), 1), "longs": len(df_t[df_t["dir"] == "LONG"]), "shorts": len(df_t[df_t["dir"] == "SHORT"]), } # ============================================================ # MAIN # ============================================================ def main(): print("=" * 75) print(" S7_COMBINED BACKTEST — EMA + Divergence + Volume") print(f" Пары: {', '.join(SYMBOLS)}") print(f" Период: {DAYS_BACK} дней | TF: {', '.join(TIMEFRAMES)}") print("=" * 75) # Download data print("\n📥 Скачиваю данные с Binance...") data = {} for sym in SYMBOLS: data[sym] = {} for tf in TIMEFRAMES: print(f" {sym} {tf}...", end=" ", flush=True) df = get_binance_klines(sym, tf, DAYS_BACK) if df is not None: data[sym][tf] = df print(f"✅ {len(df)} свечей") else: print("❌ FAIL") # Run strategies print("\n📊 Запускаю стратегии...") strategies = { "S3_EMA (baseline)": "s3", "S2_Div (baseline)": "s2", "S7_ALL (EMA+Div+Vol)": "s7_all", "S7_EMA_DIV": "s7_ema_div", "S7_EMA_VOL": "s7_ema_vol", "S7_ANY2 (EMA + Div|Vol)": "s7_any2", } all_results = [] for tf in TIMEFRAMES: print(f"\n{'─'*75}") print(f" ТАЙМФРЕЙМ: {tf}") print(f"{'─'*75}") for strat_name, strat_key in strategies.items(): agg_trades = [] per_symbol = {} for sym in SYMBOLS: if tf not in data.get(sym, {}): continue df = data[sym][tf] if len(df) < 250: continue wt1, wt2 = calc_wavetrend(df) ema200 = calc_ema(df["close"], 200) vol_ratio = calc_volume_ratio(df) div_series = detect_divergence(df, wt1) if strat_key == "s3": signals = s3_ema_filter(df, wt1, wt2, ema200) elif strat_key == "s2": signals = s2_divergence(df, wt1, wt2, div_series) elif strat_key == "s7_all": signals = s7_all_three(df, wt1, wt2, ema200, vol_ratio, div_series) elif strat_key == "s7_ema_div": signals = s7_ema_div(df, wt1, wt2, ema200, vol_ratio, div_series) elif strat_key == "s7_ema_vol": signals = s7_ema_vol(df, wt1, wt2, ema200, vol_ratio, div_series) elif strat_key == "s7_any2": signals = s7_any2(df, wt1, wt2, ema200, vol_ratio, div_series) trades = run_backtest(df, signals, sl_pct=0.02, tp_pct=0.03) agg_trades.extend(trades) if trades: sym_stats = calc_stats(trades) per_symbol[sym] = sym_stats stats = calc_stats(agg_trades) stats["strategy"] = strat_name stats["timeframe"] = tf stats["per_symbol"] = per_symbol all_results.append(stats) sym_detail = " | ".join([f"{s}:{d['trades']}t/{d['win_rate']}%" for s, d in per_symbol.items()]) print(f" {strat_name:28s} | {stats['trades']:3d}t | WR {stats['win_rate']:5.1f}% | " f"PnL {stats['total_pnl']:+7.2f}% | PF {stats['profit_factor']:.2f} | " f"DD {stats['max_dd']:.2f}%") if sym_detail: print(f" └─ {sym_detail}") # SL/TP optimization for best strategy print(f"\n{'='*75}") print(" SL/TP ОПТИМИЗАЦИЯ лучших комбо стратегий") print(f"{'='*75}") # Test all combined strategies with all SL/TP configs combo_strats = ["s7_all", "s7_ema_div", "s7_ema_vol", "s7_any2"] combo_names = ["S7_ALL", "S7_EMA_DIV", "S7_EMA_VOL", "S7_ANY2"] best_tf = "5m" # бэктест показал 5m как лучший sltp_table = [] for strat_key, strat_name in zip(combo_strats, combo_names): for config in SLTP_CONFIGS: agg_trades = [] for sym in SYMBOLS: if best_tf not in data.get(sym, {}): continue df = data[sym][best_tf] if len(df) < 250: continue wt1, wt2 = calc_wavetrend(df) ema200 = calc_ema(df["close"], 200) vol_ratio = calc_volume_ratio(df) div_series = detect_divergence(df, wt1) if strat_key == "s7_all": signals = s7_all_three(df, wt1, wt2, ema200, vol_ratio, div_series) elif strat_key == "s7_ema_div": signals = s7_ema_div(df, wt1, wt2, ema200, vol_ratio, div_series) elif strat_key == "s7_ema_vol": signals = s7_ema_vol(df, wt1, wt2, ema200, vol_ratio, div_series) elif strat_key == "s7_any2": signals = s7_any2(df, wt1, wt2, ema200, vol_ratio, div_series) trades = run_backtest(df, signals, sl_pct=config["sl"], tp_pct=config["tp"]) agg_trades.extend(trades) stats = calc_stats(agg_trades) sltp_table.append([ strat_name, config["name"], stats["trades"], f"{stats['win_rate']:.1f}%", f"{stats['total_pnl']:+.2f}%", f"{stats['profit_factor']:.2f}", f"{stats['max_dd']:.2f}%", f"{stats['avg_bars']:.0f}", f"{stats['longs']}L/{stats['shorts']}S" ]) headers = ["Strategy", "SL/TP", "Trades", "WR", "PnL", "PF", "MaxDD", "AvgBars", "L/S"] print(tabulate(sltp_table, headers=headers, tablefmt="grid")) # Save results save_data = [] for r in all_results: entry = {k: v for k, v in r.items() if k != "per_symbol"} entry["per_symbol"] = {sym: stats for sym, stats in r.get("per_symbol", {}).items()} save_data.append(entry) with open("/home/app/wt-backtest/results_combined.json", "w") as f: json.dump(save_data, f, indent=2, default=str) print(f"\n💾 Результаты сохранены в /home/app/wt-backtest/results_combined.json") print("Done! ✅") if __name__ == "__main__": main()