← Назад
""" Gerchik Models — Pattern Recognition for 4 entry models. Model A: Bounce (отбой) — only with trend Model B: Simple false breakout — 1 candle pokes through, closes back Model C: Complex false breakout — 2-3 candles beyond level, then return Model D: Breakout — impulse break with volume All patterns detected on 15m candles against 1H levels. """ import logging from dataclasses import dataclass from typing import Optional from gerchik_levels import Level from gerchik_config import GERCHIK_LEVEL_TOLERANCE_PCT, GERCHIK_ALLOWED_MODELS logger = logging.getLogger(__name__) @dataclass class GerchikSignal: """A detected Gerchik entry signal.""" symbol: str model: str # "A", "B", "C", "D" side: str # "BUY" or "SELL" level: Level # The level we're trading from entry_price: float # Suggested entry (limit) sl_price: float # Stop loss sl_distance_pct: float # SL distance in % tp1_price: float # TP1 (3x SL) tp2_price: float # TP2 (4x SL) tp3_price: float # TP3 (5x SL) trend: str # "UP" or "DOWN" (from EMA50 1H) volume_ratio: float # Current volume vs avg candle_pattern: str # Description of what was detected def calc_ema(closes: list[float], period: int) -> float: """Calculate EMA of last `period` closes. Returns latest value.""" if len(closes) < period: return closes[-1] if closes else 0 multiplier = 2 / (period + 1) ema = sum(closes[:period]) / period for price in closes[period:]: ema = (price - ema) * multiplier + ema return ema def get_trend_1h(closes_1h: list[float], period: int = 50) -> str: """Determine trend from EMA50 on 1H.""" if len(closes_1h) < period: return "NEUTRAL" ema = calc_ema(closes_1h, period) current = closes_1h[-1] if current > ema * 1.002: # 0.2% buffer return "UP" elif current < ema * 0.998: return "DOWN" return "NEUTRAL" def get_volume_ratio(volumes: list[float], period: int = 20) -> float: """Current candle volume vs avg of last N.""" if len(volumes) < period + 1: return 1.0 avg = sum(volumes[-period - 1:-1]) / period return volumes[-1] / avg if avg > 0 else 1.0 def detect_model_a( level: Level, highs: list[float], lows: list[float], closes: list[float], opens: list[float], current_price: float, trend: str, ) -> Optional[dict]: """ Model A — Bounce (Отбой от уровня). Rules: - 3+ candles near the level (within tolerance) - None of them breaks through - 4th candle starts moving away from level - ONLY with trend (EMA50 direction) For support level (BUY): candles bounce off from above, trend=UP For resistance level (SELL): candles bounce off from below, trend=DOWN """ if len(closes) < 5: return None tol = level.price * GERCHIK_LEVEL_TOLERANCE_PCT / 100 # Check if this is a support bounce (BUY) if level.price < current_price and trend == "UP": # Last 3-5 candles should have lows near the level but not breaking it near_candles = 0 for i in range(-4, -1): if i + len(lows) < 0: continue low = lows[i] close = closes[i] if abs(low - level.price) <= tol * 2 and close > level.price: near_candles += 1 if near_candles >= 2: # Current candle should be moving up (close > open) if closes[-1] > opens[-1] and closes[-1] > level.price + tol: return { "side": "BUY", "pattern": f"{near_candles} свечей у поддержки, отбой вверх", } # Check if this is a resistance bounce (SELL) if level.price > current_price and trend == "DOWN": near_candles = 0 for i in range(-4, -1): if i + len(highs) < 0: continue high = highs[i] close = closes[i] if abs(high - level.price) <= tol * 2 and close < level.price: near_candles += 1 if near_candles >= 2: if closes[-1] < opens[-1] and closes[-1] < level.price - tol: return { "side": "SELL", "pattern": f"{near_candles} свечей у сопротивления, отбой вниз", } return None def detect_model_b( level: Level, highs: list[float], lows: list[float], closes: list[float], opens: list[float], current_price: float, ) -> Optional[dict]: """ Model B — Simple False Breakout (Простой ложный пробой). Rules: - Previous candle broke through the level with its wick - But closed back on the correct side - Current candle confirms by moving away """ if len(closes) < 3: return None tol = level.price * GERCHIK_LEVEL_TOLERANCE_PCT / 100 # False breakout of support (wick went below, closed above) → BUY if level.price < current_price: prev_low = lows[-2] prev_close = closes[-2] # Wick below level, close above if prev_low < level.price - tol and prev_close > level.price: # Current candle is bullish and above level if closes[-1] > opens[-1] and closes[-1] > level.price: return { "side": "BUY", "pattern": f"ложный пробой поддержки (хвост ${prev_low:.4f}), закрытие выше", } # False breakout of resistance (wick went above, closed below) → SELL if level.price > current_price: prev_high = highs[-2] prev_close = closes[-2] if prev_high > level.price + tol and prev_close < level.price: if closes[-1] < opens[-1] and closes[-1] < level.price: return { "side": "SELL", "pattern": f"ложный пробой сопротивления (хвост ${prev_high:.4f}), закрытие ниже", } return None def detect_model_c( level: Level, highs: list[float], lows: list[float], closes: list[float], opens: list[float], current_price: float, ) -> Optional[dict]: """ Model C — Complex False Breakout (Сложный ложный пробой). Rules: - 2-3 candles closed BEYOND the level - Then price returned back through the level - Stronger signal than Model B (trap confirmed) """ if len(closes) < 5: return None tol = level.price * GERCHIK_LEVEL_TOLERANCE_PCT / 100 # Complex false breakout of support → BUY if level.price < current_price: # Count candles that closed below level in recent history candles_below = 0 for i in range(-5, -1): if i + len(closes) < 0: continue if closes[i] < level.price - tol: candles_below += 1 # 2-3 candles were below, now we're back above if 2 <= candles_below <= 3 and closes[-1] > level.price and closes[-1] > opens[-1]: return { "side": "BUY", "pattern": f"сложный ЛП: {candles_below} свечи ниже уровня, возврат", } # Complex false breakout of resistance → SELL if level.price > current_price: candles_above = 0 for i in range(-5, -1): if i + len(closes) < 0: continue if closes[i] > level.price + tol: candles_above += 1 if 2 <= candles_above <= 3 and closes[-1] < level.price and closes[-1] < opens[-1]: return { "side": "SELL", "pattern": f"сложный ЛП: {candles_above} свечи выше уровня, возврат", } return None def detect_model_d( level: Level, highs: list[float], lows: list[float], closes: list[float], opens: list[float], volumes: list[float], current_price: float, trend: str, ) -> Optional[dict]: """ Model D — Breakout (Пробой уровня). Rules: - Candle closes BEYOND the level (not just wick) - Volume > 2x average (strong impulse) - Only with trend - Potential: 1-3 ATR movement """ if len(closes) < 3: return None vol_ratio = get_volume_ratio(volumes) # Need strong volume if vol_ratio < 2.0: return None tol = level.price * GERCHIK_LEVEL_TOLERANCE_PCT / 100 # Breakout of resistance (close above) → BUY, trend must be UP if level.price < current_price and trend == "UP": # Previous candles were below the level was_below = any(closes[i] < level.price for i in range(-4, -1) if i + len(closes) >= 0) # Current candle closed above with body body_above = closes[-1] > level.price + tol and opens[-1] < closes[-1] if was_below and body_above: return { "side": "BUY", "pattern": f"пробой сопротивления, vol={vol_ratio:.1f}x", "vol_ratio": vol_ratio, } # Breakout of support (close below) → SELL, trend must be DOWN if level.price > current_price and trend == "DOWN": was_above = any(closes[i] > level.price for i in range(-4, -1) if i + len(closes) >= 0) body_below = closes[-1] < level.price - tol and opens[-1] > closes[-1] if was_above and body_below: return { "side": "SELL", "pattern": f"пробой поддержки, vol={vol_ratio:.1f}x", "vol_ratio": vol_ratio, } return None def detect_all_models( symbol: str, level: Level, opens_15m: list[float], highs_15m: list[float], lows_15m: list[float], closes_15m: list[float], volumes_15m: list[float], closes_1h: list[float], current_price: float, sl_buffer_pct: float, tp1_rr: float, tp2_rr: float, tp3_rr: float, max_sl_pct: float, ) -> Optional[GerchikSignal]: """ Try all 4 models against a single level. Returns GerchikSignal if any model triggers, None otherwise. Priority: C > B > A > D (false breakouts are stronger signals) """ trend = get_trend_1h(closes_1h) vol_ratio = get_volume_ratio(volumes_15m) # Try models in priority order (filtered by GERCHIK_ALLOWED_MODELS) result = None model = None # Model C — Complex false breakout (strongest) if "C" in GERCHIK_ALLOWED_MODELS and not result: r = detect_model_c(level, highs_15m, lows_15m, closes_15m, opens_15m, current_price) if r: result = r model = "C" # Model B — Simple false breakout if "B" in GERCHIK_ALLOWED_MODELS and not result: r = detect_model_b(level, highs_15m, lows_15m, closes_15m, opens_15m, current_price) if r: result = r model = "B" # Model A — Bounce (only with trend) if "A" in GERCHIK_ALLOWED_MODELS and not result: r = detect_model_a(level, highs_15m, lows_15m, closes_15m, opens_15m, current_price, trend) if r: result = r model = "A" # Model D — Breakout (only with trend + volume) if "D" in GERCHIK_ALLOWED_MODELS and not result: r = detect_model_d(level, highs_15m, lows_15m, closes_15m, opens_15m, volumes_15m, current_price, trend) if r: result = r model = "D" if not result: return None side = result["side"] # Calculate SL: just beyond the level + buffer if side == "BUY": # SL below support level sl_price = level.price * (1 - sl_buffer_pct / 100) sl_distance_pct = abs(current_price - sl_price) / current_price * 100 # Check if SL is too far if sl_distance_pct > max_sl_pct: logger.debug(f"{symbol}: SL too far ({sl_distance_pct:.2f}% > {max_sl_pct}%), skip") return None # TPs based on SL distance from level sl_distance_abs = current_price - sl_price tp1_price = current_price + sl_distance_abs * tp1_rr tp2_price = current_price + sl_distance_abs * tp2_rr tp3_price = current_price + sl_distance_abs * tp3_rr entry_price = current_price # Will be refined to limit near level else: # SELL # SL above resistance level sl_price = level.price * (1 + sl_buffer_pct / 100) sl_distance_pct = abs(sl_price - current_price) / current_price * 100 if sl_distance_pct > max_sl_pct: logger.debug(f"{symbol}: SL too far ({sl_distance_pct:.2f}% > {max_sl_pct}%), skip") return None sl_distance_abs = sl_price - current_price tp1_price = current_price - sl_distance_abs * tp1_rr tp2_price = current_price - sl_distance_abs * tp2_rr tp3_price = current_price - sl_distance_abs * tp3_rr entry_price = current_price return GerchikSignal( symbol=symbol, model=model, side=side, level=level, entry_price=entry_price, sl_price=sl_price, sl_distance_pct=round(sl_distance_pct, 3), tp1_price=tp1_price, tp2_price=tp2_price, tp3_price=tp3_price, trend=trend, volume_ratio=round(vol_ratio, 2), candle_pattern=result["pattern"], )