← Back
"""
Digash Formation Parser — parses signals from "Формации - Digash" Telegram bot.

Supported formation types:
  1. Отскок/закол уровня {price}$ на таймфрейме {tf}
  2. Пробой уровней {price1}$ ,{price2}$ ... на таймфрейме {tf}
  3. Трендовый уровень на таймфрейме {tf}

Each message is forwarded from the Digash bot and contains:
  - 🟧 Бинанс фьючерсы - {SYMBOL}
  - {emoji} {formation_type} ... на таймфрейме {tf}
"""

import re
import logging

logger = logging.getLogger(__name__)


def parse_digash_message(text: str) -> dict | None:
    """
    Parse a Digash formation message.

    Returns dict with keys:
      - symbol: str (e.g. "HUMAUSDT")
      - ticker: str (e.g. "HUMA")
      - formation: str ("bounce", "breakout", "trendline")
      - levels: list[float] (price levels, empty for trendline)
      - timeframe: str ("15m", "1h", "4h", "1d")
      - raw_text: str
    Or None if not a valid Digash formation message.
    """
    if not text:
        return None

    # Must contain "Бинанс фьючерсы" — that's the marker
    if "Бинанс фьючерсы" not in text and "Бинанс Фьючерсы" not in text:
        return None

    lines = text.strip().split("\n")
    result = {
        "raw_text": text,
        "symbol": "",
        "ticker": "",
        "formation": "",
        "levels": [],
        "timeframe": "",
    }

    for line in lines:
        line = line.strip()

        # Extract symbol: "Бинанс фьючерсы - HUMAUSDT"
        sym_match = re.search(r"Бинанс\s+[фФ]ьючерсы\s*-\s*(\w+)", line)
        if sym_match:
            result["symbol"] = sym_match.group(1).upper()
            # Extract ticker (remove USDT suffix)
            sym = result["symbol"]
            if sym.endswith("USDT"):
                result["ticker"] = sym[:-4]
            else:
                result["ticker"] = sym

        # Extract formation type + levels + timeframe
        # Type 1: Отскок/закол уровня 0.019487$ на таймфрейме 15m
        bounce_match = re.search(
            r"[Оо]тскок/закол\s+уровня\s+([\d.]+)\$?\s+на\s+таймфрейме\s+(\w+)",
            line,
        )
        if bounce_match:
            result["formation"] = "bounce"
            result["levels"] = [float(bounce_match.group(1))]
            result["timeframe"] = normalize_timeframe(bounce_match.group(2))
            continue

        # Type 2: Пробой уровней 0.06795$ ,0.06415$ ,0.06415$ ,0.0635$ на таймфрейме 4h
        breakout_match = re.search(
            r"[Пп]робой\s+уровн\w*\s+([\d.$,\s]+)\s+на\s+таймфрейме\s+(\w+)",
            line,
        )
        if breakout_match:
            result["formation"] = "breakout"
            levels_str = breakout_match.group(1)
            # Parse multiple prices: "0.06795$ ,0.06415$ ,0.06415$ ,0.0635$"
            prices = re.findall(r"([\d.]+)\$?", levels_str)
            result["levels"] = [float(p) for p in prices if p]
            result["timeframe"] = normalize_timeframe(breakout_match.group(2))
            continue

        # Type 3: Трендовый уровень на таймфрейме 1d
        trend_match = re.search(
            r"[Тт]рендовый\s+уровень\s+на\s+таймфрейме\s+(\w+)",
            line,
        )
        if trend_match:
            result["formation"] = "trendline"
            result["levels"] = []
            result["timeframe"] = normalize_timeframe(trend_match.group(1))
            continue

    # Validate: must have symbol and formation
    if not result["symbol"] or not result["formation"]:
        logger.debug(f"Failed to parse Digash message: {text[:100]}")
        return None

    logger.info(
        f"Digash parsed: {result['symbol']} | {result['formation']} | "
        f"levels={result['levels']} | tf={result['timeframe']}"
    )
    return result


def normalize_timeframe(tf: str) -> str:
    """Normalize timeframe string."""
    tf = tf.lower().strip()
    # Map common variants
    mapping = {
        "15m": "15m",
        "15min": "15m",
        "1h": "1h",
        "1hr": "1h",
        "4h": "4h",
        "4hr": "4h",
        "1d": "1d",
        "1day": "1d",
        "d": "1d",
    }
    return mapping.get(tf, tf)


def determine_direction(formation: str, levels: list[float], current_price: float) -> str | None:
    """
    Determine trade direction based on formation type and price vs level.

    Returns "BUY", "SELL", or None if can't determine.
    """
    if not levels or current_price <= 0:
        return None

    if formation == "bounce":
        # Отскок/закол — price bounced from level
        level = levels[0]
        margin = level * 0.002  # 0.2% margin for noise

        if current_price > level + margin:
            # Price above level = bounced up from support → LONG
            return "BUY"
        elif current_price < level - margin:
            # Price below level = bounced down from resistance → SHORT
            return "SELL"
        else:
            # Too close to level — can't determine yet
            return None

    elif formation == "breakout":
        # Пробой — price broke through levels
        # Use highest and lowest levels
        max_level = max(levels)
        min_level = min(levels)

        if current_price > max_level:
            # Broke above all levels → LONG
            return "BUY"
        elif current_price < min_level:
            # Broke below all levels → SHORT
            return "SELL"
        else:
            # Between levels — direction from nearest level
            mid = (max_level + min_level) / 2
            if current_price > mid:
                return "BUY"
            else:
                return "SELL"

    elif formation == "trendline":
        # Трендовый уровень — no price levels given
        # Can't determine direction from level alone
        return None

    return None


def calculate_sl_from_level(
    side: str,
    levels: list[float],
    formation: str,
    buffer_pct: float = 0.5,
) -> float | None:
    """
    Calculate stop loss based on formation level.

    For bounces: SL is behind the level (level ± buffer)
    For breakouts: SL is back through the broken level
    """
    if not levels:
        return None

    if formation == "bounce":
        level = levels[0]
        buffer = level * (buffer_pct / 100)

        if side == "BUY":
            # SL below the support level
            return level - buffer
        else:
            # SL above the resistance level
            return level + buffer

    elif formation == "breakout":
        if side == "BUY":
            # SL below the highest broken level (back into range)
            level = max(levels)
            buffer = level * (buffer_pct / 100)
            return level - buffer
        else:
            # SL above the lowest broken level
            level = min(levels)
            buffer = level * (buffer_pct / 100)
            return level + buffer

    return None

📜 Git History

c6f6bd5chore: initial commit — version control setup5 weeks ago
Show last diff
Loading...