← Back
"""
Signal History — stores all processed signals to JSON file.
Tracks: ticker, timestamp, price, WaveTrend, classification.
Outcome tracker checks price after 15m/1h/4h and calculates win rate.
"""

import json
import os
import logging
from datetime import datetime, timezone, timedelta

logger = logging.getLogger(__name__)

HISTORY_DIR = os.path.dirname(os.path.abspath(__file__))
HISTORY_FILE = os.path.join(HISTORY_DIR, "signal_history.json")

VANCOUVER_TZ = timezone(timedelta(hours=-7))

# Win/loss threshold
WIN_THRESHOLD_PCT = 1.0  # >1% in signal direction = win
LOSS_THRESHOLD_PCT = -1.0  # >1% against signal direction = loss


def now_van() -> datetime:
    return datetime.now(VANCOUVER_TZ)


def load_history() -> list[dict]:
    """Load signal history from file."""
    if not os.path.exists(HISTORY_FILE):
        return []
    try:
        with open(HISTORY_FILE, "r") as f:
            return json.load(f)
    except Exception as e:
        logger.error(f"Failed to load history: {e}")
        return []


def save_history(history: list[dict]):
    """Save signal history to file."""
    try:
        with open(HISTORY_FILE, "w") as f:
            json.dump(history, f, indent=2, ensure_ascii=False)
    except Exception as e:
        logger.error(f"Failed to save history: {e}")


def record_signal(signal: dict, analysis: dict, classification: dict) -> dict:
    """Record a signal to history."""
    now = now_van()

    wt_15m = analysis.get("wavetrend") or {}
    wt_1h = analysis.get("wavetrend_1h") or {}

    # Determine expected direction from WT signal
    wt_sig = wt_1h.get("signal", wt_15m.get("signal", "neutral"))
    if wt_sig in ("strong_buy", "buy", "weak_buy", "approaching_buy"):
        expected_direction = "long"
    elif wt_sig in ("strong_sell", "sell", "weak_sell", "approaching_sell"):
        expected_direction = "short"
    else:
        expected_direction = "neutral"

    entry = {
        "id": f"{signal['ticker']}_{now.strftime('%Y%m%d_%H%M%S')}",
        "timestamp": now.isoformat(),
        "timestamp_utc": datetime.now(timezone.utc).isoformat(),
        "ticker": signal["ticker"],
        "pair": analysis.get("symbol", signal.get("pair", "")),
        "market": signal.get("market", "unknown"),

        # Price at signal time
        "price_at_signal": analysis.get("price", 0),

        # Movement at signal time
        "move_15m": analysis.get("move_15m", 0),
        "move_1h": analysis.get("move_1h", 0),
        "trend_15m": analysis.get("trend_15m", "unknown"),
        "trend_1h": analysis.get("trend_1h", "unknown"),
        "volume_ratio": analysis.get("volume_ratio", 0),
        "volume_spike": analysis.get("volume_spike", False),

        # WaveTrend 15m
        "wt_15m_wt1": wt_15m.get("wt1", 0),
        "wt_15m_wt2": wt_15m.get("wt2", 0),
        "wt_15m_signal": wt_15m.get("signal", "n/a"),
        "wt_15m_zone": wt_15m.get("zone", "n/a"),
        "wt_15m_cross": wt_15m.get("cross"),

        # WaveTrend 1H
        "wt_1h_wt1": wt_1h.get("wt1", 0),
        "wt_1h_wt2": wt_1h.get("wt2", 0),
        "wt_1h_signal": wt_1h.get("signal", "n/a"),
        "wt_1h_zone": wt_1h.get("zone", "n/a"),
        "wt_1h_cross": wt_1h.get("cross"),

        # Classification
        "priority": classification.get("priority", "LOW"),
        "strategy": classification.get("strategy", "skip"),
        "expected_direction": expected_direction,

        # Outcome (filled by outcome tracker)
        "price_after_15m": None,
        "price_after_1h": None,
        "price_after_4h": None,
        "outcome_15m_pct": None,
        "outcome_1h_pct": None,
        "outcome_4h_pct": None,
        "result_15m": None,   # "win", "loss", "neutral"
        "result_1h": None,
        "result_4h": None,
        "checked_15m": False,
        "checked_1h": False,
        "checked_4h": False,
    }

    history = load_history()
    history.append(entry)
    save_history(history)

    logger.info(f"Signal recorded: {entry['id']} (direction: {expected_direction})")
    return entry


def update_outcome(entry: dict, timeframe: str, current_price: float) -> str | None:
    """
    Update outcome for a signal entry.
    Returns "win", "loss", or "neutral".
    """
    price_at = entry.get("price_at_signal", 0)
    if price_at <= 0:
        return None

    pct_change = ((current_price - price_at) / price_at) * 100
    direction = entry.get("expected_direction", "neutral")

    # Flip for shorts
    if direction == "short":
        pct_change = -pct_change

    # Determine result
    if pct_change >= WIN_THRESHOLD_PCT:
        result = "win"
    elif pct_change <= LOSS_THRESHOLD_PCT:
        result = "loss"
    else:
        result = "neutral"

    # Store actual pct change (unsigned for display)
    actual_pct = ((current_price - price_at) / price_at) * 100

    entry[f"price_after_{timeframe}"] = current_price
    entry[f"outcome_{timeframe}_pct"] = round(actual_pct, 2)
    entry[f"result_{timeframe}"] = result
    entry[f"checked_{timeframe}"] = True

    return result


def get_pending_checks() -> list[dict]:
    """Get signals that need outcome checking."""
    history = load_history()
    now = datetime.now(timezone.utc)
    pending = []

    for entry in history:
        ts_str = entry.get("timestamp_utc", entry.get("timestamp", ""))
        if not ts_str:
            continue

        try:
            ts = datetime.fromisoformat(ts_str)
            if ts.tzinfo is None:
                ts = ts.replace(tzinfo=VANCOUVER_TZ)
            age_minutes = (now - ts.astimezone(timezone.utc)).total_seconds() / 60
        except Exception:
            continue

        needs_check = False

        # Check 15m (after 15 min)
        if not entry.get("checked_15m") and age_minutes >= 15:
            needs_check = True

        # Check 1h (after 60 min)
        if not entry.get("checked_1h") and age_minutes >= 60:
            needs_check = True

        # Check 4h (after 240 min)
        if not entry.get("checked_4h") and age_minutes >= 240:
            needs_check = True

        if needs_check:
            pending.append({
                "entry": entry,
                "age_minutes": age_minutes,
            })

    return pending


def _filter_by_period(history: list[dict], period: str) -> list[dict]:
    """Filter history by period: 'today', 'week', 'month', 'all'."""
    if period == "all":
        return history

    now = now_van()

    if period == "today":
        start = now.replace(hour=0, minute=0, second=0, microsecond=0)
    elif period == "week":
        start = now - timedelta(days=7)
    elif period == "month":
        start = now - timedelta(days=30)
    else:
        return history

    filtered = []
    for h in history:
        try:
            ts = datetime.fromisoformat(h["timestamp"])
            if ts.tzinfo is None:
                ts = ts.replace(tzinfo=VANCOUVER_TZ)
            if ts >= start:
                filtered.append(h)
        except Exception:
            continue

    return filtered


def _calc_win_rate(entries: list[dict], timeframe: str = "1h") -> dict:
    """Calculate win rate for a specific timeframe."""
    key = f"result_{timeframe}"
    checked = [e for e in entries if e.get(f"checked_{timeframe}")]

    if not checked:
        return {"total": 0, "win": 0, "loss": 0, "neutral": 0, "rate": None}

    wins = sum(1 for e in checked if e[key] == "win")
    losses = sum(1 for e in checked if e[key] == "loss")
    neutrals = sum(1 for e in checked if e[key] == "neutral")

    total = wins + losses + neutrals
    rate = (wins / (wins + losses) * 100) if (wins + losses) > 0 else None

    return {"total": total, "win": wins, "loss": losses, "neutral": neutrals, "rate": rate}


def _best_worst_tickers(entries: list[dict]) -> tuple[str, str]:
    """Find best and worst performing tickers."""
    ticker_results = {}
    for e in entries:
        ticker = e["ticker"]
        pct = e.get("outcome_1h_pct")
        if pct is not None:
            if ticker not in ticker_results:
                ticker_results[ticker] = []
            ticker_results[ticker].append(pct)

    if not ticker_results:
        return ("—", "—")

    avg = {t: sum(v)/len(v) for t, v in ticker_results.items()}
    best = max(avg, key=avg.get)
    worst = min(avg, key=avg.get)

    return (f"{best} ({avg[best]:+.1f}%)", f"{worst} ({avg[worst]:+.1f}%)")


def format_stats_message(period: str = "all") -> str:
    """Format stats for Telegram."""
    history = load_history()

    if not history:
        return "📊 Нет записанных сигналов"

    entries = _filter_by_period(history, period)
    if not entries:
        return f"📊 Нет сигналов за период: {period}"

    total = len(entries)

    # Priority counts
    high = sum(1 for h in entries if h["priority"] == "HIGH")
    medium = sum(1 for h in entries if h["priority"] == "MEDIUM")
    low = sum(1 for h in entries if h["priority"] == "LOW")

    # WT crosses
    bull_cross = sum(1 for h in entries if h.get("wt_1h_cross") == "bullish_cross")
    bear_cross = sum(1 for h in entries if h.get("wt_1h_cross") == "bearish_cross")
    no_cross = sum(1 for h in entries if h.get("wt_1h_cross") is None)

    # Win rates by timeframe
    wr_15m = _calc_win_rate(entries, "15m")
    wr_1h = _calc_win_rate(entries, "1h")
    wr_4h = _calc_win_rate(entries, "4h")

    # Period label
    period_labels = {"all": "All time", "today": "Сегодня", "week": "За неделю", "month": "За месяц"}
    period_label = period_labels.get(period, period)

    # Win rate section
    wr_section = ""
    for label, wr in [("15m", wr_15m), ("1H", wr_1h), ("4H", wr_4h)]:
        if wr["total"] > 0:
            rate_str = f"{wr['rate']:.0f}%" if wr["rate"] is not None else "—"
            wr_section += f"  {label}: {rate_str} ({wr['win']}W/{wr['loss']}L/{wr['neutral']}N)\n"

    if not wr_section:
        wr_section = "  Ещё нет данных (ждём 15m+)\n"

    # Best/worst
    best, worst = _best_worst_tickers(entries)

    # Unique tickers
    unique = len(set(h["ticker"] for h in entries))

    # Direction stats
    longs = sum(1 for h in entries if h.get("expected_direction") == "long")
    shorts = sum(1 for h in entries if h.get("expected_direction") == "short")
    neutrals_dir = sum(1 for h in entries if h.get("expected_direction", "neutral") in ("neutral", None, ""))

    first_ts = entries[0].get("timestamp", "")[:16]
    last_ts = entries[-1].get("timestamp", "")[:16]

    msg = (
        f"📊 Stats: {period_label}\n"
        f"━━━━━━━━━━━━━━━━━━━━\n"
        f"📈 Всего сигналов: {total}\n"
        f"🎯 HIGH: {high} | MED: {medium} | LOW: {low}\n"
        f"📍 Long: {longs} | Short: {shorts} | Neutral: {neutrals_dir}\n"
        f"\n"
        f"🌊 WT Crosses (1H):\n"
        f"  ⬆️ Bullish: {bull_cross}\n"
        f"  ⬇️ Bearish: {bear_cross}\n"
        f"  ⚪ No cross: {no_cross}\n"
        f"\n"
        f"🏆 Win Rate:\n"
        f"{wr_section}"
        f"\n"
        f"🥇 Best: {best}\n"
        f"🥉 Worst: {worst}\n"
        f"\n"
        f"🪙 Тикеров: {unique}\n"
        f"📅 {first_ts[5:]} → {last_ts[5:]}\n"
        f"━━━━━━━━━━━━━━━━━━━━"
    )

    return msg

📜 Git History

c6f6bd5chore: initial commit — version control setup5 weeks ago
Show last diff
Loading...