โ ะะฐะทะฐะด"""
AlphaPulse Bot โ Content Fetcher (RSS, CoinGecko, Reddit, etc.)
"""
import asyncio
import logging
import feedparser
import requests
from config import RSS_SOURCES
from utils import stable_id
logger = logging.getLogger(__name__)
async def _retry_get(session, url, retries=3, delay=5, timeout=15):
"""GET with retry on failure/rate-limit. Returns response or None."""
for attempt in range(retries):
try:
resp = await asyncio.to_thread(session.get, url, timeout=timeout)
if resp.status_code == 429:
wait = delay * (attempt + 1)
logger.warning(f"Rate limited (429) on {url[:60]}... retry in {wait}s")
await asyncio.sleep(wait)
continue
return resp
except Exception as e:
if attempt < retries - 1:
logger.warning(f"Request failed ({e}), retry {attempt+2}/{retries} in {delay}s")
await asyncio.sleep(delay)
else:
logger.error(f"Request failed after {retries} attempts: {e}")
return None
class ContentFetcher:
# Stablecoins to exclude from top movers
STABLES = {'usdt', 'usdc', 'busd', 'dai', 'tusd', 'usdp', 'fdusd', 'usdd', 'frax', 'lusd'}
def __init__(self):
self.session = requests.Session()
self.session.headers.update({'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'})
async def fetch_rss(self):
"""Fetch from all 9 RSS sources."""
posts = []
for source_name, url in RSS_SOURCES.items():
try:
logger.info(f"Fetching {source_name}...")
feed = await asyncio.to_thread(feedparser.parse, url)
for entry in feed.entries[:5]:
posts.append({
'id': stable_id(source_name, entry.link),
'title': entry.title,
'url': entry.link,
'source': source_name,
'published': entry.get('published', ''),
'summary': entry.get('summary', '')[:200],
})
await asyncio.sleep(0.3)
except Exception as e:
logger.error(f"RSS error {source_name}: {e}")
return posts
async def fetch_fear_greed(self):
"""Crypto Fear & Greed Index from alternative.me."""
try:
resp = await asyncio.to_thread(
self.session.get, "https://api.alternative.me/fng/?limit=1", timeout=20
)
data = resp.json()
if data and 'data' in data:
item = data['data'][0]
return {'value': int(item['value']), 'classification': item['value_classification']}
except Exception as e:
logger.error(f"Fear & Greed error: {e}")
return None
async def fetch_prices(self):
"""BTC + ETH + SOL prices with 24h and 7d changes."""
try:
url = (
"https://api.coingecko.com/api/v3/simple/price"
"?ids=bitcoin,ethereum,solana&vs_currencies=usd"
"&include_24hr_change=true&include_7d_change=true"
)
resp = await _retry_get(self.session, url)
if not resp:
return None
data = resp.json()
btc = data.get('bitcoin', {})
eth = data.get('ethereum', {})
sol = data.get('solana', {})
return {
'btc_price': btc.get('usd', 0),
'btc_change': btc.get('usd_24h_change', 0),
'btc_7d': btc.get('usd_7d_change', 0),
'eth_price': eth.get('usd', 0),
'eth_change': eth.get('usd_24h_change', 0),
'eth_7d': eth.get('usd_7d_change', 0),
'sol_price': sol.get('usd', 0),
'sol_change': sol.get('usd_24h_change', 0),
'sol_7d': sol.get('usd_7d_change', 0),
}
except Exception as e:
logger.error(f"Price fetch error: {e}")
return None
async def fetch_global(self) -> dict | None:
"""Global crypto market data: BTC dominance, total mcap."""
try:
resp = await _retry_get(self.session, "https://api.coingecko.com/api/v3/global")
if not resp:
return None
data = resp.json().get('data', {})
return {
'btc_dominance': round(data.get('market_cap_percentage', {}).get('btc', 0), 1),
'eth_dominance': round(data.get('market_cap_percentage', {}).get('eth', 0), 1),
'total_mcap': data.get('total_market_cap', {}).get('usd', 0),
'mcap_change': data.get('market_cap_change_percentage_24h_usd', 0),
}
except Exception as e:
logger.error(f"Global fetch error: {e}")
return None
async def fetch_trending(self) -> list:
"""Top 7 trending coins from CoinGecko /search/trending."""
try:
resp = await _retry_get(self.session, "https://api.coingecko.com/api/v3/search/trending")
if not resp:
return []
data = resp.json()
coins = []
for item in data.get('coins', [])[:7]:
c = item.get('item', {})
coins.append({
'name': c.get('name', ''),
'symbol': c.get('symbol', '').upper(),
'rank': c.get('market_cap_rank', '?'),
'score': c.get('score', 0),
})
return coins
except Exception as e:
logger.error(f"Trending fetch error: {e}")
return []
async def fetch_top_movers(self):
"""Top 5 gainers and top 5 losers by 24h % from CoinGecko top-200."""
try:
url = (
"https://api.coingecko.com/api/v3/coins/markets"
"?vs_currency=usd&order=market_cap_desc&per_page=200&page=1"
"&price_change_percentage=24h&sparkline=false"
)
resp = await _retry_get(self.session, url, timeout=20)
if not resp:
return [], []
coins = resp.json()
# CoinGecko returns error dict/string on rate limit
if not isinstance(coins, list):
logger.warning(f"Top movers: unexpected response type {type(coins)}: {str(coins)[:100]}")
return [], []
filtered = [
c for c in coins
if isinstance(c, dict)
and c.get('symbol', '').lower() not in self.STABLES
and c.get('price_change_percentage_24h') is not None
]
sorted_coins = sorted(
filtered,
key=lambda c: c['price_change_percentage_24h'] or 0,
reverse=True
)
gainers = sorted_coins[:5]
losers = list(reversed(sorted_coins[-5:]))
return gainers, losers
except Exception as e:
logger.error(f"Top movers error: {e}")
return [], []
async def fetch_sparklines(self) -> dict:
"""Fetch 7-day price data for BTC/ETH/SOL and return sparkline strings.
Returns: {'btc': 'โโโโ
โ', 'eth': '...', 'sol': '...'}
"""
from templates import spark_bar
result = {}
coins = [('bitcoin', 'btc'), ('ethereum', 'eth'), ('solana', 'sol')]
for coin_id, symbol in coins:
try:
url = (
f"https://api.coingecko.com/api/v3/coins/{coin_id}/market_chart"
f"?vs_currency=usd&days=7&interval=daily"
)
resp = await _retry_get(self.session, url)
if not resp:
continue
data = resp.json()
prices = [p[1] for p in data.get('prices', [])]
if prices:
result[symbol] = spark_bar(prices)
await asyncio.sleep(0.3) # rate limit
except Exception as e:
logger.error(f"Sparkline error {coin_id}: {e}")
return result
# Major coins to include in funding rates (top ~80 by mcap)
MAJOR_COINS = {
'BTC', 'ETH', 'BNB', 'SOL', 'XRP', 'ADA', 'DOGE', 'AVAX',
'DOT', 'LINK', 'MATIC', 'SHIB', 'LTC', 'TRX', 'NEAR', 'UNI',
'ATOM', 'XLM', 'ICP', 'APT', 'ARB', 'OP', 'SUI', 'SEI', 'INJ',
'TON', 'PEPE', 'WLD', 'RNDR', 'FIL', 'ALGO', 'FTM', 'AAVE',
'MANA', 'SAND', 'AXS', 'GALA', 'CRV', 'LDO', 'MKR', 'SNX',
'COMP', 'RUNE', 'THETA', 'FET', 'AGIX', 'OCEAN', 'IMX', 'GMX',
'DYDX', 'STX', 'ORDI', 'WIF', 'JUP', 'TIA', 'PYTH', 'JTO',
'BONK', 'FLOKI', 'NOT', 'ENA', 'W', 'ETHFI', 'PENDLE', 'TAO',
'ONDO', 'STRK', 'ZRO', 'BOME', 'EIGEN', 'MOVE', 'HBAR',
}
async def fetch_funding_rates(self) -> list:
"""Top funding rates from Binance Futures (no API key needed).
Only major coins (top ~80 by mcap). Returns top 10 by abs rate.
"""
try:
url = "https://fapi.binance.com/fapi/v1/premiumIndex"
resp = await asyncio.to_thread(self.session.get, url, timeout=15)
data = resp.json()
rates = []
for item in data:
rate = float(item.get('lastFundingRate', 0))
if rate == 0:
continue
symbol = item.get('symbol', '')
if not symbol.endswith('USDT'):
continue
clean = symbol.replace('USDT', '')
# Filter: only major coins
if clean not in self.MAJOR_COINS:
continue
rates.append({
'symbol': clean,
'rate': rate,
'rate_pct': rate * 100,
'price': float(item.get('markPrice', 0)),
})
rates.sort(key=lambda x: abs(x['rate']), reverse=True)
return rates[:10]
except Exception as e:
logger.error(f"Funding rates error: {e}")
return []
async def fetch_long_short_ratio(self) -> list:
"""Fetch Long/Short account ratio + OI for top coins from Binance Futures.
No API key needed. Returns list of dicts.
"""
coins = [
('BTCUSDT', 'BTC'), ('ETHUSDT', 'ETH'), ('SOLUSDT', 'SOL'),
('XRPUSDT', 'XRP'), ('DOGEUSDT', 'DOGE'), ('BNBUSDT', 'BNB'),
('ADAUSDT', 'ADA'), ('AVAXUSDT', 'AVAX'), ('LINKUSDT', 'LINK'),
('DOTUSDT', 'DOT'),
]
results = []
for pair, symbol in coins:
try:
# Long/Short ratio
url = (
f"https://fapi.binance.com/futures/data/"
f"globalLongShortAccountRatio?symbol={pair}&period=1h&limit=1"
)
resp = await asyncio.to_thread(self.session.get, url, timeout=10)
data = resp.json()
if data:
d = data[0]
long_pct = float(d.get('longAccount', 0.5))
short_pct = float(d.get('shortAccount', 0.5))
ratio = float(d.get('longShortRatio', 1.0))
results.append({
'symbol': symbol,
'long_pct': long_pct,
'short_pct': short_pct,
'ratio': ratio,
})
await asyncio.sleep(0.2)
except Exception as e:
logger.error(f"L/S ratio error {symbol}: {e}")
return results
async def fetch_whale_alerts(self) -> list:
"""Fetch recent large transactions from whale-alert.io free API (no key, limited).
Falls back to Blockchair if unavailable.
"""
try:
# Blockchair free: largest BTC transactions in last 24h
url = "https://api.blockchair.com/bitcoin/transactions?s=output_total(desc)&limit=5"
resp = await _retry_get(self.session, url, retries=2, timeout=15)
if not resp:
return []
data = resp.json()
txs = data.get('data', [])
alerts = []
for tx in txs:
btc_amount = tx.get('output_total', 0) / 1e8 # satoshi โ BTC
if btc_amount >= 100: # only 100+ BTC
alerts.append({
'amount_btc': btc_amount,
'hash': tx.get('hash', '')[:16],
'block': tx.get('block_id', 0),
})
return alerts[:5]
except Exception as e:
logger.error(f"Whale alerts error: {e}")
return []
async def fetch_reddit(self):
"""Hot posts from r/CryptoCurrency (score > 100 only)."""
try:
url = "https://www.reddit.com/r/CryptoCurrency/hot.json?limit=10"
headers = {'User-Agent': 'AlphaPulseBot/4.0'}
resp = await asyncio.to_thread(
lambda: self.session.get(url, headers=headers, timeout=20)
)
data = resp.json()
posts = []
for item in data.get('data', {}).get('children', []):
pd = item['data']
if pd.get('score', 0) > 100:
posts.append({
'id': stable_id('Reddit', pd['id']),
'title': pd['title'],
'url': f"https://reddit.com{pd['permalink']}",
'source': 'Reddit r/CC',
'score': pd['score'],
})
return posts
except Exception as e:
logger.error(f"Reddit error: {e}")
return []