โ† ะะฐะทะฐะด
""" AlphaPulse Bot โ€” Content Fetcher (RSS, CoinGecko, Reddit, etc.) """ import asyncio import logging import feedparser import requests from config import RSS_SOURCES from utils import stable_id logger = logging.getLogger(__name__) async def _retry_get(session, url, retries=3, delay=5, timeout=15): """GET with retry on failure/rate-limit. Returns response or None.""" for attempt in range(retries): try: resp = await asyncio.to_thread(session.get, url, timeout=timeout) if resp.status_code == 429: wait = delay * (attempt + 1) logger.warning(f"Rate limited (429) on {url[:60]}... retry in {wait}s") await asyncio.sleep(wait) continue return resp except Exception as e: if attempt < retries - 1: logger.warning(f"Request failed ({e}), retry {attempt+2}/{retries} in {delay}s") await asyncio.sleep(delay) else: logger.error(f"Request failed after {retries} attempts: {e}") return None class ContentFetcher: # Stablecoins to exclude from top movers STABLES = {'usdt', 'usdc', 'busd', 'dai', 'tusd', 'usdp', 'fdusd', 'usdd', 'frax', 'lusd'} def __init__(self): self.session = requests.Session() self.session.headers.update({'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'}) async def fetch_rss(self): """Fetch from all 9 RSS sources.""" posts = [] for source_name, url in RSS_SOURCES.items(): try: logger.info(f"Fetching {source_name}...") feed = await asyncio.to_thread(feedparser.parse, url) for entry in feed.entries[:5]: posts.append({ 'id': stable_id(source_name, entry.link), 'title': entry.title, 'url': entry.link, 'source': source_name, 'published': entry.get('published', ''), 'summary': entry.get('summary', '')[:200], }) await asyncio.sleep(0.3) except Exception as e: logger.error(f"RSS error {source_name}: {e}") return posts async def fetch_fear_greed(self): """Crypto Fear & Greed Index from alternative.me.""" try: resp = await asyncio.to_thread( self.session.get, "https://api.alternative.me/fng/?limit=1", timeout=20 ) data = resp.json() if data and 'data' in data: item = data['data'][0] return {'value': int(item['value']), 'classification': item['value_classification']} except Exception as e: logger.error(f"Fear & Greed error: {e}") return None async def fetch_prices(self): """BTC + ETH + SOL prices with 24h and 7d changes.""" try: url = ( "https://api.coingecko.com/api/v3/simple/price" "?ids=bitcoin,ethereum,solana&vs_currencies=usd" "&include_24hr_change=true&include_7d_change=true" ) resp = await _retry_get(self.session, url) if not resp: return None data = resp.json() btc = data.get('bitcoin', {}) eth = data.get('ethereum', {}) sol = data.get('solana', {}) return { 'btc_price': btc.get('usd', 0), 'btc_change': btc.get('usd_24h_change', 0), 'btc_7d': btc.get('usd_7d_change', 0), 'eth_price': eth.get('usd', 0), 'eth_change': eth.get('usd_24h_change', 0), 'eth_7d': eth.get('usd_7d_change', 0), 'sol_price': sol.get('usd', 0), 'sol_change': sol.get('usd_24h_change', 0), 'sol_7d': sol.get('usd_7d_change', 0), } except Exception as e: logger.error(f"Price fetch error: {e}") return None async def fetch_global(self) -> dict | None: """Global crypto market data: BTC dominance, total mcap.""" try: resp = await _retry_get(self.session, "https://api.coingecko.com/api/v3/global") if not resp: return None data = resp.json().get('data', {}) return { 'btc_dominance': round(data.get('market_cap_percentage', {}).get('btc', 0), 1), 'eth_dominance': round(data.get('market_cap_percentage', {}).get('eth', 0), 1), 'total_mcap': data.get('total_market_cap', {}).get('usd', 0), 'mcap_change': data.get('market_cap_change_percentage_24h_usd', 0), } except Exception as e: logger.error(f"Global fetch error: {e}") return None async def fetch_trending(self) -> list: """Top 7 trending coins from CoinGecko /search/trending.""" try: resp = await _retry_get(self.session, "https://api.coingecko.com/api/v3/search/trending") if not resp: return [] data = resp.json() coins = [] for item in data.get('coins', [])[:7]: c = item.get('item', {}) coins.append({ 'name': c.get('name', ''), 'symbol': c.get('symbol', '').upper(), 'rank': c.get('market_cap_rank', '?'), 'score': c.get('score', 0), }) return coins except Exception as e: logger.error(f"Trending fetch error: {e}") return [] async def fetch_top_movers(self): """Top 5 gainers and top 5 losers by 24h % from CoinGecko top-200.""" try: url = ( "https://api.coingecko.com/api/v3/coins/markets" "?vs_currency=usd&order=market_cap_desc&per_page=200&page=1" "&price_change_percentage=24h&sparkline=false" ) resp = await _retry_get(self.session, url, timeout=20) if not resp: return [], [] coins = resp.json() # CoinGecko returns error dict/string on rate limit if not isinstance(coins, list): logger.warning(f"Top movers: unexpected response type {type(coins)}: {str(coins)[:100]}") return [], [] filtered = [ c for c in coins if isinstance(c, dict) and c.get('symbol', '').lower() not in self.STABLES and c.get('price_change_percentage_24h') is not None ] sorted_coins = sorted( filtered, key=lambda c: c['price_change_percentage_24h'] or 0, reverse=True ) gainers = sorted_coins[:5] losers = list(reversed(sorted_coins[-5:])) return gainers, losers except Exception as e: logger.error(f"Top movers error: {e}") return [], [] async def fetch_sparklines(self) -> dict: """Fetch 7-day price data for BTC/ETH/SOL and return sparkline strings. Returns: {'btc': 'โ–โ–‚โ–ƒโ–…โ–‡', 'eth': '...', 'sol': '...'} """ from templates import spark_bar result = {} coins = [('bitcoin', 'btc'), ('ethereum', 'eth'), ('solana', 'sol')] for coin_id, symbol in coins: try: url = ( f"https://api.coingecko.com/api/v3/coins/{coin_id}/market_chart" f"?vs_currency=usd&days=7&interval=daily" ) resp = await _retry_get(self.session, url) if not resp: continue data = resp.json() prices = [p[1] for p in data.get('prices', [])] if prices: result[symbol] = spark_bar(prices) await asyncio.sleep(0.3) # rate limit except Exception as e: logger.error(f"Sparkline error {coin_id}: {e}") return result # Major coins to include in funding rates (top ~80 by mcap) MAJOR_COINS = { 'BTC', 'ETH', 'BNB', 'SOL', 'XRP', 'ADA', 'DOGE', 'AVAX', 'DOT', 'LINK', 'MATIC', 'SHIB', 'LTC', 'TRX', 'NEAR', 'UNI', 'ATOM', 'XLM', 'ICP', 'APT', 'ARB', 'OP', 'SUI', 'SEI', 'INJ', 'TON', 'PEPE', 'WLD', 'RNDR', 'FIL', 'ALGO', 'FTM', 'AAVE', 'MANA', 'SAND', 'AXS', 'GALA', 'CRV', 'LDO', 'MKR', 'SNX', 'COMP', 'RUNE', 'THETA', 'FET', 'AGIX', 'OCEAN', 'IMX', 'GMX', 'DYDX', 'STX', 'ORDI', 'WIF', 'JUP', 'TIA', 'PYTH', 'JTO', 'BONK', 'FLOKI', 'NOT', 'ENA', 'W', 'ETHFI', 'PENDLE', 'TAO', 'ONDO', 'STRK', 'ZRO', 'BOME', 'EIGEN', 'MOVE', 'HBAR', } async def fetch_funding_rates(self) -> list: """Top funding rates from Binance Futures (no API key needed). Only major coins (top ~80 by mcap). Returns top 10 by abs rate. """ try: url = "https://fapi.binance.com/fapi/v1/premiumIndex" resp = await asyncio.to_thread(self.session.get, url, timeout=15) data = resp.json() rates = [] for item in data: rate = float(item.get('lastFundingRate', 0)) if rate == 0: continue symbol = item.get('symbol', '') if not symbol.endswith('USDT'): continue clean = symbol.replace('USDT', '') # Filter: only major coins if clean not in self.MAJOR_COINS: continue rates.append({ 'symbol': clean, 'rate': rate, 'rate_pct': rate * 100, 'price': float(item.get('markPrice', 0)), }) rates.sort(key=lambda x: abs(x['rate']), reverse=True) return rates[:10] except Exception as e: logger.error(f"Funding rates error: {e}") return [] async def fetch_long_short_ratio(self) -> list: """Fetch Long/Short account ratio + OI for top coins from Binance Futures. No API key needed. Returns list of dicts. """ coins = [ ('BTCUSDT', 'BTC'), ('ETHUSDT', 'ETH'), ('SOLUSDT', 'SOL'), ('XRPUSDT', 'XRP'), ('DOGEUSDT', 'DOGE'), ('BNBUSDT', 'BNB'), ('ADAUSDT', 'ADA'), ('AVAXUSDT', 'AVAX'), ('LINKUSDT', 'LINK'), ('DOTUSDT', 'DOT'), ] results = [] for pair, symbol in coins: try: # Long/Short ratio url = ( f"https://fapi.binance.com/futures/data/" f"globalLongShortAccountRatio?symbol={pair}&period=1h&limit=1" ) resp = await asyncio.to_thread(self.session.get, url, timeout=10) data = resp.json() if data: d = data[0] long_pct = float(d.get('longAccount', 0.5)) short_pct = float(d.get('shortAccount', 0.5)) ratio = float(d.get('longShortRatio', 1.0)) results.append({ 'symbol': symbol, 'long_pct': long_pct, 'short_pct': short_pct, 'ratio': ratio, }) await asyncio.sleep(0.2) except Exception as e: logger.error(f"L/S ratio error {symbol}: {e}") return results async def fetch_whale_alerts(self) -> list: """Fetch recent large transactions from whale-alert.io free API (no key, limited). Falls back to Blockchair if unavailable. """ try: # Blockchair free: largest BTC transactions in last 24h url = "https://api.blockchair.com/bitcoin/transactions?s=output_total(desc)&limit=5" resp = await _retry_get(self.session, url, retries=2, timeout=15) if not resp: return [] data = resp.json() txs = data.get('data', []) alerts = [] for tx in txs: btc_amount = tx.get('output_total', 0) / 1e8 # satoshi โ†’ BTC if btc_amount >= 100: # only 100+ BTC alerts.append({ 'amount_btc': btc_amount, 'hash': tx.get('hash', '')[:16], 'block': tx.get('block_id', 0), }) return alerts[:5] except Exception as e: logger.error(f"Whale alerts error: {e}") return [] async def fetch_reddit(self): """Hot posts from r/CryptoCurrency (score > 100 only).""" try: url = "https://www.reddit.com/r/CryptoCurrency/hot.json?limit=10" headers = {'User-Agent': 'AlphaPulseBot/4.0'} resp = await asyncio.to_thread( lambda: self.session.get(url, headers=headers, timeout=20) ) data = resp.json() posts = [] for item in data.get('data', {}).get('children', []): pd = item['data'] if pd.get('score', 0) > 100: posts.append({ 'id': stable_id('Reddit', pd['id']), 'title': pd['title'], 'url': f"https://reddit.com{pd['permalink']}", 'source': 'Reddit r/CC', 'score': pd['score'], }) return posts except Exception as e: logger.error(f"Reddit error: {e}") return []