β ΠΠ°Π·Π°Π΄"""
Scalp Position Manager β Quick Take strategy.
Simple: +1% TP, -0.75% SL, 30min time stop.
No partial closes β full in, full out.
Separate PnL tracking from WT strategy.
"""
import asyncio
import logging
import os
from dataclasses import dataclass, field
from datetime import datetime, timezone, timedelta
from trader import BinanceFuturesTrader
from trade_log import log_event
from order_placer import ExchangeOrderManager, MAKER_FEE_PCT
logger = logging.getLogger(__name__)
VANCOUVER_TZ = timezone(timedelta(hours=-7))
# === Scalp Config ===
SCALP_ENABLED = os.environ.get("SCALP_ENABLED", "false").lower() == "true"
SCALP_SIZE_USDT = float(os.environ.get("SCALP_SIZE_USDT", "10"))
SCALP_LEVERAGE = int(os.environ.get("SCALP_LEVERAGE", "5"))
SCALP_MAX_POSITIONS = int(os.environ.get("SCALP_MAX_POSITIONS", "3"))
SCALP_TP_PCT = float(os.environ.get("SCALP_TP_PCT", "1.0"))
SCALP_SL_PCT = float(os.environ.get("SCALP_SL_PCT", "0.75"))
SCALP_TIME_STOP_MIN = int(os.environ.get("SCALP_TIME_STOP_MIN", "15"))
SCALP_SCAN_INTERVAL = int(os.environ.get("SCALP_SCAN_INTERVAL", "60"))
SCALP_CHECK_INTERVAL = int(os.environ.get("SCALP_CHECK_INTERVAL", "3"))
SCALP_COOLDOWN_MIN = int(os.environ.get("SCALP_COOLDOWN_MIN", "30"))
SCALP_BE_TRIGGER_PCT = float(os.environ.get("SCALP_BE_TRIGGER_PCT", "0.5"))
# Exchange-side TP/SL (limit TP + stop-market SL on Binance)
SCALP_USE_EXCHANGE_ORDERS = os.environ.get("SCALP_USE_EXCHANGE_ORDERS", "true").lower() == "true"
# Taker fee
TAKER_FEE_PCT = 0.04
@dataclass
class ScalpPosition:
"""A scalp trade β simple TP/SL/Time stop."""
symbol: str
side: str
entry_price: float
quantity: float
tp_price: float
sl_price: float
opened_at: datetime
trade_id: str
signal_data: dict = field(default_factory=dict)
# Exchange-side order IDs (None = using polling fallback)
tp_order_id: int | None = None
sl_order_id: int | None = None
use_exchange_orders: bool = False # True if TP/SL are on Binance
moved_to_be: bool = False # True once SL moved to breakeven
@property
def age_minutes(self) -> float:
now = datetime.now(VANCOUVER_TZ)
return (now - self.opened_at).total_seconds() / 60
class ScalpManager:
"""Manages Quick Take scalp positions."""
def __init__(self, trader: BinanceFuturesTrader, notify_fn, tmm=None):
self.trader = trader
self.notify = notify_fn
self.tmm = tmm
self.positions: dict[str, ScalpPosition] = {}
self.order_mgr = ExchangeOrderManager(trader)
self.cooldowns: dict[str, datetime] = {} # symbol β cooldown expiry
# Stats tracking (in-memory, persisted in trade_log)
self.stats = {"wins": 0, "losses": 0, "time_stops": 0, "total_pnl": 0.0}
def recover_positions(self):
"""Recover scalp positions from trade_log + Binance on restart."""
log = load_event = None # just to avoid confusion
from trade_log import load_trade_log
trade_log = load_trade_log()
# Find SCALP_ENTRY without matching close
open_scalps = {}
close_events = {"SCALP_TP", "SCALP_SL", "SCALP_TIME_STOP", "SCALP_MANUAL"}
for event in trade_log:
symbol = event.get("symbol", "")
evt = event.get("event", "")
if evt == "SCALP_ENTRY":
open_scalps[symbol] = event
elif evt in close_events and symbol in open_scalps:
del open_scalps[symbol]
if not open_scalps:
logger.info("No scalp positions to recover")
return
for symbol, entry in open_scalps.items():
# Verify position exists on Binance
binance_pos = self.trader.get_position(symbol)
if not binance_pos:
logger.info(f"Scalp log has {symbol} but no Binance position β skip")
continue
entry_price = entry.get("entry_price", binance_pos["entry_price"])
side = entry.get("side", binance_pos["side"])
qty = binance_pos["quantity"]
# Reconstruct TP/SL
if side == "BUY":
tp = entry_price * (1 + SCALP_TP_PCT / 100)
sl = entry_price * (1 - SCALP_SL_PCT / 100)
else:
tp = entry_price * (1 - SCALP_TP_PCT / 100)
sl = entry_price * (1 + SCALP_SL_PCT / 100)
# Parse opened_at from timestamp
try:
opened_at = datetime.fromisoformat(entry.get("timestamp", ""))
except Exception:
opened_at = datetime.now(VANCOUVER_TZ)
trade_id = entry.get("trade_id", f"QT_{symbol}_recovered")
pos = ScalpPosition(
symbol=symbol,
side=side,
entry_price=entry_price,
quantity=qty,
tp_price=tp,
sl_price=sl,
opened_at=opened_at,
trade_id=trade_id,
)
# Try to find existing exchange TP orders (LIMIT reduceOnly)
# Note: STOP_MARKET SL are algo orders, not visible in get_open_orders
if SCALP_USE_EXCHANGE_ORDERS:
open_orders = self.trader.get_open_orders(symbol)
for o in open_orders:
if o["type"] == "LIMIT" and o["status"] == "NEW":
pos.tp_order_id = o["orderId"]
# SL algo order can't be recovered, but we re-place everything below
pos.use_exchange_orders = bool(pos.tp_order_id)
# If orders missing, cancel stale and re-place
if not pos.use_exchange_orders and SCALP_USE_EXCHANGE_ORDERS:
import time as _time
self.order_mgr.cancel_all_for_symbol(symbol)
_time.sleep(2.0) # Let Binance fully process algo cancellation
orders = self.order_mgr.place_tp_sl_orders(
symbol, side, sl_price=sl, sl_quantity=qty,
tp_levels=[(tp, qty)],
)
pos.tp_order_id = orders["tp_order_ids"][0] if orders["tp_order_ids"] else None
pos.sl_order_id = orders["sl_order_id"]
pos.use_exchange_orders = bool(pos.tp_order_id and pos.sl_order_id)
if pos.use_exchange_orders:
logger.info(f"Re-placed exchange orders for recovered scalp {symbol}")
self.positions[symbol] = pos
logger.info(
f"Recovered scalp: {side} {symbol} @ {entry_price}, "
f"qty={qty}, age={pos.age_minutes:.0f}min, "
f"exchange_orders={'YES' if pos.use_exchange_orders else 'NO'}"
)
def can_open(self, symbol: str) -> tuple[bool, str]:
if symbol in self.positions:
return False, f"Already in {symbol}"
# Cooldown check β prevent re-entering same symbol too soon
now = datetime.now(VANCOUVER_TZ)
cd = self.cooldowns.get(symbol)
if cd and now < cd:
remaining = (cd - now).total_seconds() / 60
return False, f"Cooldown {remaining:.0f}min left"
# Double-check Binance β prevents duplicates after restarts
existing = self.trader.get_position(symbol)
if existing:
logger.warning(f"Scalp {symbol}: position exists on Binance but not in manager β skip")
return False, f"Position exists on Binance (untracked)"
if len(self.positions) >= SCALP_MAX_POSITIONS:
return False, f"Max scalp positions ({SCALP_MAX_POSITIONS})"
balance = self.trader.get_account_balance()
if balance < SCALP_SIZE_USDT:
return False, f"Low balance: ${balance:.2f}"
return True, "OK"
async def open_scalp(self, signal: dict) -> bool:
"""Open a scalp trade from scanner signal."""
symbol = signal["symbol"]
side = signal["side"]
can, reason = self.can_open(symbol)
if not can:
logger.info(f"Scalp skip {symbol}: {reason}")
return False
# Execute on Binance
result = self.trader.open_position(symbol, side, SCALP_SIZE_USDT, SCALP_LEVERAGE)
if not result:
await self.notify(f"β Scalp: Π½Π΅ ΡΠ΄Π°Π»ΠΎΡΡ ΠΎΡΠΊΡΡΡΡ {side} {symbol}")
return False
entry = result["fill_price"]
qty = result["quantity"]
# Calculate TP/SL
if side == "BUY":
tp = entry * (1 + SCALP_TP_PCT / 100)
sl = entry * (1 - SCALP_SL_PCT / 100)
else:
tp = entry * (1 - SCALP_TP_PCT / 100)
sl = entry * (1 + SCALP_SL_PCT / 100)
now = datetime.now(VANCOUVER_TZ)
trade_id = f"QT_{symbol}_{now.strftime('%Y%m%d_%H%M%S')}"
pos = ScalpPosition(
symbol=symbol,
side=side,
entry_price=entry,
quantity=qty,
tp_price=tp,
sl_price=sl,
opened_at=now,
trade_id=trade_id,
signal_data=signal,
)
# Place TP/SL on Binance (exchange-side execution)
if SCALP_USE_EXCHANGE_ORDERS:
orders = self.order_mgr.place_tp_sl_orders(
symbol, side, sl_price=sl, sl_quantity=qty,
tp_levels=[(tp, qty)],
)
pos.tp_order_id = orders["tp_order_ids"][0] if orders["tp_order_ids"] else None
pos.sl_order_id = orders["sl_order_id"]
pos.use_exchange_orders = bool(pos.tp_order_id and pos.sl_order_id)
if pos.use_exchange_orders:
logger.info(f"Scalp {symbol}: exchange orders placed (TP #{pos.tp_order_id}, SL #{pos.sl_order_id})")
else:
logger.warning(f"Scalp {symbol}: exchange orders partially failed, using polling fallback")
self.positions[symbol] = pos
log_event("SCALP_ENTRY", {
"trade_id": trade_id,
"symbol": symbol,
"side": side,
"entry_price": entry,
"quantity": qty,
"leverage": SCALP_LEVERAGE,
"tp_price": tp,
"sl_price": sl,
"rsi": signal.get("rsi"),
"volume_ratio": signal.get("volume_ratio"),
"bb_bandwidth_pct": signal.get("bb_bandwidth_pct"),
})
direction = "LONG" if side == "BUY" else "SHORT"
notional = qty * entry
msg = (
f"β‘ SCALP {direction} {symbol}\n"
f"ββββββββββββββββββββ\n"
f"π° Entry: ${entry:.6f}\n"
f"π Size: {qty} ({SCALP_LEVERAGE}x, ${notional:.2f})\n"
f"π― TP: ${tp:.6f} (+{SCALP_TP_PCT}%)\n"
f"π SL: ${sl:.6f} (-{SCALP_SL_PCT}%)\n"
f"β± Time stop: {SCALP_TIME_STOP_MIN}ΠΌΠΈΠ½\n"
f"π RSI={signal.get('rsi', '?')} Vol={signal.get('volume_ratio', '?')}x\n"
f"ββββββββββββββββββββ"
)
await self.notify(msg)
logger.info(f"Scalp opened: {direction} {symbol} @ {entry}")
# TMM journal: tag trade
if self.tmm:
self.tmm.on_trade_opened(symbol, side, "SCALP",
signal_info=f"[Scalp] {direction} RSI+BB+Vol+EMA")
return True
async def _close_position(self, pos: ScalpPosition, reason: str, current_price: float):
"""Close a scalp position and record result."""
result = self.trader.close_full(pos.symbol, pos.side)
# Use actual fill price from Binance instead of mark price
fill_price = current_price # fallback
if result and result.get("fill_price"):
fill_price = result["fill_price"]
logger.info(f"Scalp close {pos.symbol}: mark=${current_price:.6f} fill=${fill_price:.6f} slip={((fill_price - current_price) / current_price * 100):+.3f}%")
# PnL based on actual fill price
if pos.side == "BUY":
pnl_pct = ((fill_price - pos.entry_price) / pos.entry_price) * 100
else:
pnl_pct = ((pos.entry_price - fill_price) / pos.entry_price) * 100
pnl_usdt = pos.quantity * abs(fill_price - pos.entry_price)
if pnl_pct < 0:
pnl_usdt = -pnl_usdt
# Deduct fees (open + close)
open_fee = pos.quantity * pos.entry_price * TAKER_FEE_PCT / 100
close_fee = pos.quantity * fill_price * TAKER_FEE_PCT / 100
pnl_usdt -= (open_fee + close_fee)
# Update stats
if reason == "TP":
self.stats["wins"] += 1
emoji = "π―"
elif reason == "SL":
self.stats["losses"] += 1
emoji = "π"
else: # TIME_STOP
if pnl_pct >= 0:
self.stats["wins"] += 1
else:
self.stats["losses"] += 1
self.stats["time_stops"] += 1
emoji = "β±"
self.stats["total_pnl"] += pnl_usdt
age = pos.age_minutes
log_event(f"SCALP_{reason}", {
"trade_id": pos.trade_id,
"symbol": pos.symbol,
"side": pos.side,
"entry_price": pos.entry_price,
"exit_price": fill_price,
"mark_price": current_price,
"slippage_pct": round((fill_price - current_price) / current_price * 100, 3) if current_price else 0,
"pnl_pct": round(pnl_pct, 2),
"pnl_usdt": round(pnl_usdt, 4),
"age_minutes": round(age, 1),
"fees": round(open_fee + close_fee, 4),
})
total = self.stats["wins"] + self.stats["losses"]
wr = (self.stats["wins"] / total * 100) if total > 0 else 0
msg = (
f"{emoji} SCALP {reason}: {pos.symbol}\n"
f"ββββββββββββββββββββ\n"
f"Entry: ${pos.entry_price:.6f}\n"
f"Exit: ${fill_price:.6f} ({pnl_pct:+.2f}%)\n"
f"{'β οΈ Slip: ' + f'{((fill_price - current_price) / current_price * 100):+.2f}%' + chr(10) if abs(fill_price - current_price) / current_price > 0.001 else ''}"
f"π΅ PnL: ${pnl_usdt:+.4f}\n"
f"β± {age:.0f}ΠΌΠΈΠ½\n"
f"π WR: {wr:.0f}% ({self.stats['wins']}W/{self.stats['losses']}L)\n"
f"π° Total: ${self.stats['total_pnl']:+.2f}\n"
f"ββββββββββββββββββββ"
)
await self.notify(msg)
del self.positions[pos.symbol]
self.cooldowns[pos.symbol] = datetime.now(VANCOUVER_TZ) + timedelta(minutes=SCALP_COOLDOWN_MIN)
logger.info(f"Scalp {reason}: {pos.symbol} {pnl_pct:+.2f}% ${pnl_usdt:+.4f} ({age:.0f}min) [cd {SCALP_COOLDOWN_MIN}m]")
async def _handle_exchange_tp_fill(self, pos: ScalpPosition, fill_price: float, mark_price: float):
"""Handle TP filled by exchange-side LIMIT order."""
if pos.side == "BUY":
pnl_pct = ((fill_price - pos.entry_price) / pos.entry_price) * 100
else:
pnl_pct = ((pos.entry_price - fill_price) / pos.entry_price) * 100
pnl_usdt = pos.quantity * abs(fill_price - pos.entry_price)
if pnl_pct < 0:
pnl_usdt = -pnl_usdt
# TP limit = maker fee (0.02%), open was taker (0.04%)
open_fee = pos.quantity * pos.entry_price * TAKER_FEE_PCT / 100
close_fee = pos.quantity * fill_price * MAKER_FEE_PCT / 100
pnl_usdt -= (open_fee + close_fee)
self.stats["wins"] += 1
self.stats["total_pnl"] += pnl_usdt
log_event("SCALP_TP", {
"trade_id": pos.trade_id,
"symbol": pos.symbol,
"side": pos.side,
"entry_price": pos.entry_price,
"exit_price": fill_price,
"mark_price": mark_price,
"pnl_pct": round(pnl_pct, 2),
"pnl_usdt": round(pnl_usdt, 4),
"age_minutes": round(pos.age_minutes, 1),
"fees": round(open_fee + close_fee, 4),
"order_type": "EXCHANGE_LIMIT",
})
total = self.stats["wins"] + self.stats["losses"]
wr = (self.stats["wins"] / total * 100) if total > 0 else 0
msg = (
f"π― SCALP TP: {pos.symbol}\n"
f"ββββββββββββββββββββ\n"
f"Entry: ${pos.entry_price:.6f}\n"
f"Exit: ${fill_price:.6f} ({pnl_pct:+.2f}%) [LIMIT]\n"
f"π΅ PnL: ${pnl_usdt:+.4f}\n"
f"β± {pos.age_minutes:.0f}ΠΌΠΈΠ½\n"
f"π WR: {wr:.0f}% ({self.stats['wins']}W/{self.stats['losses']}L)\n"
f"π° Total: ${self.stats['total_pnl']:+.2f}\n"
f"ββββββββββββββββββββ"
)
await self.notify(msg)
del self.positions[pos.symbol]
self.cooldowns[pos.symbol] = datetime.now(VANCOUVER_TZ) + timedelta(minutes=SCALP_COOLDOWN_MIN)
logger.info(f"Scalp TP (exchange): {pos.symbol} {pnl_pct:+.2f}% ${pnl_usdt:+.4f} [cd {SCALP_COOLDOWN_MIN}m]")
async def _handle_exchange_sl_fill(self, pos: ScalpPosition, fill_price: float, mark_price: float):
"""Handle SL filled by STOP_MARKET algo order (detected via position gone)."""
if pos.side == "BUY":
pnl_pct = ((fill_price - pos.entry_price) / pos.entry_price) * 100
else:
pnl_pct = ((pos.entry_price - fill_price) / pos.entry_price) * 100
pnl_usdt = pos.quantity * abs(fill_price - pos.entry_price)
if pnl_pct < 0:
pnl_usdt = -pnl_usdt
# SL is STOP_MARKET = taker fee both sides
open_fee = pos.quantity * pos.entry_price * TAKER_FEE_PCT / 100
close_fee = pos.quantity * fill_price * TAKER_FEE_PCT / 100
pnl_usdt -= (open_fee + close_fee)
self.stats["losses"] += 1
self.stats["total_pnl"] += pnl_usdt
log_event("SCALP_SL", {
"trade_id": pos.trade_id,
"symbol": pos.symbol,
"side": pos.side,
"entry_price": pos.entry_price,
"exit_price": fill_price,
"mark_price": mark_price,
"pnl_pct": round(pnl_pct, 2),
"pnl_usdt": round(pnl_usdt, 4),
"age_minutes": round(pos.age_minutes, 1),
"fees": round(open_fee + close_fee, 4),
"order_type": "EXCHANGE_STOP",
})
total = self.stats["wins"] + self.stats["losses"]
wr = (self.stats["wins"] / total * 100) if total > 0 else 0
msg = (
f"π SCALP SL: {pos.symbol}\n"
f"ββββββββββββββββββββ\n"
f"Entry: ${pos.entry_price:.6f}\n"
f"Exit: ${fill_price:.6f} ({pnl_pct:+.2f}%) [STOP]\n"
f"π΅ PnL: ${pnl_usdt:+.4f}\n"
f"β± {pos.age_minutes:.0f}ΠΌΠΈΠ½\n"
f"π WR: {wr:.0f}% ({self.stats['wins']}W/{self.stats['losses']}L)\n"
f"π° Total: ${self.stats['total_pnl']:+.2f}\n"
f"ββββββββββββββββββββ"
)
await self.notify(msg)
del self.positions[pos.symbol]
self.cooldowns[pos.symbol] = datetime.now(VANCOUVER_TZ) + timedelta(minutes=SCALP_COOLDOWN_MIN)
logger.info(f"Scalp SL (exchange): {pos.symbol} {pnl_pct:+.2f}% ${pnl_usdt:+.4f} [cd {SCALP_COOLDOWN_MIN}m]")
async def check_position(self, pos: ScalpPosition):
"""Check single position: exchange order fills β TP β SL β Time stop."""
price = self.trader.get_mark_price(pos.symbol)
if not price:
return
# === Exchange-side order mode ===
# TP = regular LIMIT (queryable), SL = STOP_MARKET algo (not queryable)
# Detection: TP filled? β win. Position gone + TP not filled? β SL fired.
if pos.use_exchange_orders:
# 0. Detect externally cancelled orders β re-place
if pos.tp_order_id:
tp_status = self.trader.get_order_status(pos.symbol, pos.tp_order_id)
if tp_status and tp_status["status"] in ("CANCELED", "CANCELLED", "EXPIRED", "REJECTED"):
logger.warning(f"Scalp {pos.symbol}: orders cancelled externally, re-placing")
remaining_tps = [(pos.tp_price, pos.quantity)]
orders = self.order_mgr.replace_sl_and_tps(
pos.symbol, pos.side, pos.sl_price, pos.quantity, remaining_tps
)
pos.sl_order_id = orders["sl_order_id"]
pos.tp_order_id = orders["tp_order_ids"][0] if orders["tp_order_ids"] else None
await self.notify(f"π Scalp {pos.symbol}: ΠΎΡΠ΄Π΅ΡΠ° ΠΏΠ΅ΡΠ΅ΡΡΠ°Π²Π»Π΅Π½Ρ (Π±ΡΠ»ΠΈ ΠΎΡΠΌΠ΅Π½Π΅Π½Ρ)")
logger.info(f"Scalp {pos.symbol}: re-placed SL + TP")
return
# 1. Check TP order status (regular LIMIT β queryable)
if pos.tp_order_id:
tp_status = self.trader.get_order_status(pos.symbol, pos.tp_order_id)
if tp_status and tp_status["status"] == "FILLED":
fp = tp_status["fill_price"] if tp_status["fill_price"] > 0 else pos.tp_price
# Cancel SL algo order via cancel_all
self.order_mgr.cancel_all_for_symbol(pos.symbol)
await self._handle_exchange_tp_fill(pos, fp, price)
return
# 2. Breakeven: move SL to entry once unrealized profit >= trigger
if not pos.moved_to_be and SCALP_BE_TRIGGER_PCT > 0:
if pos.side == "BUY":
unreal_pct = ((price - pos.entry_price) / pos.entry_price) * 100
else:
unreal_pct = ((pos.entry_price - price) / pos.entry_price) * 100
if unreal_pct >= SCALP_BE_TRIGGER_PCT:
new_sl = pos.entry_price
remaining_tps = [(pos.tp_price, pos.quantity)]
try:
orders = self.order_mgr.replace_sl_and_tps(
pos.symbol, pos.side, new_sl, pos.quantity, remaining_tps
)
pos.sl_price = new_sl
pos.sl_order_id = orders["sl_order_id"]
pos.tp_order_id = orders["tp_order_ids"][0] if orders["tp_order_ids"] else pos.tp_order_id
pos.moved_to_be = True
logger.info(f"Scalp {pos.symbol}: SL moved to BE @ {new_sl} (was +{unreal_pct:.2f}%)")
await self.notify(f"π Scalp {pos.symbol}: SL β breakeven (profit +{unreal_pct:.1f}%)")
except Exception as e:
logger.error(f"Scalp {pos.symbol}: failed to move SL to BE: {e}")
# 3. Time stop: cancel all orders, close market
if pos.age_minutes >= SCALP_TIME_STOP_MIN:
self.order_mgr.cancel_all_for_symbol(pos.symbol)
await self._close_position(pos, "TIME_STOP", price)
return
# 3. Position gone? Either SL fired or manual close
binance_pos = self.trader.get_position(pos.symbol)
if not binance_pos:
# Clean up any remaining orders
self.order_mgr.cancel_all_for_symbol(pos.symbol)
# Check if TP was filled (might have been filled between checks)
if pos.tp_order_id:
tp_status = self.trader.get_order_status(pos.symbol, pos.tp_order_id)
if tp_status and tp_status["status"] == "FILLED":
fp = tp_status["fill_price"] if tp_status["fill_price"] > 0 else pos.tp_price
await self._handle_exchange_tp_fill(pos, fp, price)
return
# TP not filled β SL fired (STOP_MARKET executed on Binance)
await self._handle_exchange_sl_fill(pos, price, price)
return
return
# === Polling fallback mode (original logic) ===
# Safety: detect if position was closed externally (manually on exchange)
binance_pos = self.trader.get_position(pos.symbol)
if not binance_pos:
if pos.side == "BUY":
pnl_pct = ((price - pos.entry_price) / pos.entry_price) * 100
else:
pnl_pct = ((pos.entry_price - price) / pos.entry_price) * 100
logger.warning(f"Scalp {pos.symbol} no longer on Binance β closed externally")
log_event("SCALP_MANUAL", {
"trade_id": pos.trade_id,
"symbol": pos.symbol,
"side": pos.side,
"entry_price": pos.entry_price,
"exit_price": price,
"pnl_pct": round(pnl_pct, 2),
"pnl_usdt": 0, # unknown exact PnL
"age_minutes": round(pos.age_minutes, 1),
"note": "closed_externally_on_exchange",
})
await self.notify(
f"π Scalp {pos.symbol} Π·Π°ΠΊΡΡΡΠ° Π½Π° Π±ΠΈΡΠΆΠ΅ Π²ΡΡΡΠ½ΡΡ\n"
f"Π£Π±ΠΈΡΠ°Ρ ΠΈΠ· ΠΌΠΎΠ½ΠΈΡΠΎΡΠΈΠ½Π³Π°.\n"
f"ΠΠΎΡΠ»Π΅Π΄Π½ΡΡ ΡΠ΅Π½Π°: ${price:.6f} ({pnl_pct:+.2f}%)"
)
del self.positions[pos.symbol]
self.cooldowns[pos.symbol] = datetime.now(VANCOUVER_TZ) + timedelta(minutes=SCALP_COOLDOWN_MIN)
return
is_long = pos.side == "BUY"
# 1. Check TP
tp_hit = (price >= pos.tp_price) if is_long else (price <= pos.tp_price)
if tp_hit:
await self._close_position(pos, "TP", price)
return
# 2. Breakeven (polling mode)
if not pos.moved_to_be and SCALP_BE_TRIGGER_PCT > 0:
unreal_pct = ((price - pos.entry_price) / pos.entry_price * 100) if is_long else ((pos.entry_price - price) / pos.entry_price * 100)
if unreal_pct >= SCALP_BE_TRIGGER_PCT:
pos.sl_price = pos.entry_price
pos.moved_to_be = True
logger.info(f"Scalp {pos.symbol}: SL moved to BE @ {pos.entry_price} [polling] (+{unreal_pct:.2f}%)")
# 3. Check SL
sl_hit = (price <= pos.sl_price) if is_long else (price >= pos.sl_price)
if sl_hit:
await self._close_position(pos, "SL", price)
return
# 4. Time stop
if pos.age_minutes >= SCALP_TIME_STOP_MIN:
await self._close_position(pos, "TIME_STOP", price)
return
async def monitor_loop(self):
"""Check all scalp positions every N seconds."""
logger.info(f"Scalp monitor started (TP={SCALP_TP_PCT}%, SL={SCALP_SL_PCT}%, TimeStop={SCALP_TIME_STOP_MIN}min, BE@+{SCALP_BE_TRIGGER_PCT}%, cd={SCALP_COOLDOWN_MIN}m)")
while True:
try:
symbols = list(self.positions.keys())
for symbol in symbols:
pos = self.positions.get(symbol)
if pos:
await self.check_position(pos)
except Exception as e:
logger.error(f"Scalp monitor error: {e}", exc_info=True)
await asyncio.sleep(SCALP_CHECK_INTERVAL)
async def scan_loop(self, wt_positions: dict):
"""
Periodic market scan for Quick Take entries.
Args:
wt_positions: dict of WT strategy positions (to avoid conflicts)
"""
from scalp_scanner import scan_market
logger.info(f"Scalp scanner started (interval={SCALP_SCAN_INTERVAL}s, top 40 pairs)")
# Wait a bit before first scan (let bot initialize)
await asyncio.sleep(10)
while True:
try:
# Skip symbols we already have positions in (both scalp and WT)
skip = set(self.positions.keys()) | set(wt_positions.keys())
async def on_signal(signal):
await self.open_scalp(signal)
await scan_market(on_signal, skip_symbols=skip)
except Exception as e:
logger.error(f"Scalp scan error: {e}", exc_info=True)
await asyncio.sleep(SCALP_SCAN_INTERVAL)
def format_positions_message(self) -> str:
"""Format scalp positions for Telegram."""
if not self.positions:
return "β‘ ΠΠ΅Ρ ΡΠΊΠ°Π»ΡΠΏ-ΠΏΠΎΠ·ΠΈΡΠΈΠΉ"
lines = ["β‘ Scalp ΠΏΠΎΠ·ΠΈΡΠΈΠΈ:\nββββββββββββββββββββ"]
for symbol, pos in self.positions.items():
price = self.trader.get_mark_price(symbol) or pos.entry_price
if pos.side == "BUY":
pnl_pct = ((price - pos.entry_price) / pos.entry_price) * 100
else:
pnl_pct = ((pos.entry_price - price) / pos.entry_price) * 100
direction = "L" if pos.side == "BUY" else "S"
emoji = "π’" if pnl_pct >= 0 else "π΄"
age = pos.age_minutes
lines.append(
f"{emoji} {direction} {symbol} | {pnl_pct:+.2f}% | "
f"${price:.6f} | {age:.0f}ΠΌΠΈΠ½/{SCALP_TIME_STOP_MIN}"
)
total = self.stats["wins"] + self.stats["losses"]
wr = (self.stats["wins"] / total * 100) if total > 0 else 0
lines.append(f"\nπ WR: {wr:.0f}% ({self.stats['wins']}W/{self.stats['losses']}L)")
lines.append(f"π° PnL: ${self.stats['total_pnl']:+.2f}")
lines.append("ββββββββββββββββββββ")
return "\n".join(lines)
async def close_manual(self, symbol: str) -> bool:
"""Manual close from /scalp_close command."""
if symbol not in self.positions:
return False
pos = self.positions[symbol]
price = self.trader.get_mark_price(symbol)
if not price:
return False
# Cancel exchange orders before closing
if pos.use_exchange_orders:
self.order_mgr.cancel_all_for_symbol(symbol)
await self._close_position(pos, "MANUAL", price)
return True