← Назад
""" AlphaPulse Bot — Database (SQLite) """ import sqlite3 import logging from datetime import datetime, timedelta from config import CONFIG from utils import now_van logger = logging.getLogger(__name__) class Database: def __init__(self, db_path='posts.db'): self.conn = sqlite3.connect(db_path, check_same_thread=False) self._init_db() self._cleanup_old() def _init_db(self): c = self.conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS posted ( id TEXT PRIMARY KEY, title TEXT, url TEXT, source TEXT, post_type TEXT, posted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP )''') c.execute('''CREATE TABLE IF NOT EXISTS daily_stats ( date TEXT PRIMARY KEY, fear_greed_posted INTEGER DEFAULT 0, price_posted INTEGER DEFAULT 0, top_movers_posted INTEGER DEFAULT 0, weekly_digest_posted INTEGER DEFAULT 0, trending_posted INTEGER DEFAULT 0, posted_hours TEXT DEFAULT "" )''') self._init_analytics(c) # Migrations — add new columns if upgrading from older versions for col, default in [ ('posted_hours', '""'), ('price_posted', '0'), ('top_movers_posted', '0'), ('weekly_digest_posted', '0'), ('trending_posted', '0'), ('funding_rates_posted', '0'), ('watchlist_posted', '0'), ('long_short_posted', '0'), ('history_posted', '0'), ('whale_posted', '0'), ]: try: c.execute(f'ALTER TABLE daily_stats ADD COLUMN {col} TEXT DEFAULT {default}') self.conn.commit() except sqlite3.OperationalError: pass # column already exists self.conn.commit() def _init_analytics(self, c): """Analytics table for post tracking.""" c.execute('''CREATE TABLE IF NOT EXISTS analytics ( id INTEGER PRIMARY KEY AUTOINCREMENT, date TEXT NOT NULL, hour INTEGER, post_type TEXT, format TEXT, source TEXT, title TEXT, posted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP )''') c.execute('''CREATE INDEX IF NOT EXISTS idx_analytics_date ON analytics(date)''') self.conn.commit() def track_post(self, post_type: str, fmt: str = '', source: str = '', title: str = ''): """Track a post for analytics.""" try: now = now_van() c = self.conn.cursor() c.execute( 'INSERT INTO analytics (date, hour, post_type, format, source, title) VALUES (?, ?, ?, ?, ?, ?)', (self._today(), now.hour, post_type, fmt, source, title[:100]) ) self.conn.commit() except Exception as e: logger.error(f"Analytics track error: {e}") def get_analytics(self, days: int = 7) -> dict: """Get posting analytics for last N days.""" try: c = self.conn.cursor() from datetime import timedelta cutoff = (now_van() - timedelta(days=days)).strftime('%Y-%m-%d') # Total posts per day c.execute('SELECT date, COUNT(*) FROM analytics WHERE date >= ? GROUP BY date ORDER BY date', (cutoff,)) daily = {row[0]: row[1] for row in c.fetchall()} # Posts by type c.execute('SELECT post_type, COUNT(*) FROM analytics WHERE date >= ? GROUP BY post_type ORDER BY COUNT(*) DESC', (cutoff,)) by_type = {row[0]: row[1] for row in c.fetchall()} # Posts by format (news only) c.execute("SELECT format, COUNT(*) FROM analytics WHERE date >= ? AND post_type='news' GROUP BY format ORDER BY COUNT(*) DESC", (cutoff,)) by_format = {row[0]: row[1] for row in c.fetchall()} # Posts by source c.execute("SELECT source, COUNT(*) FROM analytics WHERE date >= ? AND source != '' GROUP BY source ORDER BY COUNT(*) DESC LIMIT 10", (cutoff,)) by_source = {row[0]: row[1] for row in c.fetchall()} # Total c.execute('SELECT COUNT(*) FROM analytics WHERE date >= ?', (cutoff,)) total = c.fetchone()[0] return { 'period_days': days, 'total_posts': total, 'daily': daily, 'by_type': by_type, 'by_format': by_format, 'by_source': by_source, } except Exception as e: logger.error(f"Analytics query error: {e}") return {} def _cleanup_old(self): try: cutoff = (now_van() - timedelta(days=CONFIG['retention_days'])).strftime('%Y-%m-%d %H:%M:%S') c = self.conn.cursor() c.execute('DELETE FROM posted WHERE posted_at < ?', (cutoff,)) deleted = c.rowcount self.conn.commit() if deleted: logger.info(f"DB cleanup: removed {deleted} old records") except sqlite3.Error as e: logger.error(f"DB cleanup error: {e}") def _today(self) -> str: return now_van().strftime('%Y-%m-%d') def is_posted(self, post_id: str) -> bool: try: c = self.conn.cursor() c.execute('SELECT id FROM posted WHERE id = ?', (post_id,)) return c.fetchone() is not None except sqlite3.Error as e: logger.error(f"DB error is_posted: {e}") return False def mark_posted(self, post_id, title, url, source, post_type='regular'): try: c = self.conn.cursor() c.execute( 'INSERT OR REPLACE INTO posted (id, title, url, source, post_type) VALUES (?, ?, ?, ?, ?)', (post_id, title, url, source, post_type) ) self.conn.commit() except sqlite3.Error as e: logger.error(f"DB error mark_posted: {e}") self.conn.rollback() def was_posted_today(self, field: str) -> bool: try: c = self.conn.cursor() c.execute(f'SELECT {field} FROM daily_stats WHERE date = ?', (self._today(),)) result = c.fetchone() return bool(result and result[0] == 1) except sqlite3.Error as e: logger.error(f"DB error was_posted_today({field}): {e}") return False def mark_posted_today(self, field: str): try: c = self.conn.cursor() c.execute( f'INSERT INTO daily_stats (date, {field}) VALUES (?, 1) ' f'ON CONFLICT(date) DO UPDATE SET {field}=1', (self._today(),) ) self.conn.commit() except sqlite3.Error as e: logger.error(f"DB error mark_posted_today({field}): {e}") self.conn.rollback() def mark_hour_posted(self, hour: int): """Persist that this Vancouver hour was already served today.""" try: c = self.conn.cursor() c.execute('SELECT posted_hours FROM daily_stats WHERE date = ?', (self._today(),)) row = c.fetchone() current = row[0] if row else '' hours = set(h for h in current.split(',') if h) hours.add(str(hour)) new_val = ','.join(sorted(hours)) c.execute( 'INSERT INTO daily_stats (date, posted_hours) VALUES (?, ?) ' 'ON CONFLICT(date) DO UPDATE SET posted_hours=?', (self._today(), new_val, new_val) ) self.conn.commit() except sqlite3.Error as e: logger.error(f"DB error mark_hour_posted: {e}") def was_hour_posted(self, hour: int) -> bool: """Check if this Vancouver hour was already served today.""" try: c = self.conn.cursor() c.execute('SELECT posted_hours FROM daily_stats WHERE date = ?', (self._today(),)) row = c.fetchone() if row and row[0]: return str(hour) in row[0].split(',') except sqlite3.Error as e: logger.error(f"DB error was_hour_posted: {e}") return False def get_health_stats(self) -> dict: """Return bot health stats for monitoring.""" try: c = self.conn.cursor() # Total posts c.execute('SELECT COUNT(*) FROM posted') total = c.fetchone()[0] # Today's posted hours c.execute('SELECT posted_hours FROM daily_stats WHERE date = ?', (self._today(),)) row = c.fetchone() hours_today = row[0] if row and row[0] else '' # Last post time c.execute('SELECT posted_at FROM posted ORDER BY posted_at DESC LIMIT 1') last = c.fetchone() last_post = last[0] if last else None return { 'total_posts': total, 'hours_posted_today': hours_today, 'last_post_at': last_post, 'date': self._today(), } except Exception as e: logger.error(f"Health stats error: {e}") return {} def get_last_post_time(self): try: c = self.conn.cursor() c.execute('SELECT posted_at FROM posted ORDER BY posted_at DESC LIMIT 1') result = c.fetchone() if result: return datetime.fromisoformat(result[0].replace('Z', '+00:00')) except sqlite3.Error as e: logger.error(f"DB error get_last_post_time: {e}") return None