← Back
"""
AlphaPulse Bot — Database (SQLite)
"""

import sqlite3
import logging
from datetime import datetime, timedelta

from config import CONFIG
from utils import now_van

logger = logging.getLogger(__name__)


class Database:
    def __init__(self, db_path='posts.db'):
        self.conn = sqlite3.connect(db_path, check_same_thread=False)
        self._init_db()
        self._cleanup_old()

    def _init_db(self):
        c = self.conn.cursor()
        c.execute('''CREATE TABLE IF NOT EXISTS posted (
            id TEXT PRIMARY KEY,
            title TEXT,
            url TEXT,
            source TEXT,
            post_type TEXT,
            posted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )''')
        c.execute('''CREATE TABLE IF NOT EXISTS daily_stats (
            date TEXT PRIMARY KEY,
            fear_greed_posted INTEGER DEFAULT 0,
            price_posted INTEGER DEFAULT 0,
            top_movers_posted INTEGER DEFAULT 0,
            weekly_digest_posted INTEGER DEFAULT 0,
            trending_posted INTEGER DEFAULT 0,
            posted_hours TEXT DEFAULT ""
        )''')
        self._init_analytics(c)
        # Migrations — add new columns if upgrading from older versions
        for col, default in [
            ('posted_hours',         '""'),
            ('price_posted',         '0'),
            ('top_movers_posted',    '0'),
            ('weekly_digest_posted', '0'),
            ('trending_posted',      '0'),
            ('funding_rates_posted', '0'),
            ('watchlist_posted',     '0'),
            ('long_short_posted',    '0'),
            ('history_posted',       '0'),
            ('whale_posted',         '0'),
        ]:
            try:
                c.execute(f'ALTER TABLE daily_stats ADD COLUMN {col} TEXT DEFAULT {default}')
                self.conn.commit()
            except sqlite3.OperationalError:
                pass  # column already exists
        self.conn.commit()

    def _init_analytics(self, c):
        """Analytics table for post tracking."""
        c.execute('''CREATE TABLE IF NOT EXISTS analytics (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            date TEXT NOT NULL,
            hour INTEGER,
            post_type TEXT,
            format TEXT,
            source TEXT,
            title TEXT,
            posted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )''')
        c.execute('''CREATE INDEX IF NOT EXISTS idx_analytics_date ON analytics(date)''')
        self.conn.commit()

    def track_post(self, post_type: str, fmt: str = '', source: str = '', title: str = ''):
        """Track a post for analytics."""
        try:
            now = now_van()
            c = self.conn.cursor()
            c.execute(
                'INSERT INTO analytics (date, hour, post_type, format, source, title) VALUES (?, ?, ?, ?, ?, ?)',
                (self._today(), now.hour, post_type, fmt, source, title[:100])
            )
            self.conn.commit()
        except Exception as e:
            logger.error(f"Analytics track error: {e}")

    def get_analytics(self, days: int = 7) -> dict:
        """Get posting analytics for last N days."""
        try:
            c = self.conn.cursor()
            from datetime import timedelta
            cutoff = (now_van() - timedelta(days=days)).strftime('%Y-%m-%d')

            # Total posts per day
            c.execute('SELECT date, COUNT(*) FROM analytics WHERE date >= ? GROUP BY date ORDER BY date', (cutoff,))
            daily = {row[0]: row[1] for row in c.fetchall()}

            # Posts by type
            c.execute('SELECT post_type, COUNT(*) FROM analytics WHERE date >= ? GROUP BY post_type ORDER BY COUNT(*) DESC', (cutoff,))
            by_type = {row[0]: row[1] for row in c.fetchall()}

            # Posts by format (news only)
            c.execute("SELECT format, COUNT(*) FROM analytics WHERE date >= ? AND post_type='news' GROUP BY format ORDER BY COUNT(*) DESC", (cutoff,))
            by_format = {row[0]: row[1] for row in c.fetchall()}

            # Posts by source
            c.execute("SELECT source, COUNT(*) FROM analytics WHERE date >= ? AND source != '' GROUP BY source ORDER BY COUNT(*) DESC LIMIT 10", (cutoff,))
            by_source = {row[0]: row[1] for row in c.fetchall()}

            # Total
            c.execute('SELECT COUNT(*) FROM analytics WHERE date >= ?', (cutoff,))
            total = c.fetchone()[0]

            return {
                'period_days': days,
                'total_posts': total,
                'daily': daily,
                'by_type': by_type,
                'by_format': by_format,
                'by_source': by_source,
            }
        except Exception as e:
            logger.error(f"Analytics query error: {e}")
            return {}

    def _cleanup_old(self):
        try:
            cutoff = (now_van() - timedelta(days=CONFIG['retention_days'])).strftime('%Y-%m-%d %H:%M:%S')
            c = self.conn.cursor()
            c.execute('DELETE FROM posted WHERE posted_at < ?', (cutoff,))
            deleted = c.rowcount
            self.conn.commit()
            if deleted:
                logger.info(f"DB cleanup: removed {deleted} old records")
        except sqlite3.Error as e:
            logger.error(f"DB cleanup error: {e}")

    def _today(self) -> str:
        return now_van().strftime('%Y-%m-%d')

    def is_posted(self, post_id: str) -> bool:
        try:
            c = self.conn.cursor()
            c.execute('SELECT id FROM posted WHERE id = ?', (post_id,))
            return c.fetchone() is not None
        except sqlite3.Error as e:
            logger.error(f"DB error is_posted: {e}")
            return False

    def mark_posted(self, post_id, title, url, source, post_type='regular'):
        try:
            c = self.conn.cursor()
            c.execute(
                'INSERT OR REPLACE INTO posted (id, title, url, source, post_type) VALUES (?, ?, ?, ?, ?)',
                (post_id, title, url, source, post_type)
            )
            self.conn.commit()
        except sqlite3.Error as e:
            logger.error(f"DB error mark_posted: {e}")
            self.conn.rollback()

    def was_posted_today(self, field: str) -> bool:
        try:
            c = self.conn.cursor()
            c.execute(f'SELECT {field} FROM daily_stats WHERE date = ?', (self._today(),))
            result = c.fetchone()
            return bool(result and result[0] == 1)
        except sqlite3.Error as e:
            logger.error(f"DB error was_posted_today({field}): {e}")
            return False

    def mark_posted_today(self, field: str):
        try:
            c = self.conn.cursor()
            c.execute(
                f'INSERT INTO daily_stats (date, {field}) VALUES (?, 1) '
                f'ON CONFLICT(date) DO UPDATE SET {field}=1',
                (self._today(),)
            )
            self.conn.commit()
        except sqlite3.Error as e:
            logger.error(f"DB error mark_posted_today({field}): {e}")
            self.conn.rollback()

    def mark_hour_posted(self, hour: int):
        """Persist that this Vancouver hour was already served today."""
        try:
            c = self.conn.cursor()
            c.execute('SELECT posted_hours FROM daily_stats WHERE date = ?', (self._today(),))
            row = c.fetchone()
            current = row[0] if row else ''
            hours = set(h for h in current.split(',') if h)
            hours.add(str(hour))
            new_val = ','.join(sorted(hours))
            c.execute(
                'INSERT INTO daily_stats (date, posted_hours) VALUES (?, ?) '
                'ON CONFLICT(date) DO UPDATE SET posted_hours=?',
                (self._today(), new_val, new_val)
            )
            self.conn.commit()
        except sqlite3.Error as e:
            logger.error(f"DB error mark_hour_posted: {e}")

    def was_hour_posted(self, hour: int) -> bool:
        """Check if this Vancouver hour was already served today."""
        try:
            c = self.conn.cursor()
            c.execute('SELECT posted_hours FROM daily_stats WHERE date = ?', (self._today(),))
            row = c.fetchone()
            if row and row[0]:
                return str(hour) in row[0].split(',')
        except sqlite3.Error as e:
            logger.error(f"DB error was_hour_posted: {e}")
        return False

    def get_health_stats(self) -> dict:
        """Return bot health stats for monitoring."""
        try:
            c = self.conn.cursor()
            # Total posts
            c.execute('SELECT COUNT(*) FROM posted')
            total = c.fetchone()[0]
            # Today's posted hours
            c.execute('SELECT posted_hours FROM daily_stats WHERE date = ?', (self._today(),))
            row = c.fetchone()
            hours_today = row[0] if row and row[0] else ''
            # Last post time
            c.execute('SELECT posted_at FROM posted ORDER BY posted_at DESC LIMIT 1')
            last = c.fetchone()
            last_post = last[0] if last else None
            return {
                'total_posts': total,
                'hours_posted_today': hours_today,
                'last_post_at': last_post,
                'date': self._today(),
            }
        except Exception as e:
            logger.error(f"Health stats error: {e}")
            return {}

    def get_last_post_time(self):
        try:
            c = self.conn.cursor()
            c.execute('SELECT posted_at FROM posted ORDER BY posted_at DESC LIMIT 1')
            result = c.fetchone()
            if result:
                return datetime.fromisoformat(result[0].replace('Z', '+00:00'))
        except sqlite3.Error as e:
            logger.error(f"DB error get_last_post_time: {e}")
        return None

📜 Git History

a09f02fchore: initial commit — version control setup5 weeks ago
Show last diff
Loading...