const Database = require('better-sqlite3');
const path = require('path');
const DB_PATH = path.join(__dirname, 'data', 'kz-channels.db');
let db;
function init() {
const fs = require('fs');
fs.mkdirSync(path.dirname(DB_PATH), { recursive: true });
db = new Database(DB_PATH);
db.pragma('journal_mode = WAL');
db.pragma('foreign_keys = ON');
// WAL checkpoint при старте — предотвращает "database or disk is full"
try { db.pragma('wal_checkpoint(TRUNCATE)'); } catch {}
// Периодический WAL checkpoint каждый час
setInterval(() => {
try { db.pragma('wal_checkpoint(PASSIVE)'); } catch {}
}, 60 * 60 * 1000);
db.exec(`
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
channel_id TEXT NOT NULL,
source_url TEXT,
title TEXT,
content TEXT,
image_url TEXT,
post_type TEXT NOT NULL DEFAULT 'content',
telegram_message_id INTEGER,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
UNIQUE(channel_id, source_url)
);
CREATE TABLE IF NOT EXISTS stats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
channel_id TEXT NOT NULL,
date TEXT NOT NULL,
posts_count INTEGER DEFAULT 0,
ad_posts_count INTEGER DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
UNIQUE(channel_id, date)
);
CREATE TABLE IF NOT EXISTS sources (
id INTEGER PRIMARY KEY AUTOINCREMENT,
channel_id TEXT NOT NULL,
name TEXT NOT NULL,
url TEXT NOT NULL,
type TEXT NOT NULL DEFAULT 'rss',
status TEXT NOT NULL DEFAULT 'pending',
added_by TEXT DEFAULT 'system',
created_at TEXT NOT NULL DEFAULT (datetime('now')),
approved_at TEXT,
UNIQUE(channel_id, url)
);
CREATE TABLE IF NOT EXISTS post_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
channel_id TEXT NOT NULL,
source_id INTEGER,
source_url TEXT,
title TEXT,
content TEXT,
image_path TEXT,
status TEXT NOT NULL DEFAULT 'pending',
created_at TEXT NOT NULL DEFAULT (datetime('now')),
reviewed_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_queue_status ON post_queue(status);
CREATE INDEX IF NOT EXISTS idx_posts_channel ON posts(channel_id);
CREATE INDEX IF NOT EXISTS idx_posts_created ON posts(created_at);
CREATE INDEX IF NOT EXISTS idx_stats_date ON stats(date);
CREATE INDEX IF NOT EXISTS idx_sources_channel ON sources(channel_id);
CREATE INDEX IF NOT EXISTS idx_sources_status ON sources(status);
`);
// Миграция: перенести источники из config.js в БД (только при первом запуске)
migrateSources();
return db;
}
function migrateSources() {
const { channels } = require('./config');
const count = db.prepare('SELECT COUNT(*) as c FROM sources').get().c;
if (count > 0) return; // Уже мигрировано
const stmt = db.prepare(`
INSERT OR IGNORE INTO sources (channel_id, name, url, type, status, added_by)
VALUES (?, ?, ?, ?, 'approved', 'system')
`);
for (const [key, ch] of Object.entries(channels)) {
for (const src of (ch.sources || [])) {
stmt.run(key, src.name, src.url, src.type || 'rss');
}
}
}
function isDuplicate(channelId, sourceUrl) {
if (!sourceUrl) return false;
const row = db.prepare('SELECT id FROM posts WHERE channel_id = ? AND source_url = ?').get(channelId, sourceUrl);
return !!row;
}
function savePost({ channelId, sourceUrl, title, content, imageUrl, postType, telegramMessageId }) {
const stmt = db.prepare(`
INSERT OR IGNORE INTO posts (channel_id, source_url, title, content, image_url, post_type, telegram_message_id)
VALUES (?, ?, ?, ?, ?, ?, ?)
`);
return stmt.run(channelId, sourceUrl || null, title, content, imageUrl || null, postType || 'content', telegramMessageId || null);
}
// Дата по Казахстану (UTC+5) — аудитория там, статистика по их дням
function getKZDate() {
return new Date().toLocaleDateString('en-CA', { timeZone: 'Asia/Almaty' });
}
function incrementStats(channelId, isAd = false) {
const today = getKZDate();
const field = isAd ? 'ad_posts_count' : 'posts_count';
db.prepare(`
INSERT INTO stats (channel_id, date, ${field})
VALUES (?, ?, 1)
ON CONFLICT(channel_id, date)
DO UPDATE SET ${field} = ${field} + 1
`).run(channelId, today);
}
function getStats(channelId, days = 7) {
return db.prepare(`
SELECT date, posts_count, ad_posts_count
FROM stats
WHERE channel_id = ?
ORDER BY date DESC
LIMIT ?
`).all(channelId, days);
}
function getTodayPostCount(channelId) {
const today = getKZDate();
const row = db.prepare('SELECT posts_count FROM stats WHERE channel_id = ? AND date = ?').get(channelId, today);
return row ? row.posts_count : 0;
}
function getTodayAdCount(channelId) {
const today = getKZDate();
const row = db.prepare('SELECT ad_posts_count FROM stats WHERE channel_id = ? AND date = ?').get(channelId, today);
return row ? row.ad_posts_count : 0;
}
// Все URL постов за последние N дней (для глобального dedup между каналами)
function getRecentUrls(days = 3) {
const rows = db.prepare(`
SELECT DISTINCT source_url FROM posts
WHERE source_url IS NOT NULL
AND created_at > datetime('now', ? || ' days')
`).all(`-${days}`);
return new Set(rows.map(r => r.source_url));
}
// --- Sources CRUD ---
function getSources(channelId, statusFilter) {
if (statusFilter) {
return db.prepare('SELECT * FROM sources WHERE channel_id = ? AND status = ? ORDER BY created_at').all(channelId, statusFilter);
}
return db.prepare('SELECT * FROM sources WHERE channel_id = ? ORDER BY status, created_at').all(channelId);
}
function getApprovedSources(channelId) {
return db.prepare('SELECT * FROM sources WHERE channel_id = ? AND status = ?').all(channelId, 'approved');
}
function getAllPending() {
return db.prepare('SELECT * FROM sources WHERE status = ? ORDER BY created_at').all('pending');
}
function addSource({ channelId, name, url, type, addedBy }) {
return db.prepare(`
INSERT OR IGNORE INTO sources (channel_id, name, url, type, status, added_by)
VALUES (?, ?, ?, ?, 'pending', ?)
`).run(channelId, name, url, type || 'rss', addedBy || 'dashboard');
}
function approveSource(id) {
return db.prepare(`UPDATE sources SET status = 'approved', approved_at = datetime('now') WHERE id = ?`).run(id);
}
function rejectSource(id) {
return db.prepare(`UPDATE sources SET status = 'rejected' WHERE id = ?`).run(id);
}
function deleteSource(id) {
return db.prepare('DELETE FROM sources WHERE id = ?').run(id);
}
// --- Post Queue (модерация) ---
function addToQueue({ channelId, sourceId, sourceUrl, title, content, imagePath }) {
return db.prepare(`
INSERT INTO post_queue (channel_id, source_id, source_url, title, content, image_path)
VALUES (?, ?, ?, ?, ?, ?)
`).run(channelId, sourceId || null, sourceUrl || null, title || '', content, imagePath || null);
}
function getQueue(status = 'pending') {
return db.prepare(`
SELECT pq.*, s.name as source_name
FROM post_queue pq
LEFT JOIN sources s ON pq.source_id = s.id
WHERE pq.status = ?
ORDER BY pq.created_at DESC
`).all(status);
}
function getQueueItem(id) {
return db.prepare('SELECT * FROM post_queue WHERE id = ?').get(id);
}
function approveQueueItem(id) {
return db.prepare(`UPDATE post_queue SET status = 'approved', reviewed_at = datetime('now') WHERE id = ?`).run(id);
}
function rejectQueueItem(id) {
return db.prepare(`UPDATE post_queue SET status = 'rejected', reviewed_at = datetime('now') WHERE id = ?`).run(id);
}
function getQueueCount() {
const row = db.prepare("SELECT COUNT(*) as c FROM post_queue WHERE status = 'pending'").get();
return row.c;
}
// Проверить: источник требует модерации постов?
function sourceNeedsModeration(sourceId) {
if (!sourceId) return false;
const row = db.prepare('SELECT added_by FROM sources WHERE id = ?').get(sourceId);
return row && row.added_by !== 'system';
}
function getAdStats(days = 7) {
return db.prepare(`
SELECT channel_id, date, ad_posts_count
FROM stats WHERE ad_posts_count > 0
ORDER BY date DESC
LIMIT ?
`).all(days * 5);
}
function getAdTotals() {
const row = db.prepare('SELECT SUM(ad_posts_count) as total FROM stats').get();
return row ? row.total || 0 : 0;
}
function getAdTemplateStats() {
return db.prepare(`
SELECT title as template_id, COUNT(*) as count
FROM posts WHERE post_type = 'ad' AND title != ''
GROUP BY title ORDER BY count DESC
`).all();
}
function getLastPostTime(channelId) {
const row = db.prepare('SELECT created_at FROM posts WHERE channel_id = ? ORDER BY id DESC LIMIT 1').get(channelId);
return row ? row.created_at : null;
}
function getRecentPosts(channelId, limit = 3) {
return db.prepare('SELECT content, post_type, created_at FROM posts WHERE channel_id = ? ORDER BY id DESC LIMIT ?').all(channelId, limit);
}
function close() {
if (db) db.close();
}
module.exports = {
init, isDuplicate, savePost, incrementStats, getStats,
getTodayPostCount, getTodayAdCount, getRecentUrls, getLastPostTime, getRecentPosts, getAdStats, getAdTotals, getAdTemplateStats,
getSources, getApprovedSources, getAllPending, addSource, approveSource, rejectSource, deleteSource,
addToQueue, getQueue, getQueueItem, approveQueueItem, rejectQueueItem, getQueueCount, sourceNeedsModeration,
getKZDate, close
};
📜 Git History
ecd6693feat: утренняя рубрика Курс валют от Нацбанка РК (B3)3 weeks ago
3fdc0b1fix: 4 бага — WAL checkpoint, fallback контент, HTML sanitize, логи7 weeks ago
045688afeat: dashboard v0.3 — 9 новых источников + 10 улучшений дашборда7 weeks ago
5481deffeat: post moderation queue for new sources7 weeks ago
6ff5fa6feat: source management with moderation via dashboard modal7 weeks ago
2757087init: KZ Channels bot — 5 Telegram channels for Kazakhstan7 weeks ago