← Back
#!/usr/bin/env python3
"""
S7_Combined Backtest — EMA + Divergence + Volume
=================================================
Гибрид трёх лучших стратегий из бэктеста:
- S3: WT Cross в зоне + EMA200 тренд фильтр
- S2: Дивергенция цены vs WT
- S4: Volume > N×avg подтверждение

Варианты:
- S7_ALL: все 3 фильтра обязательны (strict)
- S7_EMA_DIV: EMA + Divergence (без Volume)
- S7_EMA_VOL: EMA + Volume (без Divergence)
- S7_ANY2: EMA обязательна + любой из (Div OR Vol)
"""

import json
import time
import numpy as np
import pandas as pd
from datetime import datetime
from tabulate import tabulate

# ============================================================
# CONFIG
# ============================================================

SYMBOLS = ["OPUSDT", "FETUSDT", "AVAXUSDT", "NEARUSDT", "APTUSDT"]
TIMEFRAMES = ["5m", "15m"]
DAYS_BACK = 90
COMMISSION = 0.0004
SLIPPAGE = 0.0001

WT_CHANNEL_LEN = 10
WT_AVG_LEN = 21
WT_MA_LEN = 4

SLTP_CONFIGS = [
    {"name": "SL1.5/TP3", "sl": 0.015, "tp": 0.03},
    {"name": "SL2/TP3",   "sl": 0.02,  "tp": 0.03},
    {"name": "SL2/TP4",   "sl": 0.02,  "tp": 0.04},
    {"name": "SL1.5/TP4", "sl": 0.015, "tp": 0.04},
    {"name": "SL2.5/TP5", "sl": 0.025, "tp": 0.05},
]

# ============================================================
# DATA
# ============================================================

def get_binance_klines(symbol, interval, days_back):
    import requests
    url = "https://fapi.binance.com/fapi/v1/klines"
    end_time = int(time.time() * 1000)
    interval_ms = {
        "5m": 300000, "15m": 900000, "1h": 3600000,
    }
    ms_back = days_back * 24 * 3600 * 1000
    start_time = end_time - ms_back
    all_klines = []
    current_start = start_time

    while current_start < end_time:
        params = {"symbol": symbol, "interval": interval, "startTime": current_start, "limit": 1500}
        try:
            resp = requests.get(url, params=params, timeout=10)
            data = resp.json()
            if not data or not isinstance(data, list):
                break
            all_klines.extend(data)
            current_start = data[-1][0] + interval_ms.get(interval, 300000)
            time.sleep(0.1)
        except Exception as e:
            print(f"  Error: {symbol} {interval}: {e}")
            time.sleep(1)
            continue

    if not all_klines:
        return None

    df = pd.DataFrame(all_klines, columns=[
        "timestamp", "open", "high", "low", "close", "volume",
        "close_time", "quote_volume", "trades", "taker_buy_base",
        "taker_buy_quote", "ignore"
    ])
    df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
    for col in ["open", "high", "low", "close", "volume", "quote_volume"]:
        df[col] = df[col].astype(float)
    df = df.drop_duplicates(subset=["timestamp"]).sort_values("timestamp").reset_index(drop=True)
    return df

# ============================================================
# INDICATORS
# ============================================================

def calc_ema(series, period):
    return series.ewm(span=period, adjust=False).mean()

def calc_sma(series, period):
    return series.rolling(window=period).mean()

def calc_wavetrend(df):
    hlc3 = (df["high"] + df["low"] + df["close"]) / 3
    esa = calc_ema(hlc3, WT_CHANNEL_LEN)
    d = calc_ema((hlc3 - esa).abs(), WT_CHANNEL_LEN)
    ci = (hlc3 - esa) / (0.015 * d)
    wt1 = calc_ema(ci, WT_AVG_LEN)
    wt2 = calc_sma(wt1, WT_MA_LEN)
    return wt1, wt2

def calc_volume_ratio(df, period=20):
    avg_vol = df["volume"].rolling(window=period).mean()
    return df["volume"] / avg_vol

def detect_divergence(df, wt1, lookback=20):
    div = pd.Series(0, index=df.index)
    for i in range(lookback, len(df)):
        window_close = df["close"].iloc[i-lookback:i+1]
        window_wt = wt1.iloc[i-lookback:i+1]

        # Bullish divergence: price new low, WT higher low
        curr_low = window_close.iloc[-1]
        prev_low = window_close.min()
        prev_low_idx = window_close.idxmin()
        if curr_low <= prev_low * 1.001:
            wt_curr = window_wt.iloc[-1]
            wt_prev = window_wt.loc[prev_low_idx] if prev_low_idx in window_wt.index else window_wt.min()
            if wt_curr > wt_prev + 5:
                div.iloc[i] = 1

        # Bearish divergence: price new high, WT lower high
        curr_high = window_close.iloc[-1]
        prev_high = window_close.max()
        prev_high_idx = window_close.idxmax()
        if curr_high >= prev_high * 0.999:
            wt_curr = window_wt.iloc[-1]
            wt_prev = window_wt.loc[prev_high_idx] if prev_high_idx in window_wt.index else window_wt.max()
            if wt_curr < wt_prev - 5:
                div.iloc[i] = -1

    return div

# ============================================================
# COMBINED STRATEGIES
# ============================================================

def check_wt_cross(wt1, wt2, i):
    """Returns 1 for bullish cross, -1 for bearish cross, 0 for none."""
    cross_up = wt1.iloc[i] > wt2.iloc[i] and wt1.iloc[i-1] <= wt2.iloc[i-1]
    cross_down = wt1.iloc[i] < wt2.iloc[i] and wt1.iloc[i-1] >= wt2.iloc[i-1]
    if cross_up:
        return 1
    elif cross_down:
        return -1
    return 0

def check_wt_zone(wt1, wt2, i, oversold=-53, overbought=53):
    """Returns 1 if oversold, -1 if overbought, 0 if neutral."""
    if wt1.iloc[i] < oversold or wt2.iloc[i] < oversold:
        return 1
    elif wt1.iloc[i] > overbought or wt2.iloc[i] > overbought:
        return -1
    return 0

def s7_all_three(df, wt1, wt2, ema200, vol_ratio, div_series, vol_mult=1.5):
    """S7_ALL: WT cross в зоне + EMA200 + Divergence + Volume (все обязательны)."""
    signals = pd.Series(0, index=df.index)
    for i in range(1, len(df)):
        cross = check_wt_cross(wt1, wt2, i)
        zone = check_wt_zone(wt1, wt2, i)
        if cross == 0:
            continue
        recent_div = div_series.iloc[max(0, i-5):i+1]
        has_vol = vol_ratio.iloc[i] > vol_mult

        if cross == 1 and zone == 1:
            if df["close"].iloc[i] > ema200.iloc[i] and (recent_div == 1).any() and has_vol:
                signals.iloc[i] = 1
        elif cross == -1 and zone == -1:
            if df["close"].iloc[i] < ema200.iloc[i] and (recent_div == -1).any() and has_vol:
                signals.iloc[i] = -1
    return signals

def s7_ema_div(df, wt1, wt2, ema200, vol_ratio, div_series):
    """S7_EMA_DIV: WT cross в зоне + EMA200 + Divergence (без Volume)."""
    signals = pd.Series(0, index=df.index)
    for i in range(1, len(df)):
        cross = check_wt_cross(wt1, wt2, i)
        zone = check_wt_zone(wt1, wt2, i)
        if cross == 0:
            continue
        recent_div = div_series.iloc[max(0, i-5):i+1]

        if cross == 1 and zone == 1:
            if df["close"].iloc[i] > ema200.iloc[i] and (recent_div == 1).any():
                signals.iloc[i] = 1
        elif cross == -1 and zone == -1:
            if df["close"].iloc[i] < ema200.iloc[i] and (recent_div == -1).any():
                signals.iloc[i] = -1
    return signals

def s7_ema_vol(df, wt1, wt2, ema200, vol_ratio, div_series, vol_mult=1.5):
    """S7_EMA_VOL: WT cross в зоне + EMA200 + Volume (без Divergence)."""
    signals = pd.Series(0, index=df.index)
    for i in range(1, len(df)):
        cross = check_wt_cross(wt1, wt2, i)
        zone = check_wt_zone(wt1, wt2, i)
        if cross == 0:
            continue
        has_vol = vol_ratio.iloc[i] > vol_mult

        if cross == 1 and zone == 1:
            if df["close"].iloc[i] > ema200.iloc[i] and has_vol:
                signals.iloc[i] = 1
        elif cross == -1 and zone == -1:
            if df["close"].iloc[i] < ema200.iloc[i] and has_vol:
                signals.iloc[i] = -1
    return signals

def s7_any2(df, wt1, wt2, ema200, vol_ratio, div_series, vol_mult=1.5):
    """S7_ANY2: WT cross в зоне + EMA200 (обяз) + (Divergence OR Volume)."""
    signals = pd.Series(0, index=df.index)
    for i in range(1, len(df)):
        cross = check_wt_cross(wt1, wt2, i)
        zone = check_wt_zone(wt1, wt2, i)
        if cross == 0:
            continue
        recent_div = div_series.iloc[max(0, i-5):i+1]
        has_vol = vol_ratio.iloc[i] > vol_mult

        if cross == 1 and zone == 1:
            if df["close"].iloc[i] > ema200.iloc[i] and ((recent_div == 1).any() or has_vol):
                signals.iloc[i] = 1
        elif cross == -1 and zone == -1:
            if df["close"].iloc[i] < ema200.iloc[i] and ((recent_div == -1).any() or has_vol):
                signals.iloc[i] = -1
    return signals

# Бонус: оригинальные стратегии для сравнения
def s3_ema_filter(df, wt1, wt2, ema200, oversold=-53, overbought=53):
    signals = pd.Series(0, index=df.index)
    for i in range(1, len(df)):
        cross = check_wt_cross(wt1, wt2, i)
        zone = check_wt_zone(wt1, wt2, i)
        if cross == 1 and zone == 1 and df["close"].iloc[i] > ema200.iloc[i]:
            signals.iloc[i] = 1
        elif cross == -1 and zone == -1 and df["close"].iloc[i] < ema200.iloc[i]:
            signals.iloc[i] = -1
    return signals

def s2_divergence(df, wt1, wt2, div_series):
    signals = pd.Series(0, index=df.index)
    for i in range(1, len(df)):
        cross = check_wt_cross(wt1, wt2, i)
        if cross == 0:
            continue
        recent_div = div_series.iloc[max(0, i-5):i+1]
        if cross == 1 and (recent_div == 1).any():
            signals.iloc[i] = 1
        elif cross == -1 and (recent_div == -1).any():
            signals.iloc[i] = -1
    return signals

# ============================================================
# BACKTESTER
# ============================================================

def run_backtest(df, signals, sl_pct=0.02, tp_pct=0.03):
    trades = []
    in_position = False
    entry_price = 0
    direction = 0
    entry_idx = 0

    for i in range(len(df)):
        if not in_position:
            if signals.iloc[i] != 0:
                direction = signals.iloc[i]
                entry_price = df["close"].iloc[i] * (1 + SLIPPAGE * direction)
                entry_idx = i
                in_position = True
        else:
            high = df["high"].iloc[i]
            low = df["low"].iloc[i]

            if direction == 1:
                sl_price = entry_price * (1 - sl_pct)
                tp_price = entry_price * (1 + tp_pct)
                if low <= sl_price:
                    trades.append({"pnl_pct": -sl_pct - COMMISSION*2, "result": "SL",
                                   "bars": i - entry_idx, "dir": "LONG",
                                   "entry_time": str(df["timestamp"].iloc[entry_idx]),
                                   "exit_time": str(df["timestamp"].iloc[i])})
                    in_position = False
                elif high >= tp_price:
                    trades.append({"pnl_pct": tp_pct - COMMISSION*2, "result": "TP",
                                   "bars": i - entry_idx, "dir": "LONG",
                                   "entry_time": str(df["timestamp"].iloc[entry_idx]),
                                   "exit_time": str(df["timestamp"].iloc[i])})
                    in_position = False
            elif direction == -1:
                sl_price = entry_price * (1 + sl_pct)
                tp_price = entry_price * (1 - tp_pct)
                if high >= sl_price:
                    trades.append({"pnl_pct": -sl_pct - COMMISSION*2, "result": "SL",
                                   "bars": i - entry_idx, "dir": "SHORT",
                                   "entry_time": str(df["timestamp"].iloc[entry_idx]),
                                   "exit_time": str(df["timestamp"].iloc[i])})
                    in_position = False
                elif low <= tp_price:
                    trades.append({"pnl_pct": tp_pct - COMMISSION*2, "result": "TP",
                                   "bars": i - entry_idx, "dir": "SHORT",
                                   "entry_time": str(df["timestamp"].iloc[entry_idx]),
                                   "exit_time": str(df["timestamp"].iloc[i])})
                    in_position = False
    return trades

def calc_stats(trades):
    if not trades:
        return {"trades": 0, "wins": 0, "losses": 0, "win_rate": 0,
                "total_pnl": 0, "profit_factor": 0, "max_dd": 0, "avg_bars": 0}

    df_t = pd.DataFrame(trades)
    wins = df_t[df_t["pnl_pct"] > 0]
    losses = df_t[df_t["pnl_pct"] <= 0]
    gross_profit = wins["pnl_pct"].sum() if len(wins) > 0 else 0
    gross_loss = abs(losses["pnl_pct"].sum()) if len(losses) > 0 else 0.0001
    cumul = df_t["pnl_pct"].cumsum()
    max_dd = (cumul - cumul.cummax()).min()

    return {
        "trades": len(trades),
        "wins": len(wins),
        "losses": len(losses),
        "win_rate": round(len(wins) / len(trades) * 100, 1),
        "total_pnl": round(df_t["pnl_pct"].sum() * 100, 2),
        "profit_factor": round(gross_profit / gross_loss, 2),
        "max_dd": round(max_dd * 100, 2),
        "avg_bars": round(df_t["bars"].mean(), 1),
        "longs": len(df_t[df_t["dir"] == "LONG"]),
        "shorts": len(df_t[df_t["dir"] == "SHORT"]),
    }

# ============================================================
# MAIN
# ============================================================

def main():
    print("=" * 75)
    print("  S7_COMBINED BACKTEST — EMA + Divergence + Volume")
    print(f"  Пары: {', '.join(SYMBOLS)}")
    print(f"  Период: {DAYS_BACK} дней | TF: {', '.join(TIMEFRAMES)}")
    print("=" * 75)

    # Download data
    print("\n📥 Скачиваю данные с Binance...")
    data = {}
    for sym in SYMBOLS:
        data[sym] = {}
        for tf in TIMEFRAMES:
            print(f"  {sym} {tf}...", end=" ", flush=True)
            df = get_binance_klines(sym, tf, DAYS_BACK)
            if df is not None:
                data[sym][tf] = df
                print(f"✅ {len(df)} свечей")
            else:
                print("❌ FAIL")

    # Run strategies
    print("\n📊 Запускаю стратегии...")

    strategies = {
        "S3_EMA (baseline)": "s3",
        "S2_Div (baseline)": "s2",
        "S7_ALL (EMA+Div+Vol)": "s7_all",
        "S7_EMA_DIV": "s7_ema_div",
        "S7_EMA_VOL": "s7_ema_vol",
        "S7_ANY2 (EMA + Div|Vol)": "s7_any2",
    }

    all_results = []

    for tf in TIMEFRAMES:
        print(f"\n{'─'*75}")
        print(f"  ТАЙМФРЕЙМ: {tf}")
        print(f"{'─'*75}")

        for strat_name, strat_key in strategies.items():
            agg_trades = []
            per_symbol = {}

            for sym in SYMBOLS:
                if tf not in data.get(sym, {}):
                    continue
                df = data[sym][tf]
                if len(df) < 250:
                    continue

                wt1, wt2 = calc_wavetrend(df)
                ema200 = calc_ema(df["close"], 200)
                vol_ratio = calc_volume_ratio(df)
                div_series = detect_divergence(df, wt1)

                if strat_key == "s3":
                    signals = s3_ema_filter(df, wt1, wt2, ema200)
                elif strat_key == "s2":
                    signals = s2_divergence(df, wt1, wt2, div_series)
                elif strat_key == "s7_all":
                    signals = s7_all_three(df, wt1, wt2, ema200, vol_ratio, div_series)
                elif strat_key == "s7_ema_div":
                    signals = s7_ema_div(df, wt1, wt2, ema200, vol_ratio, div_series)
                elif strat_key == "s7_ema_vol":
                    signals = s7_ema_vol(df, wt1, wt2, ema200, vol_ratio, div_series)
                elif strat_key == "s7_any2":
                    signals = s7_any2(df, wt1, wt2, ema200, vol_ratio, div_series)

                trades = run_backtest(df, signals, sl_pct=0.02, tp_pct=0.03)
                agg_trades.extend(trades)
                if trades:
                    sym_stats = calc_stats(trades)
                    per_symbol[sym] = sym_stats

            stats = calc_stats(agg_trades)
            stats["strategy"] = strat_name
            stats["timeframe"] = tf
            stats["per_symbol"] = per_symbol
            all_results.append(stats)

            sym_detail = " | ".join([f"{s}:{d['trades']}t/{d['win_rate']}%" for s, d in per_symbol.items()])
            print(f"  {strat_name:28s} | {stats['trades']:3d}t | WR {stats['win_rate']:5.1f}% | "
                  f"PnL {stats['total_pnl']:+7.2f}% | PF {stats['profit_factor']:.2f} | "
                  f"DD {stats['max_dd']:.2f}%")
            if sym_detail:
                print(f"    └─ {sym_detail}")

    # SL/TP optimization for best strategy
    print(f"\n{'='*75}")
    print("  SL/TP ОПТИМИЗАЦИЯ лучших комбо стратегий")
    print(f"{'='*75}")

    # Test all combined strategies with all SL/TP configs
    combo_strats = ["s7_all", "s7_ema_div", "s7_ema_vol", "s7_any2"]
    combo_names = ["S7_ALL", "S7_EMA_DIV", "S7_EMA_VOL", "S7_ANY2"]

    best_tf = "5m"  # бэктест показал 5m как лучший

    sltp_table = []
    for strat_key, strat_name in zip(combo_strats, combo_names):
        for config in SLTP_CONFIGS:
            agg_trades = []
            for sym in SYMBOLS:
                if best_tf not in data.get(sym, {}):
                    continue
                df = data[sym][best_tf]
                if len(df) < 250:
                    continue

                wt1, wt2 = calc_wavetrend(df)
                ema200 = calc_ema(df["close"], 200)
                vol_ratio = calc_volume_ratio(df)
                div_series = detect_divergence(df, wt1)

                if strat_key == "s7_all":
                    signals = s7_all_three(df, wt1, wt2, ema200, vol_ratio, div_series)
                elif strat_key == "s7_ema_div":
                    signals = s7_ema_div(df, wt1, wt2, ema200, vol_ratio, div_series)
                elif strat_key == "s7_ema_vol":
                    signals = s7_ema_vol(df, wt1, wt2, ema200, vol_ratio, div_series)
                elif strat_key == "s7_any2":
                    signals = s7_any2(df, wt1, wt2, ema200, vol_ratio, div_series)

                trades = run_backtest(df, signals, sl_pct=config["sl"], tp_pct=config["tp"])
                agg_trades.extend(trades)

            stats = calc_stats(agg_trades)
            sltp_table.append([
                strat_name, config["name"], stats["trades"],
                f"{stats['win_rate']:.1f}%", f"{stats['total_pnl']:+.2f}%",
                f"{stats['profit_factor']:.2f}", f"{stats['max_dd']:.2f}%",
                f"{stats['avg_bars']:.0f}", f"{stats['longs']}L/{stats['shorts']}S"
            ])

    headers = ["Strategy", "SL/TP", "Trades", "WR", "PnL", "PF", "MaxDD", "AvgBars", "L/S"]
    print(tabulate(sltp_table, headers=headers, tablefmt="grid"))

    # Save results
    save_data = []
    for r in all_results:
        entry = {k: v for k, v in r.items() if k != "per_symbol"}
        entry["per_symbol"] = {sym: stats for sym, stats in r.get("per_symbol", {}).items()}
        save_data.append(entry)

    with open("/home/app/wt-backtest/results_combined.json", "w") as f:
        json.dump(save_data, f, indent=2, default=str)

    print(f"\n💾 Результаты сохранены в /home/app/wt-backtest/results_combined.json")
    print("Done! ✅")

if __name__ == "__main__":
    main()

📜 Git History

c6f6bd5chore: initial commit — version control setup5 weeks ago
Show last diff
Loading...