← Back
#!/usr/bin/env python3
"""
S7_Combined Backtest — Single Symbol Mode
==========================================
Прогоняет 1 монету за раз, результат дописывает в JSON файл.
Использование: python backtest_s7_single.py OPUSDT
"""

import sys
import json
import time
import os
import numpy as np
import pandas as pd
from datetime import datetime

# ============================================================
# CONFIG
# ============================================================

TIMEFRAMES = ["5m", "15m"]
DAYS_BACK = 90
COMMISSION = 0.0004
SLIPPAGE = 0.0001

WT_CHANNEL_LEN = 10
WT_AVG_LEN = 21
WT_MA_LEN = 4

RESULTS_FILE = "/home/app/trading-bot/backtests/results_s7.json"

SLTP_CONFIGS = [
    {"name": "SL1.5/TP3", "sl": 0.015, "tp": 0.03},
    {"name": "SL2/TP3",   "sl": 0.02,  "tp": 0.03},
    {"name": "SL2/TP4",   "sl": 0.02,  "tp": 0.04},
    {"name": "SL1.5/TP4", "sl": 0.015, "tp": 0.04},
    {"name": "SL2.5/TP5", "sl": 0.025, "tp": 0.05},
]

BLACKLIST = ["SOLUSDT", "BNBUSDT", "XRPUSDT", "ETHUSDT",
             "BTCUSDT", "DOGEUSDT", "ADAUSDT", "WLDUSDT", "ENAUSDT"]

# ============================================================
# DATA
# ============================================================

def get_binance_klines(symbol, interval, days_back):
    import requests
    url = "https://fapi.binance.com/fapi/v1/klines"
    end_time = int(time.time() * 1000)
    interval_ms = {"5m": 300000, "15m": 900000, "1h": 3600000}
    ms_back = days_back * 24 * 3600 * 1000
    start_time = end_time - ms_back
    all_klines = []
    current_start = start_time

    while current_start < end_time:
        params = {"symbol": symbol, "interval": interval, "startTime": current_start, "limit": 1500}
        try:
            resp = requests.get(url, params=params, timeout=10)
            data = resp.json()
            if not data or not isinstance(data, list):
                break
            all_klines.extend(data)
            current_start = data[-1][0] + interval_ms.get(interval, 300000)
            time.sleep(0.1)
        except Exception as e:
            print(f"  Error: {symbol} {interval}: {e}")
            time.sleep(1)
            continue

    if not all_klines:
        return None

    df = pd.DataFrame(all_klines, columns=[
        "timestamp", "open", "high", "low", "close", "volume",
        "close_time", "quote_volume", "trades", "taker_buy_base",
        "taker_buy_quote", "ignore"
    ])
    df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
    for col in ["open", "high", "low", "close", "volume", "quote_volume"]:
        df[col] = df[col].astype(float)
    df = df.drop_duplicates(subset=["timestamp"]).sort_values("timestamp").reset_index(drop=True)
    return df

# ============================================================
# INDICATORS
# ============================================================

def calc_ema(series, period):
    return series.ewm(span=period, adjust=False).mean()

def calc_sma(series, period):
    return series.rolling(window=period).mean()

def calc_wavetrend(df):
    hlc3 = (df["high"] + df["low"] + df["close"]) / 3
    esa = calc_ema(hlc3, WT_CHANNEL_LEN)
    d = calc_ema((hlc3 - esa).abs(), WT_CHANNEL_LEN)
    ci = (hlc3 - esa) / (0.015 * d)
    wt1 = calc_ema(ci, WT_AVG_LEN)
    wt2 = calc_sma(wt1, WT_MA_LEN)
    return wt1, wt2

def calc_volume_ratio(df, period=20):
    avg_vol = df["volume"].rolling(window=period).mean()
    return df["volume"] / avg_vol

def detect_divergence(df, wt1, lookback=20):
    div = pd.Series(0, index=df.index)
    for i in range(lookback, len(df)):
        window_close = df["close"].iloc[i-lookback:i+1]
        window_wt = wt1.iloc[i-lookback:i+1]
        curr_low = window_close.iloc[-1]
        prev_low = window_close.min()
        prev_low_idx = window_close.idxmin()
        if curr_low <= prev_low * 1.001:
            wt_curr = window_wt.iloc[-1]
            wt_prev = window_wt.loc[prev_low_idx] if prev_low_idx in window_wt.index else window_wt.min()
            if wt_curr > wt_prev + 5:
                div.iloc[i] = 1
        curr_high = window_close.iloc[-1]
        prev_high = window_close.max()
        prev_high_idx = window_close.idxmax()
        if curr_high >= prev_high * 0.999:
            wt_curr = window_wt.iloc[-1]
            wt_prev = window_wt.loc[prev_high_idx] if prev_high_idx in window_wt.index else window_wt.max()
            if wt_curr < wt_prev - 5:
                div.iloc[i] = -1
    return div

# ============================================================
# STRATEGIES
# ============================================================

def check_wt_cross(wt1, wt2, i):
    cross_up = wt1.iloc[i] > wt2.iloc[i] and wt1.iloc[i-1] <= wt2.iloc[i-1]
    cross_down = wt1.iloc[i] < wt2.iloc[i] and wt1.iloc[i-1] >= wt2.iloc[i-1]
    return 1 if cross_up else (-1 if cross_down else 0)

def check_wt_zone(wt1, wt2, i, oversold=-53, overbought=53):
    if wt1.iloc[i] < oversold or wt2.iloc[i] < oversold:
        return 1
    elif wt1.iloc[i] > overbought or wt2.iloc[i] > overbought:
        return -1
    return 0

def s7_all_three(df, wt1, wt2, ema200, vol_ratio, div_series, vol_mult=1.5):
    signals = pd.Series(0, index=df.index)
    for i in range(1, len(df)):
        cross = check_wt_cross(wt1, wt2, i)
        zone = check_wt_zone(wt1, wt2, i)
        if cross == 0: continue
        recent_div = div_series.iloc[max(0, i-5):i+1]
        has_vol = vol_ratio.iloc[i] > vol_mult
        if cross == 1 and zone == 1:
            if df["close"].iloc[i] > ema200.iloc[i] and (recent_div == 1).any() and has_vol:
                signals.iloc[i] = 1
        elif cross == -1 and zone == -1:
            if df["close"].iloc[i] < ema200.iloc[i] and (recent_div == -1).any() and has_vol:
                signals.iloc[i] = -1
    return signals

def s7_ema_div(df, wt1, wt2, ema200, vol_ratio, div_series):
    signals = pd.Series(0, index=df.index)
    for i in range(1, len(df)):
        cross = check_wt_cross(wt1, wt2, i)
        zone = check_wt_zone(wt1, wt2, i)
        if cross == 0: continue
        recent_div = div_series.iloc[max(0, i-5):i+1]
        if cross == 1 and zone == 1:
            if df["close"].iloc[i] > ema200.iloc[i] and (recent_div == 1).any():
                signals.iloc[i] = 1
        elif cross == -1 and zone == -1:
            if df["close"].iloc[i] < ema200.iloc[i] and (recent_div == -1).any():
                signals.iloc[i] = -1
    return signals

def s7_ema_vol(df, wt1, wt2, ema200, vol_ratio, div_series, vol_mult=1.5):
    signals = pd.Series(0, index=df.index)
    for i in range(1, len(df)):
        cross = check_wt_cross(wt1, wt2, i)
        zone = check_wt_zone(wt1, wt2, i)
        if cross == 0: continue
        has_vol = vol_ratio.iloc[i] > vol_mult
        if cross == 1 and zone == 1:
            if df["close"].iloc[i] > ema200.iloc[i] and has_vol:
                signals.iloc[i] = 1
        elif cross == -1 and zone == -1:
            if df["close"].iloc[i] < ema200.iloc[i] and has_vol:
                signals.iloc[i] = -1
    return signals

def s7_any2(df, wt1, wt2, ema200, vol_ratio, div_series, vol_mult=1.5):
    signals = pd.Series(0, index=df.index)
    for i in range(1, len(df)):
        cross = check_wt_cross(wt1, wt2, i)
        zone = check_wt_zone(wt1, wt2, i)
        if cross == 0: continue
        recent_div = div_series.iloc[max(0, i-5):i+1]
        has_vol = vol_ratio.iloc[i] > vol_mult
        if cross == 1 and zone == 1:
            if df["close"].iloc[i] > ema200.iloc[i] and ((recent_div == 1).any() or has_vol):
                signals.iloc[i] = 1
        elif cross == -1 and zone == -1:
            if df["close"].iloc[i] < ema200.iloc[i] and ((recent_div == -1).any() or has_vol):
                signals.iloc[i] = -1
    return signals

# Baselines
def s3_ema_filter(df, wt1, wt2, ema200):
    signals = pd.Series(0, index=df.index)
    for i in range(1, len(df)):
        cross = check_wt_cross(wt1, wt2, i)
        zone = check_wt_zone(wt1, wt2, i)
        if cross == 1 and zone == 1 and df["close"].iloc[i] > ema200.iloc[i]:
            signals.iloc[i] = 1
        elif cross == -1 and zone == -1 and df["close"].iloc[i] < ema200.iloc[i]:
            signals.iloc[i] = -1
    return signals

def s2_divergence(df, wt1, wt2, div_series):
    signals = pd.Series(0, index=df.index)
    for i in range(1, len(df)):
        cross = check_wt_cross(wt1, wt2, i)
        if cross == 0: continue
        recent_div = div_series.iloc[max(0, i-5):i+1]
        if cross == 1 and (recent_div == 1).any():
            signals.iloc[i] = 1
        elif cross == -1 and (recent_div == -1).any():
            signals.iloc[i] = -1
    return signals

# ============================================================
# BACKTESTER
# ============================================================

def run_backtest(df, signals, sl_pct=0.02, tp_pct=0.03):
    trades = []
    in_position = False
    entry_price = 0
    direction = 0
    entry_idx = 0
    for i in range(len(df)):
        if not in_position:
            if signals.iloc[i] != 0:
                direction = signals.iloc[i]
                entry_price = df["close"].iloc[i] * (1 + SLIPPAGE * direction)
                entry_idx = i
                in_position = True
        else:
            high = df["high"].iloc[i]
            low = df["low"].iloc[i]
            if direction == 1:
                if low <= entry_price * (1 - sl_pct):
                    trades.append({"pnl_pct": -sl_pct - COMMISSION*2, "result": "SL", "bars": i - entry_idx, "dir": "LONG"})
                    in_position = False
                elif high >= entry_price * (1 + tp_pct):
                    trades.append({"pnl_pct": tp_pct - COMMISSION*2, "result": "TP", "bars": i - entry_idx, "dir": "LONG"})
                    in_position = False
            elif direction == -1:
                if high >= entry_price * (1 + sl_pct):
                    trades.append({"pnl_pct": -sl_pct - COMMISSION*2, "result": "SL", "bars": i - entry_idx, "dir": "SHORT"})
                    in_position = False
                elif low <= entry_price * (1 - tp_pct):
                    trades.append({"pnl_pct": tp_pct - COMMISSION*2, "result": "TP", "bars": i - entry_idx, "dir": "SHORT"})
                    in_position = False
    return trades

def calc_stats(trades):
    if not trades:
        return {"trades": 0, "wins": 0, "losses": 0, "win_rate": 0,
                "total_pnl": 0, "profit_factor": 0, "max_dd": 0, "avg_bars": 0,
                "longs": 0, "shorts": 0}
    df_t = pd.DataFrame(trades)
    wins = df_t[df_t["pnl_pct"] > 0]
    losses = df_t[df_t["pnl_pct"] <= 0]
    gross_profit = wins["pnl_pct"].sum() if len(wins) > 0 else 0
    gross_loss = abs(losses["pnl_pct"].sum()) if len(losses) > 0 else 0.0001
    cumul = df_t["pnl_pct"].cumsum()
    max_dd = (cumul - cumul.cummax()).min()
    return {
        "trades": len(trades),
        "wins": len(wins),
        "losses": len(losses),
        "win_rate": round(len(wins) / len(trades) * 100, 1),
        "total_pnl": round(df_t["pnl_pct"].sum() * 100, 2),
        "profit_factor": round(gross_profit / gross_loss, 2),
        "max_dd": round(max_dd * 100, 2),
        "avg_bars": round(df_t["bars"].mean(), 1),
        "longs": len(df_t[df_t["dir"] == "LONG"]),
        "shorts": len(df_t[df_t["dir"] == "SHORT"]),
    }

# ============================================================
# MAIN
# ============================================================

def main():
    if len(sys.argv) < 2:
        print("Usage: python backtest_s7_single.py OPUSDT")
        sys.exit(1)

    symbol = sys.argv[1].upper()
    if not symbol.endswith("USDT"):
        symbol += "USDT"

    if symbol in BLACKLIST:
        print(f"❌ {symbol} в чёрном списке!")
        sys.exit(1)

    print(f"\n{'='*60}")
    print(f"  S7 COMBINED BACKTEST: {symbol}")
    print(f"  {DAYS_BACK} дней | TF: {', '.join(TIMEFRAMES)}")
    print(f"{'='*60}")

    # Load existing results
    results = {}
    if os.path.exists(RESULTS_FILE):
        with open(RESULTS_FILE, "r") as f:
            results = json.load(f)

    # Download data
    print(f"\n📥 Скачиваю {symbol}...")
    data = {}
    for tf in TIMEFRAMES:
        print(f"  {tf}...", end=" ", flush=True)
        df = get_binance_klines(symbol, tf, DAYS_BACK)
        if df is not None:
            data[tf] = df
            print(f"✅ {len(df)} свечей")
        else:
            print("❌")

    if not data:
        print("Нет данных!")
        sys.exit(1)

    # Strategies to test
    strat_funcs = {
        "S3_EMA": "s3",
        "S2_Div": "s2",
        "S7_ALL": "s7_all",
        "S7_EMA_DIV": "s7_ema_div",
        "S7_EMA_VOL": "s7_ema_vol",
        "S7_ANY2": "s7_any2",
    }

    symbol_results = {}

    for tf in TIMEFRAMES:
        if tf not in data:
            continue
        df = data[tf]
        if len(df) < 250:
            continue

        print(f"\n📊 {tf}:")
        wt1, wt2 = calc_wavetrend(df)
        ema200 = calc_ema(df["close"], 200)
        vol_ratio = calc_volume_ratio(df)
        div_series = detect_divergence(df, wt1)

        tf_results = {}

        for strat_name, strat_key in strat_funcs.items():
            # Best default SL/TP
            if strat_key == "s3":
                signals = s3_ema_filter(df, wt1, wt2, ema200)
            elif strat_key == "s2":
                signals = s2_divergence(df, wt1, wt2, div_series)
            elif strat_key == "s7_all":
                signals = s7_all_three(df, wt1, wt2, ema200, vol_ratio, div_series)
            elif strat_key == "s7_ema_div":
                signals = s7_ema_div(df, wt1, wt2, ema200, vol_ratio, div_series)
            elif strat_key == "s7_ema_vol":
                signals = s7_ema_vol(df, wt1, wt2, ema200, vol_ratio, div_series)
            elif strat_key == "s7_any2":
                signals = s7_any2(df, wt1, wt2, ema200, vol_ratio, div_series)

            # Test all SL/TP configs
            best_sltp = None
            best_pnl = -999
            sltp_results = {}

            for config in SLTP_CONFIGS:
                trades = run_backtest(df, signals, sl_pct=config["sl"], tp_pct=config["tp"])
                stats = calc_stats(trades)
                sltp_results[config["name"]] = stats
                if stats["total_pnl"] > best_pnl and stats["trades"] >= 5:
                    best_pnl = stats["total_pnl"]
                    best_sltp = config["name"]

            # Default SL2/TP3 for display
            default_stats = sltp_results.get("SL2/TP3", calc_stats([]))
            best_stats = sltp_results.get(best_sltp, default_stats) if best_sltp else default_stats

            tf_results[strat_name] = {
                "default_SL2_TP3": sltp_results.get("SL2/TP3", {}),
                "best_sltp": best_sltp,
                "best_stats": best_stats,
                "all_sltp": sltp_results,
            }

            marker = " 🏆" if best_pnl > 0 else ""
            print(f"  {strat_name:15s} | {default_stats['trades']:3d}t | WR {default_stats['win_rate']:5.1f}% | "
                  f"PnL {default_stats['total_pnl']:+7.2f}% | PF {default_stats['profit_factor']:.2f} | "
                  f"Best: {best_sltp} → {best_pnl:+.2f}%{marker}")

        symbol_results[tf] = tf_results

    # Save
    results[symbol] = {
        "tested_at": datetime.now().isoformat(),
        "days_back": DAYS_BACK,
        "results": symbol_results,
    }

    with open(RESULTS_FILE, "w") as f:
        json.dump(results, f, indent=2, default=str)

    print(f"\n💾 Сохранено в {RESULTS_FILE}")
    print(f"✅ {symbol} готов! ({len(results)} монет в файле)")

if __name__ == "__main__":
    main()

📜 Git History

c6f6bd5chore: initial commit — version control setup5 weeks ago
Show last diff
Loading...