"""
OF-Trader β Telegram Bot
========================
Reuses grid-bot's Telegram bot (its own token, NOT Bender). Sends open/close/skip
notifications and exposes read + control commands. /kill and /resume toggle the
kill-switch file the RiskGate honours.
"""
import logging
import os
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes
from src.config import (
TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID, DRY_RUN, BINANCE_TESTNET,
OF_TP_PCT, OF_SL_PCT, OF_HOLD_MAX_MIN, MAX_CONCURRENT_POSITIONS,
DAILY_LOSS_LIMIT_USD, KILL_SWITCH_FILE,
)
logger = logging.getLogger("telegram")
def format_event(ev):
"""Pure formatter (no telegram dep) β testable in isolation."""
typ = ev.get("type")
sym = ev.get("symbol")
tag = "π§ͺDRY" if ev.get("dry") else "π΄LIVE"
if typ == "open":
return (f"π’ *OPEN* {tag} {ev['direction']} {sym} #{ev['id']}\n"
f"entryβ{ev['entry']} | TP={ev['tp']} SL={ev['sl']}\n"
f"qty={ev['qty']} (${ev.get('notional', 0):.0f})")
if typ == "close":
pnl = ev.get("pnl", 0.0)
emoji = "π’" if pnl >= 0 else "π΄"
reason_map = {
"TP": "β
take-profit", "SL": "π stop-loss",
"time_stop": "β± time-stop", "bracket": "bracket",
}
reason = reason_map.get(ev.get("reason"), ev.get("reason"))
pnl_line = "" if ev.get("dry") else f" | PnL ${pnl:+.2f}"
return f"{emoji} *CLOSE* {tag} {sym} #{ev['id']} β {reason}{pnl_line}"
if typ == "skip":
return f"βͺ skip {sym} #{ev.get('id')} β {ev.get('reason')}"
return f"βΉοΈ {ev}"
class TelegramBot:
def __init__(self, trader=None):
self.trader = trader # OFTrader (gives manager + risk)
self.app = None
self._chat_id = TELEGRAM_CHAT_ID
async def start(self):
if not TELEGRAM_BOT_TOKEN:
logger.warning("Telegram disabled (no token)")
return
self.app = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
for cmd, h in [
("start", self._cmd_status), ("help", self._cmd_help),
("status", self._cmd_status), ("positions", self._cmd_positions),
("pnl", self._cmd_pnl), ("kill", self._cmd_kill), ("resume", self._cmd_resume),
]:
self.app.add_handler(CommandHandler(cmd, h))
await self.app.initialize()
await self.app.start()
await self.app.updater.start_polling(drop_pending_updates=True)
logger.info("Telegram bot started")
async def stop(self):
if self.app:
await self.app.updater.stop()
await self.app.stop()
await self.app.shutdown()
async def send_message(self, text):
if not self.app or not self._chat_id:
return
try:
await self.app.bot.send_message(chat_id=self._chat_id, text=text, parse_mode="Markdown")
except Exception as e:
logger.error(f"TG send error: {e}")
async def send_event(self, ev):
await self.send_message(format_event(ev))
# ---- commands ----
def _mode(self):
m = "DRY-RUN" if DRY_RUN else "LIVE"
net = "testnet" if BINANCE_TESTNET else "mainnet"
return f"{m} ({net})"
async def _cmd_status(self, update: Update, ctx: ContextTypes.DEFAULT_TYPE):
mgr = self.trader.manager if self.trader else None
risk = self.trader.risk if self.trader else None
killed = "β KILLED" if os.path.exists(KILL_SWITCH_FILE) else "β
armed"
n = len(mgr.positions) if mgr else 0
dl = risk.daily_loss_usd if risk else 0
await update.message.reply_text(
f"π€ *OF-Trader* β {self._mode()}\n"
f"State: {killed}\n"
f"Open: {n}/{MAX_CONCURRENT_POSITIONS}\n"
f"Daily loss: ${dl:.2f}/${DAILY_LOSS_LIMIT_USD:.0f}\n"
f"Bracket: TP{OF_TP_PCT}% SL{OF_SL_PCT}% hold{OF_HOLD_MAX_MIN}m",
parse_mode="Markdown",
)
async def _cmd_positions(self, update: Update, ctx: ContextTypes.DEFAULT_TYPE):
mgr = self.trader.manager if self.trader else None
if not mgr or not mgr.positions:
await update.message.reply_text("No open positions")
return
lines = [f"{p['direction']} {s} #{p['id']} entry={p['entry']} TP={p['tp']} SL={p['sl']}"
for s, p in mgr.positions.items()]
await update.message.reply_text("π *Positions*\n" + "\n".join(lines), parse_mode="Markdown")
async def _cmd_pnl(self, update: Update, ctx: ContextTypes.DEFAULT_TYPE):
risk = self.trader.risk if self.trader else None
dl = risk.daily_loss_usd if risk else 0
await update.message.reply_text(f"Daily loss: ${dl:.2f}/${DAILY_LOSS_LIMIT_USD:.0f}")
async def _cmd_kill(self, update: Update, ctx: ContextTypes.DEFAULT_TYPE):
os.makedirs(os.path.dirname(KILL_SWITCH_FILE), exist_ok=True)
open(KILL_SWITCH_FILE, "w").close()
await update.message.reply_text("β KILL switch ON β no new positions will open")
async def _cmd_resume(self, update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if os.path.exists(KILL_SWITCH_FILE):
os.remove(KILL_SWITCH_FILE)
await update.message.reply_text("β
KILL switch OFF β trading armed")
async def _cmd_help(self, update: Update, ctx: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text("/status /positions /pnl /kill /resume")
π Git History
12371c2feat(of-trader): show real close reason (TP/SL/time-stop) instead of 'bracket'4 weeks ago
793f4d9feat(of-trader): telegram notifications + control (chunk 3a)4 weeks ago
120af73chore: archive grid-v3, scaffold of-trader from grid infra4 weeks ago
Show last diff
Loading...