← Назад"""
Squeeze-VWAP Bot — Trade Manager
===================================
Entry: score >= 3 из combo signal (Squeeze + Z-VWAP + Waddah + ADX)
Exit: TP1 partial (50% at 1.2%) + trailing (0.7% callback) on rest
Dynamic TP still active on remaining 50% (Z fair value)
Time stop on remaining 50% if no movement
v2 (4 Apr 2026): TP1 partial + trailing replaces old BE-only logic.
"""
import json
import time
import logging
import tempfile
import os
from datetime import datetime, timezone
from src.config import (
SL_PCT, MAX_TP_PCT, TRADE_SIZE_USD, LEVERAGE, MAX_POSITIONS,
TIMEFRAME, POSITIONS_FILE, TRADE_LOG_FILE,
ZVWAP_EXIT_THRESHOLD, BE_TRIGGER_PCT,
TP1_PCT, TP1_CLOSE_RATIO, TRAIL_CALLBACK_PCT,
)
from src.indicators import calc_combo_signal, calc_z_vwap
logger = logging.getLogger("manager")
MIN_ENTRY_SCORE = 3 # Минимальный score для входа
class Position:
"""Одна открытая позиция."""
def __init__(self, symbol, side, entry_price, qty, sl_price, tp_price,
symbol_info, direction, z_score, score, reasons,
opened_at=None):
self.symbol = symbol
self.side = side # "LONG" or "SHORT"
self.entry_price = entry_price
self.qty = qty
self.sl_price = sl_price
self.tp_price = tp_price # Max TP (cap)
self.symbol_info = symbol_info
self.direction = direction # 1=long, -1=short
self.z_score = z_score # Z-VWAP at entry
self.score = score
self.reasons = reasons
self.opened_at = opened_at or datetime.now(timezone.utc).isoformat()
self.sl_order_placed = False
self.tp_order_placed = False
self.moved_to_be = False # SL перенесён на entry
# TP1 partial + trailing (v2)
self.original_qty = qty # full entry qty
self.tp1_hit = False # True after TP1 partial fill
self.trail_high = 0.0 # best price since TP1
def to_dict(self):
return {
"symbol": self.symbol,
"side": self.side,
"entry_price": self.entry_price,
"qty": self.qty,
"original_qty": self.original_qty,
"sl_price": self.sl_price,
"tp_price": self.tp_price,
"direction": self.direction,
"z_score": self.z_score,
"score": self.score,
"reasons": self.reasons,
"opened_at": self.opened_at,
"sl_order_placed": self.sl_order_placed,
"tp_order_placed": self.tp_order_placed,
"moved_to_be": self.moved_to_be,
"tp1_hit": self.tp1_hit,
"trail_high": self.trail_high,
}
@classmethod
def from_dict(cls, d, symbol_info=None):
pos = cls(
symbol=d["symbol"], side=d["side"],
entry_price=d["entry_price"], qty=d["qty"],
sl_price=d["sl_price"], tp_price=d["tp_price"],
symbol_info=symbol_info,
direction=d.get("direction", 0),
z_score=d.get("z_score", 0),
score=d.get("score", 0),
reasons=d.get("reasons", []),
opened_at=d.get("opened_at"),
)
pos.original_qty = d.get("original_qty", d["qty"])
pos.sl_order_placed = d.get("sl_order_placed", False)
pos.tp_order_placed = d.get("tp_order_placed", False)
pos.moved_to_be = d.get("moved_to_be", False)
pos.tp1_hit = d.get("tp1_hit", False)
pos.trail_high = d.get("trail_high", 0.0)
return pos
class TradeManager:
def __init__(self, exchange, screener, notifier=None, tmm=None):
self.exchange = exchange
self.screener = screener
self.notifier = notifier
self.tmm = tmm # TMMClient for journal tagging
self.positions = {}
self._symbol_info_cache = {}
self._load_positions()
# ============================================================
# MAIN: Check watchlist for entry signals
# ============================================================
def check_watchlist_for_entries(self):
"""
Проверяет watchlist — если score >= 3 и свежий, входим.
Также перепроверяет combo signal на свежих данных.
"""
watchlist = self.screener.get_watchlist()
if not watchlist:
return
if len(self.positions) >= MAX_POSITIONS:
return
for entry in watchlist:
symbol = entry["symbol"]
if symbol in self.positions:
continue
if len(self.positions) >= MAX_POSITIONS:
break
try:
# Пересчитываем combo signal на свежих данных
klines = self.exchange.get_klines(symbol, TIMEFRAME, limit=300)
if len(klines) < 100:
continue
combo = calc_combo_signal(klines)
if combo is None:
continue
# Нужен score >= 3 для входа
if combo['score'] < MIN_ENTRY_SCORE:
continue
# Direction должен совпадать с watchlist
if combo['direction'] != entry['direction']:
logger.info(f"Direction changed for {symbol}, removing from watchlist")
self.screener.remove_from_watchlist(symbol)
continue
# ВХОД!
side = "LONG" if combo['direction'] == 1 else "SHORT"
self._open_position(
symbol=symbol,
side=side,
z_score=combo['zvwap']['z_score'],
score=combo['score'],
reasons=combo['reasons'],
direction=combo['direction'],
)
self.screener.remove_from_watchlist(symbol)
time.sleep(0.2)
except Exception as e:
logger.error(f"Entry check error {symbol}: {e}")
continue
# ============================================================
# OPEN POSITION
# ============================================================
def _open_position(self, symbol, side, z_score, score, reasons, direction):
"""Открыть позицию: market + SL + TP."""
try:
# Safety: check exchange
exchange_positions = self.exchange.get_positions()
for ep in exchange_positions:
if ep["symbol"] == symbol and float(ep["positionAmt"]) != 0:
logger.warning(f"Already have position on {symbol}, skipping")
return
sym_info = self._get_symbol_info(symbol)
if not sym_info:
return
actual_leverage = self.exchange.set_leverage(symbol) or LEVERAGE
self.exchange.set_margin_type(symbol)
# Calculate qty — keep target notional constant even if leverage capped
mark_price = self.exchange.get_mark_price(symbol)
target_notional = TRADE_SIZE_USD * LEVERAGE # always $50 ($5×10x)
qty = target_notional / mark_price
qty = self.exchange.round_qty(sym_info, qty)
if sym_info["min_qty"] and qty < sym_info["min_qty"]:
logger.warning(f"Qty {qty} below min for {symbol}")
return
# Market order
order_side = "BUY" if side == "LONG" else "SELL"
order, fill_price = self.exchange.open_market(symbol, order_side, qty)
if fill_price == 0:
logger.error(f"Fill price 0 for {symbol}!")
try:
close_side = "SELL" if order_side == "BUY" else "BUY"
self.exchange.close_position(symbol, close_side, qty)
except Exception:
pass
return
# SL + TP1 (partial at 1.2%)
if side == "LONG":
sl_price = fill_price * (1 - SL_PCT)
tp1_price = fill_price * (1 + TP1_PCT)
else:
sl_price = fill_price * (1 + SL_PCT)
tp1_price = fill_price * (1 - TP1_PCT)
pos = Position(
symbol=symbol, side=side,
entry_price=fill_price, qty=qty,
sl_price=sl_price, tp_price=tp1_price, # tp_price = TP1 level
symbol_info=sym_info,
direction=direction, z_score=z_score,
score=score, reasons=reasons,
)
# TP1 qty = 50% of position
tp1_qty = self.exchange.round_qty(sym_info, qty * TP1_CLOSE_RATIO)
# Place SL (full) + TP1 (partial) on exchange
self._place_sl_tp(pos, tp_qty=tp1_qty)
self.positions[symbol] = pos
self._save_positions()
reasons_str = " | ".join(reasons[:3])
msg = (
f"{'🟢' if side == 'LONG' else '🔴'} *{side} {symbol}*\n"
f"Score: {score}/5\n"
f"Entry: {fill_price}\n"
f"SL: {sl_price:.6f} (-{SL_PCT*100}%)\n"
f"TP1: {tp1_price:.6f} (+{TP1_PCT*100}%) → 50% close\n"
f"Trail: {TRAIL_CALLBACK_PCT*100}% callback after TP1\n"
f"Z-VWAP: {z_score:+.2f}\n"
f"Qty: {qty} (${target_notional:.0f} notional, {actual_leverage}x)\n"
f"📋 {reasons_str}"
)
logger.info(f"OPEN {side} {symbol} score={score} Z={z_score:+.2f}")
self._notify(msg)
# TMM: tag trade in journal
if self.tmm:
try:
self.tmm.on_trade_opened(symbol, side, score, z_score, reasons)
except Exception as te:
logger.warning(f"TMM tag error {symbol}: {te}")
except Exception as e:
logger.error(f"Open position failed {symbol}: {e}")
self._notify(f"❌ Open failed {symbol}: {e}")
def _place_sl_tp(self, pos, tp_qty=None):
"""
Place SL and TP orders on exchange.
tp_qty: qty for TP order (TP1 partial = 50%). None after TP1 = SL only.
"""
sym_info = pos.symbol_info or self._get_symbol_info(pos.symbol)
close_side = "SELL" if pos.side == "LONG" else "BUY"
# SL — always on full current qty
try:
self.exchange.place_sl(pos.symbol, close_side, pos.qty, pos.sl_price, sym_info)
pos.sl_order_placed = True
except Exception as e:
logger.error(f"SL failed {pos.symbol}: {e}")
pos.sl_order_placed = False
# TP — only pre-TP1 (after TP1: trailing handles exit)
if not pos.tp1_hit:
actual_tp_qty = tp_qty or pos.qty
try:
self.exchange.place_tp(pos.symbol, close_side, actual_tp_qty, pos.tp_price, sym_info)
pos.tp_order_placed = True
except Exception as e:
logger.error(f"TP failed {pos.symbol}: {e}")
pos.tp_order_placed = False
else:
pos.tp_order_placed = True # suppress re-place warnings
# ============================================================
# MONITOR POSITIONS
# ============================================================
def check_positions(self):
"""
Каждые 5 сек:
1. Позиция закрыта? → лог
2. TP1 partial fill detection → close 50%, SL→BE, start trailing
3. Trailing: track peak, callback → close rest
4. Dynamic TP (Z fair value) — only on remaining 50% after TP1
5. Time stop — only on remaining 50% after TP1
6. Order health check
"""
if not self.positions:
return
exchange_positions = self.exchange.get_positions()
exchange_map = {p["symbol"]: p for p in exchange_positions}
closed = []
changed = False
for symbol, pos in self.positions.items():
ex_pos = exchange_map.get(symbol)
actual_qty = abs(float(ex_pos["positionAmt"])) if ex_pos else 0
# ── 1. Position fully closed ──
if actual_qty == 0:
result = self._determine_close_result(pos)
self._log_trade(pos, result)
closed.append(symbol)
continue
mark_price = float(ex_pos.get("markPrice", 0))
if mark_price == 0:
try:
mark_price = self.exchange.get_mark_price(symbol)
except Exception:
continue
# ── 2. TP1 detection: qty dropped = partial fill ──
if not pos.tp1_hit:
sym_info = pos.symbol_info or self._get_symbol_info(symbol)
expected_after_tp1 = self.exchange.round_qty(
sym_info, pos.original_qty * (1 - TP1_CLOSE_RATIO)
)
if actual_qty <= expected_after_tp1 + 0.0001 and actual_qty < pos.qty - 0.0001:
pos.tp1_hit = True
pos.qty = actual_qty
pos.sl_price = pos.entry_price # SL → BE
pos.moved_to_be = True
pos.trail_high = mark_price
# Cancel all, re-place SL at BE only (no TP — trailing now)
self.exchange.cancel_all_orders(symbol)
self._place_sl_tp(pos)
changed = True
tp1_pnl_usd = TRADE_SIZE_USD * LEVERAGE * TP1_PCT * TP1_CLOSE_RATIO
msg = (
f"🎯 *TP1 {pos.side} {symbol}* +{TP1_PCT*100:.1f}%\n"
f"Closed {TP1_CLOSE_RATIO*100:.0f}% (${tp1_pnl_usd:+.2f})\n"
f"SL → BE | Trail {TRAIL_CALLBACK_PCT*100}% on rest ({actual_qty})"
)
logger.info(f"TP1 {pos.side} {symbol} +{TP1_PCT*100:.1f}% | trail rest")
self._notify(msg)
# ── 3. Trailing stop (after TP1) ──
if pos.tp1_hit:
if pos.side == "LONG":
if mark_price > pos.trail_high:
pos.trail_high = mark_price
changed = True
trail_sl = pos.trail_high * (1 - TRAIL_CALLBACK_PCT)
if mark_price <= trail_sl and pos.trail_high > pos.entry_price:
self._close_trailing(pos, mark_price)
closed.append(symbol)
continue
else: # SHORT
if pos.trail_high == 0 or mark_price < pos.trail_high:
pos.trail_high = mark_price
changed = True
trail_sl = pos.trail_high * (1 + TRAIL_CALLBACK_PCT)
if mark_price >= trail_sl and pos.trail_high < pos.entry_price:
self._close_trailing(pos, mark_price)
closed.append(symbol)
continue
# ── 4. Dynamic TP: Z-VWAP returned to fair value (on remaining after TP1) ──
if pos.tp1_hit:
try:
klines = self.exchange.get_klines(symbol, TIMEFRAME, limit=100)
if len(klines) >= 60:
zvwap = calc_z_vwap(klines)
if zvwap:
current_z = zvwap['z_score']
if pos.side == "LONG" and current_z > -ZVWAP_EXIT_THRESHOLD:
pnl_pct = (mark_price - pos.entry_price) / pos.entry_price * 100
if pnl_pct > 0.2:
logger.info(f"Dynamic TP (post-TP1) {symbol}: Z={current_z:.2f}")
self._close_trailing(pos, mark_price, reason="DYNAMIC_TP")
closed.append(symbol)
continue
elif pos.side == "SHORT" and current_z < ZVWAP_EXIT_THRESHOLD:
pnl_pct = (pos.entry_price - mark_price) / pos.entry_price * 100
if pnl_pct > 0.2:
logger.info(f"Dynamic TP (post-TP1) {symbol}: Z={current_z:.2f}")
self._close_trailing(pos, mark_price, reason="DYNAMIC_TP")
closed.append(symbol)
continue
except Exception as e:
logger.debug(f"Z-VWAP check error {symbol}: {e}")
# ── 4b. Dynamic TP pre-TP1 (full position) ──
if not pos.tp1_hit:
try:
klines = self.exchange.get_klines(symbol, TIMEFRAME, limit=100)
if len(klines) >= 60:
zvwap = calc_z_vwap(klines)
if zvwap:
current_z = zvwap['z_score']
if pos.side == "LONG" and current_z > -ZVWAP_EXIT_THRESHOLD:
pnl_pct = (mark_price - pos.entry_price) / pos.entry_price * 100
if pnl_pct > 0.2:
logger.info(f"Dynamic TP {symbol}: Z={current_z:.2f}")
self._close_position_market(pos, "DYNAMIC_TP", mark_price)
closed.append(symbol)
continue
elif pos.side == "SHORT" and current_z < ZVWAP_EXIT_THRESHOLD:
pnl_pct = (pos.entry_price - mark_price) / pos.entry_price * 100
if pnl_pct > 0.2:
logger.info(f"Dynamic TP {symbol}: Z={current_z:.2f}")
self._close_position_market(pos, "DYNAMIC_TP", mark_price)
closed.append(symbol)
continue
except Exception as e:
logger.debug(f"Z-VWAP check error {symbol}: {e}")
# ── 5. Time stop — DISABLED (let trailing handle exit) ──
# ── 6. Order health check (pre-TP1 only, every 60s) ──
if not pos.tp1_hit and pos.sl_order_placed and pos.tp_order_placed:
last_check = getattr(pos, '_last_order_check', 0)
now_ts = time.time()
if now_ts - last_check >= 60:
pos._last_order_check = now_ts
try:
open_orders = self.exchange.get_open_orders(symbol)
if open_orders is not None and len(open_orders) == 0:
logger.warning(f"Orders missing {symbol}, re-placing")
self.exchange.cancel_all_orders(symbol)
sym_info = pos.symbol_info or self._get_symbol_info(symbol)
tp1_qty = self.exchange.round_qty(sym_info, pos.original_qty * TP1_CLOSE_RATIO)
self._place_sl_tp(pos, tp_qty=tp1_qty)
changed = True
elif open_orders:
has_limit = any(o["type"] == "LIMIT" for o in open_orders)
if not has_limit:
logger.warning(f"TP missing {symbol}, re-placing")
self.exchange.cancel_all_orders(symbol)
sym_info = pos.symbol_info or self._get_symbol_info(symbol)
tp1_qty = self.exchange.round_qty(sym_info, pos.original_qty * TP1_CLOSE_RATIO)
self._place_sl_tp(pos, tp_qty=tp1_qty)
changed = True
except Exception as e:
logger.debug(f"Order check error {symbol}: {e}")
# Cleanup
for symbol in closed:
if symbol in self.positions:
del self.positions[symbol]
self.exchange.cancel_all_orders(symbol)
self.screener.add_cooldown(symbol)
if closed or changed:
self._save_positions()
def _close_trailing(self, pos, mark_price, reason="TRAIL"):
"""Close remaining position after TP1 (trailing/dynamic TP/time stop)."""
try:
close_side = "SELL" if pos.side == "LONG" else "BUY"
fill_price = self.exchange.close_position(pos.symbol, close_side, pos.qty)
if pos.side == "LONG":
trail_pnl_pct = (fill_price - pos.entry_price) / pos.entry_price * 100
else:
trail_pnl_pct = (pos.entry_price - fill_price) / pos.entry_price * 100
remaining_ratio = 1 - TP1_CLOSE_RATIO
trail_pnl_usd = TRADE_SIZE_USD * LEVERAGE * remaining_ratio * (trail_pnl_pct / 100)
tp1_pnl_usd = TRADE_SIZE_USD * LEVERAGE * TP1_CLOSE_RATIO * TP1_PCT
total_pnl_usd = tp1_pnl_usd + trail_pnl_usd
total_pnl_pct = TP1_PCT * 100 * TP1_CLOSE_RATIO + trail_pnl_pct * remaining_ratio
trade = {
"symbol": pos.symbol,
"side": pos.side,
"entry_price": pos.entry_price,
"close_price": fill_price,
"sl_price": pos.sl_price,
"tp_price": pos.tp_price,
"qty": pos.original_qty,
"result": reason,
"pnl_pct": round(total_pnl_pct, 2),
"pnl_usd": round(total_pnl_usd, 2),
"trail_high": pos.trail_high,
"tp1_hit": True,
"z_score_entry": pos.z_score,
"score": pos.score,
"reasons": pos.reasons,
"opened_at": pos.opened_at,
"closed_at": datetime.now(timezone.utc).isoformat(),
}
self._append_trade_log(trade)
emoji = "🏃" if total_pnl_usd > 0 else "⚠️"
msg = (
f"{emoji} *{reason} {pos.side} {pos.symbol}*\n"
f"TP1: +{TP1_PCT*100:.1f}% (${tp1_pnl_usd:+.2f})\n"
f"Rest: {trail_pnl_pct:+.1f}% (${trail_pnl_usd:+.2f})\n"
f"*Total: ${total_pnl_usd:+.2f}*\n"
f"Peak: {pos.trail_high:.6f} → Exit: {fill_price:.6f}"
)
logger.info(f"{reason} {pos.side} {pos.symbol} total=${total_pnl_usd:+.2f}")
self._notify(msg)
except Exception as e:
logger.error(f"Trail close failed {pos.symbol}: {e}")
self._log_trade(pos, f"{reason}_ERROR")
def _close_position_market(self, pos, reason, mark_price):
"""Close position via market order (for dynamic TP / time stop)."""
try:
close_side = "SELL" if pos.side == "LONG" else "BUY"
fill_price = self.exchange.close_position(pos.symbol, close_side, pos.qty)
if pos.side == "LONG":
pnl_pct = (fill_price - pos.entry_price) / pos.entry_price * 100
else:
pnl_pct = (pos.entry_price - fill_price) / pos.entry_price * 100
pnl_usd = TRADE_SIZE_USD * LEVERAGE * (pnl_pct / 100)
self._log_trade_manual(pos, reason, pnl_pct, pnl_usd, fill_price)
except Exception as e:
logger.error(f"Market close failed {pos.symbol}: {e}")
self._log_trade(pos, reason)
def _determine_close_result(self, pos):
"""SL or TP? Check if TP order still hanging → SL hit."""
try:
open_orders = self.exchange.get_open_orders(pos.symbol)
has_tp = any(o["type"] == "LIMIT" for o in open_orders)
if has_tp:
return "SL"
try:
mark = self.exchange.get_mark_price(pos.symbol)
dist_tp = abs(mark - pos.tp_price)
dist_sl = abs(mark - pos.sl_price)
return "TP" if dist_tp < dist_sl else "SL"
except Exception:
return "TP"
except Exception:
return "UNKNOWN"
# ============================================================
# TRADE LOG
# ============================================================
def _log_trade_manual(self, pos, result, pnl_pct, pnl_usd, close_price):
"""Log trade with exact PnL (for market closes)."""
trade = {
"symbol": pos.symbol,
"side": pos.side,
"entry_price": pos.entry_price,
"close_price": close_price,
"sl_price": pos.sl_price,
"tp_price": pos.tp_price,
"qty": pos.qty,
"result": result,
"pnl_pct": round(pnl_pct, 2),
"pnl_usd": round(pnl_usd, 2),
"z_score_entry": pos.z_score,
"score": pos.score,
"reasons": pos.reasons,
"opened_at": pos.opened_at,
"closed_at": datetime.now(timezone.utc).isoformat(),
}
self._append_trade_log(trade)
emoji = "🎯" if result == "DYNAMIC_TP" else "⏱️" if result == "TIME_STOP" else "⚠️"
msg = (
f"{emoji} *{result} {pos.side} {pos.symbol}*\n"
f"Entry: {pos.entry_price} → {close_price}\n"
f"PnL: {pnl_pct:+.1f}% (${pnl_usd:+.2f})\n"
f"Z entry: {pos.z_score:+.2f} | Score: {pos.score}/5"
)
logger.info(f"{result} {pos.side} {pos.symbol} PnL={pnl_pct:+.1f}%")
self._notify(msg)
def _log_trade(self, pos, result):
"""Log trade with estimated PnL (for exchange-closed positions)."""
if pos.tp1_hit:
# TP1 already taken — remaining closed by SL@BE or SL
tp1_pnl_usd = TRADE_SIZE_USD * LEVERAGE * TP1_CLOSE_RATIO * TP1_PCT
remaining_ratio = 1 - TP1_CLOSE_RATIO
if result == "SL":
# SL@BE: remaining at entry → 0 PnL on rest
rest_pnl_usd = -(TRADE_SIZE_USD * LEVERAGE * remaining_ratio * SL_PCT)
total_pnl_usd = tp1_pnl_usd + rest_pnl_usd
total_pnl_pct = TP1_PCT * 100 * TP1_CLOSE_RATIO - SL_PCT * 100 * remaining_ratio
result = "TP1+SL"
else:
# BE or unknown — assume rest at 0
total_pnl_usd = tp1_pnl_usd
total_pnl_pct = TP1_PCT * 100 * TP1_CLOSE_RATIO
result = "TP1+BE"
elif result == "TP":
total_pnl_pct = MAX_TP_PCT * 100
total_pnl_usd = TRADE_SIZE_USD * LEVERAGE * MAX_TP_PCT
elif result == "SL":
total_pnl_pct = -SL_PCT * 100
total_pnl_usd = -(TRADE_SIZE_USD * LEVERAGE * SL_PCT)
else:
total_pnl_pct = 0
total_pnl_usd = 0
trade = {
"symbol": pos.symbol,
"side": pos.side,
"entry_price": pos.entry_price,
"sl_price": pos.sl_price,
"tp_price": pos.tp_price,
"qty": pos.original_qty,
"result": result,
"pnl_pct": round(total_pnl_pct, 2),
"pnl_usd": round(total_pnl_usd, 2),
"tp1_hit": pos.tp1_hit,
"z_score_entry": pos.z_score,
"score": pos.score,
"opened_at": pos.opened_at,
"closed_at": datetime.now(timezone.utc).isoformat(),
}
self._append_trade_log(trade)
emoji = "✅" if "TP" in result else "❌" if result == "SL" else "⚠️"
msg = (
f"{emoji} *{result} {pos.side} {pos.symbol}*\n"
f"Entry: {pos.entry_price}\n"
f"PnL: {total_pnl_pct:+.1f}% (${total_pnl_usd:+.2f})\n"
f"Z entry: {pos.z_score:+.2f} | Score: {pos.score}/5"
)
logger.info(f"{result} {pos.side} {pos.symbol} PnL={total_pnl_pct:+.1f}%")
self._notify(msg)
def _append_trade_log(self, trade):
"""Atomic append to trade log."""
try:
with open(TRADE_LOG_FILE, "r") as f:
log = json.load(f)
except Exception:
log = []
log.append(trade)
try:
dir_name = os.path.dirname(TRADE_LOG_FILE)
fd, tmp_path = tempfile.mkstemp(dir=dir_name, suffix=".tmp")
with os.fdopen(fd, "w") as f:
json.dump(log, f, indent=2)
os.replace(tmp_path, TRADE_LOG_FILE)
except Exception as e:
logger.error(f"Error saving trade log: {e}")
# ============================================================
# STATS
# ============================================================
def get_pnl_summary(self):
"""PnL summary from trade_log."""
try:
with open(TRADE_LOG_FILE, "r") as f:
log = json.load(f)
except Exception:
return "No trades yet."
if not log:
return "No trades yet."
total_pnl = sum(t.get("pnl_usd", 0) for t in log)
wins = [t for t in log if t.get("pnl_usd", 0) > 0]
losses = [t for t in log if t.get("pnl_usd", 0) <= 0]
wr = len(wins) / len(log) * 100 if log else 0
# By result type
dynamic_tps = [t for t in log if t.get("result") == "DYNAMIC_TP"]
time_stops = [t for t in log if t.get("result") == "TIME_STOP"]
sl_hits = [t for t in log if t.get("result") == "SL"]
tp_caps = [t for t in log if t.get("result") == "TP"]
return (
f"📊 *Squeeze-VWAP PnL*\n"
f"Trades: {len(log)} ({len(wins)}W / {len(losses)}L)\n"
f"Win Rate: {wr:.1f}%\n"
f"Total PnL: ${total_pnl:+.2f}\n"
f"🎯 Dynamic TP: {len(dynamic_tps)} | "
f"⏱️ Time Stop: {len(time_stops)}\n"
f"❌ SL: {len(sl_hits)} | ✅ TP Cap: {len(tp_caps)}"
)
def get_positions_info(self):
"""Info about open positions."""
if not self.positions:
return "No open positions."
lines = ["📊 *Open Positions*\n"]
for symbol, pos in self.positions.items():
try:
mark = self.exchange.get_mark_price(symbol)
if pos.side == "LONG":
pnl = (mark - pos.entry_price) / pos.entry_price * 100
else:
pnl = (pos.entry_price - mark) / pos.entry_price * 100
age = ""
try:
opened = datetime.fromisoformat(pos.opened_at)
age_min = (datetime.now(timezone.utc) - opened).total_seconds() / 60
age = f" ({age_min:.0f}min)"
except Exception:
pass
tp1_flag = " 🎯TP1" if pos.tp1_hit else ""
be_flag = " 🔄BE" if pos.moved_to_be else ""
trail_info = f" trail:{pos.trail_high:.4f}" if pos.tp1_hit else ""
lines.append(
f"{'🟢' if pos.side == 'LONG' else '🔴'} {symbol} "
f"PnL:{pnl:+.1f}%{tp1_flag}{be_flag}{age}\n"
f" Z:{pos.z_score:+.2f} Score:{pos.score}/5{trail_info}"
)
except Exception:
lines.append(f"{'🟢' if pos.side == 'LONG' else '🔴'} {symbol}")
return "\n".join(lines)
# ============================================================
# PERSISTENCE
# ============================================================
def _save_positions(self):
try:
data = {s: p.to_dict() for s, p in self.positions.items()}
dir_name = os.path.dirname(POSITIONS_FILE)
fd, tmp_path = tempfile.mkstemp(dir=dir_name, suffix=".tmp")
with os.fdopen(fd, "w") as f:
json.dump(data, f, indent=2)
os.replace(tmp_path, POSITIONS_FILE)
except Exception as e:
logger.error(f"Error saving positions: {e}")
def _load_positions(self):
try:
with open(POSITIONS_FILE, "r") as f:
data = json.load(f)
if isinstance(data, dict) and data:
for symbol, d in data.items():
sym_info = self._get_symbol_info(symbol)
self.positions[symbol] = Position.from_dict(d, sym_info)
logger.info(f"Loaded {len(self.positions)} positions")
except Exception:
self.positions = {}
def _get_symbol_info(self, symbol):
if symbol not in self._symbol_info_cache:
info = self.exchange.get_symbol_info(symbol)
if info:
self._symbol_info_cache[symbol] = info
return self._symbol_info_cache.get(symbol)
# ============================================================
# RECOVERY
# ============================================================
def recovery(self):
"""Check saved positions vs exchange after restart."""
if not self.positions:
return
logger.info(f"Recovery: checking {len(self.positions)} saved positions...")
exchange_positions = self.exchange.get_positions()
exchange_symbols = {p["symbol"] for p in exchange_positions
if float(p["positionAmt"]) != 0}
closed = []
for symbol, pos in self.positions.items():
if symbol not in exchange_symbols:
logger.info(f"Recovery: {symbol} closed while bot was down")
self._log_trade(pos, "UNKNOWN")
closed.append(symbol)
else:
# Update qty + detect TP1 partial fill
for ep in exchange_positions:
if ep["symbol"] == symbol:
actual_qty = abs(float(ep["positionAmt"]))
if actual_qty > 0 and actual_qty < pos.original_qty * 0.75 and not pos.tp1_hit:
# Qty dropped significantly → TP1 was hit
pos.tp1_hit = True
pos.qty = actual_qty
pos.sl_price = pos.entry_price # SL → BE
pos.moved_to_be = True
mark = float(ep.get("markPrice", 0))
pos.trail_high = mark if mark > 0 else pos.entry_price
logger.info(f"Recovery: {symbol} TP1 detected (qty {pos.original_qty}→{actual_qty})")
elif actual_qty != pos.qty and actual_qty > 0:
pos.qty = actual_qty
break
# Re-place orders
logger.info(f"Recovery: re-placing orders for {symbol}")
self.exchange.cancel_all_orders(symbol)
if pos.tp1_hit:
self._place_sl_tp(pos) # SL only (no TP after TP1)
else:
sym_info = pos.symbol_info or self._get_symbol_info(symbol)
tp1_qty = self.exchange.round_qty(sym_info, pos.original_qty * TP1_CLOSE_RATIO)
self._place_sl_tp(pos, tp_qty=tp1_qty)
for symbol in closed:
del self.positions[symbol]
self.exchange.cancel_all_orders(symbol)
if closed:
self._save_positions()
def get_open_positions(self):
"""Return positions dict (for screener callback)."""
return self.positions
# ============================================================
# NOTIFY
# ============================================================
def _notify(self, msg):
if self.notifier:
try:
self.notifier(msg)
except Exception as e:
logger.debug(f"Notify error: {e}")