"""
Telegram Bot β alerts and commands for Bybit trading bot.
"""
import asyncio
import logging
import time
from telegram import Update, Bot
from telegram.ext import Application, CommandHandler, ContextTypes
from src.config import TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID
from src.exchange.client import BybitFuturesClient
from src.core.position import PositionTracker
from src.core.trade_log import calculate_strategy_summary
from src.bot.formatters import format_balance
logger = logging.getLogger(__name__)
class TelegramBot:
"""Telegram bot for trading alerts and commands."""
def __init__(
self,
client: BybitFuturesClient,
tracker: PositionTracker,
strategies: list = None,
):
self.client = client
self.tracker = tracker
self.strategies = strategies or []
self.app: Application | None = None
self.bot: Bot | None = None
self._paused = False
async def start(self):
"""Initialize and start the Telegram bot."""
if not TELEGRAM_BOT_TOKEN:
logger.warning("TELEGRAM_BOT_TOKEN not set, bot disabled")
return
self.app = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
# Register commands
self.app.add_handler(CommandHandler("start", self._cmd_start))
self.app.add_handler(CommandHandler("status", self._cmd_status))
self.app.add_handler(CommandHandler("balance", self._cmd_balance))
self.app.add_handler(CommandHandler("positions", self._cmd_positions))
self.app.add_handler(CommandHandler("deals", self._cmd_deals))
self.app.add_handler(CommandHandler("pnl", self._cmd_pnl))
self.app.add_handler(CommandHandler("close", self._cmd_close))
self.app.add_handler(CommandHandler("config", self._cmd_config))
self.app.add_handler(CommandHandler("pause", self._cmd_pause))
self.app.add_handler(CommandHandler("resume", self._cmd_resume))
self.app.add_handler(CommandHandler("help", self._cmd_help))
await self.app.initialize()
await self.app.start()
await self.app.updater.start_polling(drop_pending_updates=True)
self.bot = self.app.bot
logger.info("Telegram bot started")
async def stop(self):
"""Stop the Telegram bot."""
if self.app:
await self.app.updater.stop()
await self.app.stop()
await self.app.shutdown()
logger.info("Telegram bot stopped")
async def send_message(self, text: str):
"""Send a message to the owner."""
if not self.bot or not TELEGRAM_CHAT_ID:
logger.info(f"[TG disabled] {text}")
return
try:
await self.bot.send_message(
chat_id=TELEGRAM_CHAT_ID,
text=text,
parse_mode=None,
)
except Exception as e:
logger.error(f"Failed to send Telegram message: {e}")
@property
def is_paused(self) -> bool:
return self._paused
# ββ Commands βββββββββββββββββββββββββββββββββββββββββββββ
def _is_owner(self, update: Update) -> bool:
return update.effective_user and update.effective_user.id == TELEGRAM_CHAT_ID
async def _cmd_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
if not self._is_owner(update):
return
mode = "TESTNET" if self.client.testnet else "MAINNET"
strats = ", ".join(s.name for s in self.strategies) if self.strategies else "none"
await update.message.reply_text(
f"π€ Bybit Trading Bot\n"
f"ββββββββββββββββββββ\n"
f"Mode: {mode}\n"
f"Strategies: {strats}\n"
f"Status: {'βΈ PAUSED' if self._paused else 'β
Active'}\n"
f"ββββββββββββββββββββ\n"
f"/help for commands"
)
async def _cmd_status(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
if not self._is_owner(update):
return
balance = self.client.get_account_balance()
equity = self.client.get_total_equity()
pos_count = self.tracker.count()
mode = "TESTNET" if self.client.testnet else "MAINNET"
lines = [
f"π Status ({mode})",
f"ββββββββββββββββββββ",
f"π° Balance: ${balance:.2f}",
f"π Equity: ${equity:.2f}",
f"π Open positions: {pos_count}",
f"Status: {'βΈ PAUSED' if self._paused else 'β
Active'}",
]
# Strategy summaries
for strat in self.strategies:
s = calculate_strategy_summary(strat.event_prefix)
if s["total_entries"] > 0:
lines.append(
f"\n{strat.name}: {s['wins']}W/{s['losses']}L "
f"({s['win_rate']}%) ${s['total_pnl_usdt']:+.2f}"
)
lines.append("ββββββββββββββββββββ")
await update.message.reply_text("\n".join(lines))
async def _cmd_balance(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
if not self._is_owner(update):
return
balance = self.client.get_account_balance()
equity = self.client.get_total_equity()
await update.message.reply_text(format_balance(balance, equity))
async def _cmd_positions(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
if not self._is_owner(update):
return
if not self.tracker.all():
await update.message.reply_text("π No open positions")
return
parts = []
for strat in self.strategies:
msg = strat.format_positions()
if "No " not in msg:
parts.append(msg)
if not parts:
await update.message.reply_text("π No open positions")
else:
await update.message.reply_text("\n\n".join(parts))
async def _cmd_pnl(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
if not self._is_owner(update):
return
args = context.args
period = args[0] if args else "all"
lines = [f"π PnL ({period})", "ββββββββββββββββββββ"]
total_pnl = 0
for strat in self.strategies:
s = calculate_strategy_summary(strat.event_prefix, period)
total_pnl += s["total_pnl_usdt"]
lines.append(
f"{strat.name}: {s['wins']}W/{s['losses']}L "
f"({s['win_rate']}%) ${s['total_pnl_usdt']:+.2f}"
)
lines.append(f"\nπ° Total: ${total_pnl:+.2f}")
lines.append("ββββββββββββββββββββ")
await update.message.reply_text("\n".join(lines))
async def _cmd_pause(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
if not self._is_owner(update):
return
self._paused = True
await update.message.reply_text("βΈ Bot paused β no new trades will be opened")
async def _cmd_resume(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
if not self._is_owner(update):
return
self._paused = False
await update.message.reply_text("βΆοΈ Bot resumed β scanning for trades")
async def _cmd_deals(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Detailed DCA deals with entry, avg, TP, SL, PnL$, MaxDD."""
if not self._is_owner(update):
return
dca = self.strategies[0] if self.strategies else None
if not dca or not hasattr(dca, 'deals') or not dca.deals:
await update.message.reply_text("π No active DCA deals")
return
from src.strategies.dca_zvwap import DCA_MAX_SAFETY_ORDERS, DCA_TAKE_PROFIT_PCT, DCA_STOP_LOSS_PCT
lines = ["π DCA Deals (detailed)", "ββββββββββββββββββββ"]
for symbol, deal in dca.deals.items():
price = dca.client.get_mark_price(symbol) or deal.avg_entry
avg = deal.avg_entry
if deal.side == "BUY":
pnl_pct = (price - avg) / avg * 100
pnl_usd = deal.total_qty * (price - avg)
else:
pnl_pct = (avg - price) / avg * 100
pnl_usd = deal.total_qty * (avg - price)
emoji = "π’" if pnl_pct >= 0 else "π΄"
d = "LONG" if deal.side == "BUY" else "SHORT"
z = dca._last_z.get(symbol, 0)
dur = (time.time() - deal.created_at) / 60
lines.append(
f"\n{emoji} {d} {symbol}\n"
f" Entry: ${deal.fills[0]['price']:.6f}\n"
f" Avg: ${avg:.6f} | Now: ${price:.6f}\n"
f" PnL: ${pnl_usd:+.4f} ({pnl_pct:+.2f}%)\n"
f" TP: ${deal.get_tp_price():.6f} | SL: ${deal.get_sl_price():.6f}\n"
f" SOs: {deal.so_filled_count}/{DCA_MAX_SAFETY_ORDERS} | "
f"Invested: ${deal.total_invested_usd:.1f}\n"
f" MaxDD: {deal.max_dd_pct:+.1f}% | Z: {z:.2f} | {dur:.0f}min"
)
lines.append("\nββββββββββββββββββββ")
await update.message.reply_text("\n".join(lines))
async def _cmd_close(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Manually close a DCA deal: /close REDUSDT"""
if not self._is_owner(update):
return
args = context.args
if not args:
await update.message.reply_text("Usage: /close REDUSDT")
return
symbol = args[0].upper()
if not symbol.endswith("USDT"):
symbol += "USDT"
dca = self.strategies[0] if self.strategies else None
if not dca or not hasattr(dca, 'deals'):
await update.message.reply_text("β No strategy running")
return
deal = dca.deals.get(symbol)
if not deal:
await update.message.reply_text(f"β No active deal for {symbol}")
return
try:
price = dca.client.get_mark_price(symbol) or deal.avg_entry
pnl = dca._estimate_pnl(deal, price)
await dca._close_deal(deal, price, "manual", pnl)
if symbol in dca.deals:
del dca.deals[symbol]
await update.message.reply_text(
f"β
Closed {symbol} | PnL: ${pnl:+.4f}"
)
except Exception as e:
await update.message.reply_text(f"β Error closing {symbol}: {e}")
async def _cmd_config(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Show current DCA config."""
if not self._is_owner(update):
return
from src.strategies.dca_zvwap import (
DCA_BASE_ORDER_USD, DCA_SAFETY_ORDER_USD, DCA_MAX_SAFETY_ORDERS,
DCA_NATR_FACTOR, DCA_STEP_SCALE, DCA_VOLUME_SCALE,
DCA_TAKE_PROFIT_PCT, DCA_STOP_LOSS_PCT, DCA_LEVERAGE,
DCA_COOLDOWN_SEC, SCREENER_MAX_DEALS, SCREENER_MIN_VOLUME_24H,
ZVWAP_ENTRY_THRESHOLD, MIN_NATR_5M_PCT,
)
await update.message.reply_text(
"βοΈ DCA Config\n"
"ββββββββββββββββββββ\n"
f"BO: ${DCA_BASE_ORDER_USD} | SO: ${DCA_SAFETY_ORDER_USD}\n"
f"Max SOs: {DCA_MAX_SAFETY_ORDERS} | Lev: {DCA_LEVERAGE}x\n"
f"SO spacing: NATRΓ{DCA_NATR_FACTOR}Γ{DCA_STEP_SCALE}^n\n"
f"Vol scale: {DCA_VOLUME_SCALE}x\n"
f"TP: {DCA_TAKE_PROFIT_PCT}% | SL: {DCA_STOP_LOSS_PCT}%\n"
f"Max deals: {SCREENER_MAX_DEALS}\n"
f"Z threshold: Β±{ZVWAP_ENTRY_THRESHOLD}\n"
f"Min NATR: {MIN_NATR_5M_PCT}%\n"
f"Min volume: ${SCREENER_MIN_VOLUME_24H/1e6:.0f}M\n"
f"Cooldown: {DCA_COOLDOWN_SEC}s\n"
"ββββββββββββββββββββ"
)
async def _cmd_help(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
if not self._is_owner(update):
return
await update.message.reply_text(
"π€ Bybit Bot Commands\n"
"ββββββββββββββββββββ\n"
"/status β overview + PnL\n"
"/balance β wallet balance\n"
"/positions β open positions (compact)\n"
"/deals β detailed deals (entry, avg, PnL$, MaxDD)\n"
"/pnl [today|week|all] β PnL stats\n"
"/close <SYMBOL> β manually close a deal\n"
"/config β current bot config\n"
"/pause β stop opening new trades\n"
"/resume β resume trading\n"
"/help β this message\n"
"ββββββββββββββββββββ"
)