← Назад
""" Digash Formation Parser — parses signals from "Формации - Digash" Telegram bot. Supported formation types: 1. Отскок/закол уровня {price}$ на таймфрейме {tf} 2. Пробой уровней {price1}$ ,{price2}$ ... на таймфрейме {tf} 3. Трендовый уровень на таймфрейме {tf} Each message is forwarded from the Digash bot and contains: - 🟧 Бинанс фьючерсы - {SYMBOL} - {emoji} {formation_type} ... на таймфрейме {tf} """ import re import logging logger = logging.getLogger(__name__) def parse_digash_message(text: str) -> dict | None: """ Parse a Digash formation message. Returns dict with keys: - symbol: str (e.g. "HUMAUSDT") - ticker: str (e.g. "HUMA") - formation: str ("bounce", "breakout", "trendline") - levels: list[float] (price levels, empty for trendline) - timeframe: str ("15m", "1h", "4h", "1d") - raw_text: str Or None if not a valid Digash formation message. """ if not text: return None # Must contain "Бинанс фьючерсы" — that's the marker if "Бинанс фьючерсы" not in text and "Бинанс Фьючерсы" not in text: return None lines = text.strip().split("\n") result = { "raw_text": text, "symbol": "", "ticker": "", "formation": "", "levels": [], "timeframe": "", } for line in lines: line = line.strip() # Extract symbol: "Бинанс фьючерсы - HUMAUSDT" sym_match = re.search(r"Бинанс\s+[фФ]ьючерсы\s*-\s*(\w+)", line) if sym_match: result["symbol"] = sym_match.group(1).upper() # Extract ticker (remove USDT suffix) sym = result["symbol"] if sym.endswith("USDT"): result["ticker"] = sym[:-4] else: result["ticker"] = sym # Extract formation type + levels + timeframe # Type 1: Отскок/закол уровня 0.019487$ на таймфрейме 15m bounce_match = re.search( r"[Оо]тскок/закол\s+уровня\s+([\d.]+)\$?\s+на\s+таймфрейме\s+(\w+)", line, ) if bounce_match: result["formation"] = "bounce" result["levels"] = [float(bounce_match.group(1))] result["timeframe"] = normalize_timeframe(bounce_match.group(2)) continue # Type 2: Пробой уровней 0.06795$ ,0.06415$ ,0.06415$ ,0.0635$ на таймфрейме 4h breakout_match = re.search( r"[Пп]робой\s+уровн\w*\s+([\d.$,\s]+)\s+на\s+таймфрейме\s+(\w+)", line, ) if breakout_match: result["formation"] = "breakout" levels_str = breakout_match.group(1) # Parse multiple prices: "0.06795$ ,0.06415$ ,0.06415$ ,0.0635$" prices = re.findall(r"([\d.]+)\$?", levels_str) result["levels"] = [float(p) for p in prices if p] result["timeframe"] = normalize_timeframe(breakout_match.group(2)) continue # Type 3: Трендовый уровень на таймфрейме 1d trend_match = re.search( r"[Тт]рендовый\s+уровень\s+на\s+таймфрейме\s+(\w+)", line, ) if trend_match: result["formation"] = "trendline" result["levels"] = [] result["timeframe"] = normalize_timeframe(trend_match.group(1)) continue # Validate: must have symbol and formation if not result["symbol"] or not result["formation"]: logger.debug(f"Failed to parse Digash message: {text[:100]}") return None logger.info( f"Digash parsed: {result['symbol']} | {result['formation']} | " f"levels={result['levels']} | tf={result['timeframe']}" ) return result def normalize_timeframe(tf: str) -> str: """Normalize timeframe string.""" tf = tf.lower().strip() # Map common variants mapping = { "15m": "15m", "15min": "15m", "1h": "1h", "1hr": "1h", "4h": "4h", "4hr": "4h", "1d": "1d", "1day": "1d", "d": "1d", } return mapping.get(tf, tf) def determine_direction(formation: str, levels: list[float], current_price: float) -> str | None: """ Determine trade direction based on formation type and price vs level. Returns "BUY", "SELL", or None if can't determine. """ if not levels or current_price <= 0: return None if formation == "bounce": # Отскок/закол — price bounced from level level = levels[0] margin = level * 0.002 # 0.2% margin for noise if current_price > level + margin: # Price above level = bounced up from support → LONG return "BUY" elif current_price < level - margin: # Price below level = bounced down from resistance → SHORT return "SELL" else: # Too close to level — can't determine yet return None elif formation == "breakout": # Пробой — price broke through levels # Use highest and lowest levels max_level = max(levels) min_level = min(levels) if current_price > max_level: # Broke above all levels → LONG return "BUY" elif current_price < min_level: # Broke below all levels → SHORT return "SELL" else: # Between levels — direction from nearest level mid = (max_level + min_level) / 2 if current_price > mid: return "BUY" else: return "SELL" elif formation == "trendline": # Трендовый уровень — no price levels given # Can't determine direction from level alone return None return None def calculate_sl_from_level( side: str, levels: list[float], formation: str, buffer_pct: float = 0.5, ) -> float | None: """ Calculate stop loss based on formation level. For bounces: SL is behind the level (level ± buffer) For breakouts: SL is back through the broken level """ if not levels: return None if formation == "bounce": level = levels[0] buffer = level * (buffer_pct / 100) if side == "BUY": # SL below the support level return level - buffer else: # SL above the resistance level return level + buffer elif formation == "breakout": if side == "BUY": # SL below the highest broken level (back into range) level = max(levels) buffer = level * (buffer_pct / 100) return level - buffer else: # SL above the lowest broken level level = min(levels) buffer = level * (buffer_pct / 100) return level + buffer return None