← Back
"""
Binance market data fetcher.
Gets real-time price, volume, and kline data for signal analysis.
Includes WaveTrend Oscillator calculation.
"""

import logging
import math
from binance.client import Client

logger = logging.getLogger(__name__)

# Public client (no API key needed for market data)
client = Client()


def get_ticker_data(symbol: str) -> dict | None:
    """Get current ticker data from Binance."""
    try:
        ticker = client.get_ticker(symbol=symbol)
        return {
            "symbol": symbol,
            "price": float(ticker["lastPrice"]),
            "change_24h": float(ticker["priceChangePercent"]),
            "volume_24h": float(ticker["quoteVolume"]),
            "high_24h": float(ticker["highPrice"]),
            "low_24h": float(ticker["lowPrice"]),
        }
    except Exception as e:
        logger.warning(f"Failed to get ticker for {symbol}: {e}")
        return None


def get_klines(symbol: str, interval: str = "15m", limit: int = 50) -> list:
    """Get kline/candlestick data."""
    try:
        klines = client.get_klines(symbol=symbol, interval=interval, limit=limit)
        return [{
            "open_time": k[0],
            "open": float(k[1]),
            "high": float(k[2]),
            "low": float(k[3]),
            "close": float(k[4]),
            "volume": float(k[5]),
            "close_time": k[6],
        } for k in klines]
    except Exception as e:
        logger.warning(f"Failed to get klines for {symbol}: {e}")
        return []


def ema(values: list[float], period: int) -> list[float]:
    """Calculate Exponential Moving Average."""
    if not values or period <= 0:
        return []

    result = [values[0]]
    multiplier = 2.0 / (period + 1)

    for i in range(1, len(values)):
        val = (values[i] - result[-1]) * multiplier + result[-1]
        result.append(val)

    return result


def calculate_wavetrend(klines: list[dict], channel_len: int = 10, avg_len: int = 21) -> dict:
    """
    Calculate WaveTrend Oscillator [WT].

    Based on the famous TradingView indicator by LazyBear.

    Parameters:
        klines: list of OHLCV candles
        channel_len: Channel Length (default 10)
        avg_len: Average Length (default 21)

    Returns:
        wt1: WaveTrend line
        wt2: Signal line (SMA of wt1)
        signal: "buy", "sell", or "neutral"
        zone: "overbought", "oversold", or "neutral"
        cross: "bullish_cross", "bearish_cross", or None
    """
    if len(klines) < channel_len + avg_len + 10:
        return {"wt1": 0, "wt2": 0, "signal": "neutral", "zone": "neutral", "cross": None}

    # Step 1: Calculate HLC3 (typical price)
    hlc3 = [(k["high"] + k["low"] + k["close"]) / 3.0 for k in klines]

    # Step 2: EMA of HLC3 (channel_len)
    esa = ema(hlc3, channel_len)

    # Step 3: EMA of abs(HLC3 - ESA) (channel_len) — this is the "d" value
    diff = [abs(hlc3[i] - esa[i]) for i in range(len(hlc3))]
    d = ema(diff, channel_len)

    # Step 4: CI = (HLC3 - ESA) / (0.015 * d)
    ci = []
    for i in range(len(hlc3)):
        denominator = 0.015 * d[i] if d[i] != 0 else 0.001
        ci.append((hlc3[i] - esa[i]) / denominator)

    # Step 5: WT1 = EMA of CI (avg_len)
    wt1_series = ema(ci, avg_len)

    # Step 6: WT2 = SMA of WT1 (4 periods)
    sma_period = 4
    wt2_series = []
    for i in range(len(wt1_series)):
        if i < sma_period - 1:
            wt2_series.append(wt1_series[i])
        else:
            avg = sum(wt1_series[i - sma_period + 1:i + 1]) / sma_period
            wt2_series.append(avg)

    # Current values
    wt1 = round(wt1_series[-1], 2)
    wt2 = round(wt2_series[-1], 2)

    # Previous values (for cross detection)
    wt1_prev = wt1_series[-2]
    wt2_prev = wt2_series[-2]

    # Levels from LazyBear's original indicator
    ob1 = 60   # Overbought Level 1 (strong)
    ob2 = 53   # Overbought Level 2 (weak)
    os1 = -60  # Oversold Level 1 (strong)
    os2 = -53  # Oversold Level 2 (weak)

    # Zone detection (two-tier)
    if wt1 > ob1:
        zone = "overbought_strong"
    elif wt1 > ob2:
        zone = "overbought"
    elif wt1 < os1:
        zone = "oversold_strong"
    elif wt1 < os2:
        zone = "oversold"
    else:
        zone = "neutral"

    # Cross detection — THE key signal
    cross = None
    if wt1_prev <= wt2_prev and wt1 > wt2:
        cross = "bullish_cross"  # WT1 crosses above WT2 → BUY
    elif wt1_prev >= wt2_prev and wt1 < wt2:
        cross = "bearish_cross"  # WT1 crosses below WT2 → SELL

    # Signal classification based on cross + zone
    # Best signals: cross IN the zone (confirmed reversal)
    if cross == "bullish_cross" and wt1 < os1:
        signal = "strong_buy"       # Cross up in deep oversold → best buy
    elif cross == "bullish_cross" and wt1 < os2:
        signal = "buy"              # Cross up in oversold zone
    elif cross == "bearish_cross" and wt1 > ob1:
        signal = "strong_sell"      # Cross down in deep overbought → best sell
    elif cross == "bearish_cross" and wt1 > ob2:
        signal = "sell"             # Cross down in overbought zone
    elif cross == "bullish_cross":
        signal = "weak_buy"         # Cross up but not in zone
    elif cross == "bearish_cross":
        signal = "weak_sell"        # Cross down but not in zone
    elif "oversold" in zone:
        signal = "approaching_buy"  # In oversold, waiting for cross
    elif "overbought" in zone:
        signal = "approaching_sell" # In overbought, waiting for cross
    else:
        signal = "neutral"

    return {
        "wt1": wt1,
        "wt2": wt2,
        "signal": signal,
        "zone": zone,
        "cross": cross,
    }


def analyze_momentum(symbol: str, market_type: str = "spot") -> dict:
    """Quick momentum analysis on 15m and 1H + WaveTrend."""
    result = {
        "symbol": symbol,
        "market_type": market_type,
        "has_data": False,
        "trend_15m": "unknown",
        "trend_1h": "unknown",
        "volume_spike": False,
        "price": 0,
        "move_15m": 0,
        "move_1h": 0,
        "wavetrend": None,
    }

    # Pick the right klines function based on market type
    klines_fn = get_futures_klines if market_type == "futures" else get_klines

    # Get current price
    if market_type == "futures":
        try:
            ft = client.futures_symbol_ticker(symbol=symbol)
            result["price"] = float(ft["price"])
            result["has_data"] = True
        except Exception as e:
            logger.warning(f"Failed to get futures ticker for {symbol}: {e}")
            return result
    else:
        ticker = get_ticker_data(symbol)
        if not ticker:
            return result
        result["price"] = ticker["price"]
        result["has_data"] = True

    # 15m analysis
    klines_15m = klines_fn(symbol, "15m", 60)
    if len(klines_15m) >= 2:
        current = klines_15m[-1]
        prev = klines_15m[-2]

        # Recent move
        move = ((current["close"] - prev["close"]) / prev["close"]) * 100
        result["move_15m"] = round(move, 2)

        # Volume spike (current vs average of last 20)
        recent = klines_15m[-20:]
        avg_vol = sum(k["volume"] for k in recent[:-1]) / len(recent[:-1])
        if avg_vol > 0:
            vol_ratio = current["volume"] / avg_vol
            result["volume_spike"] = vol_ratio > 2.0
            result["volume_ratio"] = round(vol_ratio, 1)

        # Simple trend (last 5 candles)
        closes = [k["close"] for k in klines_15m[-5:]]
        if closes[-1] > closes[0]:
            result["trend_15m"] = "up"
        elif closes[-1] < closes[0]:
            result["trend_15m"] = "down"
        else:
            result["trend_15m"] = "flat"

        # WaveTrend on 15m
        if len(klines_15m) >= 45:
            result["wavetrend"] = calculate_wavetrend(klines_15m)

    # 1H analysis
    klines_1h = klines_fn(symbol, "1h", 60)
    if len(klines_1h) >= 2:
        current = klines_1h[-1]
        prev = klines_1h[-2]

        move = ((current["close"] - prev["close"]) / prev["close"]) * 100
        result["move_1h"] = round(move, 2)

        closes = [k["close"] for k in klines_1h[-5:]]
        if closes[-1] > closes[0]:
            result["trend_1h"] = "up"
        elif closes[-1] < closes[0]:
            result["trend_1h"] = "down"
        else:
            result["trend_1h"] = "flat"

        # WaveTrend on 1H (more reliable)
        if len(klines_1h) >= 45:
            result["wavetrend_1h"] = calculate_wavetrend(klines_1h)

    return result


def check_symbol_exists(symbol: str) -> bool:
    """Check if a trading pair exists on Binance spot."""
    try:
        client.get_ticker(symbol=symbol)
        return True
    except:
        return False


def check_futures_symbol_exists(symbol: str) -> bool:
    """Check if a trading pair exists on Binance Futures (USD-M)."""
    try:
        client.futures_symbol_ticker(symbol=symbol)
        return True
    except:
        return False


def get_futures_klines(symbol: str, interval: str = "15m", limit: int = 50) -> list:
    """Get futures kline/candlestick data."""
    try:
        klines = client.futures_klines(symbol=symbol, interval=interval, limit=limit)
        return [{
            "open_time": k[0],
            "open": float(k[1]),
            "high": float(k[2]),
            "low": float(k[3]),
            "close": float(k[4]),
            "volume": float(k[5]),
            "close_time": k[6],
        } for k in klines]
    except Exception as e:
        logger.warning(f"Failed to get futures klines for {symbol}: {e}")
        return []


def find_trading_pair(ticker: str) -> tuple[str, str] | None:
    """
    Find the correct trading pair for a ticker.
    Returns (symbol, market_type) or None.
    Futures only — spot disabled for now.
    """
    symbol = f"{ticker}USDT"

    # Futures only (USD-M)
    if check_futures_symbol_exists(symbol):
        return (symbol, "futures")

    # TODO: enable spot later for higher TFs
    # if check_symbol_exists(symbol):
    #     return (symbol, "spot")

    return None

📜 Git History

c6f6bd5chore: initial commit — version control setup5 weeks ago
Show last diff
Loading...