← Назад
""" Signal History — stores all processed signals to JSON file. Tracks: ticker, timestamp, price, WaveTrend, classification. Outcome tracker checks price after 15m/1h/4h and calculates win rate. """ import json import os import logging from datetime import datetime, timezone, timedelta logger = logging.getLogger(__name__) HISTORY_DIR = os.path.dirname(os.path.abspath(__file__)) HISTORY_FILE = os.path.join(HISTORY_DIR, "signal_history.json") VANCOUVER_TZ = timezone(timedelta(hours=-7)) # Win/loss threshold WIN_THRESHOLD_PCT = 1.0 # >1% in signal direction = win LOSS_THRESHOLD_PCT = -1.0 # >1% against signal direction = loss def now_van() -> datetime: return datetime.now(VANCOUVER_TZ) def load_history() -> list[dict]: """Load signal history from file.""" if not os.path.exists(HISTORY_FILE): return [] try: with open(HISTORY_FILE, "r") as f: return json.load(f) except Exception as e: logger.error(f"Failed to load history: {e}") return [] def save_history(history: list[dict]): """Save signal history to file.""" try: with open(HISTORY_FILE, "w") as f: json.dump(history, f, indent=2, ensure_ascii=False) except Exception as e: logger.error(f"Failed to save history: {e}") def record_signal(signal: dict, analysis: dict, classification: dict) -> dict: """Record a signal to history.""" now = now_van() wt_15m = analysis.get("wavetrend") or {} wt_1h = analysis.get("wavetrend_1h") or {} # Determine expected direction from WT signal wt_sig = wt_1h.get("signal", wt_15m.get("signal", "neutral")) if wt_sig in ("strong_buy", "buy", "weak_buy", "approaching_buy"): expected_direction = "long" elif wt_sig in ("strong_sell", "sell", "weak_sell", "approaching_sell"): expected_direction = "short" else: expected_direction = "neutral" entry = { "id": f"{signal['ticker']}_{now.strftime('%Y%m%d_%H%M%S')}", "timestamp": now.isoformat(), "timestamp_utc": datetime.now(timezone.utc).isoformat(), "ticker": signal["ticker"], "pair": analysis.get("symbol", signal.get("pair", "")), "market": signal.get("market", "unknown"), # Price at signal time "price_at_signal": analysis.get("price", 0), # Movement at signal time "move_15m": analysis.get("move_15m", 0), "move_1h": analysis.get("move_1h", 0), "trend_15m": analysis.get("trend_15m", "unknown"), "trend_1h": analysis.get("trend_1h", "unknown"), "volume_ratio": analysis.get("volume_ratio", 0), "volume_spike": analysis.get("volume_spike", False), # WaveTrend 15m "wt_15m_wt1": wt_15m.get("wt1", 0), "wt_15m_wt2": wt_15m.get("wt2", 0), "wt_15m_signal": wt_15m.get("signal", "n/a"), "wt_15m_zone": wt_15m.get("zone", "n/a"), "wt_15m_cross": wt_15m.get("cross"), # WaveTrend 1H "wt_1h_wt1": wt_1h.get("wt1", 0), "wt_1h_wt2": wt_1h.get("wt2", 0), "wt_1h_signal": wt_1h.get("signal", "n/a"), "wt_1h_zone": wt_1h.get("zone", "n/a"), "wt_1h_cross": wt_1h.get("cross"), # Classification "priority": classification.get("priority", "LOW"), "strategy": classification.get("strategy", "skip"), "expected_direction": expected_direction, # Outcome (filled by outcome tracker) "price_after_15m": None, "price_after_1h": None, "price_after_4h": None, "outcome_15m_pct": None, "outcome_1h_pct": None, "outcome_4h_pct": None, "result_15m": None, # "win", "loss", "neutral" "result_1h": None, "result_4h": None, "checked_15m": False, "checked_1h": False, "checked_4h": False, } history = load_history() history.append(entry) save_history(history) logger.info(f"Signal recorded: {entry['id']} (direction: {expected_direction})") return entry def update_outcome(entry: dict, timeframe: str, current_price: float) -> str | None: """ Update outcome for a signal entry. Returns "win", "loss", or "neutral". """ price_at = entry.get("price_at_signal", 0) if price_at <= 0: return None pct_change = ((current_price - price_at) / price_at) * 100 direction = entry.get("expected_direction", "neutral") # Flip for shorts if direction == "short": pct_change = -pct_change # Determine result if pct_change >= WIN_THRESHOLD_PCT: result = "win" elif pct_change <= LOSS_THRESHOLD_PCT: result = "loss" else: result = "neutral" # Store actual pct change (unsigned for display) actual_pct = ((current_price - price_at) / price_at) * 100 entry[f"price_after_{timeframe}"] = current_price entry[f"outcome_{timeframe}_pct"] = round(actual_pct, 2) entry[f"result_{timeframe}"] = result entry[f"checked_{timeframe}"] = True return result def get_pending_checks() -> list[dict]: """Get signals that need outcome checking.""" history = load_history() now = datetime.now(timezone.utc) pending = [] for entry in history: ts_str = entry.get("timestamp_utc", entry.get("timestamp", "")) if not ts_str: continue try: ts = datetime.fromisoformat(ts_str) if ts.tzinfo is None: ts = ts.replace(tzinfo=VANCOUVER_TZ) age_minutes = (now - ts.astimezone(timezone.utc)).total_seconds() / 60 except Exception: continue needs_check = False # Check 15m (after 15 min) if not entry.get("checked_15m") and age_minutes >= 15: needs_check = True # Check 1h (after 60 min) if not entry.get("checked_1h") and age_minutes >= 60: needs_check = True # Check 4h (after 240 min) if not entry.get("checked_4h") and age_minutes >= 240: needs_check = True if needs_check: pending.append({ "entry": entry, "age_minutes": age_minutes, }) return pending def _filter_by_period(history: list[dict], period: str) -> list[dict]: """Filter history by period: 'today', 'week', 'month', 'all'.""" if period == "all": return history now = now_van() if period == "today": start = now.replace(hour=0, minute=0, second=0, microsecond=0) elif period == "week": start = now - timedelta(days=7) elif period == "month": start = now - timedelta(days=30) else: return history filtered = [] for h in history: try: ts = datetime.fromisoformat(h["timestamp"]) if ts.tzinfo is None: ts = ts.replace(tzinfo=VANCOUVER_TZ) if ts >= start: filtered.append(h) except Exception: continue return filtered def _calc_win_rate(entries: list[dict], timeframe: str = "1h") -> dict: """Calculate win rate for a specific timeframe.""" key = f"result_{timeframe}" checked = [e for e in entries if e.get(f"checked_{timeframe}")] if not checked: return {"total": 0, "win": 0, "loss": 0, "neutral": 0, "rate": None} wins = sum(1 for e in checked if e[key] == "win") losses = sum(1 for e in checked if e[key] == "loss") neutrals = sum(1 for e in checked if e[key] == "neutral") total = wins + losses + neutrals rate = (wins / (wins + losses) * 100) if (wins + losses) > 0 else None return {"total": total, "win": wins, "loss": losses, "neutral": neutrals, "rate": rate} def _best_worst_tickers(entries: list[dict]) -> tuple[str, str]: """Find best and worst performing tickers.""" ticker_results = {} for e in entries: ticker = e["ticker"] pct = e.get("outcome_1h_pct") if pct is not None: if ticker not in ticker_results: ticker_results[ticker] = [] ticker_results[ticker].append(pct) if not ticker_results: return ("—", "—") avg = {t: sum(v)/len(v) for t, v in ticker_results.items()} best = max(avg, key=avg.get) worst = min(avg, key=avg.get) return (f"{best} ({avg[best]:+.1f}%)", f"{worst} ({avg[worst]:+.1f}%)") def format_stats_message(period: str = "all") -> str: """Format stats for Telegram.""" history = load_history() if not history: return "📊 Нет записанных сигналов" entries = _filter_by_period(history, period) if not entries: return f"📊 Нет сигналов за период: {period}" total = len(entries) # Priority counts high = sum(1 for h in entries if h["priority"] == "HIGH") medium = sum(1 for h in entries if h["priority"] == "MEDIUM") low = sum(1 for h in entries if h["priority"] == "LOW") # WT crosses bull_cross = sum(1 for h in entries if h.get("wt_1h_cross") == "bullish_cross") bear_cross = sum(1 for h in entries if h.get("wt_1h_cross") == "bearish_cross") no_cross = sum(1 for h in entries if h.get("wt_1h_cross") is None) # Win rates by timeframe wr_15m = _calc_win_rate(entries, "15m") wr_1h = _calc_win_rate(entries, "1h") wr_4h = _calc_win_rate(entries, "4h") # Period label period_labels = {"all": "All time", "today": "Сегодня", "week": "За неделю", "month": "За месяц"} period_label = period_labels.get(period, period) # Win rate section wr_section = "" for label, wr in [("15m", wr_15m), ("1H", wr_1h), ("4H", wr_4h)]: if wr["total"] > 0: rate_str = f"{wr['rate']:.0f}%" if wr["rate"] is not None else "—" wr_section += f" {label}: {rate_str} ({wr['win']}W/{wr['loss']}L/{wr['neutral']}N)\n" if not wr_section: wr_section = " Ещё нет данных (ждём 15m+)\n" # Best/worst best, worst = _best_worst_tickers(entries) # Unique tickers unique = len(set(h["ticker"] for h in entries)) # Direction stats longs = sum(1 for h in entries if h.get("expected_direction") == "long") shorts = sum(1 for h in entries if h.get("expected_direction") == "short") neutrals_dir = sum(1 for h in entries if h.get("expected_direction", "neutral") in ("neutral", None, "")) first_ts = entries[0].get("timestamp", "")[:16] last_ts = entries[-1].get("timestamp", "")[:16] msg = ( f"📊 Stats: {period_label}\n" f"━━━━━━━━━━━━━━━━━━━━\n" f"📈 Всего сигналов: {total}\n" f"🎯 HIGH: {high} | MED: {medium} | LOW: {low}\n" f"📍 Long: {longs} | Short: {shorts} | Neutral: {neutrals_dir}\n" f"\n" f"🌊 WT Crosses (1H):\n" f" ⬆️ Bullish: {bull_cross}\n" f" ⬇️ Bearish: {bear_cross}\n" f" ⚪ No cross: {no_cross}\n" f"\n" f"🏆 Win Rate:\n" f"{wr_section}" f"\n" f"🥇 Best: {best}\n" f"🥉 Worst: {worst}\n" f"\n" f"🪙 Тикеров: {unique}\n" f"📅 {first_ts[5:]} → {last_ts[5:]}\n" f"━━━━━━━━━━━━━━━━━━━━" ) return msg