โ ะะฐะทะฐะด"""
AlphaPulse Bot โ Scheduler & Main Loop
"""
import random
import asyncio
import logging
from config import CONFIG, POST_FORMATS, TIME_THEMES
from database import Database
from fetcher import ContentFetcher
from ai import AICommentary
from poster import TelegramPoster
from crypto_history import get_today_events
from utils import now_van, detect_news_format
logger = logging.getLogger(__name__)
class AlphaPulseBot:
# Static polls (used when no market context available)
STATIC_POLLS = [
("Which sector pumps next?", ["BTC", "ETH/L2s", "Solana", "Memecoins", "AI coins"]),
("Biggest market mover?", ["๐ฐ News", "๐ณ Whales", "๐ Technicals", "๐ฃ๏ธ Social"]),
("Best time to DCA?", ["๐ด Extreme Fear", "๐ Neutral", "๐
Same day always", "Never"]),
("Most likely to flip BTC?", ["ETH", "SOL", "Never", "XRP somehow"]),
("Macro is...", ["๐ข Bullish for crypto", "๐ด Bearish", "๐คท Doesn't matter"]),
("Biggest risk right now?", ["๐ Regulation", "๐ฆ Macro rates", "๐ณ Whale dump", "FUD"]),
("Altseason is...", ["โ
Here now", "โณ Coming soon", "โ Cancelled", "Never for alts"]),
]
@staticmethod
def _smart_polls(prices: dict = None, fg: dict = None) -> list:
"""Generate context-aware polls based on market conditions."""
polls = []
if prices:
btc = prices.get('btc_price', 0)
change = prices.get('btc_change', 0)
# Price-based polls
if change > 5:
polls.append((
f"BTC pumped {change:.1f}% today. What's next?",
["๐ More upside", "๐ Pullback incoming", "๐ฆ Consolidation", "๐ Parabolic time"]
))
elif change < -5:
polls.append((
f"BTC dumped {abs(change):.1f}% today. Your move?",
["๐ Buying the dip", "โณ Waiting for lower", "๐ Already sold", "๐ฆ Not touching it"]
))
else:
polls.append((
f"BTC at ${btc:,.0f}. End of week prediction?",
["๐ Higher", "โฌ๏ธ Lower", "๐ฆ Same range", "๐ฅ Surprise move"]
))
if fg:
val = fg.get('value', 50)
if val <= 25:
polls.append((
f"Fear & Greed at {val} (Extreme Fear). What do you do?",
["๐ Buy aggressively", "๐ค Small buys only", "โณ Wait for reversal", "๐ฑ Already panic sold"]
))
elif val >= 75:
polls.append((
f"Fear & Greed at {val} (Extreme Greed). Time to...",
["๐ฐ Take profits", "๐ฒ Ride the wave", "๐ Rotate to stables", "๐คท HODL no matter what"]
))
return polls
def __init__(self):
self.db = Database()
self.fetcher = ContentFetcher()
self.ai = AICommentary(CONFIG['openrouter_key'])
self.telegram = TelegramPoster(CONFIG['bot_token'], CONFIG['channel_id'])
self.cycle_count = 0
def _select_format(self, hour: int) -> str:
# F&G standalone: 30% chance at any non-data hour, if not already included in price snapshot
if not self.db.was_posted_today('fear_greed_posted'):
if random.random() < 0.3:
return 'fear_greed'
return random.choices(list(POST_FORMATS.keys()), weights=list(POST_FORMATS.values()))[0]
# โโ Scheduled posting cycle โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
async def run_cycle(self):
now = now_van()
hour = now.hour
self.cycle_count += 1
# โโ Weekly digest: every Sunday at 08:00 Vancouver โโโโโโโโโโโโโโโโ
if now.weekday() == 6 and hour == 8 and not self.db.was_posted_today('weekly_digest_posted'):
logger.info("Posting weekly digest (Sunday 08:00)")
prices = await self.fetcher.fetch_prices()
fg = await self.fetcher.fetch_fear_greed()
gainers, _ = await self.fetcher.fetch_top_movers()
if prices:
await self.telegram.post_weekly_digest(prices, fg, gainers)
self.db.mark_posted_today('weekly_digest_posted')
self.db.track_post('weekly_digest')
return
# โโ Monday Watchlist at 08:00 Vancouver โโโโโโโโโโโโโโโโโโโโโโโโโโ
if now.weekday() == 0 and hour == 8 and not self.db.was_posted_today('watchlist_posted'):
logger.info("Posting weekly watchlist (Monday 08:00)")
trending, (gainers, _), funding, prices = await asyncio.gather(
self.fetcher.fetch_trending(),
self.fetcher.fetch_top_movers(),
self.fetcher.fetch_funding_rates(),
self.fetcher.fetch_prices(),
)
coins, summary = await self.ai.generate_watchlist(
trending, gainers, funding, prices
)
if coins:
date_str = now.strftime('%b %d, %Y')
await self.telegram.post_watchlist(coins, summary, date_str)
self.db.mark_posted_today('watchlist_posted')
self.db.track_post('watchlist')
return # one post per slot โ no double posting
# โโ "This Day in Crypto" at 11:00 Vancouver โโโโโโโโโโโโโโโโโโโโโโโโ
if hour == 11 and not self.db.was_posted_today('history_posted'):
logger.info("Posting 'This Day in Crypto' (11:00)")
events = get_today_events(now.month, now.day)
if events:
prices = await self.fetcher.fetch_prices()
reflection = await self.ai.generate_history_reflection(events, prices)
date_str = now.strftime('%B %d')
await self.telegram.post_history(events, date_str, reflection)
self.db.mark_posted_today('history_posted')
self.db.track_post('history')
return
# โโ Trending coins at 02:00 Vancouver โโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
if hour == 2 and not self.db.was_posted_today('trending_posted'):
logger.info("Posting trending coins (02:00)")
coins = await self.fetcher.fetch_trending()
if await self.telegram.post_trending(coins):
self.db.mark_posted_today('trending_posted')
self.db.track_post('trending')
return
# โโ Daily price snapshot + dominance + top movers at 15:00 โโโโโโโโ
if hour == 15 and not self.db.was_posted_today('price_posted'):
logger.info("Posting price snapshot + top movers (15:00)")
prices, fg, dominance, sparklines = await asyncio.gather(
self.fetcher.fetch_prices(),
self.fetcher.fetch_fear_greed(),
self.fetcher.fetch_global(),
self.fetcher.fetch_sparklines(),
)
if prices:
await self.telegram.post_price_snapshot(prices, fg, dominance, sparklines)
self.db.mark_posted_today('price_posted')
self.db.track_post('price_snapshot')
if fg:
self.db.mark_posted_today('fear_greed_posted')
# Top movers with 5-min gap to avoid "double post" feel
if not self.db.was_posted_today('top_movers_posted'):
await asyncio.sleep(300)
gainers, losers = await self.fetcher.fetch_top_movers()
if await self.telegram.post_top_movers(gainers, losers):
self.db.mark_posted_today('top_movers_posted')
self.db.track_post('top_movers')
return
# โโ Market Positioning (L/S) at 21:00 Vancouver โโโโโโโโโโโโโโโโโโ
if hour == 21 and not self.db.was_posted_today('long_short_posted'):
logger.info("Posting market positioning L/S (21:00)")
positions = await self.fetcher.fetch_long_short_ratio()
if positions:
await self.telegram.post_long_short(positions)
self.db.mark_posted_today('long_short_posted')
self.db.track_post('long_short')
return
# โโ Funding Rates at 18:00 Vancouver โโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
if hour == 18 and not self.db.was_posted_today('funding_rates_posted'):
logger.info("Posting funding rates (18:00)")
rates = await self.fetcher.fetch_funding_rates()
if rates:
await self.telegram.post_funding_rates(rates)
self.db.mark_posted_today('funding_rates_posted')
self.db.track_post('funding_rates')
return
# โโ Whale Watch at 23:00 Vancouver (every other day) โโโโโโโโโโโโโ
if hour == 23 and not self.db.was_posted_today('whale_posted'):
if now.toordinal() % 2 == 0: # every other day
logger.info("Posting whale alerts (23:00)")
alerts = await self.fetcher.fetch_whale_alerts()
if alerts:
await self.telegram.post_whale_alerts(alerts)
self.db.mark_posted_today('whale_posted')
self.db.track_post('whale_alerts')
return
# โโ Smart Poll every N cycles (after scheduled content, before news) โ
if CONFIG['enable_polls'] and self.cycle_count % CONFIG['poll_frequency'] == 0:
prices = await self.fetcher.fetch_prices()
fg = await self.fetcher.fetch_fear_greed()
smart = self._smart_polls(prices, fg)
if smart:
q, opts = random.choice(smart)
else:
q, opts = random.choice(self.STATIC_POLLS)
await self.telegram.post_poll(q, opts)
return
# โโ Select format โ auto-detect from post title, fallback weighted โ
fmt = self._select_format(hour)
time_theme = TIME_THEMES.get(hour)
# โโ Fear & Greed standalone post โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
if fmt == 'fear_greed':
fg = await self.fetcher.fetch_fear_greed()
if fg:
comment = AICommentary.fear_greed_text(fg['value'], fg['classification'])
await self.telegram.post_fear_greed(
fg['value'], fg['classification'], comment
)
self.db.mark_posted_today('fear_greed_posted')
self.db.track_post('fear_greed')
return
# โโ Fetch content (RSS + Reddit in parallel) โโโโโโโโโโโโโโโโโโโโโโ
rss_posts, reddit_posts = await asyncio.gather(
asyncio.create_task(self.fetcher.fetch_rss()),
asyncio.create_task(self.fetcher.fetch_reddit()),
)
all_posts = rss_posts + reddit_posts
if not all_posts:
logger.warning("No posts fetched from any source")
return
fresh = [p for p in all_posts if not self.db.is_posted(p['id'])]
if not fresh:
logger.info("No fresh posts available")
return
# Pick best: prefer high-score Reddit posts, fallback to random top-10
candidates = fresh[:10]
post = (
max(candidates, key=lambda p: p.get('score', 0))
if any(p.get('score') for p in candidates)
else random.choice(candidates)
)
# โโ Auto-detect format from title โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
detected = detect_news_format(post['title'])
if detected and detected != 'fear_greed':
fmt = detected
logger.info(f"Auto-detected format [{fmt}] from title: {post['title'][:60]}")
# โโ Generate AI commentary โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
prices = await self.fetcher.fetch_prices()
commentary = await self.ai.generate(post['title'], post['source'], fmt, time_theme, prices)
if commentary:
# Affiliate on signal posts + every Nth news post
add_affiliate = (
fmt == 'signal'
or self.cycle_count % CONFIG.get('affiliate_freq', 3) == 0
)
await self.telegram.post(
commentary, post['url'], post['source'], fmt, add_affiliate,
title=post['title']
)
self.db.mark_posted(post['id'], post['title'], post['url'], post['source'], fmt)
self.db.track_post('news', fmt, post['source'], post['title'])
async def run(self):
logger.info("AlphaPulse Bot v8.1 started โ schedule: %s", CONFIG['posting_schedule'])
# Anti-spam restart guard
last = self.db.get_last_post_time()
if last:
mins = (now_van().replace(tzinfo=None) - last.replace(tzinfo=None)).total_seconds() / 60
if mins < 5:
logger.info(f"Posted {mins:.0f} min ago โ waiting 5 min to avoid spam...")
await asyncio.sleep(300)
while True:
try:
now = now_van()
hour = now.hour
if hour in CONFIG['posting_schedule']:
if self.db.was_hour_posted(hour):
logger.debug(f"Hour {hour} already posted, sleeping 5min")
await asyncio.sleep(300)
continue
logger.info(f"=== Cycle start: hour={hour}, day={now.strftime('%a')}, cycle #{self.cycle_count+1} ===")
await self.run_cycle()
self.db.mark_hour_posted(hour)
logger.info(f"=== Cycle done: hour={hour} posted, sleeping 1h ===")
await asyncio.sleep(3600)
else:
await asyncio.sleep(300)
except Exception as e:
logger.error(f"Main loop error: {e}", exc_info=True)
await asyncio.sleep(60)