← Back
"""
AlphaPulse Bot — Scheduler & Main Loop
"""

import random
import asyncio
import logging

from config import CONFIG, POST_FORMATS, TIME_THEMES
from database import Database
from fetcher import ContentFetcher
from ai import AICommentary
from poster import TelegramPoster
from crypto_history import get_today_events
from utils import now_van, detect_news_format

logger = logging.getLogger(__name__)


class AlphaPulseBot:
    # Static polls (used when no market context available)
    STATIC_POLLS = [
        ("Which sector pumps next?",    ["BTC", "ETH/L2s", "Solana", "Memecoins", "AI coins"]),
        ("Biggest market mover?",       ["📰 News", "🐳 Whales", "📊 Technicals", "🗣️ Social"]),
        ("Best time to DCA?",           ["🔴 Extreme Fear", "😐 Neutral", "📅 Same day always", "Never"]),
        ("Most likely to flip BTC?",    ["ETH", "SOL", "Never", "XRP somehow"]),
        ("Macro is...",                 ["🟢 Bullish for crypto", "🔴 Bearish", "🤷 Doesn't matter"]),
        ("Biggest risk right now?",     ["📉 Regulation", "🏦 Macro rates", "🐳 Whale dump", "FUD"]),
        ("Altseason is...",             ["✅ Here now", "⏳ Coming soon", "❌ Cancelled", "Never for alts"]),
    ]

    @staticmethod
    def _smart_polls(prices: dict = None, fg: dict = None) -> list:
        """Generate context-aware polls based on market conditions."""
        polls = []

        if prices:
            btc = prices.get('btc_price', 0)
            change = prices.get('btc_change', 0)

            # Price-based polls
            if change > 5:
                polls.append((
                    f"BTC pumped {change:.1f}% today. What's next?",
                    ["📈 More upside", "🔄 Pullback incoming", "🦀 Consolidation", "🚀 Parabolic time"]
                ))
            elif change < -5:
                polls.append((
                    f"BTC dumped {abs(change):.1f}% today. Your move?",
                    ["🛒 Buying the dip", "⏳ Waiting for lower", "🏃 Already sold", "🦀 Not touching it"]
                ))
            else:
                polls.append((
                    f"BTC at ${btc:,.0f}. End of week prediction?",
                    ["🆙 Higher", "⬇️ Lower", "🦀 Same range", "💥 Surprise move"]
                ))

        if fg:
            val = fg.get('value', 50)
            if val <= 25:
                polls.append((
                    f"Fear & Greed at {val} (Extreme Fear). What do you do?",
                    ["🛒 Buy aggressively", "🤏 Small buys only", "⏳ Wait for reversal", "😱 Already panic sold"]
                ))
            elif val >= 75:
                polls.append((
                    f"Fear & Greed at {val} (Extreme Greed). Time to...",
                    ["💰 Take profits", "🎲 Ride the wave", "🔄 Rotate to stables", "🤷 HODL no matter what"]
                ))

        return polls

    def __init__(self):
        self.db       = Database()
        self.fetcher  = ContentFetcher()
        self.ai       = AICommentary(CONFIG['openrouter_key'])
        self.telegram = TelegramPoster(CONFIG['bot_token'], CONFIG['channel_id'])
        self.cycle_count = 0

    def _select_format(self, hour: int) -> str:
        # F&G standalone: 30% chance at any non-data hour, if not already included in price snapshot
        if not self.db.was_posted_today('fear_greed_posted'):
            if random.random() < 0.3:
                return 'fear_greed'
        return random.choices(list(POST_FORMATS.keys()), weights=list(POST_FORMATS.values()))[0]

    # ── Scheduled posting cycle ───────────────────────────────────────────────

    async def run_cycle(self):
        now  = now_van()
        hour = now.hour
        self.cycle_count += 1

        # ── Weekly digest: every Sunday at 08:00 Vancouver ────────────────
        if now.weekday() == 6 and hour == 8 and not self.db.was_posted_today('weekly_digest_posted'):
            logger.info("Posting weekly digest (Sunday 08:00)")
            prices         = await self.fetcher.fetch_prices()
            fg             = await self.fetcher.fetch_fear_greed()
            gainers, _     = await self.fetcher.fetch_top_movers()
            if prices:
                await self.telegram.post_weekly_digest(prices, fg, gainers)
                self.db.mark_posted_today('weekly_digest_posted')
                self.db.track_post('weekly_digest')
            return

        # ── Monday Watchlist at 08:00 Vancouver ──────────────────────────
        if now.weekday() == 0 and hour == 8 and not self.db.was_posted_today('watchlist_posted'):
            logger.info("Posting weekly watchlist (Monday 08:00)")
            trending, (gainers, _), funding, prices = await asyncio.gather(
                self.fetcher.fetch_trending(),
                self.fetcher.fetch_top_movers(),
                self.fetcher.fetch_funding_rates(),
                self.fetcher.fetch_prices(),
            )
            coins, summary = await self.ai.generate_watchlist(
                trending, gainers, funding, prices
            )
            if coins:
                date_str = now.strftime('%b %d, %Y')
                await self.telegram.post_watchlist(coins, summary, date_str)
                self.db.mark_posted_today('watchlist_posted')
                self.db.track_post('watchlist')
                return  # one post per slot — no double posting

        # ── "This Day in Crypto" at 11:00 Vancouver ────────────────────────
        if hour == 11 and not self.db.was_posted_today('history_posted'):
            logger.info("Posting 'This Day in Crypto' (11:00)")
            events = get_today_events(now.month, now.day)
            if events:
                prices = await self.fetcher.fetch_prices()
                reflection = await self.ai.generate_history_reflection(events, prices)
                date_str = now.strftime('%B %d')
                await self.telegram.post_history(events, date_str, reflection)
                self.db.mark_posted_today('history_posted')
                self.db.track_post('history')
            return

        # ── Trending coins at 02:00 Vancouver ─────────────────────────────
        if hour == 2 and not self.db.was_posted_today('trending_posted'):
            logger.info("Posting trending coins (02:00)")
            coins = await self.fetcher.fetch_trending()
            if await self.telegram.post_trending(coins):
                self.db.mark_posted_today('trending_posted')
                self.db.track_post('trending')
            return

        # ── Daily price snapshot + dominance + top movers at 15:00 ────────
        if hour == 15 and not self.db.was_posted_today('price_posted'):
            logger.info("Posting price snapshot + top movers (15:00)")
            prices, fg, dominance, sparklines = await asyncio.gather(
                self.fetcher.fetch_prices(),
                self.fetcher.fetch_fear_greed(),
                self.fetcher.fetch_global(),
                self.fetcher.fetch_sparklines(),
            )
            if prices:
                await self.telegram.post_price_snapshot(prices, fg, dominance, sparklines)
                self.db.mark_posted_today('price_posted')
                self.db.track_post('price_snapshot')
                if fg:
                    self.db.mark_posted_today('fear_greed_posted')

                # Top movers with 5-min gap to avoid "double post" feel
                if not self.db.was_posted_today('top_movers_posted'):
                    await asyncio.sleep(300)
                    gainers, losers = await self.fetcher.fetch_top_movers()
                    if await self.telegram.post_top_movers(gainers, losers):
                        self.db.mark_posted_today('top_movers_posted')
                        self.db.track_post('top_movers')
            return

        # ── Market Positioning (L/S) at 21:00 Vancouver ──────────────────
        if hour == 21 and not self.db.was_posted_today('long_short_posted'):
            logger.info("Posting market positioning L/S (21:00)")
            positions = await self.fetcher.fetch_long_short_ratio()
            if positions:
                await self.telegram.post_long_short(positions)
                self.db.mark_posted_today('long_short_posted')
                self.db.track_post('long_short')
            return

        # ── Funding Rates at 18:00 Vancouver ─────────────────────────────
        if hour == 18 and not self.db.was_posted_today('funding_rates_posted'):
            logger.info("Posting funding rates (18:00)")
            rates = await self.fetcher.fetch_funding_rates()
            if rates:
                await self.telegram.post_funding_rates(rates)
                self.db.mark_posted_today('funding_rates_posted')
                self.db.track_post('funding_rates')
            return

        # ── Whale Watch at 23:00 Vancouver (every other day) ─────────────
        if hour == 23 and not self.db.was_posted_today('whale_posted'):
            if now.toordinal() % 2 == 0:  # every other day
                logger.info("Posting whale alerts (23:00)")
                alerts = await self.fetcher.fetch_whale_alerts()
                if alerts:
                    await self.telegram.post_whale_alerts(alerts)
                    self.db.mark_posted_today('whale_posted')
                    self.db.track_post('whale_alerts')
                    return

        # ── Smart Poll every N cycles (after scheduled content, before news) ─
        if CONFIG['enable_polls'] and self.cycle_count % CONFIG['poll_frequency'] == 0:
            prices = await self.fetcher.fetch_prices()
            fg = await self.fetcher.fetch_fear_greed()
            smart = self._smart_polls(prices, fg)
            if smart:
                q, opts = random.choice(smart)
            else:
                q, opts = random.choice(self.STATIC_POLLS)
            await self.telegram.post_poll(q, opts)
            return

        # ── Select format — auto-detect from post title, fallback weighted ─
        fmt        = self._select_format(hour)
        time_theme = TIME_THEMES.get(hour)

        # ── Fear & Greed standalone post ──────────────────────────────────
        if fmt == 'fear_greed':
            fg = await self.fetcher.fetch_fear_greed()
            if fg:
                comment = AICommentary.fear_greed_text(fg['value'], fg['classification'])
                await self.telegram.post_fear_greed(
                    fg['value'], fg['classification'], comment
                )
                self.db.mark_posted_today('fear_greed_posted')
                self.db.track_post('fear_greed')
            return

        # ── Fetch content (RSS + Reddit in parallel) ──────────────────────
        rss_posts, reddit_posts = await asyncio.gather(
            asyncio.create_task(self.fetcher.fetch_rss()),
            asyncio.create_task(self.fetcher.fetch_reddit()),
        )

        all_posts = rss_posts + reddit_posts
        if not all_posts:
            logger.warning("No posts fetched from any source")
            return

        fresh = [p for p in all_posts if not self.db.is_posted(p['id'])]
        if not fresh:
            logger.info("No fresh posts available")
            return

        # Pick best: prefer high-score Reddit posts, fallback to random top-10
        candidates = fresh[:10]
        post = (
            max(candidates, key=lambda p: p.get('score', 0))
            if any(p.get('score') for p in candidates)
            else random.choice(candidates)
        )

        # ── Auto-detect format from title ─────────────────────────────────
        detected = detect_news_format(post['title'])
        if detected and detected != 'fear_greed':
            fmt = detected
            logger.info(f"Auto-detected format [{fmt}] from title: {post['title'][:60]}")

        # ── Generate AI commentary ────────────────────────────────────────
        prices     = await self.fetcher.fetch_prices()
        commentary = await self.ai.generate(post['title'], post['source'], fmt, time_theme, prices)

        if commentary:
            # Affiliate on signal posts + every Nth news post
            add_affiliate = (
                fmt == 'signal'
                or self.cycle_count % CONFIG.get('affiliate_freq', 3) == 0
            )
            await self.telegram.post(
                commentary, post['url'], post['source'], fmt, add_affiliate,
                title=post['title']
            )
            self.db.mark_posted(post['id'], post['title'], post['url'], post['source'], fmt)
            self.db.track_post('news', fmt, post['source'], post['title'])

    async def run(self):
        logger.info("AlphaPulse Bot v8.1 started — schedule: %s", CONFIG['posting_schedule'])

        # Anti-spam restart guard
        last = self.db.get_last_post_time()
        if last:
            mins = (now_van().replace(tzinfo=None) - last.replace(tzinfo=None)).total_seconds() / 60
            if mins < 5:
                logger.info(f"Posted {mins:.0f} min ago — waiting 5 min to avoid spam...")
                await asyncio.sleep(300)

        while True:
            try:
                now  = now_van()
                hour = now.hour

                if hour in CONFIG['posting_schedule']:
                    if self.db.was_hour_posted(hour):
                        logger.debug(f"Hour {hour} already posted, sleeping 5min")
                        await asyncio.sleep(300)
                        continue
                    logger.info(f"=== Cycle start: hour={hour}, day={now.strftime('%a')}, cycle #{self.cycle_count+1} ===")
                    await self.run_cycle()
                    self.db.mark_hour_posted(hour)
                    logger.info(f"=== Cycle done: hour={hour} posted, sleeping 1h ===")
                    await asyncio.sleep(3600)
                else:
                    await asyncio.sleep(300)

            except Exception as e:
                logger.error(f"Main loop error: {e}", exc_info=True)
                await asyncio.sleep(60)

📜 Git History

a09f02fchore: initial commit — version control setup5 weeks ago
Show last diff
Loading...