← Назад"""
Signal History — stores all processed signals to JSON file.
Tracks: ticker, timestamp, price, WaveTrend, classification.
Outcome tracker checks price after 15m/1h/4h and calculates win rate.
"""
import json
import os
import logging
from datetime import datetime, timezone, timedelta
logger = logging.getLogger(__name__)
HISTORY_DIR = os.path.dirname(os.path.abspath(__file__))
HISTORY_FILE = os.path.join(HISTORY_DIR, "signal_history.json")
VANCOUVER_TZ = timezone(timedelta(hours=-7))
# Win/loss threshold
WIN_THRESHOLD_PCT = 1.0 # >1% in signal direction = win
LOSS_THRESHOLD_PCT = -1.0 # >1% against signal direction = loss
def now_van() -> datetime:
return datetime.now(VANCOUVER_TZ)
def load_history() -> list[dict]:
"""Load signal history from file."""
if not os.path.exists(HISTORY_FILE):
return []
try:
with open(HISTORY_FILE, "r") as f:
return json.load(f)
except Exception as e:
logger.error(f"Failed to load history: {e}")
return []
def save_history(history: list[dict]):
"""Save signal history to file."""
try:
with open(HISTORY_FILE, "w") as f:
json.dump(history, f, indent=2, ensure_ascii=False)
except Exception as e:
logger.error(f"Failed to save history: {e}")
def record_signal(signal: dict, analysis: dict, classification: dict) -> dict:
"""Record a signal to history."""
now = now_van()
wt_15m = analysis.get("wavetrend") or {}
wt_1h = analysis.get("wavetrend_1h") or {}
# Determine expected direction from WT signal
wt_sig = wt_1h.get("signal", wt_15m.get("signal", "neutral"))
if wt_sig in ("strong_buy", "buy", "weak_buy", "approaching_buy"):
expected_direction = "long"
elif wt_sig in ("strong_sell", "sell", "weak_sell", "approaching_sell"):
expected_direction = "short"
else:
expected_direction = "neutral"
entry = {
"id": f"{signal['ticker']}_{now.strftime('%Y%m%d_%H%M%S')}",
"timestamp": now.isoformat(),
"timestamp_utc": datetime.now(timezone.utc).isoformat(),
"ticker": signal["ticker"],
"pair": analysis.get("symbol", signal.get("pair", "")),
"market": signal.get("market", "unknown"),
# Price at signal time
"price_at_signal": analysis.get("price", 0),
# Movement at signal time
"move_15m": analysis.get("move_15m", 0),
"move_1h": analysis.get("move_1h", 0),
"trend_15m": analysis.get("trend_15m", "unknown"),
"trend_1h": analysis.get("trend_1h", "unknown"),
"volume_ratio": analysis.get("volume_ratio", 0),
"volume_spike": analysis.get("volume_spike", False),
# WaveTrend 15m
"wt_15m_wt1": wt_15m.get("wt1", 0),
"wt_15m_wt2": wt_15m.get("wt2", 0),
"wt_15m_signal": wt_15m.get("signal", "n/a"),
"wt_15m_zone": wt_15m.get("zone", "n/a"),
"wt_15m_cross": wt_15m.get("cross"),
# WaveTrend 1H
"wt_1h_wt1": wt_1h.get("wt1", 0),
"wt_1h_wt2": wt_1h.get("wt2", 0),
"wt_1h_signal": wt_1h.get("signal", "n/a"),
"wt_1h_zone": wt_1h.get("zone", "n/a"),
"wt_1h_cross": wt_1h.get("cross"),
# Classification
"priority": classification.get("priority", "LOW"),
"strategy": classification.get("strategy", "skip"),
"expected_direction": expected_direction,
# Outcome (filled by outcome tracker)
"price_after_15m": None,
"price_after_1h": None,
"price_after_4h": None,
"outcome_15m_pct": None,
"outcome_1h_pct": None,
"outcome_4h_pct": None,
"result_15m": None, # "win", "loss", "neutral"
"result_1h": None,
"result_4h": None,
"checked_15m": False,
"checked_1h": False,
"checked_4h": False,
}
history = load_history()
history.append(entry)
save_history(history)
logger.info(f"Signal recorded: {entry['id']} (direction: {expected_direction})")
return entry
def update_outcome(entry: dict, timeframe: str, current_price: float) -> str | None:
"""
Update outcome for a signal entry.
Returns "win", "loss", or "neutral".
"""
price_at = entry.get("price_at_signal", 0)
if price_at <= 0:
return None
pct_change = ((current_price - price_at) / price_at) * 100
direction = entry.get("expected_direction", "neutral")
# Flip for shorts
if direction == "short":
pct_change = -pct_change
# Determine result
if pct_change >= WIN_THRESHOLD_PCT:
result = "win"
elif pct_change <= LOSS_THRESHOLD_PCT:
result = "loss"
else:
result = "neutral"
# Store actual pct change (unsigned for display)
actual_pct = ((current_price - price_at) / price_at) * 100
entry[f"price_after_{timeframe}"] = current_price
entry[f"outcome_{timeframe}_pct"] = round(actual_pct, 2)
entry[f"result_{timeframe}"] = result
entry[f"checked_{timeframe}"] = True
return result
def get_pending_checks() -> list[dict]:
"""Get signals that need outcome checking."""
history = load_history()
now = datetime.now(timezone.utc)
pending = []
for entry in history:
ts_str = entry.get("timestamp_utc", entry.get("timestamp", ""))
if not ts_str:
continue
try:
ts = datetime.fromisoformat(ts_str)
if ts.tzinfo is None:
ts = ts.replace(tzinfo=VANCOUVER_TZ)
age_minutes = (now - ts.astimezone(timezone.utc)).total_seconds() / 60
except Exception:
continue
needs_check = False
# Check 15m (after 15 min)
if not entry.get("checked_15m") and age_minutes >= 15:
needs_check = True
# Check 1h (after 60 min)
if not entry.get("checked_1h") and age_minutes >= 60:
needs_check = True
# Check 4h (after 240 min)
if not entry.get("checked_4h") and age_minutes >= 240:
needs_check = True
if needs_check:
pending.append({
"entry": entry,
"age_minutes": age_minutes,
})
return pending
def _filter_by_period(history: list[dict], period: str) -> list[dict]:
"""Filter history by period: 'today', 'week', 'month', 'all'."""
if period == "all":
return history
now = now_van()
if period == "today":
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
elif period == "week":
start = now - timedelta(days=7)
elif period == "month":
start = now - timedelta(days=30)
else:
return history
filtered = []
for h in history:
try:
ts = datetime.fromisoformat(h["timestamp"])
if ts.tzinfo is None:
ts = ts.replace(tzinfo=VANCOUVER_TZ)
if ts >= start:
filtered.append(h)
except Exception:
continue
return filtered
def _calc_win_rate(entries: list[dict], timeframe: str = "1h") -> dict:
"""Calculate win rate for a specific timeframe."""
key = f"result_{timeframe}"
checked = [e for e in entries if e.get(f"checked_{timeframe}")]
if not checked:
return {"total": 0, "win": 0, "loss": 0, "neutral": 0, "rate": None}
wins = sum(1 for e in checked if e[key] == "win")
losses = sum(1 for e in checked if e[key] == "loss")
neutrals = sum(1 for e in checked if e[key] == "neutral")
total = wins + losses + neutrals
rate = (wins / (wins + losses) * 100) if (wins + losses) > 0 else None
return {"total": total, "win": wins, "loss": losses, "neutral": neutrals, "rate": rate}
def _best_worst_tickers(entries: list[dict]) -> tuple[str, str]:
"""Find best and worst performing tickers."""
ticker_results = {}
for e in entries:
ticker = e["ticker"]
pct = e.get("outcome_1h_pct")
if pct is not None:
if ticker not in ticker_results:
ticker_results[ticker] = []
ticker_results[ticker].append(pct)
if not ticker_results:
return ("—", "—")
avg = {t: sum(v)/len(v) for t, v in ticker_results.items()}
best = max(avg, key=avg.get)
worst = min(avg, key=avg.get)
return (f"{best} ({avg[best]:+.1f}%)", f"{worst} ({avg[worst]:+.1f}%)")
def format_stats_message(period: str = "all") -> str:
"""Format stats for Telegram."""
history = load_history()
if not history:
return "📊 Нет записанных сигналов"
entries = _filter_by_period(history, period)
if not entries:
return f"📊 Нет сигналов за период: {period}"
total = len(entries)
# Priority counts
high = sum(1 for h in entries if h["priority"] == "HIGH")
medium = sum(1 for h in entries if h["priority"] == "MEDIUM")
low = sum(1 for h in entries if h["priority"] == "LOW")
# WT crosses
bull_cross = sum(1 for h in entries if h.get("wt_1h_cross") == "bullish_cross")
bear_cross = sum(1 for h in entries if h.get("wt_1h_cross") == "bearish_cross")
no_cross = sum(1 for h in entries if h.get("wt_1h_cross") is None)
# Win rates by timeframe
wr_15m = _calc_win_rate(entries, "15m")
wr_1h = _calc_win_rate(entries, "1h")
wr_4h = _calc_win_rate(entries, "4h")
# Period label
period_labels = {"all": "All time", "today": "Сегодня", "week": "За неделю", "month": "За месяц"}
period_label = period_labels.get(period, period)
# Win rate section
wr_section = ""
for label, wr in [("15m", wr_15m), ("1H", wr_1h), ("4H", wr_4h)]:
if wr["total"] > 0:
rate_str = f"{wr['rate']:.0f}%" if wr["rate"] is not None else "—"
wr_section += f" {label}: {rate_str} ({wr['win']}W/{wr['loss']}L/{wr['neutral']}N)\n"
if not wr_section:
wr_section = " Ещё нет данных (ждём 15m+)\n"
# Best/worst
best, worst = _best_worst_tickers(entries)
# Unique tickers
unique = len(set(h["ticker"] for h in entries))
# Direction stats
longs = sum(1 for h in entries if h.get("expected_direction") == "long")
shorts = sum(1 for h in entries if h.get("expected_direction") == "short")
neutrals_dir = sum(1 for h in entries if h.get("expected_direction", "neutral") in ("neutral", None, ""))
first_ts = entries[0].get("timestamp", "")[:16]
last_ts = entries[-1].get("timestamp", "")[:16]
msg = (
f"📊 Stats: {period_label}\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"📈 Всего сигналов: {total}\n"
f"🎯 HIGH: {high} | MED: {medium} | LOW: {low}\n"
f"📍 Long: {longs} | Short: {shorts} | Neutral: {neutrals_dir}\n"
f"\n"
f"🌊 WT Crosses (1H):\n"
f" ⬆️ Bullish: {bull_cross}\n"
f" ⬇️ Bearish: {bear_cross}\n"
f" ⚪ No cross: {no_cross}\n"
f"\n"
f"🏆 Win Rate:\n"
f"{wr_section}"
f"\n"
f"🥇 Best: {best}\n"
f"🥉 Worst: {worst}\n"
f"\n"
f"🪙 Тикеров: {unique}\n"
f"📅 {first_ts[5:]} → {last_ts[5:]}\n"
f"━━━━━━━━━━━━━━━━━━━━"
)
return msg