← Back
"""
Gerchik Levels — Level Detection Engine.

Finds support/resistance levels from 1H candles using swing point clustering.
Ranks levels by strength (touches, mirror, round number, recency).
"""

import logging
import math
from dataclasses import dataclass, field
from typing import Optional

from gerchik_config import (
    GERCHIK_LEVEL_MIN_TOUCHES,
    GERCHIK_LEVEL_TOLERANCE_PCT,
    GERCHIK_LEVEL_CANDLES,
    GERCHIK_MAX_LEVELS,
)

logger = logging.getLogger(__name__)


@dataclass
class Level:
    """A price level with metadata."""
    price: float
    touches: int = 0
    is_support: bool = False
    is_resistance: bool = False
    is_mirror: bool = False         # Was support, now resistance (or vice versa)
    is_round: bool = False          # Round number ($10, $100, $0.05, etc.)
    has_false_breakout: bool = False  # Price poked through then returned
    last_touch_idx: int = 0         # Index of last touch (for recency)
    strength: float = 0.0          # Calculated strength score

    @property
    def level_type(self) -> str:
        if self.is_mirror:
            return "mirror"
        if self.is_support and self.is_resistance:
            return "mirror"
        if self.is_support:
            return "support"
        if self.is_resistance:
            return "resistance"
        return "unknown"


def find_swing_points(highs: list[float], lows: list[float], left: int = 3, right: int = 3) -> tuple[list, list]:
    """
    Find swing highs and swing lows.

    A swing high: high[i] is highest in range [i-left, i+right]
    A swing low: low[i] is lowest in range [i-left, i+right]

    Returns: (swing_highs, swing_lows) — lists of (index, price)
    """
    n = len(highs)
    swing_highs = []
    swing_lows = []

    for i in range(left, n - right):
        # Check swing high
        is_high = True
        for j in range(i - left, i + right + 1):
            if j != i and highs[j] >= highs[i]:
                is_high = False
                break
        if is_high:
            swing_highs.append((i, highs[i]))

        # Check swing low
        is_low = True
        for j in range(i - left, i + right + 1):
            if j != i and lows[j] <= lows[i]:
                is_low = False
                break
        if is_low:
            swing_lows.append((i, lows[i]))

    return swing_highs, swing_lows


def cluster_prices(points: list[tuple[int, float]], tolerance_pct: float) -> list[dict]:
    """
    Cluster nearby price points into levels.

    Args:
        points: list of (index, price) tuples
        tolerance_pct: max % distance to merge into same cluster

    Returns:
        list of {price, touches, indices}
    """
    if not points:
        return []

    # Sort by price
    sorted_points = sorted(points, key=lambda p: p[1])
    clusters = []
    current_cluster = [sorted_points[0]]

    for i in range(1, len(sorted_points)):
        idx, price = sorted_points[i]
        cluster_avg = sum(p[1] for p in current_cluster) / len(current_cluster)

        if abs(price - cluster_avg) / cluster_avg * 100 <= tolerance_pct:
            current_cluster.append(sorted_points[i])
        else:
            # Save current cluster
            if len(current_cluster) >= 1:
                avg_price = sum(p[1] for p in current_cluster) / len(current_cluster)
                clusters.append({
                    "price": avg_price,
                    "touches": len(current_cluster),
                    "indices": [p[0] for p in current_cluster],
                })
            current_cluster = [sorted_points[i]]

    # Don't forget last cluster
    if current_cluster:
        avg_price = sum(p[1] for p in current_cluster) / len(current_cluster)
        clusters.append({
            "price": avg_price,
            "touches": len(current_cluster),
            "indices": [p[0] for p in current_cluster],
        })

    return clusters


def is_round_number(price: float) -> bool:
    """Check if price is a 'round' number (psychological level)."""
    if price >= 100:
        return price % 10 < 0.5 or price % 10 > 9.5
    elif price >= 10:
        return price % 1 < 0.05 or price % 1 > 0.95
    elif price >= 1:
        return price % 0.1 < 0.005 or price % 0.1 > 0.095
    elif price >= 0.01:
        return price % 0.01 < 0.0005 or price % 0.01 > 0.0095
    else:
        return False


def detect_false_breakouts(
    level_price: float,
    highs: list[float],
    lows: list[float],
    closes: list[float],
    tolerance_pct: float,
) -> bool:
    """Check if any candle poked through the level but closed back."""
    tol = level_price * tolerance_pct / 100

    for i in range(len(closes)):
        # Price went above level but closed below (false breakout of resistance)
        if highs[i] > level_price + tol and closes[i] < level_price:
            return True
        # Price went below level but closed above (false breakout of support)
        if lows[i] < level_price - tol and closes[i] > level_price:
            return True

    return False


def calculate_level_strength(level: Level, total_candles: int) -> float:
    """
    Calculate level strength score (0-100).

    Factors:
    - Touches (base score)
    - Mirror level (big bonus)
    - Round number (small bonus)
    - False breakout survived (medium bonus)
    - Recency (more recent = stronger)
    """
    score = 0.0

    # Touches: 3=30, 4=40, 5+=50 (capped)
    score += min(level.touches * 10, 50)

    # Mirror level: +20
    if level.is_mirror:
        score += 20

    # Round number: +10
    if level.is_round:
        score += 10

    # False breakout: +15
    if level.has_false_breakout:
        score += 15

    # Recency: 0-15 based on how recent the last touch was
    if total_candles > 0:
        recency = 1 - (level.last_touch_idx / total_candles)
        score += recency * 15

    return min(score, 100)


def find_levels(
    opens: list[float],
    highs: list[float],
    lows: list[float],
    closes: list[float],
    current_price: float,
    tolerance_pct: float = GERCHIK_LEVEL_TOLERANCE_PCT,
    min_touches: int = GERCHIK_LEVEL_MIN_TOUCHES,
    max_levels: int = GERCHIK_MAX_LEVELS,
) -> list[Level]:
    """
    Main level detection function.

    Args:
        opens, highs, lows, closes: 1H OHLC candle data
        current_price: current mark price
        tolerance_pct: clustering tolerance %
        min_touches: minimum touches to qualify as level
        max_levels: max levels to return

    Returns:
        List of Level objects sorted by distance from current price
    """
    n = len(closes)
    if n < 50:
        return []

    # 1. Find swing points
    swing_highs, swing_lows = find_swing_points(highs, lows, left=3, right=3)

    # 2. Cluster swing highs and lows separately
    high_clusters = cluster_prices(swing_highs, tolerance_pct)
    low_clusters = cluster_prices(swing_lows, tolerance_pct)

    # 3. Merge all clusters and identify support/resistance/mirror
    all_clusters = {}

    for c in low_clusters:
        if c["touches"] >= min_touches:
            key = round(c["price"], 8)
            all_clusters[key] = {
                "price": c["price"],
                "touches": c["touches"],
                "indices": c["indices"],
                "is_support": True,
                "is_resistance": False,
            }

    for c in high_clusters:
        if c["touches"] >= min_touches:
            # Check if there's already a support level nearby
            matched = False
            for existing_key in list(all_clusters.keys()):
                existing = all_clusters[existing_key]
                if abs(c["price"] - existing["price"]) / existing["price"] * 100 <= tolerance_pct:
                    # Merge — this is a mirror level!
                    existing["is_resistance"] = True
                    existing["touches"] += c["touches"]
                    existing["indices"].extend(c["indices"])
                    existing["price"] = (existing["price"] + c["price"]) / 2
                    matched = True
                    break

            if not matched:
                key = round(c["price"], 8)
                all_clusters[key] = {
                    "price": c["price"],
                    "touches": c["touches"],
                    "indices": c["indices"],
                    "is_support": False,
                    "is_resistance": True,
                }

    # 4. Build Level objects with metadata
    levels = []
    for data in all_clusters.values():
        level = Level(
            price=data["price"],
            touches=data["touches"],
            is_support=data["is_support"],
            is_resistance=data["is_resistance"],
            is_mirror=data["is_support"] and data["is_resistance"],
            is_round=is_round_number(data["price"]),
            has_false_breakout=detect_false_breakouts(
                data["price"], highs, lows, closes, tolerance_pct
            ),
            last_touch_idx=max(data["indices"]) if data["indices"] else 0,
        )
        level.strength = calculate_level_strength(level, n)
        levels.append(level)

    # 5. Sort by distance from current price (closest first)
    levels.sort(key=lambda l: abs(l.price - current_price))

    # 6. Return top N
    return levels[:max_levels]


def get_nearest_levels(levels: list[Level], current_price: float, count: int = 2) -> dict:
    """
    Get nearest support and resistance relative to current price.

    Returns:
        {
            "support": [Level, ...],   # Below current price
            "resistance": [Level, ...], # Above current price
        }
    """
    supports = [l for l in levels if l.price < current_price]
    resistances = [l for l in levels if l.price > current_price]

    # Sort supports descending (closest first), resistances ascending
    supports.sort(key=lambda l: l.price, reverse=True)
    resistances.sort(key=lambda l: l.price)

    return {
        "support": supports[:count],
        "resistance": resistances[:count],
    }

📜 Git History

c6f6bd5chore: initial commit — version control setup5 weeks ago
Show last diff
Loading...