← Back
const Database = require('better-sqlite3');
const path = require('path');

const DB_PATH = path.join(__dirname, 'data', 'kz-channels.db');

let db;

function init() {
  const fs = require('fs');
  fs.mkdirSync(path.dirname(DB_PATH), { recursive: true });

  db = new Database(DB_PATH);
  db.pragma('journal_mode = WAL');
  db.pragma('foreign_keys = ON');

  // WAL checkpoint при старте — предотвращает "database or disk is full"
  try { db.pragma('wal_checkpoint(TRUNCATE)'); } catch {}

  // Периодический WAL checkpoint каждый час
  setInterval(() => {
    try { db.pragma('wal_checkpoint(PASSIVE)'); } catch {}
  }, 60 * 60 * 1000);

  db.exec(`
    CREATE TABLE IF NOT EXISTS posts (
      id INTEGER PRIMARY KEY AUTOINCREMENT,
      channel_id TEXT NOT NULL,
      source_url TEXT,
      title TEXT,
      content TEXT,
      image_url TEXT,
      post_type TEXT NOT NULL DEFAULT 'content',
      telegram_message_id INTEGER,
      created_at TEXT NOT NULL DEFAULT (datetime('now')),
      UNIQUE(channel_id, source_url)
    );

    CREATE TABLE IF NOT EXISTS stats (
      id INTEGER PRIMARY KEY AUTOINCREMENT,
      channel_id TEXT NOT NULL,
      date TEXT NOT NULL,
      posts_count INTEGER DEFAULT 0,
      ad_posts_count INTEGER DEFAULT 0,
      created_at TEXT NOT NULL DEFAULT (datetime('now')),
      UNIQUE(channel_id, date)
    );

    CREATE TABLE IF NOT EXISTS sources (
      id INTEGER PRIMARY KEY AUTOINCREMENT,
      channel_id TEXT NOT NULL,
      name TEXT NOT NULL,
      url TEXT NOT NULL,
      type TEXT NOT NULL DEFAULT 'rss',
      status TEXT NOT NULL DEFAULT 'pending',
      added_by TEXT DEFAULT 'system',
      created_at TEXT NOT NULL DEFAULT (datetime('now')),
      approved_at TEXT,
      UNIQUE(channel_id, url)
    );

    CREATE TABLE IF NOT EXISTS post_queue (
      id INTEGER PRIMARY KEY AUTOINCREMENT,
      channel_id TEXT NOT NULL,
      source_id INTEGER,
      source_url TEXT,
      title TEXT,
      content TEXT,
      image_path TEXT,
      status TEXT NOT NULL DEFAULT 'pending',
      created_at TEXT NOT NULL DEFAULT (datetime('now')),
      reviewed_at TEXT
    );

    CREATE INDEX IF NOT EXISTS idx_queue_status ON post_queue(status);
    CREATE INDEX IF NOT EXISTS idx_posts_channel ON posts(channel_id);
    CREATE INDEX IF NOT EXISTS idx_posts_created ON posts(created_at);
    CREATE INDEX IF NOT EXISTS idx_stats_date ON stats(date);
    CREATE INDEX IF NOT EXISTS idx_sources_channel ON sources(channel_id);
    CREATE INDEX IF NOT EXISTS idx_sources_status ON sources(status);
  `);

  // Миграция: перенести источники из config.js в БД (только при первом запуске)
  migrateSources();

  return db;
}

function migrateSources() {
  const { channels } = require('./config');
  const count = db.prepare('SELECT COUNT(*) as c FROM sources').get().c;
  if (count > 0) return; // Уже мигрировано

  const stmt = db.prepare(`
    INSERT OR IGNORE INTO sources (channel_id, name, url, type, status, added_by)
    VALUES (?, ?, ?, ?, 'approved', 'system')
  `);

  for (const [key, ch] of Object.entries(channels)) {
    for (const src of (ch.sources || [])) {
      stmt.run(key, src.name, src.url, src.type || 'rss');
    }
  }
}

function isDuplicate(channelId, sourceUrl) {
  if (!sourceUrl) return false;
  const row = db.prepare('SELECT id FROM posts WHERE channel_id = ? AND source_url = ?').get(channelId, sourceUrl);
  return !!row;
}

function savePost({ channelId, sourceUrl, title, content, imageUrl, postType, telegramMessageId }) {
  const stmt = db.prepare(`
    INSERT OR IGNORE INTO posts (channel_id, source_url, title, content, image_url, post_type, telegram_message_id)
    VALUES (?, ?, ?, ?, ?, ?, ?)
  `);
  return stmt.run(channelId, sourceUrl || null, title, content, imageUrl || null, postType || 'content', telegramMessageId || null);
}

// Дата по Казахстану (UTC+5) — аудитория там, статистика по их дням
function getKZDate() {
  return new Date().toLocaleDateString('en-CA', { timeZone: 'Asia/Almaty' });
}

function incrementStats(channelId, isAd = false) {
  const today = getKZDate();
  const field = isAd ? 'ad_posts_count' : 'posts_count';

  db.prepare(`
    INSERT INTO stats (channel_id, date, ${field})
    VALUES (?, ?, 1)
    ON CONFLICT(channel_id, date)
    DO UPDATE SET ${field} = ${field} + 1
  `).run(channelId, today);
}

function getStats(channelId, days = 7) {
  return db.prepare(`
    SELECT date, posts_count, ad_posts_count
    FROM stats
    WHERE channel_id = ?
    ORDER BY date DESC
    LIMIT ?
  `).all(channelId, days);
}

function getTodayPostCount(channelId) {
  const today = getKZDate();
  const row = db.prepare('SELECT posts_count FROM stats WHERE channel_id = ? AND date = ?').get(channelId, today);
  return row ? row.posts_count : 0;
}

function getTodayAdCount(channelId) {
  const today = getKZDate();
  const row = db.prepare('SELECT ad_posts_count FROM stats WHERE channel_id = ? AND date = ?').get(channelId, today);
  return row ? row.ad_posts_count : 0;
}

// Все URL постов за последние N дней (для глобального dedup между каналами)
function getRecentUrls(days = 3) {
  const rows = db.prepare(`
    SELECT DISTINCT source_url FROM posts
    WHERE source_url IS NOT NULL
    AND created_at > datetime('now', ? || ' days')
  `).all(`-${days}`);
  return new Set(rows.map(r => r.source_url));
}

// --- Sources CRUD ---

function getSources(channelId, statusFilter) {
  if (statusFilter) {
    return db.prepare('SELECT * FROM sources WHERE channel_id = ? AND status = ? ORDER BY created_at').all(channelId, statusFilter);
  }
  return db.prepare('SELECT * FROM sources WHERE channel_id = ? ORDER BY status, created_at').all(channelId);
}

function getApprovedSources(channelId) {
  return db.prepare('SELECT * FROM sources WHERE channel_id = ? AND status = ?').all(channelId, 'approved');
}

function getAllPending() {
  return db.prepare('SELECT * FROM sources WHERE status = ? ORDER BY created_at').all('pending');
}

function addSource({ channelId, name, url, type, addedBy }) {
  return db.prepare(`
    INSERT OR IGNORE INTO sources (channel_id, name, url, type, status, added_by)
    VALUES (?, ?, ?, ?, 'pending', ?)
  `).run(channelId, name, url, type || 'rss', addedBy || 'dashboard');
}

function approveSource(id) {
  return db.prepare(`UPDATE sources SET status = 'approved', approved_at = datetime('now') WHERE id = ?`).run(id);
}

function rejectSource(id) {
  return db.prepare(`UPDATE sources SET status = 'rejected' WHERE id = ?`).run(id);
}

function deleteSource(id) {
  return db.prepare('DELETE FROM sources WHERE id = ?').run(id);
}

// --- Post Queue (модерация) ---

function addToQueue({ channelId, sourceId, sourceUrl, title, content, imagePath }) {
  return db.prepare(`
    INSERT INTO post_queue (channel_id, source_id, source_url, title, content, image_path)
    VALUES (?, ?, ?, ?, ?, ?)
  `).run(channelId, sourceId || null, sourceUrl || null, title || '', content, imagePath || null);
}

function getQueue(status = 'pending') {
  return db.prepare(`
    SELECT pq.*, s.name as source_name
    FROM post_queue pq
    LEFT JOIN sources s ON pq.source_id = s.id
    WHERE pq.status = ?
    ORDER BY pq.created_at DESC
  `).all(status);
}

function getQueueItem(id) {
  return db.prepare('SELECT * FROM post_queue WHERE id = ?').get(id);
}

function approveQueueItem(id) {
  return db.prepare(`UPDATE post_queue SET status = 'approved', reviewed_at = datetime('now') WHERE id = ?`).run(id);
}

function rejectQueueItem(id) {
  return db.prepare(`UPDATE post_queue SET status = 'rejected', reviewed_at = datetime('now') WHERE id = ?`).run(id);
}

function getQueueCount() {
  const row = db.prepare("SELECT COUNT(*) as c FROM post_queue WHERE status = 'pending'").get();
  return row.c;
}

// Проверить: источник требует модерации постов?
function sourceNeedsModeration(sourceId) {
  if (!sourceId) return false;
  const row = db.prepare('SELECT added_by FROM sources WHERE id = ?').get(sourceId);
  return row && row.added_by !== 'system';
}

function getAdStats(days = 7) {
  return db.prepare(`
    SELECT channel_id, date, ad_posts_count
    FROM stats WHERE ad_posts_count > 0
    ORDER BY date DESC
    LIMIT ?
  `).all(days * 5);
}

function getAdTotals() {
  const row = db.prepare('SELECT SUM(ad_posts_count) as total FROM stats').get();
  return row ? row.total || 0 : 0;
}

function getAdTemplateStats() {
  return db.prepare(`
    SELECT title as template_id, COUNT(*) as count
    FROM posts WHERE post_type = 'ad' AND title != ''
    GROUP BY title ORDER BY count DESC
  `).all();
}

function getLastPostTime(channelId) {
  const row = db.prepare('SELECT created_at FROM posts WHERE channel_id = ? ORDER BY id DESC LIMIT 1').get(channelId);
  return row ? row.created_at : null;
}

function getRecentPosts(channelId, limit = 3) {
  return db.prepare('SELECT content, post_type, created_at FROM posts WHERE channel_id = ? ORDER BY id DESC LIMIT ?').all(channelId, limit);
}

function close() {
  if (db) db.close();
}

module.exports = {
  init, isDuplicate, savePost, incrementStats, getStats,
  getTodayPostCount, getTodayAdCount, getRecentUrls, getLastPostTime, getRecentPosts, getAdStats, getAdTotals, getAdTemplateStats,
  getSources, getApprovedSources, getAllPending, addSource, approveSource, rejectSource, deleteSource,
  addToQueue, getQueue, getQueueItem, approveQueueItem, rejectQueueItem, getQueueCount, sourceNeedsModeration,
  getKZDate, close
};

📜 Git History

ecd6693feat: утренняя рубрика Курс валют от Нацбанка РК (B3)3 weeks ago
3fdc0b1fix: 4 бага — WAL checkpoint, fallback контент, HTML sanitize, логи7 weeks ago
045688afeat: dashboard v0.3 — 9 новых источников + 10 улучшений дашборда7 weeks ago
5481deffeat: post moderation queue for new sources7 weeks ago
6ff5fa6feat: source management with moderation via dashboard modal7 weeks ago
2757087init: KZ Channels bot — 5 Telegram channels for Kazakhstan7 weeks ago
Show last diff
Loading...