"""
AlphaPulse Bot — Database (SQLite)
"""
import sqlite3
import logging
from datetime import datetime, timedelta
from config import CONFIG
from utils import now_van
logger = logging.getLogger(__name__)
class Database:
def __init__(self, db_path='posts.db'):
self.conn = sqlite3.connect(db_path, check_same_thread=False)
self._init_db()
self._cleanup_old()
def _init_db(self):
c = self.conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS posted (
id TEXT PRIMARY KEY,
title TEXT,
url TEXT,
source TEXT,
post_type TEXT,
posted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)''')
c.execute('''CREATE TABLE IF NOT EXISTS daily_stats (
date TEXT PRIMARY KEY,
fear_greed_posted INTEGER DEFAULT 0,
price_posted INTEGER DEFAULT 0,
top_movers_posted INTEGER DEFAULT 0,
weekly_digest_posted INTEGER DEFAULT 0,
trending_posted INTEGER DEFAULT 0,
posted_hours TEXT DEFAULT ""
)''')
self._init_analytics(c)
# Migrations — add new columns if upgrading from older versions
for col, default in [
('posted_hours', '""'),
('price_posted', '0'),
('top_movers_posted', '0'),
('weekly_digest_posted', '0'),
('trending_posted', '0'),
('funding_rates_posted', '0'),
('watchlist_posted', '0'),
('long_short_posted', '0'),
('history_posted', '0'),
('whale_posted', '0'),
]:
try:
c.execute(f'ALTER TABLE daily_stats ADD COLUMN {col} TEXT DEFAULT {default}')
self.conn.commit()
except sqlite3.OperationalError:
pass # column already exists
self.conn.commit()
def _init_analytics(self, c):
"""Analytics table for post tracking."""
c.execute('''CREATE TABLE IF NOT EXISTS analytics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL,
hour INTEGER,
post_type TEXT,
format TEXT,
source TEXT,
title TEXT,
posted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)''')
c.execute('''CREATE INDEX IF NOT EXISTS idx_analytics_date ON analytics(date)''')
self.conn.commit()
def track_post(self, post_type: str, fmt: str = '', source: str = '', title: str = ''):
"""Track a post for analytics."""
try:
now = now_van()
c = self.conn.cursor()
c.execute(
'INSERT INTO analytics (date, hour, post_type, format, source, title) VALUES (?, ?, ?, ?, ?, ?)',
(self._today(), now.hour, post_type, fmt, source, title[:100])
)
self.conn.commit()
except Exception as e:
logger.error(f"Analytics track error: {e}")
def get_analytics(self, days: int = 7) -> dict:
"""Get posting analytics for last N days."""
try:
c = self.conn.cursor()
from datetime import timedelta
cutoff = (now_van() - timedelta(days=days)).strftime('%Y-%m-%d')
# Total posts per day
c.execute('SELECT date, COUNT(*) FROM analytics WHERE date >= ? GROUP BY date ORDER BY date', (cutoff,))
daily = {row[0]: row[1] for row in c.fetchall()}
# Posts by type
c.execute('SELECT post_type, COUNT(*) FROM analytics WHERE date >= ? GROUP BY post_type ORDER BY COUNT(*) DESC', (cutoff,))
by_type = {row[0]: row[1] for row in c.fetchall()}
# Posts by format (news only)
c.execute("SELECT format, COUNT(*) FROM analytics WHERE date >= ? AND post_type='news' GROUP BY format ORDER BY COUNT(*) DESC", (cutoff,))
by_format = {row[0]: row[1] for row in c.fetchall()}
# Posts by source
c.execute("SELECT source, COUNT(*) FROM analytics WHERE date >= ? AND source != '' GROUP BY source ORDER BY COUNT(*) DESC LIMIT 10", (cutoff,))
by_source = {row[0]: row[1] for row in c.fetchall()}
# Total
c.execute('SELECT COUNT(*) FROM analytics WHERE date >= ?', (cutoff,))
total = c.fetchone()[0]
return {
'period_days': days,
'total_posts': total,
'daily': daily,
'by_type': by_type,
'by_format': by_format,
'by_source': by_source,
}
except Exception as e:
logger.error(f"Analytics query error: {e}")
return {}
def _cleanup_old(self):
try:
cutoff = (now_van() - timedelta(days=CONFIG['retention_days'])).strftime('%Y-%m-%d %H:%M:%S')
c = self.conn.cursor()
c.execute('DELETE FROM posted WHERE posted_at < ?', (cutoff,))
deleted = c.rowcount
self.conn.commit()
if deleted:
logger.info(f"DB cleanup: removed {deleted} old records")
except sqlite3.Error as e:
logger.error(f"DB cleanup error: {e}")
def _today(self) -> str:
return now_van().strftime('%Y-%m-%d')
def is_posted(self, post_id: str) -> bool:
try:
c = self.conn.cursor()
c.execute('SELECT id FROM posted WHERE id = ?', (post_id,))
return c.fetchone() is not None
except sqlite3.Error as e:
logger.error(f"DB error is_posted: {e}")
return False
def mark_posted(self, post_id, title, url, source, post_type='regular'):
try:
c = self.conn.cursor()
c.execute(
'INSERT OR REPLACE INTO posted (id, title, url, source, post_type) VALUES (?, ?, ?, ?, ?)',
(post_id, title, url, source, post_type)
)
self.conn.commit()
except sqlite3.Error as e:
logger.error(f"DB error mark_posted: {e}")
self.conn.rollback()
def was_posted_today(self, field: str) -> bool:
try:
c = self.conn.cursor()
c.execute(f'SELECT {field} FROM daily_stats WHERE date = ?', (self._today(),))
result = c.fetchone()
return bool(result and result[0] == 1)
except sqlite3.Error as e:
logger.error(f"DB error was_posted_today({field}): {e}")
return False
def mark_posted_today(self, field: str):
try:
c = self.conn.cursor()
c.execute(
f'INSERT INTO daily_stats (date, {field}) VALUES (?, 1) '
f'ON CONFLICT(date) DO UPDATE SET {field}=1',
(self._today(),)
)
self.conn.commit()
except sqlite3.Error as e:
logger.error(f"DB error mark_posted_today({field}): {e}")
self.conn.rollback()
def mark_hour_posted(self, hour: int):
"""Persist that this Vancouver hour was already served today."""
try:
c = self.conn.cursor()
c.execute('SELECT posted_hours FROM daily_stats WHERE date = ?', (self._today(),))
row = c.fetchone()
current = row[0] if row else ''
hours = set(h for h in current.split(',') if h)
hours.add(str(hour))
new_val = ','.join(sorted(hours))
c.execute(
'INSERT INTO daily_stats (date, posted_hours) VALUES (?, ?) '
'ON CONFLICT(date) DO UPDATE SET posted_hours=?',
(self._today(), new_val, new_val)
)
self.conn.commit()
except sqlite3.Error as e:
logger.error(f"DB error mark_hour_posted: {e}")
def was_hour_posted(self, hour: int) -> bool:
"""Check if this Vancouver hour was already served today."""
try:
c = self.conn.cursor()
c.execute('SELECT posted_hours FROM daily_stats WHERE date = ?', (self._today(),))
row = c.fetchone()
if row and row[0]:
return str(hour) in row[0].split(',')
except sqlite3.Error as e:
logger.error(f"DB error was_hour_posted: {e}")
return False
def get_health_stats(self) -> dict:
"""Return bot health stats for monitoring."""
try:
c = self.conn.cursor()
# Total posts
c.execute('SELECT COUNT(*) FROM posted')
total = c.fetchone()[0]
# Today's posted hours
c.execute('SELECT posted_hours FROM daily_stats WHERE date = ?', (self._today(),))
row = c.fetchone()
hours_today = row[0] if row and row[0] else ''
# Last post time
c.execute('SELECT posted_at FROM posted ORDER BY posted_at DESC LIMIT 1')
last = c.fetchone()
last_post = last[0] if last else None
return {
'total_posts': total,
'hours_posted_today': hours_today,
'last_post_at': last_post,
'date': self._today(),
}
except Exception as e:
logger.error(f"Health stats error: {e}")
return {}
def get_last_post_time(self):
try:
c = self.conn.cursor()
c.execute('SELECT posted_at FROM posted ORDER BY posted_at DESC LIMIT 1')
result = c.fetchone()
if result:
return datetime.fromisoformat(result[0].replace('Z', '+00:00'))
except sqlite3.Error as e:
logger.error(f"DB error get_last_post_time: {e}")
return None