← Назад
""" Gerchik Levels — Level Detection Engine. Finds support/resistance levels from 1H candles using swing point clustering. Ranks levels by strength (touches, mirror, round number, recency). """ import logging import math from dataclasses import dataclass, field from typing import Optional from gerchik_config import ( GERCHIK_LEVEL_MIN_TOUCHES, GERCHIK_LEVEL_TOLERANCE_PCT, GERCHIK_LEVEL_CANDLES, GERCHIK_MAX_LEVELS, ) logger = logging.getLogger(__name__) @dataclass class Level: """A price level with metadata.""" price: float touches: int = 0 is_support: bool = False is_resistance: bool = False is_mirror: bool = False # Was support, now resistance (or vice versa) is_round: bool = False # Round number ($10, $100, $0.05, etc.) has_false_breakout: bool = False # Price poked through then returned last_touch_idx: int = 0 # Index of last touch (for recency) strength: float = 0.0 # Calculated strength score @property def level_type(self) -> str: if self.is_mirror: return "mirror" if self.is_support and self.is_resistance: return "mirror" if self.is_support: return "support" if self.is_resistance: return "resistance" return "unknown" def find_swing_points(highs: list[float], lows: list[float], left: int = 3, right: int = 3) -> tuple[list, list]: """ Find swing highs and swing lows. A swing high: high[i] is highest in range [i-left, i+right] A swing low: low[i] is lowest in range [i-left, i+right] Returns: (swing_highs, swing_lows) — lists of (index, price) """ n = len(highs) swing_highs = [] swing_lows = [] for i in range(left, n - right): # Check swing high is_high = True for j in range(i - left, i + right + 1): if j != i and highs[j] >= highs[i]: is_high = False break if is_high: swing_highs.append((i, highs[i])) # Check swing low is_low = True for j in range(i - left, i + right + 1): if j != i and lows[j] <= lows[i]: is_low = False break if is_low: swing_lows.append((i, lows[i])) return swing_highs, swing_lows def cluster_prices(points: list[tuple[int, float]], tolerance_pct: float) -> list[dict]: """ Cluster nearby price points into levels. Args: points: list of (index, price) tuples tolerance_pct: max % distance to merge into same cluster Returns: list of {price, touches, indices} """ if not points: return [] # Sort by price sorted_points = sorted(points, key=lambda p: p[1]) clusters = [] current_cluster = [sorted_points[0]] for i in range(1, len(sorted_points)): idx, price = sorted_points[i] cluster_avg = sum(p[1] for p in current_cluster) / len(current_cluster) if abs(price - cluster_avg) / cluster_avg * 100 <= tolerance_pct: current_cluster.append(sorted_points[i]) else: # Save current cluster if len(current_cluster) >= 1: avg_price = sum(p[1] for p in current_cluster) / len(current_cluster) clusters.append({ "price": avg_price, "touches": len(current_cluster), "indices": [p[0] for p in current_cluster], }) current_cluster = [sorted_points[i]] # Don't forget last cluster if current_cluster: avg_price = sum(p[1] for p in current_cluster) / len(current_cluster) clusters.append({ "price": avg_price, "touches": len(current_cluster), "indices": [p[0] for p in current_cluster], }) return clusters def is_round_number(price: float) -> bool: """Check if price is a 'round' number (psychological level).""" if price >= 100: return price % 10 < 0.5 or price % 10 > 9.5 elif price >= 10: return price % 1 < 0.05 or price % 1 > 0.95 elif price >= 1: return price % 0.1 < 0.005 or price % 0.1 > 0.095 elif price >= 0.01: return price % 0.01 < 0.0005 or price % 0.01 > 0.0095 else: return False def detect_false_breakouts( level_price: float, highs: list[float], lows: list[float], closes: list[float], tolerance_pct: float, ) -> bool: """Check if any candle poked through the level but closed back.""" tol = level_price * tolerance_pct / 100 for i in range(len(closes)): # Price went above level but closed below (false breakout of resistance) if highs[i] > level_price + tol and closes[i] < level_price: return True # Price went below level but closed above (false breakout of support) if lows[i] < level_price - tol and closes[i] > level_price: return True return False def calculate_level_strength(level: Level, total_candles: int) -> float: """ Calculate level strength score (0-100). Factors: - Touches (base score) - Mirror level (big bonus) - Round number (small bonus) - False breakout survived (medium bonus) - Recency (more recent = stronger) """ score = 0.0 # Touches: 3=30, 4=40, 5+=50 (capped) score += min(level.touches * 10, 50) # Mirror level: +20 if level.is_mirror: score += 20 # Round number: +10 if level.is_round: score += 10 # False breakout: +15 if level.has_false_breakout: score += 15 # Recency: 0-15 based on how recent the last touch was if total_candles > 0: recency = 1 - (level.last_touch_idx / total_candles) score += recency * 15 return min(score, 100) def find_levels( opens: list[float], highs: list[float], lows: list[float], closes: list[float], current_price: float, tolerance_pct: float = GERCHIK_LEVEL_TOLERANCE_PCT, min_touches: int = GERCHIK_LEVEL_MIN_TOUCHES, max_levels: int = GERCHIK_MAX_LEVELS, ) -> list[Level]: """ Main level detection function. Args: opens, highs, lows, closes: 1H OHLC candle data current_price: current mark price tolerance_pct: clustering tolerance % min_touches: minimum touches to qualify as level max_levels: max levels to return Returns: List of Level objects sorted by distance from current price """ n = len(closes) if n < 50: return [] # 1. Find swing points swing_highs, swing_lows = find_swing_points(highs, lows, left=3, right=3) # 2. Cluster swing highs and lows separately high_clusters = cluster_prices(swing_highs, tolerance_pct) low_clusters = cluster_prices(swing_lows, tolerance_pct) # 3. Merge all clusters and identify support/resistance/mirror all_clusters = {} for c in low_clusters: if c["touches"] >= min_touches: key = round(c["price"], 8) all_clusters[key] = { "price": c["price"], "touches": c["touches"], "indices": c["indices"], "is_support": True, "is_resistance": False, } for c in high_clusters: if c["touches"] >= min_touches: # Check if there's already a support level nearby matched = False for existing_key in list(all_clusters.keys()): existing = all_clusters[existing_key] if abs(c["price"] - existing["price"]) / existing["price"] * 100 <= tolerance_pct: # Merge — this is a mirror level! existing["is_resistance"] = True existing["touches"] += c["touches"] existing["indices"].extend(c["indices"]) existing["price"] = (existing["price"] + c["price"]) / 2 matched = True break if not matched: key = round(c["price"], 8) all_clusters[key] = { "price": c["price"], "touches": c["touches"], "indices": c["indices"], "is_support": False, "is_resistance": True, } # 4. Build Level objects with metadata levels = [] for data in all_clusters.values(): level = Level( price=data["price"], touches=data["touches"], is_support=data["is_support"], is_resistance=data["is_resistance"], is_mirror=data["is_support"] and data["is_resistance"], is_round=is_round_number(data["price"]), has_false_breakout=detect_false_breakouts( data["price"], highs, lows, closes, tolerance_pct ), last_touch_idx=max(data["indices"]) if data["indices"] else 0, ) level.strength = calculate_level_strength(level, n) levels.append(level) # 5. Sort by distance from current price (closest first) levels.sort(key=lambda l: abs(l.price - current_price)) # 6. Return top N return levels[:max_levels] def get_nearest_levels(levels: list[Level], current_price: float, count: int = 2) -> dict: """ Get nearest support and resistance relative to current price. Returns: { "support": [Level, ...], # Below current price "resistance": [Level, ...], # Above current price } """ supports = [l for l in levels if l.price < current_price] resistances = [l for l in levels if l.price > current_price] # Sort supports descending (closest first), resistances ascending supports.sort(key=lambda l: l.price, reverse=True) resistances.sort(key=lambda l: l.price) return { "support": supports[:count], "resistance": resistances[:count], }