← Back
"""
AlphaPulse Bot — Content Fetcher (RSS, CoinGecko, Reddit, etc.)
"""

import asyncio
import logging
import feedparser
import requests

from config import RSS_SOURCES
from utils import stable_id

logger = logging.getLogger(__name__)


async def _retry_get(session, url, retries=3, delay=5, timeout=15):
    """GET with retry on failure/rate-limit. Returns response or None."""
    for attempt in range(retries):
        try:
            resp = await asyncio.to_thread(session.get, url, timeout=timeout)
            if resp.status_code == 429:
                wait = delay * (attempt + 1)
                logger.warning(f"Rate limited (429) on {url[:60]}... retry in {wait}s")
                await asyncio.sleep(wait)
                continue
            return resp
        except Exception as e:
            if attempt < retries - 1:
                logger.warning(f"Request failed ({e}), retry {attempt+2}/{retries} in {delay}s")
                await asyncio.sleep(delay)
            else:
                logger.error(f"Request failed after {retries} attempts: {e}")
    return None


class ContentFetcher:
    # Stablecoins to exclude from top movers
    STABLES = {'usdt', 'usdc', 'busd', 'dai', 'tusd', 'usdp', 'fdusd', 'usdd', 'frax', 'lusd'}

    def __init__(self):
        self.session = requests.Session()
        self.session.headers.update({'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'})

    async def fetch_rss(self):
        """Fetch from all 9 RSS sources."""
        posts = []
        for source_name, url in RSS_SOURCES.items():
            try:
                logger.info(f"Fetching {source_name}...")
                feed = await asyncio.to_thread(feedparser.parse, url)
                for entry in feed.entries[:5]:
                    posts.append({
                        'id':        stable_id(source_name, entry.link),
                        'title':     entry.title,
                        'url':       entry.link,
                        'source':    source_name,
                        'published': entry.get('published', ''),
                        'summary':   entry.get('summary', '')[:200],
                    })
                await asyncio.sleep(0.3)
            except Exception as e:
                logger.error(f"RSS error {source_name}: {e}")
        return posts

    async def fetch_fear_greed(self):
        """Crypto Fear & Greed Index from alternative.me."""
        try:
            resp = await asyncio.to_thread(
                self.session.get, "https://api.alternative.me/fng/?limit=1", timeout=20
            )
            data = resp.json()
            if data and 'data' in data:
                item = data['data'][0]
                return {'value': int(item['value']), 'classification': item['value_classification']}
        except Exception as e:
            logger.error(f"Fear & Greed error: {e}")
        return None

    async def fetch_prices(self):
        """BTC + ETH + SOL prices with 24h and 7d changes."""
        try:
            url = (
                "https://api.coingecko.com/api/v3/simple/price"
                "?ids=bitcoin,ethereum,solana&vs_currencies=usd"
                "&include_24hr_change=true&include_7d_change=true"
            )
            resp = await _retry_get(self.session, url)
            if not resp:
                return None
            data = resp.json()
            btc = data.get('bitcoin', {})
            eth = data.get('ethereum', {})
            sol = data.get('solana', {})
            return {
                'btc_price':  btc.get('usd', 0),
                'btc_change': btc.get('usd_24h_change', 0),
                'btc_7d':     btc.get('usd_7d_change', 0),
                'eth_price':  eth.get('usd', 0),
                'eth_change': eth.get('usd_24h_change', 0),
                'eth_7d':     eth.get('usd_7d_change', 0),
                'sol_price':  sol.get('usd', 0),
                'sol_change': sol.get('usd_24h_change', 0),
                'sol_7d':     sol.get('usd_7d_change', 0),
            }
        except Exception as e:
            logger.error(f"Price fetch error: {e}")
        return None

    async def fetch_global(self) -> dict | None:
        """Global crypto market data: BTC dominance, total mcap."""
        try:
            resp = await _retry_get(self.session, "https://api.coingecko.com/api/v3/global")
            if not resp:
                return None
            data = resp.json().get('data', {})
            return {
                'btc_dominance': round(data.get('market_cap_percentage', {}).get('btc', 0), 1),
                'eth_dominance': round(data.get('market_cap_percentage', {}).get('eth', 0), 1),
                'total_mcap':    data.get('total_market_cap', {}).get('usd', 0),
                'mcap_change':   data.get('market_cap_change_percentage_24h_usd', 0),
            }
        except Exception as e:
            logger.error(f"Global fetch error: {e}")
        return None

    async def fetch_trending(self) -> list:
        """Top 7 trending coins from CoinGecko /search/trending."""
        try:
            resp = await _retry_get(self.session, "https://api.coingecko.com/api/v3/search/trending")
            if not resp:
                return []
            data = resp.json()
            coins = []
            for item in data.get('coins', [])[:7]:
                c = item.get('item', {})
                coins.append({
                    'name':   c.get('name', ''),
                    'symbol': c.get('symbol', '').upper(),
                    'rank':   c.get('market_cap_rank', '?'),
                    'score':  c.get('score', 0),
                })
            return coins
        except Exception as e:
            logger.error(f"Trending fetch error: {e}")
        return []

    async def fetch_top_movers(self):
        """Top 5 gainers and top 5 losers by 24h % from CoinGecko top-200."""
        try:
            url = (
                "https://api.coingecko.com/api/v3/coins/markets"
                "?vs_currency=usd&order=market_cap_desc&per_page=200&page=1"
                "&price_change_percentage=24h&sparkline=false"
            )
            resp = await _retry_get(self.session, url, timeout=20)
            if not resp:
                return [], []
            coins = resp.json()
            # CoinGecko returns error dict/string on rate limit
            if not isinstance(coins, list):
                logger.warning(f"Top movers: unexpected response type {type(coins)}: {str(coins)[:100]}")
                return [], []
            filtered = [
                c for c in coins
                if isinstance(c, dict)
                and c.get('symbol', '').lower() not in self.STABLES
                and c.get('price_change_percentage_24h') is not None
            ]
            sorted_coins = sorted(
                filtered,
                key=lambda c: c['price_change_percentage_24h'] or 0,
                reverse=True
            )
            gainers = sorted_coins[:5]
            losers  = list(reversed(sorted_coins[-5:]))
            return gainers, losers
        except Exception as e:
            logger.error(f"Top movers error: {e}")
        return [], []

    async def fetch_sparklines(self) -> dict:
        """Fetch 7-day price data for BTC/ETH/SOL and return sparkline strings.
        Returns: {'btc': '▁▂▃▅▇', 'eth': '...', 'sol': '...'}
        """
        from templates import spark_bar

        result = {}
        coins = [('bitcoin', 'btc'), ('ethereum', 'eth'), ('solana', 'sol')]

        for coin_id, symbol in coins:
            try:
                url = (
                    f"https://api.coingecko.com/api/v3/coins/{coin_id}/market_chart"
                    f"?vs_currency=usd&days=7&interval=daily"
                )
                resp = await _retry_get(self.session, url)
                if not resp:
                    continue
                data = resp.json()
                prices = [p[1] for p in data.get('prices', [])]
                if prices:
                    result[symbol] = spark_bar(prices)
                await asyncio.sleep(0.3)  # rate limit
            except Exception as e:
                logger.error(f"Sparkline error {coin_id}: {e}")

        return result

    # Major coins to include in funding rates (top ~80 by mcap)
    MAJOR_COINS = {
        'BTC', 'ETH', 'BNB', 'SOL', 'XRP', 'ADA', 'DOGE', 'AVAX',
        'DOT', 'LINK', 'MATIC', 'SHIB', 'LTC', 'TRX', 'NEAR', 'UNI',
        'ATOM', 'XLM', 'ICP', 'APT', 'ARB', 'OP', 'SUI', 'SEI', 'INJ',
        'TON', 'PEPE', 'WLD', 'RNDR', 'FIL', 'ALGO', 'FTM', 'AAVE',
        'MANA', 'SAND', 'AXS', 'GALA', 'CRV', 'LDO', 'MKR', 'SNX',
        'COMP', 'RUNE', 'THETA', 'FET', 'AGIX', 'OCEAN', 'IMX', 'GMX',
        'DYDX', 'STX', 'ORDI', 'WIF', 'JUP', 'TIA', 'PYTH', 'JTO',
        'BONK', 'FLOKI', 'NOT', 'ENA', 'W', 'ETHFI', 'PENDLE', 'TAO',
        'ONDO', 'STRK', 'ZRO', 'BOME', 'EIGEN', 'MOVE', 'HBAR',
    }

    async def fetch_funding_rates(self) -> list:
        """Top funding rates from Binance Futures (no API key needed).
        Only major coins (top ~80 by mcap). Returns top 10 by abs rate.
        """
        try:
            url = "https://fapi.binance.com/fapi/v1/premiumIndex"
            resp = await asyncio.to_thread(self.session.get, url, timeout=15)
            data = resp.json()
            rates = []
            for item in data:
                rate = float(item.get('lastFundingRate', 0))
                if rate == 0:
                    continue
                symbol = item.get('symbol', '')
                if not symbol.endswith('USDT'):
                    continue
                clean = symbol.replace('USDT', '')
                # Filter: only major coins
                if clean not in self.MAJOR_COINS:
                    continue
                rates.append({
                    'symbol':  clean,
                    'rate':    rate,
                    'rate_pct': rate * 100,
                    'price':   float(item.get('markPrice', 0)),
                })
            rates.sort(key=lambda x: abs(x['rate']), reverse=True)
            return rates[:10]
        except Exception as e:
            logger.error(f"Funding rates error: {e}")
        return []

    async def fetch_long_short_ratio(self) -> list:
        """Fetch Long/Short account ratio + OI for top coins from Binance Futures.
        No API key needed. Returns list of dicts.
        """
        coins = [
            ('BTCUSDT', 'BTC'), ('ETHUSDT', 'ETH'), ('SOLUSDT', 'SOL'),
            ('XRPUSDT', 'XRP'), ('DOGEUSDT', 'DOGE'), ('BNBUSDT', 'BNB'),
            ('ADAUSDT', 'ADA'), ('AVAXUSDT', 'AVAX'), ('LINKUSDT', 'LINK'),
            ('DOTUSDT', 'DOT'),
        ]
        results = []
        for pair, symbol in coins:
            try:
                # Long/Short ratio
                url = (
                    f"https://fapi.binance.com/futures/data/"
                    f"globalLongShortAccountRatio?symbol={pair}&period=1h&limit=1"
                )
                resp = await asyncio.to_thread(self.session.get, url, timeout=10)
                data = resp.json()
                if data:
                    d = data[0]
                    long_pct = float(d.get('longAccount', 0.5))
                    short_pct = float(d.get('shortAccount', 0.5))
                    ratio = float(d.get('longShortRatio', 1.0))
                    results.append({
                        'symbol':    symbol,
                        'long_pct':  long_pct,
                        'short_pct': short_pct,
                        'ratio':     ratio,
                    })
                await asyncio.sleep(0.2)
            except Exception as e:
                logger.error(f"L/S ratio error {symbol}: {e}")
        return results

    async def fetch_whale_alerts(self) -> list:
        """Fetch recent large transactions from whale-alert.io free API (no key, limited).
        Falls back to Blockchair if unavailable.
        """
        try:
            # Blockchair free: largest BTC transactions in last 24h
            url = "https://api.blockchair.com/bitcoin/transactions?s=output_total(desc)&limit=5"
            resp = await _retry_get(self.session, url, retries=2, timeout=15)
            if not resp:
                return []
            data = resp.json()
            txs = data.get('data', [])
            alerts = []
            for tx in txs:
                btc_amount = tx.get('output_total', 0) / 1e8  # satoshi → BTC
                if btc_amount >= 100:  # only 100+ BTC
                    alerts.append({
                        'amount_btc': btc_amount,
                        'hash': tx.get('hash', '')[:16],
                        'block': tx.get('block_id', 0),
                    })
            return alerts[:5]
        except Exception as e:
            logger.error(f"Whale alerts error: {e}")
        return []

    async def fetch_reddit(self):
        """Hot posts from r/CryptoCurrency via RSS (the .json API is IP-blocked
        for datacenter requests; the RSS feed stays open). Hot feed is already
        ranked, so we assign a synthetic descending score by position."""
        try:
            url = "https://www.reddit.com/r/CryptoCurrency/hot/.rss"
            feed = await asyncio.to_thread(feedparser.parse, url)
            posts = []
            for idx, entry in enumerate(feed.entries[:10]):
                posts.append({
                    'id':     stable_id('Reddit', entry.link),
                    'title':  entry.title,
                    'url':    entry.link,
                    'source': 'Reddit r/CC',
                    'score':  (10 - idx) * 10,
                })
            return posts
        except Exception as e:
            logger.error(f"Reddit error: {e}")
        return []

📜 Git History

022bfb4fix: switch AI to deepseek-v4-pro + restore Reddit via RSS3 weeks ago
a09f02fchore: initial commit — version control setup5 weeks ago
Show last diff
Loading...