← Назад"""
Zatochki (Knife Catcher) — Trade Manager
==========================================
Signal → immediate entry (no watchlist — knife catching is time-sensitive).
TP1 partial (50% at 0.7%) + trailing (0.5% callback) on rest.
Dynamic SL from spike wick, capped at 1.2%.
TMM description: vol_spike, vwap_dist, rsi, sl_pct, natr_5m, vol_24h, oi_change
"""
import json
import time
import logging
import tempfile
import os
from datetime import datetime, timezone
from src.zatochki_config import (
TRADE_SIZE_USD, LEVERAGE, MAX_POSITIONS,
TP1_PCT, TP1_CLOSE_RATIO, TRAIL_CALLBACK_PCT,
SL_CAP_PCT, MAX_BARS_IN_TRADE, TAKER_FEE,
Z_POSITIONS_FILE, Z_TRADE_LOG_FILE,
TMM_TAG, TELEGRAM_PREFIX, STRATEGY_NAME,
)
logger = logging.getLogger("zatochki.manager")
class ZatochkiPosition:
"""One open Zatochki position."""
def __init__(self, symbol, side, entry_price, qty, sl_price,
symbol_info, signal_data, opened_at=None):
self.symbol = symbol
self.side = side # "LONG" or "SHORT"
self.entry_price = entry_price
self.qty = qty
self.sl_price = sl_price
self.symbol_info = symbol_info
self.signal_data = signal_data # full signal dict for TMM logging
self.opened_at = opened_at or datetime.now(timezone.utc).isoformat()
self.sl_order_placed = False
self.tp_order_placed = False
# TP1 partial + trailing
self.original_qty = qty
self.tp1_hit = False
self.trail_high = 0.0
self.entry_bar = int(time.time()) # for timeout check
def to_dict(self):
return {
"symbol": self.symbol,
"side": self.side,
"entry_price": self.entry_price,
"qty": self.qty,
"original_qty": self.original_qty,
"sl_price": self.sl_price,
"signal_data": self.signal_data,
"opened_at": self.opened_at,
"sl_order_placed": self.sl_order_placed,
"tp_order_placed": self.tp_order_placed,
"tp1_hit": self.tp1_hit,
"trail_high": self.trail_high,
"entry_bar": self.entry_bar,
}
@classmethod
def from_dict(cls, d, exchange=None):
pos = cls(
symbol=d["symbol"], side=d["side"],
entry_price=d["entry_price"], qty=d["qty"],
sl_price=d["sl_price"], symbol_info=None,
signal_data=d.get("signal_data", {}),
opened_at=d.get("opened_at"),
)
pos.original_qty = d.get("original_qty", d["qty"])
pos.sl_order_placed = d.get("sl_order_placed", False)
pos.tp_order_placed = d.get("tp_order_placed", False)
pos.tp1_hit = d.get("tp1_hit", False)
pos.trail_high = d.get("trail_high", 0)
pos.entry_bar = d.get("entry_bar", int(time.time()))
if exchange:
try:
pos.symbol_info = exchange.get_symbol_info(pos.symbol)
except Exception:
pass
return pos
class ZatochkiManager:
"""Trade manager for Zatochki strategy."""
def __init__(self, exchange, screener, notifier=None, tmm=None):
self.exchange = exchange
self.screener = screener
self.notifier = notifier
self.tmm = tmm
self.positions = {} # symbol → ZatochkiPosition
self._load_positions()
# Wire up screener callback
self.screener.get_open_positions = lambda: self.positions
# ============================================================
# PROCESS SIGNALS → OPEN POSITIONS
# ============================================================
def process_signals(self, signals):
"""Process signals from screener — immediate entry."""
if not signals:
return
if len(self.positions) >= MAX_POSITIONS:
return
for sig in signals:
symbol = sig["symbol"]
if symbol in self.positions:
continue
if len(self.positions) >= MAX_POSITIONS:
break
try:
self._open_position(sig)
time.sleep(0.2)
except Exception as e:
logger.error(f"Open failed {symbol}: {e}")
def _open_position(self, signal):
"""Open position based on signal."""
symbol = signal["symbol"]
side = signal["direction"]
# Safety check: no existing position on exchange
exchange_positions = self.exchange.get_positions()
for ep in exchange_positions:
if ep["symbol"] == symbol and float(ep.get("positionAmt", 0)) != 0:
logger.warning(f"Already have position on {symbol}, skipping")
return
sym_info = self._get_symbol_info(symbol)
if not sym_info:
return
actual_leverage = self.exchange.set_leverage(symbol) or LEVERAGE
self.exchange.set_margin_type(symbol)
# Calculate qty
mark_price = self.exchange.get_mark_price(symbol)
target_notional = TRADE_SIZE_USD * LEVERAGE
qty = target_notional / mark_price
qty = self.exchange.round_qty(sym_info, qty)
if sym_info.get("min_qty") and qty < sym_info["min_qty"]:
logger.warning(f"Qty {qty} below min for {symbol}")
return
# Market order
order_side = "BUY" if side == "LONG" else "SELL"
order, fill_price = self.exchange.open_market(symbol, order_side, qty)
if fill_price == 0:
logger.error(f"Fill price 0 for {symbol}!")
try:
close_side = "SELL" if order_side == "BUY" else "BUY"
self.exchange.close_position(symbol, close_side, qty)
except Exception:
pass
return
# SL from signal (dynamic + capped)
sl_price = signal["sl_price"]
# Recalculate SL based on fill price (not signal entry_price)
if side == "LONG":
sl_pct = (fill_price - sl_price) / fill_price
if sl_pct > SL_CAP_PCT:
sl_price = fill_price * (1 - SL_CAP_PCT)
tp1_price = fill_price * (1 + TP1_PCT)
else:
sl_pct = (sl_price - fill_price) / fill_price
if sl_pct > SL_CAP_PCT:
sl_price = fill_price * (1 + SL_CAP_PCT)
tp1_price = fill_price * (1 - TP1_PCT)
pos = ZatochkiPosition(
symbol=symbol, side=side,
entry_price=fill_price, qty=qty,
sl_price=sl_price, symbol_info=sym_info,
signal_data=signal,
)
# TP1 qty = 50%
tp1_qty = self.exchange.round_qty(sym_info, qty * TP1_CLOSE_RATIO)
# Place SL (full) + TP1 (partial)
self._place_sl_tp(pos, tp1_price, tp1_qty)
self.positions[symbol] = pos
self._save_positions()
# Telegram notification
sl_pct_actual = abs(fill_price - sl_price) / fill_price * 100
msg = (
f"{TELEGRAM_PREFIX} *{side} {symbol}*\n"
f"Entry: {fill_price}\n"
f"SL: {sl_price:.6f} (\\-{sl_pct_actual:.1f}%)\n"
f"TP1: {tp1_price:.6f} (\\+{TP1_PCT*100:.1f}%) \\→ 50% close\n"
f"Trail: {TRAIL_CALLBACK_PCT*100}% callback\n"
f"Vol spike: {signal['vol_spike_ratio']}x | "
f"VWAP: {signal['vwap_dist_pct']}%\n"
f"RSI: {signal['rsi']} | "
f"OI: {signal.get('oi_change_pct', 'N/A')}%\n"
f"Qty: {qty} (${target_notional:.0f}, {actual_leverage}x)"
)
logger.info(f"OPEN {side} {symbol} vol={signal['vol_spike_ratio']}x rsi={signal['rsi']}")
self._notify(msg)
# TMM: tag trade
if self.tmm:
try:
self._tmm_tag_trade(symbol, side, signal)
except Exception as te:
logger.warning(f"TMM tag error {symbol}: {te}")
def _place_sl_tp(self, pos, tp1_price, tp1_qty):
"""Place SL and TP1 orders on exchange."""
sym_info = pos.symbol_info or self._get_symbol_info(pos.symbol)
close_side = "SELL" if pos.side == "LONG" else "BUY"
# SL on full qty
try:
self.exchange.place_sl(pos.symbol, close_side, pos.qty, pos.sl_price, sym_info)
pos.sl_order_placed = True
except Exception as e:
logger.error(f"SL failed {pos.symbol}: {e}")
# TP1 on partial qty
if not pos.tp1_hit:
try:
self.exchange.place_tp(pos.symbol, close_side, tp1_qty, tp1_price, sym_info)
pos.tp_order_placed = True
except Exception as e:
logger.error(f"TP1 failed {pos.symbol}: {e}")
# ============================================================
# MONITOR POSITIONS
# ============================================================
def check_positions(self):
"""
Called every 5 sec:
1. Position closed? → log
2. TP1 partial fill → SL→BE, start trailing
3. Trailing: track peak, callback → close rest
4. Timeout
"""
if not self.positions:
return
exchange_positions = self.exchange.get_positions()
exchange_map = {p["symbol"]: p for p in exchange_positions}
to_remove = []
for symbol, pos in list(self.positions.items()):
try:
ep = exchange_map.get(symbol)
current_qty = abs(float(ep.get("positionAmt", 0))) if ep else 0
# === 1. Position fully closed ===
if current_qty == 0:
result = self._determine_close_result(pos)
self._log_trade(pos, result)
self.screener.set_cooldown(symbol)
to_remove.append(symbol)
continue
# === 2. TP1 detection (qty dropped ~50%) ===
if not pos.tp1_hit and current_qty < pos.original_qty * 0.8:
pos.tp1_hit = True
pos.qty = current_qty
# Move SL to BE
pos.sl_price = pos.entry_price
mark_price = self.exchange.get_mark_price(symbol)
pos.trail_high = mark_price
# Cancel old orders, place new SL at BE
self.exchange.cancel_all_orders(symbol)
sym_info = pos.symbol_info or self._get_symbol_info(symbol)
close_side = "SELL" if pos.side == "LONG" else "BUY"
try:
self.exchange.place_sl(symbol, close_side, pos.qty, pos.sl_price, sym_info)
pos.sl_order_placed = True
except Exception as e:
logger.error(f"BE SL failed {symbol}: {e}")
self._save_positions()
logger.info(f"TP1 hit {symbol} — SL→BE, trailing started")
self._notify(
f"\U0001f3af *TP1 {pos.side} {symbol}*\n"
f"50% closed at +{TP1_PCT*100:.1f}%\n"
f"SL \\→ BE | Trailing {TRAIL_CALLBACK_PCT*100}%"
)
continue
# === 3. Trailing stop (after TP1) ===
if pos.tp1_hit:
mark_price = self.exchange.get_mark_price(symbol)
if pos.side == "LONG":
if mark_price > pos.trail_high:
pos.trail_high = mark_price
trail_sl = pos.trail_high * (1 - TRAIL_CALLBACK_PCT)
if mark_price <= trail_sl:
self._close_trailing(pos, mark_price, "TRAIL")
to_remove.append(symbol)
continue
else:
if mark_price < pos.trail_high or pos.trail_high == 0:
pos.trail_high = mark_price
trail_sl = pos.trail_high * (1 + TRAIL_CALLBACK_PCT)
if mark_price >= trail_sl:
self._close_trailing(pos, mark_price, "TRAIL")
to_remove.append(symbol)
continue
# === 4. Timeout (MAX_BARS_IN_TRADE minutes) ===
elapsed_sec = time.time() - pos.entry_bar
if elapsed_sec > MAX_BARS_IN_TRADE * 60:
mark_price = self.exchange.get_mark_price(symbol)
if pos.tp1_hit:
self._close_trailing(pos, mark_price, "TIMEOUT")
else:
self._close_position_market(pos, "TIMEOUT", mark_price)
to_remove.append(symbol)
continue
except Exception as e:
logger.error(f"Check error {symbol}: {e}")
for symbol in to_remove:
self.positions.pop(symbol, None)
if to_remove:
self._save_positions()
# ============================================================
# CLOSE HELPERS
# ============================================================
def _close_trailing(self, pos, mark_price, reason="TRAIL"):
"""Close remaining position after TP1."""
try:
close_side = "SELL" if pos.side == "LONG" else "BUY"
self.exchange.cancel_all_orders(pos.symbol)
fill_price = self.exchange.close_position(pos.symbol, close_side, pos.qty)
if pos.side == "LONG":
trail_pnl_pct = (fill_price - pos.entry_price) / pos.entry_price * 100
else:
trail_pnl_pct = (pos.entry_price - fill_price) / pos.entry_price * 100
remaining_ratio = 1 - TP1_CLOSE_RATIO
trail_pnl_usd = TRADE_SIZE_USD * LEVERAGE * remaining_ratio * (trail_pnl_pct / 100)
tp1_pnl_usd = TRADE_SIZE_USD * LEVERAGE * TP1_CLOSE_RATIO * TP1_PCT
total_pnl_usd = tp1_pnl_usd + trail_pnl_usd
total_pnl_pct = TP1_PCT * 100 * TP1_CLOSE_RATIO + trail_pnl_pct * remaining_ratio
trade = self._build_trade_dict(pos, reason, total_pnl_pct, total_pnl_usd, fill_price)
trade["tp1_hit"] = True
self._append_trade_log(trade)
emoji = "\U0001f3c3" if total_pnl_usd > 0 else "\u26a0\ufe0f"
msg = (
f"{emoji} *{reason} {pos.side} {pos.symbol}*\n"
f"TP1: +{TP1_PCT*100:.1f}% (${tp1_pnl_usd:+.2f})\n"
f"Rest: {trail_pnl_pct:+.1f}% (${trail_pnl_usd:+.2f})\n"
f"*Total: ${total_pnl_usd:+.2f}*\n"
f"Peak: {pos.trail_high:.6f} \\→ Exit: {fill_price:.6f}"
)
logger.info(f"{reason} {pos.side} {pos.symbol} total=${total_pnl_usd:+.2f}")
self._notify(msg)
except Exception as e:
logger.error(f"Trail close failed {pos.symbol}: {e}")
self._log_trade(pos, f"{reason}_ERROR")
def _close_position_market(self, pos, reason, mark_price):
"""Close full position via market (timeout, etc.)."""
try:
close_side = "SELL" if pos.side == "LONG" else "BUY"
self.exchange.cancel_all_orders(pos.symbol)
fill_price = self.exchange.close_position(pos.symbol, close_side, pos.qty)
if pos.side == "LONG":
pnl_pct = (fill_price - pos.entry_price) / pos.entry_price * 100
else:
pnl_pct = (pos.entry_price - fill_price) / pos.entry_price * 100
pnl_usd = TRADE_SIZE_USD * LEVERAGE * (pnl_pct / 100)
trade = self._build_trade_dict(pos, reason, pnl_pct, pnl_usd, fill_price)
self._append_trade_log(trade)
emoji = "\u23f1\ufe0f"
msg = (
f"{emoji} *{reason} {pos.side} {pos.symbol}*\n"
f"Entry: {pos.entry_price} \\→ {fill_price}\n"
f"PnL: {pnl_pct:+.1f}% (${pnl_usd:+.2f})"
)
logger.info(f"{reason} {pos.side} {pos.symbol} PnL={pnl_pct:+.1f}%")
self._notify(msg)
except Exception as e:
logger.error(f"Market close failed {pos.symbol}: {e}")
self._log_trade(pos, reason)
def _determine_close_result(self, pos):
"""Determine if closed by SL or TP."""
try:
open_orders = self.exchange.get_open_orders(pos.symbol)
has_tp = any(o.get("type") == "LIMIT" for o in open_orders)
if has_tp:
return "SL"
try:
mark = self.exchange.get_mark_price(pos.symbol)
dist_tp = abs(mark - pos.entry_price * (1 + TP1_PCT if pos.side == "LONG" else 1 - TP1_PCT))
dist_sl = abs(mark - pos.sl_price)
return "TP" if dist_tp < dist_sl else "SL"
except Exception:
return "TP"
except Exception:
return "UNKNOWN"
# ============================================================
# TRADE LOG
# ============================================================
def _log_trade(self, pos, result):
"""Log trade with estimated PnL."""
sig = pos.signal_data or {}
if pos.tp1_hit:
tp1_pnl_usd = TRADE_SIZE_USD * LEVERAGE * TP1_CLOSE_RATIO * TP1_PCT
remaining_ratio = 1 - TP1_CLOSE_RATIO
if result == "SL":
rest_pnl_usd = -(TRADE_SIZE_USD * LEVERAGE * remaining_ratio * SL_CAP_PCT)
total_pnl_usd = tp1_pnl_usd + rest_pnl_usd
total_pnl_pct = TP1_PCT * 100 * TP1_CLOSE_RATIO - SL_CAP_PCT * 100 * remaining_ratio
result = "TP1+SL"
else:
total_pnl_usd = tp1_pnl_usd
total_pnl_pct = TP1_PCT * 100 * TP1_CLOSE_RATIO
result = "TP1+BE"
elif result == "TP":
# Fast TP1 detected (exchange closed TP1+SL before we saw it)
total_pnl_pct = TP1_PCT * 100 * TP1_CLOSE_RATIO
total_pnl_usd = TRADE_SIZE_USD * LEVERAGE * TP1_CLOSE_RATIO * TP1_PCT
result = "TP1+BE (fast)"
logger.info(f"Fast TP1 detected {pos.symbol}")
elif result == "SL":
sl_pct = abs(pos.entry_price - pos.sl_price) / pos.entry_price
total_pnl_pct = -sl_pct * 100
total_pnl_usd = -(TRADE_SIZE_USD * LEVERAGE * sl_pct)
else:
total_pnl_pct = 0
total_pnl_usd = 0
trade = self._build_trade_dict(pos, result, total_pnl_pct, total_pnl_usd)
self._append_trade_log(trade)
emoji = "\u2705" if "TP" in result else "\u274c" if result == "SL" else "\u26a0\ufe0f"
msg = (
f"{emoji} *{result} {pos.side} {pos.symbol}*\n"
f"Entry: {pos.entry_price}\n"
f"PnL: {total_pnl_pct:+.1f}% (${total_pnl_usd:+.2f})\n"
f"Vol: {sig.get('vol_spike_ratio', '?')}x | RSI: {sig.get('rsi', '?')}"
)
logger.info(f"{result} {pos.side} {pos.symbol} PnL={total_pnl_pct:+.1f}%")
self._notify(msg)
def _build_trade_dict(self, pos, result, pnl_pct, pnl_usd, close_price=None):
"""Build trade dict for log + TMM."""
sig = pos.signal_data or {}
return {
"symbol": pos.symbol,
"side": pos.side,
"strategy": STRATEGY_NAME,
"entry_price": pos.entry_price,
"close_price": close_price,
"sl_price": pos.sl_price,
"qty": pos.original_qty,
"result": result,
"pnl_pct": round(pnl_pct, 2),
"pnl_usd": round(pnl_usd, 2),
"tp1_hit": pos.tp1_hit,
"trail_high": pos.trail_high,
# Signal params for TMM description
"vol_spike_ratio": sig.get("vol_spike_ratio"),
"vwap_dist_pct": sig.get("vwap_dist_pct"),
"rsi": sig.get("rsi"),
"sl_pct": sig.get("sl_pct"),
"natr_5m": sig.get("natr_5m"),
"volume_24h": sig.get("volume_24h"),
"oi_change_pct": sig.get("oi_change_pct"),
"opened_at": pos.opened_at,
"closed_at": datetime.now(timezone.utc).isoformat(),
}
def _append_trade_log(self, trade):
"""Atomic append to trade log."""
try:
with open(Z_TRADE_LOG_FILE, "r") as f:
log = json.load(f)
except Exception:
log = []
log.append(trade)
try:
dir_name = os.path.dirname(Z_TRADE_LOG_FILE)
fd, tmp_path = tempfile.mkstemp(dir=dir_name, suffix=".tmp")
with os.fdopen(fd, "w") as f:
json.dump(log, f, indent=2)
os.replace(tmp_path, Z_TRADE_LOG_FILE)
except Exception as e:
logger.error(f"Error saving trade log: {e}")
# ============================================================
# TMM INTEGRATION
# ============================================================
def _tmm_tag_trade(self, symbol, side, signal):
"""Tag trade in TMM with Zatochki tag + full signal params in description."""
if not self.tmm or not self.tmm.enabled:
return
time.sleep(5) # Wait for TMM to import from Bybit
order_side = "BUY" if side == "LONG" else "SELL"
trade_id = self.tmm.find_recent_trade(symbol, order_side)
if not trade_id:
logger.warning(f"TMM: trade not found {symbol} {side}, will retry")
self.tmm._pending_tags.append({
"symbol": symbol, "side": order_side,
"score": 0, "z_score": 0,
"reasons": [TMM_TAG],
"attempts": 1, "next_retry": time.time() + 15,
"_zatochki_signal": signal,
})
return
self._apply_tmm_tags(trade_id, signal)
def _apply_tmm_tags(self, trade_id, signal):
"""Apply Zatochki tag + detailed description to TMM trade."""
self.tmm.tag_trade(trade_id, TMM_TAG)
# Description with ALL params for future optimization
vol_24h_m = signal.get("volume_24h", 0) / 1e6 if signal.get("volume_24h") else 0
desc = (
f"Zatochki Bot\n"
f"vol_spike: {signal.get('vol_spike_ratio', '?')}x | "
f"vwap_dist: {signal.get('vwap_dist_pct', '?')}% | "
f"rsi: {signal.get('rsi', '?')}\n"
f"sl_pct: {signal.get('sl_pct', '?')}% | "
f"natr_5m: {signal.get('natr_5m', '?')}% | "
f"vol_24h: ${vol_24h_m:.0f}M\n"
f"oi_change: {signal.get('oi_change_pct', 'N/A')}%"
)
self.tmm.update_description(trade_id, desc)
# ============================================================
# POSITIONS PERSISTENCE
# ============================================================
def _save_positions(self):
"""Save positions to file."""
data = {s: p.to_dict() for s, p in self.positions.items()}
try:
dir_name = os.path.dirname(Z_POSITIONS_FILE)
os.makedirs(dir_name, exist_ok=True)
fd, tmp_path = tempfile.mkstemp(dir=dir_name, suffix=".tmp")
with os.fdopen(fd, "w") as f:
json.dump(data, f, indent=2)
os.replace(tmp_path, Z_POSITIONS_FILE)
except Exception as e:
logger.error(f"Save positions error: {e}")
def _load_positions(self):
"""Load positions from file."""
try:
with open(Z_POSITIONS_FILE, "r") as f:
data = json.load(f)
for symbol, d in data.items():
self.positions[symbol] = ZatochkiPosition.from_dict(d, self.exchange)
if self.positions:
logger.info(f"Loaded {len(self.positions)} Zatochki positions")
except (FileNotFoundError, json.JSONDecodeError):
self.positions = {}
# ============================================================
# RECOVERY (after restart)
# ============================================================
def recovery(self):
"""Check saved positions vs exchange after restart."""
if not self.positions:
return
exchange_positions = self.exchange.get_positions()
exchange_map = {p["symbol"]: p for p in exchange_positions}
to_remove = []
for symbol, pos in self.positions.items():
ep = exchange_map.get(symbol)
current_qty = abs(float(ep.get("positionAmt", 0))) if ep else 0
if current_qty == 0:
logger.info(f"Recovery: {symbol} closed while offline")
self._log_trade(pos, "OFFLINE_CLOSE")
to_remove.append(symbol)
continue
# Detect TP1 (qty dropped)
if current_qty < pos.original_qty * 0.8 and not pos.tp1_hit:
pos.tp1_hit = True
pos.qty = current_qty
pos.sl_price = pos.entry_price
mark_price = self.exchange.get_mark_price(symbol)
pos.trail_high = mark_price
logger.info(f"Recovery: {symbol} TP1 detected, SL→BE")
# Re-place orders
sym_info = pos.symbol_info or self._get_symbol_info(symbol)
pos.symbol_info = sym_info
self.exchange.cancel_all_orders(symbol)
close_side = "SELL" if pos.side == "LONG" else "BUY"
try:
self.exchange.place_sl(symbol, close_side, pos.qty, pos.sl_price, sym_info)
pos.sl_order_placed = True
except Exception as e:
logger.error(f"Recovery SL failed {symbol}: {e}")
if not pos.tp1_hit:
tp1_price = pos.entry_price * (1 + TP1_PCT) if pos.side == "LONG" else pos.entry_price * (1 - TP1_PCT)
tp1_qty = self.exchange.round_qty(sym_info, pos.qty * TP1_CLOSE_RATIO)
try:
self.exchange.place_tp(symbol, close_side, tp1_qty, tp1_price, sym_info)
pos.tp_order_placed = True
except Exception as e:
logger.error(f"Recovery TP failed {symbol}: {e}")
for symbol in to_remove:
self.positions.pop(symbol, None)
self._save_positions()
# ============================================================
# HELPERS
# ============================================================
def _get_symbol_info(self, symbol):
"""Get symbol trading info (tick size, qty step, etc.)."""
try:
return self.exchange.get_symbol_info(symbol)
except Exception as e:
logger.error(f"Symbol info error {symbol}: {e}")
return None
def _notify(self, msg):
"""Send Telegram notification."""
if self.notifier:
self.notifier(msg)
# ============================================================
# STATUS (for Telegram commands)
# ============================================================
def get_positions_info(self):
"""Get formatted positions info for /zstatus command."""
if not self.positions:
return "No open Zatochki positions"
lines = [f"{TELEGRAM_PREFIX} *Zatochki Positions ({len(self.positions)})*\n"]
for symbol, pos in self.positions.items():
try:
mark = self.exchange.get_mark_price(symbol)
if pos.side == "LONG":
pnl_pct = (mark - pos.entry_price) / pos.entry_price * 100
else:
pnl_pct = (pos.entry_price - mark) / pos.entry_price * 100
tp1_flag = "\U0001f3af" if pos.tp1_hit else ""
elapsed = (time.time() - pos.entry_bar) / 60
sig = pos.signal_data or {}
lines.append(
f"{'\\-' if pos.side == 'SHORT' else '\\+'} "
f"`{symbol}` {pos.side} {tp1_flag}\n"
f" PnL: {pnl_pct:+.1f}% | {elapsed:.0f}min\n"
f" vol: {sig.get('vol_spike_ratio', '?')}x rsi: {sig.get('rsi', '?')}"
)
except Exception:
lines.append(f"`{symbol}` {pos.side} (error getting price)")
return "\n".join(lines)
def get_stats(self):
"""Get trade stats for /zstats command."""
try:
with open(Z_TRADE_LOG_FILE, "r") as f:
trades = json.load(f)
except Exception:
return "No trade history"
if not trades:
return "No trades yet"
n = len(trades)
wins = [t for t in trades if t.get("pnl_usd", 0) > 0]
losses = [t for t in trades if t.get("pnl_usd", 0) <= 0]
wr = len(wins) / n * 100
total_pnl = sum(t.get("pnl_usd", 0) for t in trades)
return (
f"{TELEGRAM_PREFIX} *Zatochki Stats*\n"
f"Trades: {n} | WR: {wr:.0f}%\n"
f"Wins: {len(wins)} | Losses: {len(losses)}\n"
f"PnL: ${total_pnl:+.2f}"
)