"""
Gerchik Models — Pattern Recognition for 4 entry models.
Model A: Bounce (отбой) — only with trend
Model B: Simple false breakout — 1 candle pokes through, closes back
Model C: Complex false breakout — 2-3 candles beyond level, then return
Model D: Breakout — impulse break with volume
All patterns detected on 15m candles against 1H levels.
"""
import logging
from dataclasses import dataclass
from typing import Optional
from gerchik_levels import Level
from gerchik_config import GERCHIK_LEVEL_TOLERANCE_PCT, GERCHIK_ALLOWED_MODELS
logger = logging.getLogger(__name__)
@dataclass
class GerchikSignal:
"""A detected Gerchik entry signal."""
symbol: str
model: str # "A", "B", "C", "D"
side: str # "BUY" or "SELL"
level: Level # The level we're trading from
entry_price: float # Suggested entry (limit)
sl_price: float # Stop loss
sl_distance_pct: float # SL distance in %
tp1_price: float # TP1 (3x SL)
tp2_price: float # TP2 (4x SL)
tp3_price: float # TP3 (5x SL)
trend: str # "UP" or "DOWN" (from EMA50 1H)
volume_ratio: float # Current volume vs avg
candle_pattern: str # Description of what was detected
def calc_ema(closes: list[float], period: int) -> float:
"""Calculate EMA of last `period` closes. Returns latest value."""
if len(closes) < period:
return closes[-1] if closes else 0
multiplier = 2 / (period + 1)
ema = sum(closes[:period]) / period
for price in closes[period:]:
ema = (price - ema) * multiplier + ema
return ema
def get_trend_1h(closes_1h: list[float], period: int = 50) -> str:
"""Determine trend from EMA50 on 1H."""
if len(closes_1h) < period:
return "NEUTRAL"
ema = calc_ema(closes_1h, period)
current = closes_1h[-1]
if current > ema * 1.002: # 0.2% buffer
return "UP"
elif current < ema * 0.998:
return "DOWN"
return "NEUTRAL"
def get_volume_ratio(volumes: list[float], period: int = 20) -> float:
"""Current candle volume vs avg of last N."""
if len(volumes) < period + 1:
return 1.0
avg = sum(volumes[-period - 1:-1]) / period
return volumes[-1] / avg if avg > 0 else 1.0
def detect_model_a(
level: Level,
highs: list[float],
lows: list[float],
closes: list[float],
opens: list[float],
current_price: float,
trend: str,
) -> Optional[dict]:
"""
Model A — Bounce (Отбой от уровня).
Rules:
- 3+ candles near the level (within tolerance)
- None of them breaks through
- 4th candle starts moving away from level
- ONLY with trend (EMA50 direction)
For support level (BUY): candles bounce off from above, trend=UP
For resistance level (SELL): candles bounce off from below, trend=DOWN
"""
if len(closes) < 5:
return None
tol = level.price * GERCHIK_LEVEL_TOLERANCE_PCT / 100
# Check if this is a support bounce (BUY)
if level.price < current_price and trend == "UP":
# Last 3-5 candles should have lows near the level but not breaking it
near_candles = 0
for i in range(-4, -1):
if i + len(lows) < 0:
continue
low = lows[i]
close = closes[i]
if abs(low - level.price) <= tol * 2 and close > level.price:
near_candles += 1
if near_candles >= 2:
# Current candle should be moving up (close > open)
if closes[-1] > opens[-1] and closes[-1] > level.price + tol:
return {
"side": "BUY",
"pattern": f"{near_candles} свечей у поддержки, отбой вверх",
}
# Check if this is a resistance bounce (SELL)
if level.price > current_price and trend == "DOWN":
near_candles = 0
for i in range(-4, -1):
if i + len(highs) < 0:
continue
high = highs[i]
close = closes[i]
if abs(high - level.price) <= tol * 2 and close < level.price:
near_candles += 1
if near_candles >= 2:
if closes[-1] < opens[-1] and closes[-1] < level.price - tol:
return {
"side": "SELL",
"pattern": f"{near_candles} свечей у сопротивления, отбой вниз",
}
return None
def detect_model_b(
level: Level,
highs: list[float],
lows: list[float],
closes: list[float],
opens: list[float],
current_price: float,
) -> Optional[dict]:
"""
Model B — Simple False Breakout (Простой ложный пробой).
Rules:
- Previous candle broke through the level with its wick
- But closed back on the correct side
- Current candle confirms by moving away
"""
if len(closes) < 3:
return None
tol = level.price * GERCHIK_LEVEL_TOLERANCE_PCT / 100
# False breakout of support (wick went below, closed above) → BUY
if level.price < current_price:
prev_low = lows[-2]
prev_close = closes[-2]
# Wick below level, close above
if prev_low < level.price - tol and prev_close > level.price:
# Current candle is bullish and above level
if closes[-1] > opens[-1] and closes[-1] > level.price:
return {
"side": "BUY",
"pattern": f"ложный пробой поддержки (хвост ${prev_low:.4f}), закрытие выше",
}
# False breakout of resistance (wick went above, closed below) → SELL
if level.price > current_price:
prev_high = highs[-2]
prev_close = closes[-2]
if prev_high > level.price + tol and prev_close < level.price:
if closes[-1] < opens[-1] and closes[-1] < level.price:
return {
"side": "SELL",
"pattern": f"ложный пробой сопротивления (хвост ${prev_high:.4f}), закрытие ниже",
}
return None
def detect_model_c(
level: Level,
highs: list[float],
lows: list[float],
closes: list[float],
opens: list[float],
current_price: float,
) -> Optional[dict]:
"""
Model C — Complex False Breakout (Сложный ложный пробой).
Rules:
- 2-3 candles closed BEYOND the level
- Then price returned back through the level
- Stronger signal than Model B (trap confirmed)
"""
if len(closes) < 5:
return None
tol = level.price * GERCHIK_LEVEL_TOLERANCE_PCT / 100
# Complex false breakout of support → BUY
if level.price < current_price:
# Count candles that closed below level in recent history
candles_below = 0
for i in range(-5, -1):
if i + len(closes) < 0:
continue
if closes[i] < level.price - tol:
candles_below += 1
# 2-3 candles were below, now we're back above
if 2 <= candles_below <= 3 and closes[-1] > level.price and closes[-1] > opens[-1]:
return {
"side": "BUY",
"pattern": f"сложный ЛП: {candles_below} свечи ниже уровня, возврат",
}
# Complex false breakout of resistance → SELL
if level.price > current_price:
candles_above = 0
for i in range(-5, -1):
if i + len(closes) < 0:
continue
if closes[i] > level.price + tol:
candles_above += 1
if 2 <= candles_above <= 3 and closes[-1] < level.price and closes[-1] < opens[-1]:
return {
"side": "SELL",
"pattern": f"сложный ЛП: {candles_above} свечи выше уровня, возврат",
}
return None
def detect_model_d(
level: Level,
highs: list[float],
lows: list[float],
closes: list[float],
opens: list[float],
volumes: list[float],
current_price: float,
trend: str,
) -> Optional[dict]:
"""
Model D — Breakout (Пробой уровня).
Rules:
- Candle closes BEYOND the level (not just wick)
- Volume > 2x average (strong impulse)
- Only with trend
- Potential: 1-3 ATR movement
"""
if len(closes) < 3:
return None
vol_ratio = get_volume_ratio(volumes)
# Need strong volume
if vol_ratio < 2.0:
return None
tol = level.price * GERCHIK_LEVEL_TOLERANCE_PCT / 100
# Breakout of resistance (close above) → BUY, trend must be UP
if level.price < current_price and trend == "UP":
# Previous candles were below the level
was_below = any(closes[i] < level.price for i in range(-4, -1) if i + len(closes) >= 0)
# Current candle closed above with body
body_above = closes[-1] > level.price + tol and opens[-1] < closes[-1]
if was_below and body_above:
return {
"side": "BUY",
"pattern": f"пробой сопротивления, vol={vol_ratio:.1f}x",
"vol_ratio": vol_ratio,
}
# Breakout of support (close below) → SELL, trend must be DOWN
if level.price > current_price and trend == "DOWN":
was_above = any(closes[i] > level.price for i in range(-4, -1) if i + len(closes) >= 0)
body_below = closes[-1] < level.price - tol and opens[-1] > closes[-1]
if was_above and body_below:
return {
"side": "SELL",
"pattern": f"пробой поддержки, vol={vol_ratio:.1f}x",
"vol_ratio": vol_ratio,
}
return None
def detect_all_models(
symbol: str,
level: Level,
opens_15m: list[float],
highs_15m: list[float],
lows_15m: list[float],
closes_15m: list[float],
volumes_15m: list[float],
closes_1h: list[float],
current_price: float,
sl_buffer_pct: float,
tp1_rr: float,
tp2_rr: float,
tp3_rr: float,
max_sl_pct: float,
) -> Optional[GerchikSignal]:
"""
Try all 4 models against a single level.
Returns GerchikSignal if any model triggers, None otherwise.
Priority: C > B > A > D (false breakouts are stronger signals)
"""
trend = get_trend_1h(closes_1h)
vol_ratio = get_volume_ratio(volumes_15m)
# Try models in priority order (filtered by GERCHIK_ALLOWED_MODELS)
result = None
model = None
# Model C — Complex false breakout (strongest)
if "C" in GERCHIK_ALLOWED_MODELS and not result:
r = detect_model_c(level, highs_15m, lows_15m, closes_15m, opens_15m, current_price)
if r:
result = r
model = "C"
# Model B — Simple false breakout
if "B" in GERCHIK_ALLOWED_MODELS and not result:
r = detect_model_b(level, highs_15m, lows_15m, closes_15m, opens_15m, current_price)
if r:
result = r
model = "B"
# Model A — Bounce (only with trend)
if "A" in GERCHIK_ALLOWED_MODELS and not result:
r = detect_model_a(level, highs_15m, lows_15m, closes_15m, opens_15m, current_price, trend)
if r:
result = r
model = "A"
# Model D — Breakout (only with trend + volume)
if "D" in GERCHIK_ALLOWED_MODELS and not result:
r = detect_model_d(level, highs_15m, lows_15m, closes_15m, opens_15m, volumes_15m, current_price, trend)
if r:
result = r
model = "D"
if not result:
return None
side = result["side"]
# Calculate SL: just beyond the level + buffer
if side == "BUY":
# SL below support level
sl_price = level.price * (1 - sl_buffer_pct / 100)
sl_distance_pct = abs(current_price - sl_price) / current_price * 100
# Check if SL is too far
if sl_distance_pct > max_sl_pct:
logger.debug(f"{symbol}: SL too far ({sl_distance_pct:.2f}% > {max_sl_pct}%), skip")
return None
# TPs based on SL distance from level
sl_distance_abs = current_price - sl_price
tp1_price = current_price + sl_distance_abs * tp1_rr
tp2_price = current_price + sl_distance_abs * tp2_rr
tp3_price = current_price + sl_distance_abs * tp3_rr
entry_price = current_price # Will be refined to limit near level
else: # SELL
# SL above resistance level
sl_price = level.price * (1 + sl_buffer_pct / 100)
sl_distance_pct = abs(sl_price - current_price) / current_price * 100
if sl_distance_pct > max_sl_pct:
logger.debug(f"{symbol}: SL too far ({sl_distance_pct:.2f}% > {max_sl_pct}%), skip")
return None
sl_distance_abs = sl_price - current_price
tp1_price = current_price - sl_distance_abs * tp1_rr
tp2_price = current_price - sl_distance_abs * tp2_rr
tp3_price = current_price - sl_distance_abs * tp3_rr
entry_price = current_price
return GerchikSignal(
symbol=symbol,
model=model,
side=side,
level=level,
entry_price=entry_price,
sl_price=sl_price,
sl_distance_pct=round(sl_distance_pct, 3),
tp1_price=tp1_price,
tp2_price=tp2_price,
tp3_price=tp3_price,
trend=trend,
volume_ratio=round(vol_ratio, 2),
candle_pattern=result["pattern"],
)