← Назад"""
Binance market data fetcher.
Gets real-time price, volume, and kline data for signal analysis.
Includes WaveTrend Oscillator calculation.
"""
import logging
import math
from binance.client import Client
logger = logging.getLogger(__name__)
# Public client (no API key needed for market data)
client = Client()
def get_ticker_data(symbol: str) -> dict | None:
"""Get current ticker data from Binance."""
try:
ticker = client.get_ticker(symbol=symbol)
return {
"symbol": symbol,
"price": float(ticker["lastPrice"]),
"change_24h": float(ticker["priceChangePercent"]),
"volume_24h": float(ticker["quoteVolume"]),
"high_24h": float(ticker["highPrice"]),
"low_24h": float(ticker["lowPrice"]),
}
except Exception as e:
logger.warning(f"Failed to get ticker for {symbol}: {e}")
return None
def get_klines(symbol: str, interval: str = "15m", limit: int = 50) -> list:
"""Get kline/candlestick data."""
try:
klines = client.get_klines(symbol=symbol, interval=interval, limit=limit)
return [{
"open_time": k[0],
"open": float(k[1]),
"high": float(k[2]),
"low": float(k[3]),
"close": float(k[4]),
"volume": float(k[5]),
"close_time": k[6],
} for k in klines]
except Exception as e:
logger.warning(f"Failed to get klines for {symbol}: {e}")
return []
def ema(values: list[float], period: int) -> list[float]:
"""Calculate Exponential Moving Average."""
if not values or period <= 0:
return []
result = [values[0]]
multiplier = 2.0 / (period + 1)
for i in range(1, len(values)):
val = (values[i] - result[-1]) * multiplier + result[-1]
result.append(val)
return result
def calculate_wavetrend(klines: list[dict], channel_len: int = 10, avg_len: int = 21) -> dict:
"""
Calculate WaveTrend Oscillator [WT].
Based on the famous TradingView indicator by LazyBear.
Parameters:
klines: list of OHLCV candles
channel_len: Channel Length (default 10)
avg_len: Average Length (default 21)
Returns:
wt1: WaveTrend line
wt2: Signal line (SMA of wt1)
signal: "buy", "sell", or "neutral"
zone: "overbought", "oversold", or "neutral"
cross: "bullish_cross", "bearish_cross", or None
"""
if len(klines) < channel_len + avg_len + 10:
return {"wt1": 0, "wt2": 0, "signal": "neutral", "zone": "neutral", "cross": None}
# Step 1: Calculate HLC3 (typical price)
hlc3 = [(k["high"] + k["low"] + k["close"]) / 3.0 for k in klines]
# Step 2: EMA of HLC3 (channel_len)
esa = ema(hlc3, channel_len)
# Step 3: EMA of abs(HLC3 - ESA) (channel_len) — this is the "d" value
diff = [abs(hlc3[i] - esa[i]) for i in range(len(hlc3))]
d = ema(diff, channel_len)
# Step 4: CI = (HLC3 - ESA) / (0.015 * d)
ci = []
for i in range(len(hlc3)):
denominator = 0.015 * d[i] if d[i] != 0 else 0.001
ci.append((hlc3[i] - esa[i]) / denominator)
# Step 5: WT1 = EMA of CI (avg_len)
wt1_series = ema(ci, avg_len)
# Step 6: WT2 = SMA of WT1 (4 periods)
sma_period = 4
wt2_series = []
for i in range(len(wt1_series)):
if i < sma_period - 1:
wt2_series.append(wt1_series[i])
else:
avg = sum(wt1_series[i - sma_period + 1:i + 1]) / sma_period
wt2_series.append(avg)
# Current values
wt1 = round(wt1_series[-1], 2)
wt2 = round(wt2_series[-1], 2)
# Previous values (for cross detection)
wt1_prev = wt1_series[-2]
wt2_prev = wt2_series[-2]
# Levels from LazyBear's original indicator
ob1 = 60 # Overbought Level 1 (strong)
ob2 = 53 # Overbought Level 2 (weak)
os1 = -60 # Oversold Level 1 (strong)
os2 = -53 # Oversold Level 2 (weak)
# Zone detection (two-tier)
if wt1 > ob1:
zone = "overbought_strong"
elif wt1 > ob2:
zone = "overbought"
elif wt1 < os1:
zone = "oversold_strong"
elif wt1 < os2:
zone = "oversold"
else:
zone = "neutral"
# Cross detection — THE key signal
cross = None
if wt1_prev <= wt2_prev and wt1 > wt2:
cross = "bullish_cross" # WT1 crosses above WT2 → BUY
elif wt1_prev >= wt2_prev and wt1 < wt2:
cross = "bearish_cross" # WT1 crosses below WT2 → SELL
# Signal classification based on cross + zone
# Best signals: cross IN the zone (confirmed reversal)
if cross == "bullish_cross" and wt1 < os1:
signal = "strong_buy" # Cross up in deep oversold → best buy
elif cross == "bullish_cross" and wt1 < os2:
signal = "buy" # Cross up in oversold zone
elif cross == "bearish_cross" and wt1 > ob1:
signal = "strong_sell" # Cross down in deep overbought → best sell
elif cross == "bearish_cross" and wt1 > ob2:
signal = "sell" # Cross down in overbought zone
elif cross == "bullish_cross":
signal = "weak_buy" # Cross up but not in zone
elif cross == "bearish_cross":
signal = "weak_sell" # Cross down but not in zone
elif "oversold" in zone:
signal = "approaching_buy" # In oversold, waiting for cross
elif "overbought" in zone:
signal = "approaching_sell" # In overbought, waiting for cross
else:
signal = "neutral"
return {
"wt1": wt1,
"wt2": wt2,
"signal": signal,
"zone": zone,
"cross": cross,
}
def analyze_momentum(symbol: str, market_type: str = "spot") -> dict:
"""Quick momentum analysis on 15m and 1H + WaveTrend."""
result = {
"symbol": symbol,
"market_type": market_type,
"has_data": False,
"trend_15m": "unknown",
"trend_1h": "unknown",
"volume_spike": False,
"price": 0,
"move_15m": 0,
"move_1h": 0,
"wavetrend": None,
}
# Pick the right klines function based on market type
klines_fn = get_futures_klines if market_type == "futures" else get_klines
# Get current price
if market_type == "futures":
try:
ft = client.futures_symbol_ticker(symbol=symbol)
result["price"] = float(ft["price"])
result["has_data"] = True
except Exception as e:
logger.warning(f"Failed to get futures ticker for {symbol}: {e}")
return result
else:
ticker = get_ticker_data(symbol)
if not ticker:
return result
result["price"] = ticker["price"]
result["has_data"] = True
# 15m analysis
klines_15m = klines_fn(symbol, "15m", 60)
if len(klines_15m) >= 2:
current = klines_15m[-1]
prev = klines_15m[-2]
# Recent move
move = ((current["close"] - prev["close"]) / prev["close"]) * 100
result["move_15m"] = round(move, 2)
# Volume spike (current vs average of last 20)
recent = klines_15m[-20:]
avg_vol = sum(k["volume"] for k in recent[:-1]) / len(recent[:-1])
if avg_vol > 0:
vol_ratio = current["volume"] / avg_vol
result["volume_spike"] = vol_ratio > 2.0
result["volume_ratio"] = round(vol_ratio, 1)
# Simple trend (last 5 candles)
closes = [k["close"] for k in klines_15m[-5:]]
if closes[-1] > closes[0]:
result["trend_15m"] = "up"
elif closes[-1] < closes[0]:
result["trend_15m"] = "down"
else:
result["trend_15m"] = "flat"
# WaveTrend on 15m
if len(klines_15m) >= 45:
result["wavetrend"] = calculate_wavetrend(klines_15m)
# 1H analysis
klines_1h = klines_fn(symbol, "1h", 60)
if len(klines_1h) >= 2:
current = klines_1h[-1]
prev = klines_1h[-2]
move = ((current["close"] - prev["close"]) / prev["close"]) * 100
result["move_1h"] = round(move, 2)
closes = [k["close"] for k in klines_1h[-5:]]
if closes[-1] > closes[0]:
result["trend_1h"] = "up"
elif closes[-1] < closes[0]:
result["trend_1h"] = "down"
else:
result["trend_1h"] = "flat"
# WaveTrend on 1H (more reliable)
if len(klines_1h) >= 45:
result["wavetrend_1h"] = calculate_wavetrend(klines_1h)
return result
def check_symbol_exists(symbol: str) -> bool:
"""Check if a trading pair exists on Binance spot."""
try:
client.get_ticker(symbol=symbol)
return True
except:
return False
def check_futures_symbol_exists(symbol: str) -> bool:
"""Check if a trading pair exists on Binance Futures (USD-M)."""
try:
client.futures_symbol_ticker(symbol=symbol)
return True
except:
return False
def get_futures_klines(symbol: str, interval: str = "15m", limit: int = 50) -> list:
"""Get futures kline/candlestick data."""
try:
klines = client.futures_klines(symbol=symbol, interval=interval, limit=limit)
return [{
"open_time": k[0],
"open": float(k[1]),
"high": float(k[2]),
"low": float(k[3]),
"close": float(k[4]),
"volume": float(k[5]),
"close_time": k[6],
} for k in klines]
except Exception as e:
logger.warning(f"Failed to get futures klines for {symbol}: {e}")
return []
def find_trading_pair(ticker: str) -> tuple[str, str] | None:
"""
Find the correct trading pair for a ticker.
Returns (symbol, market_type) or None.
Futures only — spot disabled for now.
"""
symbol = f"{ticker}USDT"
# Futures only (USD-M)
if check_futures_symbol_exists(symbol):
return (symbol, "futures")
# TODO: enable spot later for higher TFs
# if check_symbol_exists(symbol):
# return (symbol, "spot")
return None