← Назад
""" Binance market data fetcher. Gets real-time price, volume, and kline data for signal analysis. Includes WaveTrend Oscillator calculation. """ import logging import math from binance.client import Client logger = logging.getLogger(__name__) # Public client (no API key needed for market data) client = Client() def get_ticker_data(symbol: str) -> dict | None: """Get current ticker data from Binance.""" try: ticker = client.get_ticker(symbol=symbol) return { "symbol": symbol, "price": float(ticker["lastPrice"]), "change_24h": float(ticker["priceChangePercent"]), "volume_24h": float(ticker["quoteVolume"]), "high_24h": float(ticker["highPrice"]), "low_24h": float(ticker["lowPrice"]), } except Exception as e: logger.warning(f"Failed to get ticker for {symbol}: {e}") return None def get_klines(symbol: str, interval: str = "15m", limit: int = 50) -> list: """Get kline/candlestick data.""" try: klines = client.get_klines(symbol=symbol, interval=interval, limit=limit) return [{ "open_time": k[0], "open": float(k[1]), "high": float(k[2]), "low": float(k[3]), "close": float(k[4]), "volume": float(k[5]), "close_time": k[6], } for k in klines] except Exception as e: logger.warning(f"Failed to get klines for {symbol}: {e}") return [] def ema(values: list[float], period: int) -> list[float]: """Calculate Exponential Moving Average.""" if not values or period <= 0: return [] result = [values[0]] multiplier = 2.0 / (period + 1) for i in range(1, len(values)): val = (values[i] - result[-1]) * multiplier + result[-1] result.append(val) return result def calculate_wavetrend(klines: list[dict], channel_len: int = 10, avg_len: int = 21) -> dict: """ Calculate WaveTrend Oscillator [WT]. Based on the famous TradingView indicator by LazyBear. Parameters: klines: list of OHLCV candles channel_len: Channel Length (default 10) avg_len: Average Length (default 21) Returns: wt1: WaveTrend line wt2: Signal line (SMA of wt1) signal: "buy", "sell", or "neutral" zone: "overbought", "oversold", or "neutral" cross: "bullish_cross", "bearish_cross", or None """ if len(klines) < channel_len + avg_len + 10: return {"wt1": 0, "wt2": 0, "signal": "neutral", "zone": "neutral", "cross": None} # Step 1: Calculate HLC3 (typical price) hlc3 = [(k["high"] + k["low"] + k["close"]) / 3.0 for k in klines] # Step 2: EMA of HLC3 (channel_len) esa = ema(hlc3, channel_len) # Step 3: EMA of abs(HLC3 - ESA) (channel_len) — this is the "d" value diff = [abs(hlc3[i] - esa[i]) for i in range(len(hlc3))] d = ema(diff, channel_len) # Step 4: CI = (HLC3 - ESA) / (0.015 * d) ci = [] for i in range(len(hlc3)): denominator = 0.015 * d[i] if d[i] != 0 else 0.001 ci.append((hlc3[i] - esa[i]) / denominator) # Step 5: WT1 = EMA of CI (avg_len) wt1_series = ema(ci, avg_len) # Step 6: WT2 = SMA of WT1 (4 periods) sma_period = 4 wt2_series = [] for i in range(len(wt1_series)): if i < sma_period - 1: wt2_series.append(wt1_series[i]) else: avg = sum(wt1_series[i - sma_period + 1:i + 1]) / sma_period wt2_series.append(avg) # Current values wt1 = round(wt1_series[-1], 2) wt2 = round(wt2_series[-1], 2) # Previous values (for cross detection) wt1_prev = wt1_series[-2] wt2_prev = wt2_series[-2] # Levels from LazyBear's original indicator ob1 = 60 # Overbought Level 1 (strong) ob2 = 53 # Overbought Level 2 (weak) os1 = -60 # Oversold Level 1 (strong) os2 = -53 # Oversold Level 2 (weak) # Zone detection (two-tier) if wt1 > ob1: zone = "overbought_strong" elif wt1 > ob2: zone = "overbought" elif wt1 < os1: zone = "oversold_strong" elif wt1 < os2: zone = "oversold" else: zone = "neutral" # Cross detection — THE key signal cross = None if wt1_prev <= wt2_prev and wt1 > wt2: cross = "bullish_cross" # WT1 crosses above WT2 → BUY elif wt1_prev >= wt2_prev and wt1 < wt2: cross = "bearish_cross" # WT1 crosses below WT2 → SELL # Signal classification based on cross + zone # Best signals: cross IN the zone (confirmed reversal) if cross == "bullish_cross" and wt1 < os1: signal = "strong_buy" # Cross up in deep oversold → best buy elif cross == "bullish_cross" and wt1 < os2: signal = "buy" # Cross up in oversold zone elif cross == "bearish_cross" and wt1 > ob1: signal = "strong_sell" # Cross down in deep overbought → best sell elif cross == "bearish_cross" and wt1 > ob2: signal = "sell" # Cross down in overbought zone elif cross == "bullish_cross": signal = "weak_buy" # Cross up but not in zone elif cross == "bearish_cross": signal = "weak_sell" # Cross down but not in zone elif "oversold" in zone: signal = "approaching_buy" # In oversold, waiting for cross elif "overbought" in zone: signal = "approaching_sell" # In overbought, waiting for cross else: signal = "neutral" return { "wt1": wt1, "wt2": wt2, "signal": signal, "zone": zone, "cross": cross, } def analyze_momentum(symbol: str, market_type: str = "spot") -> dict: """Quick momentum analysis on 15m and 1H + WaveTrend.""" result = { "symbol": symbol, "market_type": market_type, "has_data": False, "trend_15m": "unknown", "trend_1h": "unknown", "volume_spike": False, "price": 0, "move_15m": 0, "move_1h": 0, "wavetrend": None, } # Pick the right klines function based on market type klines_fn = get_futures_klines if market_type == "futures" else get_klines # Get current price if market_type == "futures": try: ft = client.futures_symbol_ticker(symbol=symbol) result["price"] = float(ft["price"]) result["has_data"] = True except Exception as e: logger.warning(f"Failed to get futures ticker for {symbol}: {e}") return result else: ticker = get_ticker_data(symbol) if not ticker: return result result["price"] = ticker["price"] result["has_data"] = True # 15m analysis klines_15m = klines_fn(symbol, "15m", 60) if len(klines_15m) >= 2: current = klines_15m[-1] prev = klines_15m[-2] # Recent move move = ((current["close"] - prev["close"]) / prev["close"]) * 100 result["move_15m"] = round(move, 2) # Volume spike (current vs average of last 20) recent = klines_15m[-20:] avg_vol = sum(k["volume"] for k in recent[:-1]) / len(recent[:-1]) if avg_vol > 0: vol_ratio = current["volume"] / avg_vol result["volume_spike"] = vol_ratio > 2.0 result["volume_ratio"] = round(vol_ratio, 1) # Simple trend (last 5 candles) closes = [k["close"] for k in klines_15m[-5:]] if closes[-1] > closes[0]: result["trend_15m"] = "up" elif closes[-1] < closes[0]: result["trend_15m"] = "down" else: result["trend_15m"] = "flat" # WaveTrend on 15m if len(klines_15m) >= 45: result["wavetrend"] = calculate_wavetrend(klines_15m) # 1H analysis klines_1h = klines_fn(symbol, "1h", 60) if len(klines_1h) >= 2: current = klines_1h[-1] prev = klines_1h[-2] move = ((current["close"] - prev["close"]) / prev["close"]) * 100 result["move_1h"] = round(move, 2) closes = [k["close"] for k in klines_1h[-5:]] if closes[-1] > closes[0]: result["trend_1h"] = "up" elif closes[-1] < closes[0]: result["trend_1h"] = "down" else: result["trend_1h"] = "flat" # WaveTrend on 1H (more reliable) if len(klines_1h) >= 45: result["wavetrend_1h"] = calculate_wavetrend(klines_1h) return result def check_symbol_exists(symbol: str) -> bool: """Check if a trading pair exists on Binance spot.""" try: client.get_ticker(symbol=symbol) return True except: return False def check_futures_symbol_exists(symbol: str) -> bool: """Check if a trading pair exists on Binance Futures (USD-M).""" try: client.futures_symbol_ticker(symbol=symbol) return True except: return False def get_futures_klines(symbol: str, interval: str = "15m", limit: int = 50) -> list: """Get futures kline/candlestick data.""" try: klines = client.futures_klines(symbol=symbol, interval=interval, limit=limit) return [{ "open_time": k[0], "open": float(k[1]), "high": float(k[2]), "low": float(k[3]), "close": float(k[4]), "volume": float(k[5]), "close_time": k[6], } for k in klines] except Exception as e: logger.warning(f"Failed to get futures klines for {symbol}: {e}") return [] def find_trading_pair(ticker: str) -> tuple[str, str] | None: """ Find the correct trading pair for a ticker. Returns (symbol, market_type) or None. Futures only — spot disabled for now. """ symbol = f"{ticker}USDT" # Futures only (USD-M) if check_futures_symbol_exists(symbol): return (symbol, "futures") # TODO: enable spot later for higher TFs # if check_symbol_exists(symbol): # return (symbol, "spot") return None