← Назад
#!/usr/bin/env python3 """ S7_Combined Backtest — Single Symbol Mode ========================================== Прогоняет 1 монету за раз, результат дописывает в JSON файл. Использование: python backtest_s7_single.py OPUSDT """ import sys import json import time import os import numpy as np import pandas as pd from datetime import datetime # ============================================================ # CONFIG # ============================================================ TIMEFRAMES = ["5m", "15m"] DAYS_BACK = 90 COMMISSION = 0.0004 SLIPPAGE = 0.0001 WT_CHANNEL_LEN = 10 WT_AVG_LEN = 21 WT_MA_LEN = 4 RESULTS_FILE = "/home/app/trading-bot/backtests/results_s7.json" SLTP_CONFIGS = [ {"name": "SL1.5/TP3", "sl": 0.015, "tp": 0.03}, {"name": "SL2/TP3", "sl": 0.02, "tp": 0.03}, {"name": "SL2/TP4", "sl": 0.02, "tp": 0.04}, {"name": "SL1.5/TP4", "sl": 0.015, "tp": 0.04}, {"name": "SL2.5/TP5", "sl": 0.025, "tp": 0.05}, ] BLACKLIST = ["SOLUSDT", "BNBUSDT", "XRPUSDT", "ETHUSDT", "BTCUSDT", "DOGEUSDT", "ADAUSDT", "WLDUSDT", "ENAUSDT"] # ============================================================ # DATA # ============================================================ def get_binance_klines(symbol, interval, days_back): import requests url = "https://fapi.binance.com/fapi/v1/klines" end_time = int(time.time() * 1000) interval_ms = {"5m": 300000, "15m": 900000, "1h": 3600000} ms_back = days_back * 24 * 3600 * 1000 start_time = end_time - ms_back all_klines = [] current_start = start_time while current_start < end_time: params = {"symbol": symbol, "interval": interval, "startTime": current_start, "limit": 1500} try: resp = requests.get(url, params=params, timeout=10) data = resp.json() if not data or not isinstance(data, list): break all_klines.extend(data) current_start = data[-1][0] + interval_ms.get(interval, 300000) time.sleep(0.1) except Exception as e: print(f" Error: {symbol} {interval}: {e}") time.sleep(1) continue if not all_klines: return None df = pd.DataFrame(all_klines, columns=[ "timestamp", "open", "high", "low", "close", "volume", "close_time", "quote_volume", "trades", "taker_buy_base", "taker_buy_quote", "ignore" ]) df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms") for col in ["open", "high", "low", "close", "volume", "quote_volume"]: df[col] = df[col].astype(float) df = df.drop_duplicates(subset=["timestamp"]).sort_values("timestamp").reset_index(drop=True) return df # ============================================================ # INDICATORS # ============================================================ def calc_ema(series, period): return series.ewm(span=period, adjust=False).mean() def calc_sma(series, period): return series.rolling(window=period).mean() def calc_wavetrend(df): hlc3 = (df["high"] + df["low"] + df["close"]) / 3 esa = calc_ema(hlc3, WT_CHANNEL_LEN) d = calc_ema((hlc3 - esa).abs(), WT_CHANNEL_LEN) ci = (hlc3 - esa) / (0.015 * d) wt1 = calc_ema(ci, WT_AVG_LEN) wt2 = calc_sma(wt1, WT_MA_LEN) return wt1, wt2 def calc_volume_ratio(df, period=20): avg_vol = df["volume"].rolling(window=period).mean() return df["volume"] / avg_vol def detect_divergence(df, wt1, lookback=20): div = pd.Series(0, index=df.index) for i in range(lookback, len(df)): window_close = df["close"].iloc[i-lookback:i+1] window_wt = wt1.iloc[i-lookback:i+1] curr_low = window_close.iloc[-1] prev_low = window_close.min() prev_low_idx = window_close.idxmin() if curr_low <= prev_low * 1.001: wt_curr = window_wt.iloc[-1] wt_prev = window_wt.loc[prev_low_idx] if prev_low_idx in window_wt.index else window_wt.min() if wt_curr > wt_prev + 5: div.iloc[i] = 1 curr_high = window_close.iloc[-1] prev_high = window_close.max() prev_high_idx = window_close.idxmax() if curr_high >= prev_high * 0.999: wt_curr = window_wt.iloc[-1] wt_prev = window_wt.loc[prev_high_idx] if prev_high_idx in window_wt.index else window_wt.max() if wt_curr < wt_prev - 5: div.iloc[i] = -1 return div # ============================================================ # STRATEGIES # ============================================================ def check_wt_cross(wt1, wt2, i): cross_up = wt1.iloc[i] > wt2.iloc[i] and wt1.iloc[i-1] <= wt2.iloc[i-1] cross_down = wt1.iloc[i] < wt2.iloc[i] and wt1.iloc[i-1] >= wt2.iloc[i-1] return 1 if cross_up else (-1 if cross_down else 0) def check_wt_zone(wt1, wt2, i, oversold=-53, overbought=53): if wt1.iloc[i] < oversold or wt2.iloc[i] < oversold: return 1 elif wt1.iloc[i] > overbought or wt2.iloc[i] > overbought: return -1 return 0 def s7_all_three(df, wt1, wt2, ema200, vol_ratio, div_series, vol_mult=1.5): signals = pd.Series(0, index=df.index) for i in range(1, len(df)): cross = check_wt_cross(wt1, wt2, i) zone = check_wt_zone(wt1, wt2, i) if cross == 0: continue recent_div = div_series.iloc[max(0, i-5):i+1] has_vol = vol_ratio.iloc[i] > vol_mult if cross == 1 and zone == 1: if df["close"].iloc[i] > ema200.iloc[i] and (recent_div == 1).any() and has_vol: signals.iloc[i] = 1 elif cross == -1 and zone == -1: if df["close"].iloc[i] < ema200.iloc[i] and (recent_div == -1).any() and has_vol: signals.iloc[i] = -1 return signals def s7_ema_div(df, wt1, wt2, ema200, vol_ratio, div_series): signals = pd.Series(0, index=df.index) for i in range(1, len(df)): cross = check_wt_cross(wt1, wt2, i) zone = check_wt_zone(wt1, wt2, i) if cross == 0: continue recent_div = div_series.iloc[max(0, i-5):i+1] if cross == 1 and zone == 1: if df["close"].iloc[i] > ema200.iloc[i] and (recent_div == 1).any(): signals.iloc[i] = 1 elif cross == -1 and zone == -1: if df["close"].iloc[i] < ema200.iloc[i] and (recent_div == -1).any(): signals.iloc[i] = -1 return signals def s7_ema_vol(df, wt1, wt2, ema200, vol_ratio, div_series, vol_mult=1.5): signals = pd.Series(0, index=df.index) for i in range(1, len(df)): cross = check_wt_cross(wt1, wt2, i) zone = check_wt_zone(wt1, wt2, i) if cross == 0: continue has_vol = vol_ratio.iloc[i] > vol_mult if cross == 1 and zone == 1: if df["close"].iloc[i] > ema200.iloc[i] and has_vol: signals.iloc[i] = 1 elif cross == -1 and zone == -1: if df["close"].iloc[i] < ema200.iloc[i] and has_vol: signals.iloc[i] = -1 return signals def s7_any2(df, wt1, wt2, ema200, vol_ratio, div_series, vol_mult=1.5): signals = pd.Series(0, index=df.index) for i in range(1, len(df)): cross = check_wt_cross(wt1, wt2, i) zone = check_wt_zone(wt1, wt2, i) if cross == 0: continue recent_div = div_series.iloc[max(0, i-5):i+1] has_vol = vol_ratio.iloc[i] > vol_mult if cross == 1 and zone == 1: if df["close"].iloc[i] > ema200.iloc[i] and ((recent_div == 1).any() or has_vol): signals.iloc[i] = 1 elif cross == -1 and zone == -1: if df["close"].iloc[i] < ema200.iloc[i] and ((recent_div == -1).any() or has_vol): signals.iloc[i] = -1 return signals # Baselines def s3_ema_filter(df, wt1, wt2, ema200): signals = pd.Series(0, index=df.index) for i in range(1, len(df)): cross = check_wt_cross(wt1, wt2, i) zone = check_wt_zone(wt1, wt2, i) if cross == 1 and zone == 1 and df["close"].iloc[i] > ema200.iloc[i]: signals.iloc[i] = 1 elif cross == -1 and zone == -1 and df["close"].iloc[i] < ema200.iloc[i]: signals.iloc[i] = -1 return signals def s2_divergence(df, wt1, wt2, div_series): signals = pd.Series(0, index=df.index) for i in range(1, len(df)): cross = check_wt_cross(wt1, wt2, i) if cross == 0: continue recent_div = div_series.iloc[max(0, i-5):i+1] if cross == 1 and (recent_div == 1).any(): signals.iloc[i] = 1 elif cross == -1 and (recent_div == -1).any(): signals.iloc[i] = -1 return signals # ============================================================ # BACKTESTER # ============================================================ def run_backtest(df, signals, sl_pct=0.02, tp_pct=0.03): trades = [] in_position = False entry_price = 0 direction = 0 entry_idx = 0 for i in range(len(df)): if not in_position: if signals.iloc[i] != 0: direction = signals.iloc[i] entry_price = df["close"].iloc[i] * (1 + SLIPPAGE * direction) entry_idx = i in_position = True else: high = df["high"].iloc[i] low = df["low"].iloc[i] if direction == 1: if low <= entry_price * (1 - sl_pct): trades.append({"pnl_pct": -sl_pct - COMMISSION*2, "result": "SL", "bars": i - entry_idx, "dir": "LONG"}) in_position = False elif high >= entry_price * (1 + tp_pct): trades.append({"pnl_pct": tp_pct - COMMISSION*2, "result": "TP", "bars": i - entry_idx, "dir": "LONG"}) in_position = False elif direction == -1: if high >= entry_price * (1 + sl_pct): trades.append({"pnl_pct": -sl_pct - COMMISSION*2, "result": "SL", "bars": i - entry_idx, "dir": "SHORT"}) in_position = False elif low <= entry_price * (1 - tp_pct): trades.append({"pnl_pct": tp_pct - COMMISSION*2, "result": "TP", "bars": i - entry_idx, "dir": "SHORT"}) in_position = False return trades def calc_stats(trades): if not trades: return {"trades": 0, "wins": 0, "losses": 0, "win_rate": 0, "total_pnl": 0, "profit_factor": 0, "max_dd": 0, "avg_bars": 0, "longs": 0, "shorts": 0} df_t = pd.DataFrame(trades) wins = df_t[df_t["pnl_pct"] > 0] losses = df_t[df_t["pnl_pct"] <= 0] gross_profit = wins["pnl_pct"].sum() if len(wins) > 0 else 0 gross_loss = abs(losses["pnl_pct"].sum()) if len(losses) > 0 else 0.0001 cumul = df_t["pnl_pct"].cumsum() max_dd = (cumul - cumul.cummax()).min() return { "trades": len(trades), "wins": len(wins), "losses": len(losses), "win_rate": round(len(wins) / len(trades) * 100, 1), "total_pnl": round(df_t["pnl_pct"].sum() * 100, 2), "profit_factor": round(gross_profit / gross_loss, 2), "max_dd": round(max_dd * 100, 2), "avg_bars": round(df_t["bars"].mean(), 1), "longs": len(df_t[df_t["dir"] == "LONG"]), "shorts": len(df_t[df_t["dir"] == "SHORT"]), } # ============================================================ # MAIN # ============================================================ def main(): if len(sys.argv) < 2: print("Usage: python backtest_s7_single.py OPUSDT") sys.exit(1) symbol = sys.argv[1].upper() if not symbol.endswith("USDT"): symbol += "USDT" if symbol in BLACKLIST: print(f"❌ {symbol} в чёрном списке!") sys.exit(1) print(f"\n{'='*60}") print(f" S7 COMBINED BACKTEST: {symbol}") print(f" {DAYS_BACK} дней | TF: {', '.join(TIMEFRAMES)}") print(f"{'='*60}") # Load existing results results = {} if os.path.exists(RESULTS_FILE): with open(RESULTS_FILE, "r") as f: results = json.load(f) # Download data print(f"\n📥 Скачиваю {symbol}...") data = {} for tf in TIMEFRAMES: print(f" {tf}...", end=" ", flush=True) df = get_binance_klines(symbol, tf, DAYS_BACK) if df is not None: data[tf] = df print(f"✅ {len(df)} свечей") else: print("❌") if not data: print("Нет данных!") sys.exit(1) # Strategies to test strat_funcs = { "S3_EMA": "s3", "S2_Div": "s2", "S7_ALL": "s7_all", "S7_EMA_DIV": "s7_ema_div", "S7_EMA_VOL": "s7_ema_vol", "S7_ANY2": "s7_any2", } symbol_results = {} for tf in TIMEFRAMES: if tf not in data: continue df = data[tf] if len(df) < 250: continue print(f"\n📊 {tf}:") wt1, wt2 = calc_wavetrend(df) ema200 = calc_ema(df["close"], 200) vol_ratio = calc_volume_ratio(df) div_series = detect_divergence(df, wt1) tf_results = {} for strat_name, strat_key in strat_funcs.items(): # Best default SL/TP if strat_key == "s3": signals = s3_ema_filter(df, wt1, wt2, ema200) elif strat_key == "s2": signals = s2_divergence(df, wt1, wt2, div_series) elif strat_key == "s7_all": signals = s7_all_three(df, wt1, wt2, ema200, vol_ratio, div_series) elif strat_key == "s7_ema_div": signals = s7_ema_div(df, wt1, wt2, ema200, vol_ratio, div_series) elif strat_key == "s7_ema_vol": signals = s7_ema_vol(df, wt1, wt2, ema200, vol_ratio, div_series) elif strat_key == "s7_any2": signals = s7_any2(df, wt1, wt2, ema200, vol_ratio, div_series) # Test all SL/TP configs best_sltp = None best_pnl = -999 sltp_results = {} for config in SLTP_CONFIGS: trades = run_backtest(df, signals, sl_pct=config["sl"], tp_pct=config["tp"]) stats = calc_stats(trades) sltp_results[config["name"]] = stats if stats["total_pnl"] > best_pnl and stats["trades"] >= 5: best_pnl = stats["total_pnl"] best_sltp = config["name"] # Default SL2/TP3 for display default_stats = sltp_results.get("SL2/TP3", calc_stats([])) best_stats = sltp_results.get(best_sltp, default_stats) if best_sltp else default_stats tf_results[strat_name] = { "default_SL2_TP3": sltp_results.get("SL2/TP3", {}), "best_sltp": best_sltp, "best_stats": best_stats, "all_sltp": sltp_results, } marker = " 🏆" if best_pnl > 0 else "" print(f" {strat_name:15s} | {default_stats['trades']:3d}t | WR {default_stats['win_rate']:5.1f}% | " f"PnL {default_stats['total_pnl']:+7.2f}% | PF {default_stats['profit_factor']:.2f} | " f"Best: {best_sltp} → {best_pnl:+.2f}%{marker}") symbol_results[tf] = tf_results # Save results[symbol] = { "tested_at": datetime.now().isoformat(), "days_back": DAYS_BACK, "results": symbol_results, } with open(RESULTS_FILE, "w") as f: json.dump(results, f, indent=2, default=str) print(f"\n💾 Сохранено в {RESULTS_FILE}") print(f"✅ {symbol} готов! ({len(results)} монет в файле)") if __name__ == "__main__": main()