← Back
"""
Quick Take Scalper — Mean Reversion Scanner

Scans top futures pairs every 60s on 5m timeframe.
Entry: ALL 4 indicators must align (BB + RSI + Volume + EMA trend).
Exit: +1% TP, -0.75% SL, or 30min time stop.

Runs as a separate async loop alongside the main signal listener.
"""

import asyncio
import logging
import os
import time
import math
from datetime import datetime, timezone, timedelta
from binance.client import Client

logger = logging.getLogger(__name__)

VANCOUVER_TZ = timezone(timedelta(hours=-7))

# Public client for market data
_client = Client()


# ═══════════════════════════════════════════
#  INDICATOR CALCULATIONS
# ═══════════════════════════════════════════

def calc_ema(values: list[float], period: int) -> list[float]:
    """Exponential Moving Average."""
    if not values or period <= 0:
        return []
    result = [values[0]]
    m = 2.0 / (period + 1)
    for i in range(1, len(values)):
        result.append((values[i] - result[-1]) * m + result[-1])
    return result


def calc_sma(values: list[float], period: int) -> float | None:
    """Simple Moving Average of last N values."""
    if len(values) < period:
        return None
    return sum(values[-period:]) / period


def calc_rsi(closes: list[float], period: int = 7) -> float | None:
    """
    RSI calculation (Wilder smoothing).
    Returns RSI value 0-100 or None if not enough data.
    """
    if len(closes) < period + 1:
        return None

    gains = []
    losses = []
    for i in range(1, len(closes)):
        diff = closes[i] - closes[i - 1]
        gains.append(max(diff, 0))
        losses.append(max(-diff, 0))

    if len(gains) < period:
        return None

    # Initial average
    avg_gain = sum(gains[:period]) / period
    avg_loss = sum(losses[:period]) / period

    # Wilder smoothing for remaining
    for i in range(period, len(gains)):
        avg_gain = (avg_gain * (period - 1) + gains[i]) / period
        avg_loss = (avg_loss * (period - 1) + losses[i]) / period

    if avg_loss == 0:
        return 100.0

    rs = avg_gain / avg_loss
    return 100.0 - (100.0 / (1.0 + rs))


def calc_bollinger_bands(closes: list[float], period: int = 20, std_mult: float = 2.0) -> dict | None:
    """
    Bollinger Bands: middle = SMA(period), upper/lower = middle ± std_mult * StdDev.
    Returns dict with upper, middle, lower, bandwidth_pct.
    """
    if len(closes) < period:
        return None

    window = closes[-period:]
    middle = sum(window) / period
    variance = sum((c - middle) ** 2 for c in window) / period
    std_dev = math.sqrt(variance)

    upper = middle + std_mult * std_dev
    lower = middle - std_mult * std_dev

    # Bandwidth as % of middle
    bw_pct = ((upper - lower) / middle) * 100 if middle > 0 else 0

    return {
        "upper": upper,
        "middle": middle,
        "lower": lower,
        "std_dev": std_dev,
        "bandwidth_pct": bw_pct,
    }


def calc_volume_ratio(volumes: list[float], lookback: int = 20) -> float:
    """Current candle volume vs average of previous N candles."""
    if len(volumes) < lookback + 1:
        return 0
    avg = sum(volumes[-(lookback + 1):-1]) / lookback
    if avg <= 0:
        return 0
    return volumes[-1] / avg


# ═══════════════════════════════════════════
#  MARKET SCANNER
# ═══════════════════════════════════════════

def get_top_futures_by_volume(limit: int = 50) -> list[str]:
    """
    Get top N USD-M futures pairs by 24h quote volume.
    Filters out non-USDT pairs and known problematic symbols.
    """
    try:
        tickers = _client.futures_ticker()

        # Filter USDT pairs only, exclude special ones
        exclude = {"USDCUSDT", "BTCDOMUSDT", "DEFIUSDT", "BTCSTUSDT"}
        usdt_pairs = [
            t for t in tickers
            if t["symbol"].endswith("USDT")
            and t["symbol"] not in exclude
            and float(t["quoteVolume"]) > 0
        ]

        # Sort by quote volume descending
        usdt_pairs.sort(key=lambda t: float(t["quoteVolume"]), reverse=True)

        # Skip top 5 (BTC, ETH — too low vol% for 1% scalp)
        # Actually, let's skip by name
        skip_symbols = {
            "BTCUSDT", "ETHUSDT", "BNBUSDT", "XRPUSDT", "SOLUSDT",
            "ADAUSDT", "DOTUSDT", "AVAXUSDT", "DOGEUSDT", "LTCUSDT",
            "TRXUSDT", "XAUTUSDT", "XAUUSDT",
            "1000PEPEUSDT", "PEPEUSDT",
        }

        result = []
        for t in usdt_pairs:
            if t["symbol"] not in skip_symbols:
                result.append(t["symbol"])
            if len(result) >= limit:
                break

        return result

    except Exception as e:
        logger.error(f"Failed to get top futures: {e}")
        return []


def fetch_5m_klines(symbol: str, limit: int = 120) -> list[dict]:
    """Fetch 5m klines for a symbol."""
    try:
        klines = _client.futures_klines(symbol=symbol, interval="5m", limit=limit)
        return [{
            "open": float(k[1]),
            "high": float(k[2]),
            "low": float(k[3]),
            "close": float(k[4]),
            "volume": float(k[5]),
            "close_time": k[6],
        } for k in klines]
    except Exception as e:
        logger.warning(f"Failed to get 5m klines for {symbol}: {e}")
        return []


def analyze_for_scalp(symbol: str, klines: list[dict]) -> dict | None:
    """
    Analyze a symbol for Quick Take entry signal.

    Uses the LAST CLOSED candle ([-2]) for volume and indicators,
    since the current candle ([-1]) is still forming.

    Returns signal dict if ALL 4 conditions are met, else None.
    """
    if len(klines) < 105:  # Need 100+ for EMA(100)
        return None

    # Use up to last closed candle for indicators (drop incomplete current)
    completed = klines[:-1]
    closes = [k["close"] for k in completed]
    volumes = [k["volume"] for k in completed]
    current_price = closes[-1]  # Last closed candle price

    # 1. RSI(7)
    rsi = calc_rsi(closes, period=7)
    if rsi is None:
        return None

    # 2. Bollinger Bands(20, 2)
    bb = calc_bollinger_bands(closes, period=20, std_mult=2.0)
    if bb is None:
        return None

    # 3. Volume ratio (last closed candle vs avg of previous 20)
    vol_ratio = calc_volume_ratio(volumes, lookback=20)

    # 4. EMA(100) — trend filter
    ema100 = calc_ema(closes, 100)
    if not ema100:
        return None
    ema100_val = ema100[-1]

    # Minimum BB bandwidth — skip low-volatility coins (1% TP unreachable)
    MIN_BB_BW_PCT = float(os.environ.get("SCALP_MIN_BB_BW_PCT", "3.0"))
    if bb["bandwidth_pct"] < MIN_BB_BW_PCT:
        return None

    # ═══ CHECK LONG CONDITIONS ═══
    long_signal = (
        rsi < 25                              # RSI oversold
        and current_price <= bb["lower"]      # At or below lower BB
        and vol_ratio >= 1.5                   # Volume spike
        and current_price > ema100_val         # Above EMA100 (uptrend)
    )

    # ═══ CHECK SHORT CONDITIONS ═══
    short_signal = (
        rsi > 75                              # RSI overbought
        and current_price >= bb["upper"]      # At or above upper BB
        and vol_ratio >= 1.5                   # Volume spike
        and current_price < ema100_val         # Below EMA100 (downtrend)
    )

    if not long_signal and not short_signal:
        return None

    side = "BUY" if long_signal else "SELL"

    return {
        "symbol": symbol,
        "side": side,
        "price": current_price,
        "rsi": round(rsi, 1),
        "bb_upper": bb["upper"],
        "bb_lower": bb["lower"],
        "bb_middle": bb["middle"],
        "bb_bandwidth_pct": round(bb["bandwidth_pct"], 2),
        "volume_ratio": round(vol_ratio, 1),
        "ema100": round(ema100_val, 6),
        "timestamp": datetime.now(VANCOUVER_TZ).isoformat(),
    }


# ═══════════════════════════════════════════
#  SCAN LOOP (called from bot.py)
# ═══════════════════════════════════════════

async def scan_market(on_signal_fn, skip_symbols: set | None = None):
    """
    Scan top futures for Quick Take entries.

    Args:
        on_signal_fn: async callback(signal_dict) called when entry found
        skip_symbols: set of symbols to skip (already have position etc)
    """
    skip = skip_symbols or set()

    # Get top pairs (cached refresh every 5 min handled by caller)
    symbols = get_top_futures_by_volume(limit=40)

    if not symbols:
        logger.warning("No symbols to scan")
        return

    found = 0
    scanned = 0

    for symbol in symbols:
        if symbol in skip:
            continue

        # Fetch 5m klines
        klines = fetch_5m_klines(symbol, limit=120)
        if not klines:
            continue

        scanned += 1

        # Analyze
        signal = analyze_for_scalp(symbol, klines)
        if signal:
            found += 1
            logger.info(
                f"SCALP SIGNAL: {signal['side']} {symbol} | "
                f"RSI={signal['rsi']} BB={signal['bb_bandwidth_pct']}% "
                f"Vol={signal['volume_ratio']}x"
            )
            await on_signal_fn(signal)

        # Small delay to avoid API rate limits (1200 req/min)
        await asyncio.sleep(0.1)

    logger.info(f"Scalp scan: {scanned} checked, {found} signals found")

📜 Git History

c6f6bd5chore: initial commit — version control setup5 weeks ago
Show last diff
Loading...