← Back
"""
Gerchik Models — Pattern Recognition for 4 entry models.

Model A: Bounce (отбой) — only with trend
Model B: Simple false breakout — 1 candle pokes through, closes back
Model C: Complex false breakout — 2-3 candles beyond level, then return
Model D: Breakout — impulse break with volume

All patterns detected on 15m candles against 1H levels.
"""

import logging
from dataclasses import dataclass
from typing import Optional

from gerchik_levels import Level
from gerchik_config import GERCHIK_LEVEL_TOLERANCE_PCT, GERCHIK_ALLOWED_MODELS

logger = logging.getLogger(__name__)


@dataclass
class GerchikSignal:
    """A detected Gerchik entry signal."""
    symbol: str
    model: str              # "A", "B", "C", "D"
    side: str               # "BUY" or "SELL"
    level: Level            # The level we're trading from
    entry_price: float      # Suggested entry (limit)
    sl_price: float         # Stop loss
    sl_distance_pct: float  # SL distance in %
    tp1_price: float        # TP1 (3x SL)
    tp2_price: float        # TP2 (4x SL)
    tp3_price: float        # TP3 (5x SL)
    trend: str              # "UP" or "DOWN" (from EMA50 1H)
    volume_ratio: float     # Current volume vs avg
    candle_pattern: str     # Description of what was detected


def calc_ema(closes: list[float], period: int) -> float:
    """Calculate EMA of last `period` closes. Returns latest value."""
    if len(closes) < period:
        return closes[-1] if closes else 0

    multiplier = 2 / (period + 1)
    ema = sum(closes[:period]) / period

    for price in closes[period:]:
        ema = (price - ema) * multiplier + ema

    return ema


def get_trend_1h(closes_1h: list[float], period: int = 50) -> str:
    """Determine trend from EMA50 on 1H."""
    if len(closes_1h) < period:
        return "NEUTRAL"

    ema = calc_ema(closes_1h, period)
    current = closes_1h[-1]

    if current > ema * 1.002:  # 0.2% buffer
        return "UP"
    elif current < ema * 0.998:
        return "DOWN"
    return "NEUTRAL"


def get_volume_ratio(volumes: list[float], period: int = 20) -> float:
    """Current candle volume vs avg of last N."""
    if len(volumes) < period + 1:
        return 1.0
    avg = sum(volumes[-period - 1:-1]) / period
    return volumes[-1] / avg if avg > 0 else 1.0


def detect_model_a(
    level: Level,
    highs: list[float],
    lows: list[float],
    closes: list[float],
    opens: list[float],
    current_price: float,
    trend: str,
) -> Optional[dict]:
    """
    Model A — Bounce (Отбой от уровня).

    Rules:
    - 3+ candles near the level (within tolerance)
    - None of them breaks through
    - 4th candle starts moving away from level
    - ONLY with trend (EMA50 direction)

    For support level (BUY): candles bounce off from above, trend=UP
    For resistance level (SELL): candles bounce off from below, trend=DOWN
    """
    if len(closes) < 5:
        return None

    tol = level.price * GERCHIK_LEVEL_TOLERANCE_PCT / 100

    # Check if this is a support bounce (BUY)
    if level.price < current_price and trend == "UP":
        # Last 3-5 candles should have lows near the level but not breaking it
        near_candles = 0
        for i in range(-4, -1):
            if i + len(lows) < 0:
                continue
            low = lows[i]
            close = closes[i]
            if abs(low - level.price) <= tol * 2 and close > level.price:
                near_candles += 1

        if near_candles >= 2:
            # Current candle should be moving up (close > open)
            if closes[-1] > opens[-1] and closes[-1] > level.price + tol:
                return {
                    "side": "BUY",
                    "pattern": f"{near_candles} свечей у поддержки, отбой вверх",
                }

    # Check if this is a resistance bounce (SELL)
    if level.price > current_price and trend == "DOWN":
        near_candles = 0
        for i in range(-4, -1):
            if i + len(highs) < 0:
                continue
            high = highs[i]
            close = closes[i]
            if abs(high - level.price) <= tol * 2 and close < level.price:
                near_candles += 1

        if near_candles >= 2:
            if closes[-1] < opens[-1] and closes[-1] < level.price - tol:
                return {
                    "side": "SELL",
                    "pattern": f"{near_candles} свечей у сопротивления, отбой вниз",
                }

    return None


def detect_model_b(
    level: Level,
    highs: list[float],
    lows: list[float],
    closes: list[float],
    opens: list[float],
    current_price: float,
) -> Optional[dict]:
    """
    Model B — Simple False Breakout (Простой ложный пробой).

    Rules:
    - Previous candle broke through the level with its wick
    - But closed back on the correct side
    - Current candle confirms by moving away
    """
    if len(closes) < 3:
        return None

    tol = level.price * GERCHIK_LEVEL_TOLERANCE_PCT / 100

    # False breakout of support (wick went below, closed above) → BUY
    if level.price < current_price:
        prev_low = lows[-2]
        prev_close = closes[-2]
        # Wick below level, close above
        if prev_low < level.price - tol and prev_close > level.price:
            # Current candle is bullish and above level
            if closes[-1] > opens[-1] and closes[-1] > level.price:
                return {
                    "side": "BUY",
                    "pattern": f"ложный пробой поддержки (хвост ${prev_low:.4f}), закрытие выше",
                }

    # False breakout of resistance (wick went above, closed below) → SELL
    if level.price > current_price:
        prev_high = highs[-2]
        prev_close = closes[-2]
        if prev_high > level.price + tol and prev_close < level.price:
            if closes[-1] < opens[-1] and closes[-1] < level.price:
                return {
                    "side": "SELL",
                    "pattern": f"ложный пробой сопротивления (хвост ${prev_high:.4f}), закрытие ниже",
                }

    return None


def detect_model_c(
    level: Level,
    highs: list[float],
    lows: list[float],
    closes: list[float],
    opens: list[float],
    current_price: float,
) -> Optional[dict]:
    """
    Model C — Complex False Breakout (Сложный ложный пробой).

    Rules:
    - 2-3 candles closed BEYOND the level
    - Then price returned back through the level
    - Stronger signal than Model B (trap confirmed)
    """
    if len(closes) < 5:
        return None

    tol = level.price * GERCHIK_LEVEL_TOLERANCE_PCT / 100

    # Complex false breakout of support → BUY
    if level.price < current_price:
        # Count candles that closed below level in recent history
        candles_below = 0
        for i in range(-5, -1):
            if i + len(closes) < 0:
                continue
            if closes[i] < level.price - tol:
                candles_below += 1

        # 2-3 candles were below, now we're back above
        if 2 <= candles_below <= 3 and closes[-1] > level.price and closes[-1] > opens[-1]:
            return {
                "side": "BUY",
                "pattern": f"сложный ЛП: {candles_below} свечи ниже уровня, возврат",
            }

    # Complex false breakout of resistance → SELL
    if level.price > current_price:
        candles_above = 0
        for i in range(-5, -1):
            if i + len(closes) < 0:
                continue
            if closes[i] > level.price + tol:
                candles_above += 1

        if 2 <= candles_above <= 3 and closes[-1] < level.price and closes[-1] < opens[-1]:
            return {
                "side": "SELL",
                "pattern": f"сложный ЛП: {candles_above} свечи выше уровня, возврат",
            }

    return None


def detect_model_d(
    level: Level,
    highs: list[float],
    lows: list[float],
    closes: list[float],
    opens: list[float],
    volumes: list[float],
    current_price: float,
    trend: str,
) -> Optional[dict]:
    """
    Model D — Breakout (Пробой уровня).

    Rules:
    - Candle closes BEYOND the level (not just wick)
    - Volume > 2x average (strong impulse)
    - Only with trend
    - Potential: 1-3 ATR movement
    """
    if len(closes) < 3:
        return None

    vol_ratio = get_volume_ratio(volumes)

    # Need strong volume
    if vol_ratio < 2.0:
        return None

    tol = level.price * GERCHIK_LEVEL_TOLERANCE_PCT / 100

    # Breakout of resistance (close above) → BUY, trend must be UP
    if level.price < current_price and trend == "UP":
        # Previous candles were below the level
        was_below = any(closes[i] < level.price for i in range(-4, -1) if i + len(closes) >= 0)
        # Current candle closed above with body
        body_above = closes[-1] > level.price + tol and opens[-1] < closes[-1]

        if was_below and body_above:
            return {
                "side": "BUY",
                "pattern": f"пробой сопротивления, vol={vol_ratio:.1f}x",
                "vol_ratio": vol_ratio,
            }

    # Breakout of support (close below) → SELL, trend must be DOWN
    if level.price > current_price and trend == "DOWN":
        was_above = any(closes[i] > level.price for i in range(-4, -1) if i + len(closes) >= 0)
        body_below = closes[-1] < level.price - tol and opens[-1] > closes[-1]

        if was_above and body_below:
            return {
                "side": "SELL",
                "pattern": f"пробой поддержки, vol={vol_ratio:.1f}x",
                "vol_ratio": vol_ratio,
            }

    return None


def detect_all_models(
    symbol: str,
    level: Level,
    opens_15m: list[float],
    highs_15m: list[float],
    lows_15m: list[float],
    closes_15m: list[float],
    volumes_15m: list[float],
    closes_1h: list[float],
    current_price: float,
    sl_buffer_pct: float,
    tp1_rr: float,
    tp2_rr: float,
    tp3_rr: float,
    max_sl_pct: float,
) -> Optional[GerchikSignal]:
    """
    Try all 4 models against a single level.
    Returns GerchikSignal if any model triggers, None otherwise.

    Priority: C > B > A > D (false breakouts are stronger signals)
    """
    trend = get_trend_1h(closes_1h)
    vol_ratio = get_volume_ratio(volumes_15m)

    # Try models in priority order (filtered by GERCHIK_ALLOWED_MODELS)
    result = None
    model = None

    # Model C — Complex false breakout (strongest)
    if "C" in GERCHIK_ALLOWED_MODELS and not result:
        r = detect_model_c(level, highs_15m, lows_15m, closes_15m, opens_15m, current_price)
        if r:
            result = r
            model = "C"

    # Model B — Simple false breakout
    if "B" in GERCHIK_ALLOWED_MODELS and not result:
        r = detect_model_b(level, highs_15m, lows_15m, closes_15m, opens_15m, current_price)
        if r:
            result = r
            model = "B"

    # Model A — Bounce (only with trend)
    if "A" in GERCHIK_ALLOWED_MODELS and not result:
        r = detect_model_a(level, highs_15m, lows_15m, closes_15m, opens_15m, current_price, trend)
        if r:
            result = r
            model = "A"

    # Model D — Breakout (only with trend + volume)
    if "D" in GERCHIK_ALLOWED_MODELS and not result:
        r = detect_model_d(level, highs_15m, lows_15m, closes_15m, opens_15m, volumes_15m, current_price, trend)
        if r:
            result = r
            model = "D"

    if not result:
        return None

    side = result["side"]

    # Calculate SL: just beyond the level + buffer
    if side == "BUY":
        # SL below support level
        sl_price = level.price * (1 - sl_buffer_pct / 100)
        sl_distance_pct = abs(current_price - sl_price) / current_price * 100

        # Check if SL is too far
        if sl_distance_pct > max_sl_pct:
            logger.debug(f"{symbol}: SL too far ({sl_distance_pct:.2f}% > {max_sl_pct}%), skip")
            return None

        # TPs based on SL distance from level
        sl_distance_abs = current_price - sl_price
        tp1_price = current_price + sl_distance_abs * tp1_rr
        tp2_price = current_price + sl_distance_abs * tp2_rr
        tp3_price = current_price + sl_distance_abs * tp3_rr
        entry_price = current_price  # Will be refined to limit near level

    else:  # SELL
        # SL above resistance level
        sl_price = level.price * (1 + sl_buffer_pct / 100)
        sl_distance_pct = abs(sl_price - current_price) / current_price * 100

        if sl_distance_pct > max_sl_pct:
            logger.debug(f"{symbol}: SL too far ({sl_distance_pct:.2f}% > {max_sl_pct}%), skip")
            return None

        sl_distance_abs = sl_price - current_price
        tp1_price = current_price - sl_distance_abs * tp1_rr
        tp2_price = current_price - sl_distance_abs * tp2_rr
        tp3_price = current_price - sl_distance_abs * tp3_rr
        entry_price = current_price

    return GerchikSignal(
        symbol=symbol,
        model=model,
        side=side,
        level=level,
        entry_price=entry_price,
        sl_price=sl_price,
        sl_distance_pct=round(sl_distance_pct, 3),
        tp1_price=tp1_price,
        tp2_price=tp2_price,
        tp3_price=tp3_price,
        trend=trend,
        volume_ratio=round(vol_ratio, 2),
        candle_pattern=result["pattern"],
    )

📜 Git History

c6f6bd5chore: initial commit — version control setup5 weeks ago
Show last diff
Loading...