"""
Bybit Trading Bot — Entry Point.
Initializes exchange client, Telegram bot, and strategy loops.
Strategies are pluggable: add them to STRATEGIES list below.
"""
import asyncio
import logging
import signal
import sys
import os
# Add project root to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.config import validate, LOG_LEVEL, BYBIT_TESTNET
from src.utils.logger import setup_logging
from src.exchange.client import BybitFuturesClient
from src.exchange.order_manager import OrderManager
from src.exchange.symbols import preload_instruments
from src.core.position import PositionTracker
from src.core.risk import RiskManager
from src.bot.telegram_bot import TelegramBot
# ── Register strategies here ────────────────────────────────
from src.strategies.dca_zvwap import DCAZVWAPStrategy
logger = logging.getLogger(__name__)
async def main():
"""Main async entry point."""
setup_logging(LOG_LEVEL)
logger.info("=" * 50)
logger.info("Bybit Trading Bot starting...")
logger.info("=" * 50)
# 1. Validate config
try:
validate()
except ValueError as e:
logger.error(f"Configuration error:\n{e}")
logger.error("Copy .env.example to .env and fill in your API keys")
sys.exit(1)
# 2. Initialize exchange client
client = BybitFuturesClient()
mode = "TESTNET" if BYBIT_TESTNET else "MAINNET"
logger.info(f"Exchange: Bybit {mode}")
# 3. Test connection
balance = client.get_account_balance()
equity = client.get_total_equity()
logger.info(f"Balance: ${balance:.2f} | Equity: ${equity:.2f}")
if balance <= 0 and not BYBIT_TESTNET:
logger.warning("Zero balance on mainnet — check your account")
# 4. Set position mode to one-way
client.set_position_mode(mode=0)
# 5. Initialize core components (before cleanup — need tracker for recovery)
tracker = PositionTracker()
order_mgr = OrderManager(client)
risk_mgr = RiskManager(client, tracker)
# 6. Initialize strategies
strategies = []
# DCA Z-VWAP — mean reversion with safety orders
async def notify(text: str):
await tg_bot.send_message(text)
dca = DCAZVWAPStrategy(client, order_mgr, tracker, risk_mgr, notify)
strategies.append(dca)
# 7. Recover existing positions BEFORE cleanup
# This rebuilds deals dict so cleanup knows what NOT to touch
await dca.recover_deals()
# 8. Startup cleanup — cancel orphaned orders on symbols WITHOUT active deals
active_symbols = set(dca.deals.keys())
all_orders = client.session.get_open_orders(
category="linear", settleCoin="USDT"
)
if all_orders and "result" in all_orders:
orphan_symbols = set()
for o in all_orders["result"]["list"]:
sym = o["symbol"]
if sym not in active_symbols:
orphan_symbols.add(sym)
for sym in orphan_symbols:
client.cancel_all_orders(sym)
logger.info(f"Startup cleanup: cancelled orphaned orders on {sym}")
if not orphan_symbols:
logger.info("Startup cleanup: no orphaned orders")
# 8. Start Telegram bot
tg_bot = TelegramBot(client, tracker, strategies)
await tg_bot.start()
# Link tg_bot to strategies (for /pause check)
for strat in strategies:
strat.tg_bot = tg_bot
# 10. Startup notification
strat_names = ", ".join(s.name for s in strategies) if strategies else "none (add strategies!)"
await notify(
f"🚀 Bybit Bot Started ({mode})\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"💰 Balance: ${balance:.2f}\n"
f"💎 Equity: ${equity:.2f}\n"
f"📊 Strategies: {strat_names}\n"
f"━━━━━━━━━━━━━━━━━━━━"
)
# 11. Start strategy loops
tasks = []
for strat in strategies:
tasks.append(asyncio.create_task(strat.scan_loop()))
tasks.append(asyncio.create_task(strat.monitor_loop()))
logger.info(f"Strategy loop started: {strat.name}")
if not strategies:
logger.info("No strategies registered — bot is running in monitor-only mode")
logger.info("Add strategies in src/main.py and restart")
# 12. Keep alive
try:
if tasks:
await asyncio.gather(*tasks)
else:
# No strategies, just keep telegram bot running
while True:
await asyncio.sleep(60)
except asyncio.CancelledError:
logger.info("Tasks cancelled, shutting down...")
finally:
await tg_bot.stop()
logger.info("Bybit Trading Bot stopped")
def run():
"""Sync entry point."""
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Interrupted by user")
if __name__ == "__main__":
run()