← Назад"""
Digash Formation Parser — parses signals from "Формации - Digash" Telegram bot.
Supported formation types:
1. Отскок/закол уровня {price}$ на таймфрейме {tf}
2. Пробой уровней {price1}$ ,{price2}$ ... на таймфрейме {tf}
3. Трендовый уровень на таймфрейме {tf}
Each message is forwarded from the Digash bot and contains:
- 🟧 Бинанс фьючерсы - {SYMBOL}
- {emoji} {formation_type} ... на таймфрейме {tf}
"""
import re
import logging
logger = logging.getLogger(__name__)
def parse_digash_message(text: str) -> dict | None:
"""
Parse a Digash formation message.
Returns dict with keys:
- symbol: str (e.g. "HUMAUSDT")
- ticker: str (e.g. "HUMA")
- formation: str ("bounce", "breakout", "trendline")
- levels: list[float] (price levels, empty for trendline)
- timeframe: str ("15m", "1h", "4h", "1d")
- raw_text: str
Or None if not a valid Digash formation message.
"""
if not text:
return None
# Must contain "Бинанс фьючерсы" — that's the marker
if "Бинанс фьючерсы" not in text and "Бинанс Фьючерсы" not in text:
return None
lines = text.strip().split("\n")
result = {
"raw_text": text,
"symbol": "",
"ticker": "",
"formation": "",
"levels": [],
"timeframe": "",
}
for line in lines:
line = line.strip()
# Extract symbol: "Бинанс фьючерсы - HUMAUSDT"
sym_match = re.search(r"Бинанс\s+[фФ]ьючерсы\s*-\s*(\w+)", line)
if sym_match:
result["symbol"] = sym_match.group(1).upper()
# Extract ticker (remove USDT suffix)
sym = result["symbol"]
if sym.endswith("USDT"):
result["ticker"] = sym[:-4]
else:
result["ticker"] = sym
# Extract formation type + levels + timeframe
# Type 1: Отскок/закол уровня 0.019487$ на таймфрейме 15m
bounce_match = re.search(
r"[Оо]тскок/закол\s+уровня\s+([\d.]+)\$?\s+на\s+таймфрейме\s+(\w+)",
line,
)
if bounce_match:
result["formation"] = "bounce"
result["levels"] = [float(bounce_match.group(1))]
result["timeframe"] = normalize_timeframe(bounce_match.group(2))
continue
# Type 2: Пробой уровней 0.06795$ ,0.06415$ ,0.06415$ ,0.0635$ на таймфрейме 4h
breakout_match = re.search(
r"[Пп]робой\s+уровн\w*\s+([\d.$,\s]+)\s+на\s+таймфрейме\s+(\w+)",
line,
)
if breakout_match:
result["formation"] = "breakout"
levels_str = breakout_match.group(1)
# Parse multiple prices: "0.06795$ ,0.06415$ ,0.06415$ ,0.0635$"
prices = re.findall(r"([\d.]+)\$?", levels_str)
result["levels"] = [float(p) for p in prices if p]
result["timeframe"] = normalize_timeframe(breakout_match.group(2))
continue
# Type 3: Трендовый уровень на таймфрейме 1d
trend_match = re.search(
r"[Тт]рендовый\s+уровень\s+на\s+таймфрейме\s+(\w+)",
line,
)
if trend_match:
result["formation"] = "trendline"
result["levels"] = []
result["timeframe"] = normalize_timeframe(trend_match.group(1))
continue
# Validate: must have symbol and formation
if not result["symbol"] or not result["formation"]:
logger.debug(f"Failed to parse Digash message: {text[:100]}")
return None
logger.info(
f"Digash parsed: {result['symbol']} | {result['formation']} | "
f"levels={result['levels']} | tf={result['timeframe']}"
)
return result
def normalize_timeframe(tf: str) -> str:
"""Normalize timeframe string."""
tf = tf.lower().strip()
# Map common variants
mapping = {
"15m": "15m",
"15min": "15m",
"1h": "1h",
"1hr": "1h",
"4h": "4h",
"4hr": "4h",
"1d": "1d",
"1day": "1d",
"d": "1d",
}
return mapping.get(tf, tf)
def determine_direction(formation: str, levels: list[float], current_price: float) -> str | None:
"""
Determine trade direction based on formation type and price vs level.
Returns "BUY", "SELL", or None if can't determine.
"""
if not levels or current_price <= 0:
return None
if formation == "bounce":
# Отскок/закол — price bounced from level
level = levels[0]
margin = level * 0.002 # 0.2% margin for noise
if current_price > level + margin:
# Price above level = bounced up from support → LONG
return "BUY"
elif current_price < level - margin:
# Price below level = bounced down from resistance → SHORT
return "SELL"
else:
# Too close to level — can't determine yet
return None
elif formation == "breakout":
# Пробой — price broke through levels
# Use highest and lowest levels
max_level = max(levels)
min_level = min(levels)
if current_price > max_level:
# Broke above all levels → LONG
return "BUY"
elif current_price < min_level:
# Broke below all levels → SHORT
return "SELL"
else:
# Between levels — direction from nearest level
mid = (max_level + min_level) / 2
if current_price > mid:
return "BUY"
else:
return "SELL"
elif formation == "trendline":
# Трендовый уровень — no price levels given
# Can't determine direction from level alone
return None
return None
def calculate_sl_from_level(
side: str,
levels: list[float],
formation: str,
buffer_pct: float = 0.5,
) -> float | None:
"""
Calculate stop loss based on formation level.
For bounces: SL is behind the level (level ± buffer)
For breakouts: SL is back through the broken level
"""
if not levels:
return None
if formation == "bounce":
level = levels[0]
buffer = level * (buffer_pct / 100)
if side == "BUY":
# SL below the support level
return level - buffer
else:
# SL above the resistance level
return level + buffer
elif formation == "breakout":
if side == "BUY":
# SL below the highest broken level (back into range)
level = max(levels)
buffer = level * (buffer_pct / 100)
return level - buffer
else:
# SL above the lowest broken level
level = min(levels)
buffer = level * (buffer_pct / 100)
return level + buffer
return None