← Назад"""
Gerchik Levels — Level Detection Engine.
Finds support/resistance levels from 1H candles using swing point clustering.
Ranks levels by strength (touches, mirror, round number, recency).
"""
import logging
import math
from dataclasses import dataclass, field
from typing import Optional
from gerchik_config import (
GERCHIK_LEVEL_MIN_TOUCHES,
GERCHIK_LEVEL_TOLERANCE_PCT,
GERCHIK_LEVEL_CANDLES,
GERCHIK_MAX_LEVELS,
)
logger = logging.getLogger(__name__)
@dataclass
class Level:
"""A price level with metadata."""
price: float
touches: int = 0
is_support: bool = False
is_resistance: bool = False
is_mirror: bool = False # Was support, now resistance (or vice versa)
is_round: bool = False # Round number ($10, $100, $0.05, etc.)
has_false_breakout: bool = False # Price poked through then returned
last_touch_idx: int = 0 # Index of last touch (for recency)
strength: float = 0.0 # Calculated strength score
@property
def level_type(self) -> str:
if self.is_mirror:
return "mirror"
if self.is_support and self.is_resistance:
return "mirror"
if self.is_support:
return "support"
if self.is_resistance:
return "resistance"
return "unknown"
def find_swing_points(highs: list[float], lows: list[float], left: int = 3, right: int = 3) -> tuple[list, list]:
"""
Find swing highs and swing lows.
A swing high: high[i] is highest in range [i-left, i+right]
A swing low: low[i] is lowest in range [i-left, i+right]
Returns: (swing_highs, swing_lows) — lists of (index, price)
"""
n = len(highs)
swing_highs = []
swing_lows = []
for i in range(left, n - right):
# Check swing high
is_high = True
for j in range(i - left, i + right + 1):
if j != i and highs[j] >= highs[i]:
is_high = False
break
if is_high:
swing_highs.append((i, highs[i]))
# Check swing low
is_low = True
for j in range(i - left, i + right + 1):
if j != i and lows[j] <= lows[i]:
is_low = False
break
if is_low:
swing_lows.append((i, lows[i]))
return swing_highs, swing_lows
def cluster_prices(points: list[tuple[int, float]], tolerance_pct: float) -> list[dict]:
"""
Cluster nearby price points into levels.
Args:
points: list of (index, price) tuples
tolerance_pct: max % distance to merge into same cluster
Returns:
list of {price, touches, indices}
"""
if not points:
return []
# Sort by price
sorted_points = sorted(points, key=lambda p: p[1])
clusters = []
current_cluster = [sorted_points[0]]
for i in range(1, len(sorted_points)):
idx, price = sorted_points[i]
cluster_avg = sum(p[1] for p in current_cluster) / len(current_cluster)
if abs(price - cluster_avg) / cluster_avg * 100 <= tolerance_pct:
current_cluster.append(sorted_points[i])
else:
# Save current cluster
if len(current_cluster) >= 1:
avg_price = sum(p[1] for p in current_cluster) / len(current_cluster)
clusters.append({
"price": avg_price,
"touches": len(current_cluster),
"indices": [p[0] for p in current_cluster],
})
current_cluster = [sorted_points[i]]
# Don't forget last cluster
if current_cluster:
avg_price = sum(p[1] for p in current_cluster) / len(current_cluster)
clusters.append({
"price": avg_price,
"touches": len(current_cluster),
"indices": [p[0] for p in current_cluster],
})
return clusters
def is_round_number(price: float) -> bool:
"""Check if price is a 'round' number (psychological level)."""
if price >= 100:
return price % 10 < 0.5 or price % 10 > 9.5
elif price >= 10:
return price % 1 < 0.05 or price % 1 > 0.95
elif price >= 1:
return price % 0.1 < 0.005 or price % 0.1 > 0.095
elif price >= 0.01:
return price % 0.01 < 0.0005 or price % 0.01 > 0.0095
else:
return False
def detect_false_breakouts(
level_price: float,
highs: list[float],
lows: list[float],
closes: list[float],
tolerance_pct: float,
) -> bool:
"""Check if any candle poked through the level but closed back."""
tol = level_price * tolerance_pct / 100
for i in range(len(closes)):
# Price went above level but closed below (false breakout of resistance)
if highs[i] > level_price + tol and closes[i] < level_price:
return True
# Price went below level but closed above (false breakout of support)
if lows[i] < level_price - tol and closes[i] > level_price:
return True
return False
def calculate_level_strength(level: Level, total_candles: int) -> float:
"""
Calculate level strength score (0-100).
Factors:
- Touches (base score)
- Mirror level (big bonus)
- Round number (small bonus)
- False breakout survived (medium bonus)
- Recency (more recent = stronger)
"""
score = 0.0
# Touches: 3=30, 4=40, 5+=50 (capped)
score += min(level.touches * 10, 50)
# Mirror level: +20
if level.is_mirror:
score += 20
# Round number: +10
if level.is_round:
score += 10
# False breakout: +15
if level.has_false_breakout:
score += 15
# Recency: 0-15 based on how recent the last touch was
if total_candles > 0:
recency = 1 - (level.last_touch_idx / total_candles)
score += recency * 15
return min(score, 100)
def find_levels(
opens: list[float],
highs: list[float],
lows: list[float],
closes: list[float],
current_price: float,
tolerance_pct: float = GERCHIK_LEVEL_TOLERANCE_PCT,
min_touches: int = GERCHIK_LEVEL_MIN_TOUCHES,
max_levels: int = GERCHIK_MAX_LEVELS,
) -> list[Level]:
"""
Main level detection function.
Args:
opens, highs, lows, closes: 1H OHLC candle data
current_price: current mark price
tolerance_pct: clustering tolerance %
min_touches: minimum touches to qualify as level
max_levels: max levels to return
Returns:
List of Level objects sorted by distance from current price
"""
n = len(closes)
if n < 50:
return []
# 1. Find swing points
swing_highs, swing_lows = find_swing_points(highs, lows, left=3, right=3)
# 2. Cluster swing highs and lows separately
high_clusters = cluster_prices(swing_highs, tolerance_pct)
low_clusters = cluster_prices(swing_lows, tolerance_pct)
# 3. Merge all clusters and identify support/resistance/mirror
all_clusters = {}
for c in low_clusters:
if c["touches"] >= min_touches:
key = round(c["price"], 8)
all_clusters[key] = {
"price": c["price"],
"touches": c["touches"],
"indices": c["indices"],
"is_support": True,
"is_resistance": False,
}
for c in high_clusters:
if c["touches"] >= min_touches:
# Check if there's already a support level nearby
matched = False
for existing_key in list(all_clusters.keys()):
existing = all_clusters[existing_key]
if abs(c["price"] - existing["price"]) / existing["price"] * 100 <= tolerance_pct:
# Merge — this is a mirror level!
existing["is_resistance"] = True
existing["touches"] += c["touches"]
existing["indices"].extend(c["indices"])
existing["price"] = (existing["price"] + c["price"]) / 2
matched = True
break
if not matched:
key = round(c["price"], 8)
all_clusters[key] = {
"price": c["price"],
"touches": c["touches"],
"indices": c["indices"],
"is_support": False,
"is_resistance": True,
}
# 4. Build Level objects with metadata
levels = []
for data in all_clusters.values():
level = Level(
price=data["price"],
touches=data["touches"],
is_support=data["is_support"],
is_resistance=data["is_resistance"],
is_mirror=data["is_support"] and data["is_resistance"],
is_round=is_round_number(data["price"]),
has_false_breakout=detect_false_breakouts(
data["price"], highs, lows, closes, tolerance_pct
),
last_touch_idx=max(data["indices"]) if data["indices"] else 0,
)
level.strength = calculate_level_strength(level, n)
levels.append(level)
# 5. Sort by distance from current price (closest first)
levels.sort(key=lambda l: abs(l.price - current_price))
# 6. Return top N
return levels[:max_levels]
def get_nearest_levels(levels: list[Level], current_price: float, count: int = 2) -> dict:
"""
Get nearest support and resistance relative to current price.
Returns:
{
"support": [Level, ...], # Below current price
"resistance": [Level, ...], # Above current price
}
"""
supports = [l for l in levels if l.price < current_price]
resistances = [l for l in levels if l.price > current_price]
# Sort supports descending (closest first), resistances ascending
supports.sort(key=lambda l: l.price, reverse=True)
resistances.sort(key=lambda l: l.price)
return {
"support": supports[:count],
"resistance": resistances[:count],
}