← Назад
const fastify = require('fastify')({ logger: true }) // CORS — whitelist known origins only const ALLOWED_ORIGINS = [ 'https://futures-screener.szhub.space', 'http://localhost:3200', 'http://127.0.0.1:3200' ] const cors = require('@fastify/cors') fastify.register(cors, { origin: (origin, cb) => { // Allow same-origin requests (no origin header) and whitelisted origins if (!origin || ALLOWED_ORIGINS.includes(origin)) { cb(null, true) } else { cb(null, false) } }, methods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'], allowedHeaders: ['Content-Type', 'Authorization'], credentials: true }) // Rate limiting const rateLimit = require('@fastify/rate-limit') fastify.register(rateLimit, { max: 100, // 100 requests per minute (global default) timeWindow: 60000, keyGenerator: (req) => req.ip, addHeadersOnExceeding: { 'x-ratelimit-limit': true, 'x-ratelimit-remaining': true }, addHeaders: { 'x-ratelimit-limit': true, 'x-ratelimit-remaining': true, 'retry-after': true } }) // Binance Futures (USDT-M) REST base const BINANCE_FAPI = 'https://fapi.binance.com' // Custom Modules const wsManager = require('./ws'); const stateManager = require('./state'); const { binLevels } = require('./logic'); const { analyzeBehavior } = require('./scorer'); const densityV2 = require('./densityV2'); const auth = require('./auth'); const signals = require('./signals'); const push = require('./push'); const klinesCache = require('./klines-cache'); // WS connects lazily on first subscribe() — no eager connect needed // ---- helpers ---- const FETCH_TIMEOUT_MS = 15000 // 15s timeout for all Binance requests // ---- Global Binance Rate Limiter ---- const rateLimiter = { usedWeight: 0, // last known X-MBX-USED-WEIGHT-1M weightUpdatedAt: 0, // when usedWeight was last updated pauseUntil: 0, // global pause timestamp (after 429) WEIGHT_SOFT_LIMIT: 1800, // start throttling at 1800/2400 (75%) WEIGHT_HARD_LIMIT: 2200, // hard pause at 2200/2400 (92%) THROTTLE_DELAY_MS: 500, // delay when approaching soft limit /** Update weight from Binance response headers */ updateWeight(headers) { const w = parseInt(headers.get('x-mbx-used-weight-1m') || '0', 10) if (w > 0) { this.usedWeight = w this.weightUpdatedAt = Date.now() } }, /** Set global pause (all requests wait) */ setPause(ms) { const until = Date.now() + ms if (until > this.pauseUntil) { this.pauseUntil = until console.log(`[rate-limiter] ⚠️ Global pause ${ms}ms (until ${new Date(until).toISOString().slice(11,19)})`) } }, /** Wait if paused or throttled. Call before every request. */ async waitIfNeeded() { // Global pause (after 429) const now = Date.now() if (now < this.pauseUntil) { const wait = this.pauseUntil - now console.log(`[rate-limiter] Waiting ${wait}ms (global pause)`) await new Promise(r => setTimeout(r, wait)) } // Weight is stale after 60s — Binance resets every minute if (Date.now() - this.weightUpdatedAt > 60000) { this.usedWeight = 0 } // Hard limit — pause 5s if (this.usedWeight >= this.WEIGHT_HARD_LIMIT) { console.log(`[rate-limiter] Weight ${this.usedWeight} >= ${this.WEIGHT_HARD_LIMIT}, pausing 5s`) await new Promise(r => setTimeout(r, 5000)) } // Soft limit — small delay else if (this.usedWeight >= this.WEIGHT_SOFT_LIMIT) { await new Promise(r => setTimeout(r, this.THROTTLE_DELAY_MS)) } }, /** Get current status for logging */ status() { return `weight=${this.usedWeight}/2400, pause=${this.pauseUntil > Date.now() ? (this.pauseUntil - Date.now()) + 'ms' : 'none'}` } } class RateLimitError extends Error { constructor(path, retryAfterMs, usedWeight) { super(`Binance 429 rate limited: ${path}`) this.name = 'RateLimitError' this.retryAfterMs = retryAfterMs this.usedWeight = usedWeight } } async function bget(path) { // Wait if globally paused or approaching weight limit await rateLimiter.waitIfNeeded() const controller = new AbortController() const timeoutId = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS) try { const res = await fetch(BINANCE_FAPI + path, { method: 'GET', signal: controller.signal }) // Always track weight from response rateLimiter.updateWeight(res.headers) if (res.status === 429 || res.status === 418) { // 429 = rate limited, 418 = IP ban (temp) const retryAfter = parseInt(res.headers.get('retry-after') || '0', 10) const retryMs = retryAfter > 0 ? retryAfter * 1000 : (res.status === 418 ? 120000 : 30000) rateLimiter.setPause(retryMs) throw new RateLimitError(path, retryMs, rateLimiter.usedWeight) } if (!res.ok) { const txt = await res.text().catch(() => '') throw new Error(`Binance GET ${path} failed: ${res.status} ${txt}`) } return res.json() } finally { clearTimeout(timeoutId) } } function toNumber(x) { return Number(x) } // K-lines timeframe (5 minutes in ms) const KLINE_INTERVAL = '5m' const KLINE_LIMIT = 20 // Binance K-lines order: index 0 = oldest, last = newest // After reverse(): bars[0] = newest (t), bars[1] = prev (t-1), bars[2] = oldest (t-2) // So: vol1 = newest (t), vol2 = prev (t-1), vol3 = oldest (t-2) // Note: Variable names now match time order, not index order function filterLevelsByWindow(levels, markPrice, windowPct) { return levels.filter(level => { const distPct = Math.abs(level.price - markPrice) / markPrice * 100; return distPct <= windowPct; }); } // Group close levels into clusters (for MM detection) // levels: array of {price, notional, distancePct} // maxGapPct: max distance between levels in same cluster (%) function groupCloseLevels(levels, maxGapPct = 0.2) { if (!levels || levels.length === 0) return [] // Sort by price const sorted = [...levels].sort((a, b) => a.price - b.price) const clusters = [] let currentCluster = [sorted[0]] for (let i = 1; i < sorted.length; i++) { const level = sorted[i] const prevLevel = sorted[i - 1] // Calculate gap in % relative to price const gapPct = Math.abs(level.price - prevLevel.price) / prevLevel.price * 100 if (gapPct <= maxGapPct) { // Add to current cluster currentCluster.push(level) } else { // Close current cluster and start new one if (currentCluster.length >= 2) { clusters.push(currentCluster) } currentCluster = [level] } } // Don't forget the last cluster if (currentCluster.length >= 2) { clusters.push(currentCluster) } return clusters } function calcNearestDensities({ price, bids, asks, minNotional, windowPct }) { // bids/asks are arrays: [priceStr, qtyStr] const filteredLevels = []; for (const [pStr, qStr] of bids) { const p = toNumber(pStr), q = toNumber(qStr) const notional = p * q if (notional >= minNotional) { const distPct = Math.abs((price - p) / price) * 100 if (distPct <= windowPct) { filteredLevels.push({ side: 'bid', price: p, qty: q, notional, distancePct: distPct }) } } } for (const [pStr, qStr] of asks) { const p = toNumber(pStr), q = toNumber(qStr) const notional = p * q if (notional >= minNotional) { const distPct = Math.abs((p - price) / price) * 100 if (distPct <= windowPct) { filteredLevels.push({ side: 'ask', price: p, qty: q, notional, distancePct: distPct }) } } } const bidLevels = filteredLevels.filter(l => l.side === 'bid'); const askLevels = filteredLevels.filter(l => l.side === 'ask'); return { filteredLevels, bidLevels, askLevels }; } // Simple concurrency limiter (no deps) with optional per-item delay async function mapLimit(items, limit, fn, delayMs = 0) { const out = new Array(items.length) let i = 0 const workers = new Array(Math.min(limit, items.length)).fill(0).map(async () => { while (true) { const idx = i++ if (idx >= items.length) break out[idx] = await fn(items[idx], idx) if (delayMs > 0) await new Promise(r => setTimeout(r, delayMs)) } }) await Promise.all(workers) return out } // Density endpoint result cache (avoid re-scanning 500+ symbols on every request) let densityCache = { data: null, meta: null, ts: 0 } const DENSITY_CACHE_TTL = 60000 // 60 seconds (was 30s) // Disk cache helpers for density results (survive PM2 restarts) const DENSITY_CACHE_FILE = require('path').join(__dirname, '..', 'data', 'density-cache.json') function saveDensityToDisk(data, meta) { try { const dir = require('path').dirname(DENSITY_CACHE_FILE) if (!require('fs').existsSync(dir)) require('fs').mkdirSync(dir, { recursive: true }) // Non-blocking write (fire-and-forget) require('fs').promises.writeFile(DENSITY_CACHE_FILE, JSON.stringify({ data, meta, ts: Date.now() })) .catch(e => console.log('[density-cache] disk save error:', e.message)) } catch (e) { console.log('[density-cache] disk save error:', e.message) } } function loadDensityFromDisk() { try { if (!require('fs').existsSync(DENSITY_CACHE_FILE)) return null // Sync read OK here — only called once at startup const raw = JSON.parse(require('fs').readFileSync(DENSITY_CACHE_FILE, 'utf8')) // Accept disk cache up to 10 minutes old (stale but better than nothing) if (raw && raw.data && (Date.now() - raw.ts) < 600000) { console.log(`[density-cache] Loaded ${raw.data.length} walls from disk (age: ${((Date.now() - raw.ts) / 1000).toFixed(0)}s)`) return raw } } catch (e) { console.log('[density-cache] disk load error:', e.message) } return null } // Load disk cache on startup const diskCache = loadDensityFromDisk() if (diskCache) { densityCache = { data: diskCache.data, meta: diskCache.meta, ts: diskCache.ts } } // Scoring function: enhanced with Time To Eat, NATR, and lifetime function calcScore({ notional, distancePct, isMM, timeToEatMinutes, natr, lifetimeSec }) { let score = notional / 1000000; // Base score in millions score = score / (1 + distancePct); // Penalty for distance if (timeToEatMinutes > 60) score *= 1.5; // Huge wall compared to 25m passing volume if (natr > 1.0) score *= 1.2; // Bonus for high volatility coins if (lifetimeSec > 300) score *= 1.2; // Bonus for proven walls (5+ mins old) if (isMM) score *= 1.5; // Bonus for clustered MM levels return score; } // Track all intervals for graceful shutdown cleanup const _intervals = [] // In-memory cache (TTL: 3 seconds) const cache = new Map() const CACHE_TTL_MS = 3000 // Cleanup expired cache entries every 10 seconds _intervals.push(setInterval(() => { const now = Date.now() for (const [key, entry] of cache.entries()) { if (now - entry.ts > CACHE_TTL_MS) cache.delete(key) } }, 10000)) // --- Level History State --- const levelHistory = new Map() // Очистка старых уровней (TTL: 1 минута без обновлений) _intervals.push(setInterval(() => { const now = Date.now() for (const [key, val] of levelHistory.entries()) { if (now - val.lastUpdate > 60000) { levelHistory.delete(key) } } }, 30000)) // --- Telegram Alerts Scaffold --- const TELEGRAM_BOT_TOKEN = process.env.TELEGRAM_BOT_TOKEN || '' const TELEGRAM_CHAT_ID = process.env.TELEGRAM_CHAT_ID || '' async function sendTelegramAlert(msg) { if (!TELEGRAM_BOT_TOKEN || !TELEGRAM_CHAT_ID) return const url = `https://api.telegram.org/bot${TELEGRAM_BOT_TOKEN}/sendMessage` try { await fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ chat_id: TELEGRAM_CHAT_ID, text: msg, parse_mode: 'HTML' }) }) } catch (e) { console.error('Telegram Error:', e.message) } } function getCacheKey(req) { return JSON.stringify({ symbols: req.query.symbols || 'all', minNotional: req.query.minNotional || 50000, depthLimit: req.query.depthLimit || 100, windowPct: req.query.windowPct || 1.0, minScore: req.query.minScore || 0, concurrency: req.query.concurrency || 6 }) } function getCached(req) { const key = getCacheKey(req) const entry = cache.get(key) if (entry && Date.now() - entry.ts < CACHE_TTL_MS) { return entry.data } return null } function setCached(req, data) { const key = getCacheKey(req) cache.set(key, { data, ts: Date.now() }) } // Retry/backoff helper for Binance requests (rate-limit aware) async function bgetWithRetry(path, maxRetries = 3, baseDelay = 500) { let rateLimitRetries = 0 const MAX_RATE_LIMIT_RETRIES = 3 // max 3 rate-limit retries on top of regular retries for (let attempt = 1; attempt <= maxRetries; attempt++) { try { return await bget(path) } catch (err) { if (err instanceof RateLimitError) { rateLimitRetries++ if (rateLimitRetries > MAX_RATE_LIMIT_RETRIES) { throw new Error(`Binance GET ${path} rate limited ${rateLimitRetries}x, giving up (${rateLimiter.status()})`) } // 429/418: global pause already set by bget(), wait and retry (don't count as regular attempt) console.log(`[bgetWithRetry] 429 on ${path.slice(0, 60)}, retry ${rateLimitRetries}/${MAX_RATE_LIMIT_RETRIES}, waiting ${err.retryAfterMs}ms`) await new Promise(r => setTimeout(r, err.retryAfterMs)) attempt-- // don't count rate limit as a regular retry attempt continue } if (attempt === maxRetries) { throw new Error(`Binance GET ${path} failed after ${maxRetries} attempts: ${err.message}`) } const delay = baseDelay * Math.pow(2, attempt - 1) await new Promise(resolve => setTimeout(resolve, delay)) } } } // Klines stats cache (TTL 60s) to avoid hammering Binance const klinesStatsCache = new Map() const KLINES_STATS_TTL = 60000 // Periodic cleanup of stale klinesStats entries (every 5min) _intervals.push(setInterval(() => { const now = Date.now() let evicted = 0 for (const [symbol, entry] of klinesStatsCache) { if (now - entry.ts > KLINES_STATS_TTL * 5) { // 5x TTL = 5min klinesStatsCache.delete(symbol) evicted++ } } if (evicted > 0) console.log(`[klines-stats] Cache cleanup: evicted ${evicted}, ${klinesStatsCache.size} remaining`) }, 5 * 60_000)) // Получить K-lines и рассчитать объёмы + ATR async function getKlinesWithStats(symbol) { const cached = klinesStatsCache.get(symbol) if (cached && (Date.now() - cached.ts) < KLINES_STATS_TTL) return cached.data try { // Получаем K-lines параллельно: 1d (для NATR) и 5m (для объемов) const [klines1d, klines5m] = await Promise.all([ bgetWithRetry(`/fapi/v1/klines?symbol=${symbol}&interval=1d&limit=${14}`), bgetWithRetry(`/fapi/v1/klines?symbol=${symbol}&interval=5m&limit=${5}`) ]) let natr = 0; // Расчет NATR по 1-дневным свечам if (klines1d && klines1d.length > 0) { const convert = (k) => ({ high: toNumber(k[2]), low: toNumber(k[3]), close: toNumber(k[4]) }) const bars = klines1d.map(convert) const trValues = [] // True Range for (let i = 1; i < bars.length; i++) { const highLow = bars[i].high - bars[i].low const highPrevClose = Math.abs(bars[i].high - bars[i - 1].close) const lowPrevClose = Math.abs(bars[i].low - bars[i - 1].close) const tr = Math.max(highLow, highPrevClose, lowPrevClose) trValues.push(tr) } if (trValues.length > 0) { const atr = trValues.reduce((a, b) => a + b, 0) / trValues.length const latestClose = bars[bars.length - 1].close natr = latestClose > 0 ? (atr / latestClose) * 100 : 0 } else if (bars.length === 1) { // Если монета совсем новая const latestClose = bars[0].close const highLow = bars[0].high - bars[0].low natr = latestClose > 0 ? (highLow / latestClose) * 100 : 0 } } let vol1 = 0, vol2 = 0, vol3 = 0, vol4 = 0, vol5 = 0; // Объемы по 5-минутным свечам if (klines5m && klines5m.length > 0) { const convert5m = (k) => ({ volume: toNumber(k[7]) }) const bars5m = klines5m.map(convert5m).reverse() // [newest, prev, oldest...] vol1 = bars5m[0] ? bars5m[0].volume : 0 vol2 = bars5m[1] ? bars5m[1].volume : 0 vol3 = bars5m[2] ? bars5m[2].volume : 0 vol4 = bars5m[3] ? bars5m[3].volume : 0 vol5 = bars5m[4] ? bars5m[4].volume : 0 } const result = { vol1, vol2, vol3, vol4, vol5, natr } klinesStatsCache.set(symbol, { data: result, ts: Date.now() }) return result } catch (err) { console.warn(`[klines-stats] ${symbol} failed: ${err.message}`) return null // caller must handle null (skip symbol) } } // ---- UI (static files from ../app, cached in memory at startup) ---- const path = require('path') const fs = require('fs') const APP_DIR = path.resolve(__dirname, '..', 'app') // Pre-load static files into memory (never changes at runtime) const staticCache = new Map() function getStatic(relPath) { if (staticCache.has(relPath)) return staticCache.get(relPath) const p = path.join(APP_DIR, relPath) const buf = fs.readFileSync(p) staticCache.set(relPath, buf) return buf } // Pre-warm all static files at module load const STATIC_FILES = [ 'index.html', 'app.js', 'densities.js', 'mini-charts.js', 'auth.js', 'drawing-manager.js', 'signals.js', 'settings.js', 'styles.css', 'manifest.json', 'sw.js', 'icon-192.svg', 'icon-512.svg' ] for (const f of STATIC_FILES) { try { getStatic(f) } catch (e) { console.warn(`[Static] Failed to pre-load ${f}: ${e.message}`) } } // Also cache the UMD library const LWC_DRAWING_PATH = path.resolve(__dirname, '..', 'node_modules', 'lightweight-charts-drawing', 'dist', 'lightweight-charts-drawing.umd.js') let lwcDrawingBuf try { lwcDrawingBuf = fs.readFileSync(LWC_DRAWING_PATH) } catch (e) { console.warn('[Static] lightweight-charts-drawing UMD not found') } // Cache headers: HTML = must-revalidate (cache-buster URLs update), assets = 1 day (have ?v= buster) const ASSET_CACHE = 'public, max-age=86400' // 1 day const HTML_CACHE = 'no-cache' // always revalidate fastify.get('/', async (req, reply) => { reply.header('Cache-Control', HTML_CACHE).type('text/html; charset=utf-8').send(getStatic('index.html')) }) fastify.get('/app.js', async (req, reply) => { reply.header('Cache-Control', ASSET_CACHE).type('application/javascript; charset=utf-8').send(getStatic('app.js')) }) fastify.get('/densities.js', async (req, reply) => { reply.header('Cache-Control', ASSET_CACHE).type('application/javascript; charset=utf-8').send(getStatic('densities.js')) }) fastify.get('/mini-charts.js', async (req, reply) => { reply.header('Cache-Control', ASSET_CACHE).type('application/javascript; charset=utf-8').send(getStatic('mini-charts.js')) }) fastify.get('/auth.js', async (req, reply) => { reply.header('Cache-Control', ASSET_CACHE).type('application/javascript; charset=utf-8').send(getStatic('auth.js')) }) fastify.get('/drawing-manager.js', async (req, reply) => { reply.header('Cache-Control', ASSET_CACHE).type('application/javascript; charset=utf-8').send(getStatic('drawing-manager.js')) }) fastify.get('/signals.js', async (req, reply) => { reply.header('Cache-Control', ASSET_CACHE).type('application/javascript; charset=utf-8').send(getStatic('signals.js')) }) fastify.get('/settings.js', async (req, reply) => { reply.header('Cache-Control', ASSET_CACHE).type('application/javascript; charset=utf-8').send(getStatic('settings.js')) }) fastify.get('/styles.css', async (req, reply) => { reply.header('Cache-Control', ASSET_CACHE).type('text/css; charset=utf-8').send(getStatic('styles.css')) }) fastify.get('/lightweight-charts-drawing.umd.js', async (req, reply) => { if (!lwcDrawingBuf) return reply.code(404).send('Not found') reply.header('Cache-Control', ASSET_CACHE).type('application/javascript; charset=utf-8').send(lwcDrawingBuf) }) fastify.get('/favicon.ico', async (req, reply) => { reply.code(204).send() }) fastify.get('/manifest.json', async (req, reply) => { reply.header('Cache-Control', ASSET_CACHE).type('application/manifest+json; charset=utf-8').send(getStatic('manifest.json')) }) fastify.get('/sw.js', async (req, reply) => { reply.header('Cache-Control', 'no-cache').type('application/javascript; charset=utf-8').header('Service-Worker-Allowed', '/').send(getStatic('sw.js')) }) fastify.get('/icon-192.svg', async (req, reply) => { reply.header('Cache-Control', ASSET_CACHE).type('image/svg+xml').send(getStatic('icon-192.svg')) }) fastify.get('/icon-512.svg', async (req, reply) => { reply.header('Cache-Control', ASSET_CACHE).type('image/svg+xml').send(getStatic('icon-512.svg')) }) // ---- Auth routes ---- // Attach user to every request (non-blocking) fastify.addHook('onRequest', async (req) => { auth.authHook(req) }) fastify.post('/api/auth/register', { config: { rateLimit: { max: 10, timeWindow: 60000 } } }, async (req, reply) => { const { email, password, name } = req.body || {} const result = auth.register(email, password, name) if (result.error) return reply.code(400).send(result) return result }) fastify.post('/api/auth/login', { config: { rateLimit: { max: 15, timeWindow: 60000 } } }, async (req, reply) => { const { email, password } = req.body || {} const result = auth.login(email, password) if (result.error) return reply.code(401).send(result) return result }) fastify.get('/api/auth/me', async (req, reply) => { if (!req.user) return reply.code(401).send({ error: 'Not authenticated' }) return { success: true, user: req.user } }) // Google OAuth fastify.get('/api/auth/google/url', async () => { const url = auth.getGoogleAuthUrl() if (!url) return { error: 'Google OAuth not configured' } return { url } }) fastify.post('/api/auth/google/callback', async (req, reply) => { const { code } = req.body || {} if (!code) return reply.code(400).send({ error: 'Code required' }) const result = await auth.googleAuth(code) if (result.error) return reply.code(400).send(result) return result }) // Admin: set user tier (requires admin tier) fastify.post('/api/auth/set-tier', async (req, reply) => { if (!req.user || req.user.tier !== 'admin') { return reply.code(403).send({ error: 'Admin only' }) } const { userId, tier } = req.body || {} if (!userId || !['free', 'pro', 'admin'].includes(tier)) { return reply.code(400).send({ error: 'Invalid userId or tier' }) } const user = auth.setTier(userId, tier) return { success: true, user } }) // Admin: list users fastify.get('/api/auth/users', async (req, reply) => { if (!req.user || req.user.tier !== 'admin') { return reply.code(403).send({ error: 'Admin only' }) } return { users: auth.listUsers(), count: auth.getUserCount() } }) // ---- User Data routes (settings, watchlists, layouts, alerts) ---- // Settings fastify.get('/api/settings', async (req, reply) => { if (!auth.requireAuth(req, reply)) return return { success: true, settings: auth.getSettings(req.user.id) } }) fastify.put('/api/settings', async (req, reply) => { if (!auth.requireAuth(req, reply)) return const settings = req.body || {} auth.saveSettings(req.user.id, settings) return { success: true } }) // Watchlists fastify.get('/api/watchlist', async (req, reply) => { if (!auth.requireAuth(req, reply)) return return { success: true, watchlist: auth.getWatchlist(req.user.id) } }) fastify.post('/api/watchlist', async (req, reply) => { if (!auth.requireAuth(req, reply)) return const { symbol, color, sort_order } = req.body || {} if (!symbol) return reply.code(400).send({ error: 'Symbol required' }) auth.addToWatchlist(req.user.id, symbol, color, sort_order) return { success: true } }) fastify.delete('/api/watchlist/:symbol', async (req, reply) => { if (!auth.requireAuth(req, reply)) return auth.removeFromWatchlist(req.user.id, req.params.symbol) return { success: true } }) // Layouts fastify.get('/api/layouts', async (req, reply) => { if (!auth.requireAuth(req, reply)) return return { success: true, layouts: auth.getLayouts(req.user.id), active: auth.getActiveLayout(req.user.id) } }) fastify.post('/api/layouts', async (req, reply) => { if (!auth.requireAuth(req, reply)) return const { name, layout_type, config } = req.body || {} const result = auth.createLayout(req.user.id, name || 'Layout', layout_type || '1', config || {}) return { success: true, id: result.lastInsertRowid } }) fastify.put('/api/layouts/:id', async (req, reply) => { if (!auth.requireAuth(req, reply)) return const { config, layout_type } = req.body || {} auth.updateLayout(Number(req.params.id), req.user.id, config, layout_type) return { success: true } }) // Alerts fastify.get('/api/alerts', async (req, reply) => { if (!auth.requireAuth(req, reply)) return return { success: true, alerts: auth.getUserAlerts(req.user.id) } }) fastify.post('/api/alerts', async (req, reply) => { if (!auth.requireAuth(req, reply)) return const { type, symbol, condition, cooldown_sec } = req.body || {} if (!type) return reply.code(400).send({ error: 'Alert type required' }) const result = auth.createUserAlert(req.user.id, type, symbol, condition || {}, cooldown_sec || 300) return { success: true, id: result.lastInsertRowid } }) fastify.delete('/api/alerts/:id', async (req, reply) => { if (!auth.requireAuth(req, reply)) return auth.stmts.deleteAlert.run(Number(req.params.id), req.user.id) return { success: true } }) fastify.get('/api/alerts/triggers', async (req, reply) => { if (!auth.requireAuth(req, reply)) return const limit = Number(req.query.limit || 50) return { success: true, triggers: auth.getAlertTriggers(req.user.id, limit) } }) // Signal stats (public) fastify.get('/api/signals/stats', async () => { return { success: true, stats: auth.getSignalStats(), recent: auth.getRecentSignals(20) } }) // Live signals (in-memory, real-time) fastify.get('/api/signals/live', async (req) => { const { type, symbol, direction, minConfidence, limit, hours } = req.query const data = signals.getLiveSignals({ type, symbol, direction, minConfidence, limit, hours }) return { success: true, count: data.length, data } }) // Signal summary (counts, types) fastify.get('/api/signals/summary', async () => { return { success: true, ...signals.getSignalSummary() } }) // Outcome stats (WIN/LOSS by type) fastify.get('/api/signals/outcomes', async () => { return { success: true, stats: signals.getOutcomeStats() } }) // Signal history (from DB, with pagination) fastify.get('/api/signals/history', async (req) => { const limit = Math.min(Number(req.query.limit || 50), 200) const recent = auth.getRecentSignals(limit) return { success: true, count: recent.length, data: recent } }) // ---- API routes ---- fastify.get('/health', async () => { return { status: 'ok', service: process.env.SERVICE_NAME || 'futures-screener', users: auth.getUserCount() } }) fastify.get('/symbols', async () => { const info = await bgetWithRetry('/fapi/v1/exchangeInfo') const symbols = (info.symbols || []) .filter(s => s.contractType === 'PERPETUAL' && s.quoteAsset === 'USDT' && s.status === 'TRADING') .map(s => s.symbol) return { count: symbols.length, symbols } }) fastify.get('/depth/:symbol', async (req) => { const symbol = String(req.params.symbol || '').toUpperCase() const limit = Number(req.query.limit || 100) const ob = await bgetWithRetry(`/fapi/v1/depth?symbol=${encodeURIComponent(symbol)}&limit=${limit}`) return { symbol, lastUpdateId: ob.lastUpdateId, bids: ob.bids, asks: ob.asks } }) // NEW: simple flat output for UI (scoring, sorting, cache) fastify.get('/densities/simple', async (req, reply) => { const minNotional = Number(req.query.minNotional || 0) const depthLimit = Number(req.query.depthLimit || 100) const mmMode = req.query.mmMode === 'true' const windowPct = Number(req.query.windowPct || 5.0) // 5% по умолчанию const mmMultiplier = Number(req.query.mmMultiplier || 4) // 4x по умолчанию const xFilter = Number(req.query.xFilter || 0) // фильтр по x (0 = без фильтра) const natrFilter = Number(req.query.natrFilter || 0) // фильтр по NATR (0 = без фильтра) const minScore = Number(req.query.minScore || 0) // фильтр по Score const concurrency = Number(req.query.concurrency || 3) // parallel Binance requests (3 to stay under rate limit) const isSpecificSymbols = !!req.query.symbols let symbols if (isSpecificSymbols) { symbols = String(req.query.symbols).split(',').map(s => s.trim().toUpperCase()).filter(s => s) } else { // Full scan — always return from cache (warmup populates it) if (densityCache.data) { let finalData = [...densityCache.data] if (xFilter > 0) finalData = finalData.filter(d => d.xMult >= xFilter) if (natrFilter > 0) finalData = finalData.filter(d => d.natr !== null && d.natr >= natrFilter) if (minScore > 0) finalData = finalData.filter(d => d.score >= minScore) const ageSec = ((Date.now() - densityCache.ts) / 1000).toFixed(0) return { ...densityCache.meta, xFilter, natrFilter, data: finalData, cached: true, cacheAgeSec: Number(ageSec) } } // No cache at all — return empty (warmup will fill it) reply.code(503) return { count: 0, data: [], cached: false, message: 'Warming up, try again in 30s' } } // Optional limit (0 = no limit, scan all) const limitSymbols = Number(req.query.limitSymbols || 0) if (limitSymbols > 0 && symbols.length > limitSymbols) { symbols = symbols.slice(0, limitSymbols) } const marks = await bgetWithRetry('/fapi/v1/premiumIndex') const markMap = new Map(marks.map(m => [m.symbol, Number(m.markPrice)])) // Delay between items to stay under Binance rate limit (2400/min) // Full scan: 500 symbols × ~3 calls each = 1500 calls, concurrency 3, delay 500ms // → ~6 calls/sec → ~360/min (safe margin) const itemDelay = isSpecificSymbols ? 0 : 500 const rowsArr = await mapLimit(symbols, concurrency, async (sym) => { const price = markMap.get(sym) if (!price) return [] // 1. If not yet WS-subscribed: for full scans, skip (no data yet). // For specific symbol queries (charts), fetch depth on demand. if (!wsManager.callbacks.has(sym)) { if (!isSpecificSymbols) { return []; // Full scan: skip unsubscribed symbols, they'll warm up via chart views } try { const ob = await bgetWithRetry(`/fapi/v1/depth?symbol=${encodeURIComponent(sym)}&limit=1000`); stateManager.initBook(sym, ob.bids, ob.asks); wsManager.subscribe(sym, (payload) => { stateManager.processDelta(sym, payload); }); } catch (err) { console.log(`[density] Skip ${sym}: ${err.message.slice(0, 80)}`); return []; } } // 2. Get local state from memory (from WS deltas) const bidLevelsRaw = stateManager.getTopLevels(sym, 'bid', price, minNotional, depthLimit, windowPct); const askLevelsRaw = stateManager.getTopLevels(sym, 'ask', price, minNotional, depthLimit, windowPct); // Получить K-lines для объёмов и ATR const klinesStats = await getKlinesWithStats(sym) if (!klinesStats) return [] // skip symbol if klines unavailable // 3. Binning & Density Analysis v2 (x-multiplier based) const avg5mVol = (klinesStats.vol1 + klinesStats.vol2 + klinesStats.vol3 + klinesStats.vol4 + klinesStats.vol5) / 5; const processSide = (levels, sideKey) => { const BIN_SIZE_PCT = 0.1; const rawBins = binLevels(levels, BIN_SIZE_PCT); const validBins = rawBins.filter(b => b.notional >= minNotional); const trackedBins = stateManager.trackAndEnrichBins(sym, sideKey, validBins, price); const scoredBins = trackedBins.map(bin => { const behavior = analyzeBehavior(bin, price, klinesStats.natr, avg5mVol); // x-multiplier filter: only walls >= xFilter (default x4) const minX = xFilter > 0 ? xFilter : 4; if (behavior.xMult < minX) return null; let tte = Infinity; if (avg5mVol > 0) { tte = bin.notional / (avg5mVol / 5); } return { symbol: sym, sideKey, price: Math.round(bin.anchorPrice * 10000) / 10000, notional: bin.notional, distancePct: Math.round(behavior.distancePct * 100) / 100, lifetimeMins: Math.round(behavior.lifetimeMins * 10) / 10, score: behavior.trustScore, xMult: Math.round(behavior.xMult * 10) / 10, severity: behavior.severity, tags: behavior.tags, levelsCount: bin.levelsCount, natr: klinesStats.natr, avg5mVol: Math.round(avg5mVol), vol1: klinesStats.vol1, vol2: klinesStats.vol2, vol3: klinesStats.vol3, vol4: klinesStats.vol4, vol5: klinesStats.vol5, timeToEatMinutes: tte }; }).filter(Boolean); scoredBins.sort((a, b) => { if (b.score !== a.score) return b.score - a.score; return a.distancePct - b.distancePct; }); // Top 2 per side (best bid wall + best ask wall) return scoredBins.slice(0, 2); }; const bidResult = processSide(bidLevelsRaw, 'bid'); const askResult = processSide(askLevelsRaw, 'ask'); return [...bidResult, ...askResult]; }, itemDelay); // All levels flat, sorted by score desc const allLevels = rowsArr.flat().sort((a, b) => b.score - a.score); // Top 3 per symbol (best bid + best ask + next best) const perSymbol = {}; for (const entry of allLevels) { if (!perSymbol[entry.symbol]) perSymbol[entry.symbol] = []; if (perSymbol[entry.symbol].length < 3) { perSymbol[entry.symbol].push(entry); } } let finalData = Object.values(perSymbol).flat(); finalData.sort((a, b) => b.score - a.score); // Фильтрация по NATR (если natrFilter > 0, показываем только уровни с natr >= natrFilter%) if (natrFilter > 0) { finalData = finalData.filter(d => d.natr !== null && d.natr >= natrFilter) } // Фильтрация по Score if (minScore > 0) { finalData = finalData.filter(d => d.score >= minScore) } // Cache full unfiltered data for subsequent requests if (!isSpecificSymbols) { // Store unfiltered data (before xFilter/natrFilter/minScore applied by params) // allLevels already has all scored walls, perSymbol top 3 = finalData before natr/score filters const unfilteredData = Object.values(perSymbol).flat() unfilteredData.sort((a, b) => b.score - a.score) const meta = { count: unfilteredData.length, minNotional, depthLimit, concurrency, mmMode, windowPct, mmMultiplier } densityCache = { data: unfilteredData, meta, ts: Date.now() } // Persist to disk so data survives PM2 restarts saveDensityToDisk(unfilteredData, meta) } const result = { count: finalData.length, minNotional, depthLimit, concurrency, mmMode, windowPct, mmMultiplier, xFilter, natrFilter, data: finalData } return result }) // ---- Density V2: Statistical Walls + Imbalance ---- const densityV2PersistenceMap = new Map() let densityV2Cache = { data: null, ts: 0 } const DENSITY_V2_CACHE_TTL = 15000 // 15 seconds // Persistence map disk save/load (survive PM2 restarts) const PERSISTENCE_MAP_FILE = require('path').join(__dirname, '..', 'data', 'density-persistence.json') function savePersistenceMapToDisk() { try { const dir = require('path').dirname(PERSISTENCE_MAP_FILE) if (!require('fs').existsSync(dir)) require('fs').mkdirSync(dir, { recursive: true }) const obj = {} for (const [key, val] of densityV2PersistenceMap.entries()) { obj[key] = val } require('fs').promises.writeFile(PERSISTENCE_MAP_FILE, JSON.stringify({ map: obj, ts: Date.now() })) .catch(e => console.log('[persistence-map] disk save error:', e.message)) } catch (e) { console.log('[persistence-map] disk save error:', e.message) } } function loadPersistenceMapFromDisk() { try { if (!require('fs').existsSync(PERSISTENCE_MAP_FILE)) return const raw = JSON.parse(require('fs').readFileSync(PERSISTENCE_MAP_FILE, 'utf8')) // Accept up to 30 minutes old (walls older than that are stale anyway) if (!raw || !raw.map || (Date.now() - raw.ts) > 1800000) return const now = Date.now() let loaded = 0 for (const [key, val] of Object.entries(raw.map)) { // Skip entries not seen in last 5 min (same as cleanupPersistence) if (val.lastSeen && (now - val.lastSeen) > 300000) continue densityV2PersistenceMap.set(key, val) loaded++ } if (loaded > 0) console.log(`[persistence-map] Loaded ${loaded} wall records from disk (age: ${((Date.now() - raw.ts) / 1000).toFixed(0)}s)`) } catch (e) { console.log('[persistence-map] disk load error:', e.message) } } // Load on startup loadPersistenceMapFromDisk() // Save every 30s (lightweight — typically a few hundred entries) _intervals.push(setInterval(savePersistenceMapToDisk, 30000)) // Cleanup persistence every 60s _intervals.push(setInterval(() => densityV2.cleanupPersistence(densityV2PersistenceMap), 60000)) fastify.get('/densities/v2', async (req) => { const windowPct = Number(req.query.windowPct || 2) const nSigma = Number(req.query.nSigma || 2) const minVolume24h = Number(req.query.minVolume24h || 50000000) // $50M default const minImbalance = Number(req.query.minImbalance || 0) // 0 = show all const isSpecific = !!req.query.symbols const forceRefresh = req.query.force === 'true' // Return cached data if fresh enough (full scan only) if (!isSpecific && !forceRefresh && densityV2Cache.data && (Date.now() - densityV2Cache.ts) < DENSITY_V2_CACHE_TTL) { let filtered = [...densityV2Cache.data] if (minImbalance > 0) filtered = filtered.filter(d => Math.abs(d.imbalance) >= minImbalance) return { count: filtered.length, data: filtered, cached: true, cacheAgeSec: Math.round((Date.now() - densityV2Cache.ts) / 1000) } } // Get 24h ticker for volume filter let ticker24h try { ticker24h = await bgetWithRetry('/fapi/v1/ticker/24hr') } catch (err) { return { count: 0, data: [], error: 'Ticker data temporarily unavailable' } } const volumeMap = new Map(ticker24h.map(t => [t.symbol, Number(t.quoteVolume)])) // Get mark prices const marks = await bgetWithRetry('/fapi/v1/premiumIndex') const markMap = new Map(marks.map(m => [m.symbol, Number(m.markPrice)])) // Determine symbols to analyze let symbols if (isSpecific) { symbols = String(req.query.symbols).split(',').map(s => s.trim().toUpperCase()).filter(Boolean) } else { // All subscribed symbols filtered by volume symbols = [...wsManager.callbacks.keys()].filter(sym => { const vol = volumeMap.get(sym) || 0 return vol >= minVolume24h && markMap.has(sym) }) } const results = [] for (const sym of symbols) { const price = markMap.get(sym) if (!price) continue // If not yet WS-subscribed and specific symbol requested: fetch depth on demand if (!wsManager.callbacks.has(sym)) { if (!isSpecific) continue // Full scan: skip unsubscribed try { const ob = await bgetWithRetry(`/fapi/v1/depth?symbol=${encodeURIComponent(sym)}&limit=1000`) stateManager.initBook(sym, ob.bids, ob.asks) wsManager.subscribe(sym, (payload) => { stateManager.processDelta(sym, payload) }) } catch (err) { continue } } // Get raw levels from WS state (wider window for analysis, windowPct for filtering) const bidLevels = stateManager.getTopLevels(sym, 'bid', price, 0, 500, windowPct + 1) const askLevels = stateManager.getTopLevels(sym, 'ask', price, 0, 500, windowPct + 1) if (bidLevels.length < 3 && askLevels.length < 3) continue // not enough data try { const analysis = densityV2.analyzeSymbol({ symbol: sym, markPrice: price, bidLevels, askLevels, persistenceMap: densityV2PersistenceMap, windowPct, nSigma }) // Only include if has at least one wall if (analysis.wallCount > 0) { // Add volume info analysis.volume24h = volumeMap.get(sym) || 0 // Calculate erosion time for each wall (avg 5m volume from last 14 candles) let avgVol5m = 0 try { const candles5m = klinesCache.getCandles(sym, '5m', 14) if (candles5m && candles5m.length >= 3) { avgVol5m = candles5m.reduce((sum, c) => sum + c.volume, 0) / candles5m.length } } catch (_) {} const addErosion = (wall) => { if (!wall) return wall wall.erosionMins = avgVol5m > 0 ? Math.round(wall.notional * 5 / avgVol5m * 10) / 10 : null return wall } addErosion(analysis.support) addErosion(analysis.resistance) if (analysis.bidWalls) analysis.bidWalls.forEach(addErosion) if (analysis.askWalls) analysis.askWalls.forEach(addErosion) results.push(analysis) } } catch (err) { // Skip problematic symbols silently continue } } // Sort by strongest wall score (support or resistance, whichever is bigger) results.sort((a, b) => { const scoreA = Math.max(a.support?.score || 0, a.resistance?.score || 0) const scoreB = Math.max(b.support?.score || 0, b.resistance?.score || 0) return scoreB - scoreA }) // Cache for full scans if (!isSpecific) { densityV2Cache = { data: results, ts: Date.now() } } let finalData = results if (minImbalance > 0) finalData = finalData.filter(d => Math.abs(d.imbalance) >= minImbalance) return { count: finalData.length, data: finalData, cached: false } }) // Cache stats endpoint fastify.get('/_cache/stats', async () => ({ size: cache.size, keys: [...cache.keys()] })) // ---- Binance Proxy for Mini-Charts (cached) ---- const proxyCache = new Map() const PROXY_MAX_TTL_MS = 600000 // 10 min max TTL for cleanup (NATR read TTL is 600s, must survive refresh gaps) // Cleanup expired proxy cache entries every 30 seconds _intervals.push(setInterval(() => { const now = Date.now() for (const [key, entry] of proxyCache.entries()) { if (now - entry.ts > PROXY_MAX_TTL_MS) proxyCache.delete(key) } }, 30000)) function getProxyCached(key, ttlMs) { const entry = proxyCache.get(key) if (entry && Date.now() - entry.ts < ttlMs) return entry.data return null } function setProxyCached(key, data) { proxyCache.set(key, { data, ts: Date.now() }) } // 24hr ticker — cached 30s (all pairs, heavy endpoint) fastify.get('/api/ticker24hr', async () => { const cached = getProxyCached('ticker24hr', 60000) if (cached) return cached const data = await bgetWithRetry('/fapi/v1/ticker/24hr') setProxyCached('ticker24hr', data) return data }) // Klines — SQLite cache first, Binance fallback fastify.get('/api/klines', async (req) => { const symbol = String(req.query.symbol || '').toUpperCase() const interval = String(req.query.interval || '15m') const limit = Math.min(Number(req.query.limit || 200), 1500) const endTime = req.query.endTime ? Number(req.query.endTime) : null const startTime = req.query.startTime ? Number(req.query.startTime) : null if (!symbol) return { error: 'symbol required' } // Delta request: return only candles after startTime (for client cache sync) if (startTime) { const delta = klinesCache.getCandlesAfter(symbol, interval, Math.floor(startTime / 1000)) if (delta.length > 0) return { cached: true, data: delta } // Fallback: fetch from Binance const data = await bgetWithRetry(`/fapi/v1/klines?symbol=${encodeURIComponent(symbol)}&interval=${interval}&limit=${limit}&startTime=${startTime}`) if (Array.isArray(data)) klinesCache.storeCandles(symbol, interval, data) return data } // Historical pagination (endTime) — check SQLite first if (endTime) { const beforeSec = Math.floor(endTime / 1000) const cached = klinesCache.getCandlesBefore(symbol, interval, beforeSec, limit) if (cached.length >= limit * 0.8) { // Return in Binance-compatible format for client parseKlines() return cached.map(c => [c.time * 1000, String(c.open), String(c.high), String(c.low), String(c.close), String(c.volume)]) } // Not enough in cache — fetch from Binance and cache const key = `klines:${symbol}:${interval}:${limit}:end${endTime}` const proxyCached = getProxyCached(key, 300000) if (proxyCached) return proxyCached const data = await bgetWithRetry(`/fapi/v1/klines?symbol=${encodeURIComponent(symbol)}&interval=${interval}&limit=${limit}&endTime=${endTime}`) if (Array.isArray(data)) { setProxyCached(key, data) klinesCache.storeCandles(symbol, interval, data) } return data } // Latest candles — try SQLite cache (fresh or stale with background refresh) const cachedCount = klinesCache.getCount(symbol, interval) if (cachedCount >= limit) { const latestTime = klinesCache.getLatestTime(symbol, interval) const age = Date.now() / 1000 - (latestTime || 0) if (age < 300) { const rows = klinesCache.getCandles(symbol, interval, limit) const result = rows.map(c => [c.time * 1000, String(c.open), String(c.high), String(c.low), String(c.close), String(c.volume)]) // Stale cache (>60s) — return immediately but trigger background refresh if (age >= 60) { bgetWithRetry(`/fapi/v1/klines?symbol=${encodeURIComponent(symbol)}&interval=${interval}&limit=3`) .then(data => { if (Array.isArray(data)) klinesCache.storeCandles(symbol, interval, data) }) .catch(() => {}) } return result } } // Cache stale or miss — fetch from Binance, update SQLite const key = `klines:${symbol}:${interval}:${limit}` const proxyCached = getProxyCached(key, 10000) if (proxyCached) return proxyCached const data = await bgetWithRetry(`/fapi/v1/klines?symbol=${encodeURIComponent(symbol)}&interval=${interval}&limit=${limit}`) if (Array.isArray(data)) { setProxyCached(key, data) klinesCache.storeCandles(symbol, interval, data) } return data }) // Test signal — inject a fake signal for notification testing (auto-expires in 60s) // ---- Web Push API ---- // Get VAPID public key (client needs this to subscribe) fastify.get('/api/push/vapid-key', async () => { const key = push.getVapidPublicKey() if (!key) return { success: false, error: 'Push not configured' } return { success: true, key } }) // Subscribe to push notifications fastify.post('/api/push/subscribe', async (req) => { const { subscription, filters } = req.body || {} if (!subscription?.endpoint || !subscription?.keys?.p256dh || !subscription?.keys?.auth) { return { success: false, error: 'Invalid subscription' } } auth.stmts.upsertPushSub.run( subscription.endpoint, subscription.keys.p256dh, subscription.keys.auth, JSON.stringify(filters || {}) ) const count = auth.stmts.countPushSubs.get()?.count || 0 console.log(`[Push] New subscription registered (total: ${count})`) return { success: true, total: count } }) // Unsubscribe from push notifications fastify.post('/api/push/unsubscribe', async (req) => { const { endpoint } = req.body || {} if (!endpoint) return { success: false, error: 'Missing endpoint' } auth.stmts.deletePushSub.run(endpoint) return { success: true } }) // ---- Test Signal ---- fastify.get('/api/signals/test', async (req, reply) => { // Require authenticated user to prevent public abuse if (!req.user) return reply.code(401).send({ error: 'Auth required' }) const sig = { id: `test-${Date.now()}`, type: 'volume_spike', symbol: 'BTCUSDT', direction: 'LONG', price: 94500, confidence: 85, description: 'Test signal — Volume 5.2x above SMA20 average', metadata: { ratio: 5.2, currentVol: 12000000, avgVol: 2300000 }, created_at: new Date().toISOString(), } // Remove any old test signals first (splice to keep array reference — don't reassign!) for (let i = signals.liveSignals.length - 1; i >= 0; i--) { if (String(signals.liveSignals[i].id).startsWith('test-')) signals.liveSignals.splice(i, 1) } signals.liveSignals.unshift(sig) // Auto-remove after 60s (splice to keep reference) setTimeout(() => { const idx = signals.liveSignals.indexOf(sig) if (idx >= 0) signals.liveSignals.splice(idx, 1) }, 60_000) // Test signals do NOT trigger push — only real signals do return { success: true, signal: sig, pushEnabled: push.isEnabled() } }) // OI history — proxied from Binance /futures/data/openInterestHist fastify.get('/api/oi-history', async (req, reply) => { const symbol = String(req.query.symbol || '').toUpperCase() const period = String(req.query.period || '5m') const limit = Math.min(Number(req.query.limit || 500), 500) if (!symbol) return { error: 'symbol required' } const key = `oiHist:${symbol}:${period}:${limit}` const cached = getProxyCached(key, 60000) // cache 1 min if (cached) return cached try { const url = `/futures/data/openInterestHist?symbol=${encodeURIComponent(symbol)}&period=${period}&limit=${limit}` const data = await bgetWithRetry(url) setProxyCached(key, data) return data } catch (e) { reply.code(503) return { error: 'Failed to fetch OI history', message: e.message } } }) // Batch klines — SQLite cache first, Binance fallback fastify.post('/api/klines-batch', async (req) => { const symbols = req.body?.symbols const interval = String(req.body?.interval || '15m') const limit = Math.min(Number(req.body?.limit || 200), 1500) if (!Array.isArray(symbols) || symbols.length === 0) return { error: 'symbols[] required' } const syms = symbols.slice(0, 30).map(s => String(s).toUpperCase()) const result = {} const needFetch = [] // Try SQLite cache first for each symbol (fresh <60s instant, stale <300s with bg refresh) const nowSec = Math.floor(Date.now() / 1000) const bgRefresh = [] // symbols with stale cache — refresh in background for (const symbol of syms) { const cachedCount = klinesCache.getCount(symbol, interval) if (cachedCount >= limit) { const latestTime = klinesCache.getLatestTime(symbol, interval) const age = latestTime ? nowSec - latestTime : Infinity if (age <= 300) { const rows = klinesCache.getCandles(symbol, interval, limit) result[symbol] = rows.map(c => [c.time * 1000, String(c.open), String(c.high), String(c.low), String(c.close), String(c.volume)]) if (age > 60) bgRefresh.push(symbol) // schedule background update } else { needFetch.push(symbol) // too stale (>5min) — must refetch } } else { needFetch.push(symbol) } } // Background refresh for stale-but-usable cache (non-blocking) if (bgRefresh.length > 0) { Promise.allSettled(bgRefresh.map(symbol => bgetWithRetry(`/fapi/v1/klines?symbol=${encodeURIComponent(symbol)}&interval=${interval}&limit=3`) .then(data => { if (Array.isArray(data)) klinesCache.storeCandles(symbol, interval, data) }) )).catch(() => {}) } // Fetch uncached from Binance in parallel if (needFetch.length > 0) { const promises = needFetch.map(async (symbol) => { try { const key = `klines:${symbol}:${interval}:${limit}` let data = getProxyCached(key, 10000) if (!data) { data = await bgetWithRetry(`/fapi/v1/klines?symbol=${encodeURIComponent(symbol)}&interval=${interval}&limit=${limit}`) if (data) { setProxyCached(key, data) klinesCache.storeCandles(symbol, interval, data) } } if (Array.isArray(data)) result[symbol] = data } catch(e) { /* skip */ } }) await Promise.all(promises).catch(e => console.error('[klines-batch] Error:', e.message)) } return result }) // NATR(14) for all USDT pairs — cached 5min fastify.get('/api/natr', async (req) => { const interval = String(req.query.interval || '15m') const cached = getProxyCached(`natr:${interval}`, 300000) // 5 min cache if (cached) return cached // Get all USDT pairs from ticker const tickerCached = getProxyCached('ticker24hr', 30000) const ticker = tickerCached || await bgetWithRetry('/fapi/v1/ticker/24hr') if (!tickerCached) setProxyCached('ticker24hr', ticker) const usdtPairs = ticker .filter(t => t.symbol.endsWith('USDT') && !t.symbol.includes('_')) .filter(t => parseFloat(t.quoteVolume) > 10000000) // >$10M vol only .sort((a, b) => parseFloat(b.quoteVolume) - parseFloat(a.quoteVolume)) .slice(0, 200) // top 200 by volume // Fetch klines in parallel batches of 20 const result = {} const batchSize = 20 for (let i = 0; i < usdtPairs.length; i += batchSize) { const batch = usdtPairs.slice(i, i + batchSize) const promises = batch.map(async (t) => { try { const key = `klines:${t.symbol}:${interval}:50` let klines = getProxyCached(key, 10000) if (!klines) { klines = await bgetWithRetry(`/fapi/v1/klines?symbol=${t.symbol}&interval=${interval}&limit=50`) setProxyCached(key, klines) } if (!Array.isArray(klines) || klines.length < 15) return // Calculate ATR(14) const candles = klines.map(k => ({ high: parseFloat(k[2]), low: parseFloat(k[3]), close: parseFloat(k[4]), })) let trSum = 0 for (let j = candles.length - 14; j < candles.length; j++) { const h = candles[j].high const l = candles[j].low const pc = candles[j - 1].close trSum += Math.max(h - l, Math.abs(h - pc), Math.abs(l - pc)) } const atr = trSum / 14 const lastClose = candles[candles.length - 1].close if (lastClose > 0) result[t.symbol] = parseFloat(((atr / lastClose) * 100).toFixed(2)) } catch(e) { /* skip pair */ } }) await Promise.all(promises).catch(e => console.error('[natr] Unexpected error:', e.message)) // Small delay between batches to avoid rate limits if (i + batchSize < usdtPairs.length) await new Promise(r => setTimeout(r, 200)) } setProxyCached(`natr:${interval}`, result) return result }) // Klines cache stats fastify.get('/api/klines-cache/stats', async () => { try { return klinesCache.getStats() } catch(e) { return { error: e.message } } }) // Background klines updater — refreshes cached symbols every 30s let _klinesUpdaterInterval = null function startKlinesUpdater() { const UPDATE_INTERVAL = 30000 // 30s const BATCH_SIZE = 10 // symbols per batch const BATCH_DELAY = 2000 // 2s between batches (rate-limit safe) _klinesUpdaterInterval = setInterval(async () => { try { // Get the current TF from most common cached interval const intervals = ['1m', '5m', '15m', '1h', '4h'] // all used TFs for (const interval of intervals) { const symbols = klinesCache.getCachedSymbols(interval) if (symbols.length === 0) continue // Process in batches for (let i = 0; i < symbols.length; i += BATCH_SIZE) { const batch = symbols.slice(i, i + BATCH_SIZE) const promises = batch.map(async (symbol) => { try { // Fetch only last 3 candles (latest + 2 for safety) const data = await bgetWithRetry(`/fapi/v1/klines?symbol=${encodeURIComponent(symbol)}&interval=${interval}&limit=3`) if (Array.isArray(data) && data.length > 0) { klinesCache.storeCandles(symbol, interval, data) } } catch(e) { /* skip individual failures */ } }) await Promise.all(promises) if (i + BATCH_SIZE < symbols.length) { await new Promise(r => setTimeout(r, BATCH_DELAY)) } } } } catch(e) { console.error('[KlinesUpdater] Error:', e.message) } }, UPDATE_INTERVAL) console.log('[KlinesUpdater] Started (30s interval)') } // ---- rate limiter status endpoint ---- fastify.get('/api/rate-limiter', async () => ({ usedWeight: rateLimiter.usedWeight, weightUpdatedAt: rateLimiter.weightUpdatedAt, weightAge: Date.now() - rateLimiter.weightUpdatedAt, pauseUntil: rateLimiter.pauseUntil, pauseRemaining: Math.max(0, rateLimiter.pauseUntil - Date.now()), softLimit: rateLimiter.WEIGHT_SOFT_LIMIT, hardLimit: rateLimiter.WEIGHT_HARD_LIMIT, binanceLimit: 2400, status: rateLimiter.pauseUntil > Date.now() ? 'PAUSED' : rateLimiter.usedWeight >= rateLimiter.WEIGHT_HARD_LIMIT ? 'HARD_THROTTLE' : rateLimiter.usedWeight >= rateLimiter.WEIGHT_SOFT_LIMIT ? 'SOFT_THROTTLE' : 'OK' })) // ---- start ---- const port = Number(process.env.PORT || 3200) const start = async () => { try { await fastify.listen({ port, host: '0.0.0.0' }) fastify.log.info(`listening on 0.0.0.0:${port}`) // Init signals scanner (after server up so proxyCache is available) push.init({ stmts: auth.stmts }) // Init klines SQLite cache (before signals so liq_sweep can use it) klinesCache.initDB() signals.init({ getProxyCached, setProxyCached, bgetWithRetry, auth, push, klinesCache, stateManager, densityV2, persistenceMap: densityV2PersistenceMap }) // Start background klines updater (every 30s, updates cached symbols) startKlinesUpdater() // Pre-warm NATR cache so signals scanner has data from first scan setTimeout(async () => { try { console.log('[startup] Pre-warming NATR cache (15m)...') await fastify.inject({ method: 'GET', url: '/api/natr?interval=15m' }) console.log('[startup] NATR cache warmed ✓') } catch (e) { console.warn('[startup] NATR warmup failed:', e.message) } }, 5_000) // 5s after start — let ticker cache populate first // Periodic NATR refresh every 5 min (cache TTL is 5min, so re-compute before expiry) _intervals.push(setInterval(async () => { try { await fastify.inject({ method: 'GET', url: '/api/natr?interval=15m' }) } catch (e) { console.warn('[NATR-refresh] failed:', e.message) } }, 270_000)) // 4.5 min (slightly before 5min TTL expiry) // Background warmup: subscribe top symbols to WS gradually (rate-limit safe) warmupDensitySubscriptions() } catch (err) { fastify.log.error(err) process.exit(1) } } // Graceful shutdown — clean up resources on PM2 restart / kill async function gracefulShutdown(signal) { console.log(`[shutdown] ${signal} received, closing gracefully...`) try { // Stop all intervals for (const id of _intervals) clearInterval(id) if (_klinesUpdaterInterval) clearInterval(_klinesUpdaterInterval) // Stop signal scanners try { signals.stop() } catch (_) {} console.log(`[shutdown] Cleared ${_intervals.length + 1} interval(s) + signal scanners`) // Close Fastify (stop accepting new requests, finish in-flight) await fastify.close() // Close WebSocket connections if (wsManager.connections && wsManager.connections.length > 0) { console.log(`[shutdown] Closing ${wsManager.connections.length} WS connection(s)...`) for (const conn of wsManager.connections) { conn.destroy() } wsManager.connections = [] wsManager.streamToConn.clear() wsManager.callbacks.clear() } // Save density cache + persistence map to disk before exit if (densityCache.data) { saveDensityToDisk(densityCache.data, densityCache.meta) } if (densityV2PersistenceMap.size > 0) { savePersistenceMapToDisk() } // Give async writes a moment to flush await new Promise(r => setTimeout(r, 300)) console.log(`[shutdown] Clean exit.`) process.exit(0) } catch (err) { console.error('[shutdown] Error during cleanup:', err.message) process.exit(1) } } process.on('SIGTERM', () => gracefulShutdown('SIGTERM')) process.on('SIGINT', () => gracefulShutdown('SIGINT')) // Gradually subscribe symbols to depth WS and build density cache async function warmupDensitySubscriptions() { try { const info = await bgetWithRetry('/fapi/v1/exchangeInfo') const allSymbols = (info.symbols || []) .filter(s => s.contractType === 'PERPETUAL' && s.quoteAsset === 'USDT' && s.status === 'TRADING') .map(s => s.symbol) const BATCH = 10 const BATCH_PAUSE = 20000 // 20s between batches const ITEM_DELAY = 300 // 300ms between items console.log(`[warmup] ${allSymbols.length} symbols, ${BATCH}/batch, ${BATCH_PAUSE/1000}s pause`) let subscribed = 0 let batchRetries = 0 const MAX_BATCH_RETRIES = 3 for (let i = 0; i < allSymbols.length; i += BATCH) { const batch = allSymbols.slice(i, i + BATCH) let batchFailed = false for (const sym of batch) { if (wsManager.callbacks.has(sym)) { subscribed++; continue } try { const ob = await bgetWithRetry(`/fapi/v1/depth?symbol=${encodeURIComponent(sym)}&limit=1000`) stateManager.initBook(sym, ob.bids, ob.asks) wsManager.subscribe(sym, (payload) => { stateManager.processDelta(sym, payload) }) subscribed++ } catch (err) { console.log(`[warmup] Error ${sym}: ${err.message.slice(0, 60)}, pausing 60s... (retry ${batchRetries + 1}/${MAX_BATCH_RETRIES})`) await new Promise(r => setTimeout(r, 60000)) if (batchRetries < MAX_BATCH_RETRIES) { batchRetries++ i -= BATCH // retry this batch } else { console.log(`[warmup] Skipping batch after ${MAX_BATCH_RETRIES} retries`) batchRetries = 0 } batchFailed = true break } await new Promise(r => setTimeout(r, ITEM_DELAY)) } if (!batchFailed) batchRetries = 0 // After every 5 batches (50 symbols), rebuild density cache const batchNum = Math.floor(i / BATCH) + 1 if (batchNum % 5 === 0 || i + BATCH >= allSymbols.length) { try { await rebuildDensityCache(allSymbols) } catch (_) {} console.log(`[warmup] ${subscribed}/${allSymbols.length} subscribed, cache: ${densityCache.data ? densityCache.data.length : 0} walls`) } if (i + BATCH < allSymbols.length) { await new Promise(r => setTimeout(r, BATCH_PAUSE)) } } console.log(`[warmup] Done: ${subscribed} symbols`) // After density warmup, pre-warm klines cache for fast chart loads warmupKlinesCache() } catch (err) { console.log(`[warmup] Failed: ${err.message.slice(0, 100)}`) } } // Pre-warm klines cache: top 200 by volume × main TFs → instant chart opens async function warmupKlinesCache() { try { // Get top 200 symbols by 24h volume const ticker = await bgetWithRetry('/fapi/v1/ticker/24hr') const sorted = ticker .filter(t => t.symbol.endsWith('USDT')) .sort((a, b) => Number(b.quoteVolume) - Number(a.quoteVolume)) .slice(0, 200) .map(t => t.symbol) const TFS = ['15m', '1h', '4h'] const LIMITS = { '15m': 1500, '1h': 1500, '4h': 1500 } const total = sorted.length * TFS.length let done = 0, cached = 0 console.log(`[klines-warmup] Starting: ${sorted.length} symbols × ${TFS.length} TFs = ${total} requests`) for (const tf of TFS) { for (let i = 0; i < sorted.length; i++) { const sym = sorted[i] const limit = LIMITS[tf] const key = `klines:${sym}:${tf}:${limit}` // Skip if already cached if (getProxyCached(key, 300000)) { done++; cached++; continue } const MAX_SYMBOL_RETRIES = 3 let symbolRetries = 0 let success = false while (!success && symbolRetries <= MAX_SYMBOL_RETRIES) { try { const data = await bgetWithRetry(`/fapi/v1/klines?symbol=${encodeURIComponent(sym)}&interval=${tf}&limit=${limit}`) setProxyCached(key, data) // Persist to SQLite so data survives proxy cache expiry and PM2 restarts if (klinesCache && Array.isArray(data)) { try { klinesCache.storeCandles(sym, tf, data) } catch (_) {} } done++ success = true } catch (err) { symbolRetries++ if (symbolRetries > MAX_SYMBOL_RETRIES) { console.log(`[klines-warmup] Skipping ${sym}:${tf} after ${MAX_SYMBOL_RETRIES} retries`) done++ break } console.log(`[klines-warmup] Rate limited at ${done}/${total} (${sym} retry ${symbolRetries}/${MAX_SYMBOL_RETRIES}), pausing 30s...`) await new Promise(r => setTimeout(r, 30000)) } } // Throttle: 150ms between requests (concurrency 1, ~400 req/min safe) await new Promise(r => setTimeout(r, 150)) // Progress every 100 requests if (done % 100 === 0) { console.log(`[klines-warmup] ${done}/${total} (${cached} from cache)`) } } } console.log(`[klines-warmup] Done: ${done}/${total} cached (${cached} already had)`) } catch (err) { console.log(`[klines-warmup] Failed: ${err.message.slice(0, 100)}`) } } // Rebuild density cache from currently subscribed symbols (no Binance depth calls) async function rebuildDensityCache(allSymbols) { const marks = await bgetWithRetry('/fapi/v1/premiumIndex') const markMap = new Map(marks.map(m => [m.symbol, Number(m.markPrice)])) const subscribedSyms = allSymbols.filter(s => wsManager.callbacks.has(s)) const allWalls = [] let skipped = 0 for (const sym of subscribedSyms) { const price = markMap.get(sym) if (!price) continue const bidLevelsRaw = stateManager.getTopLevels(sym, 'bid', price, 0, 100, 5.0) const askLevelsRaw = stateManager.getTopLevels(sym, 'ask', price, 0, 100, 5.0) const klinesStats = await getKlinesWithStats(sym) if (!klinesStats) { skipped++; continue } const avg5mVol = (klinesStats.vol1 + klinesStats.vol2 + klinesStats.vol3 + klinesStats.vol4 + klinesStats.vol5) / 5 const processSide = (levels, sideKey) => { const BIN_SIZE_PCT = 0.1 const rawBins = binLevels(levels, BIN_SIZE_PCT) const validBins = rawBins.filter(b => b.notional >= 0) const trackedBins = stateManager.trackAndEnrichBins(sym, sideKey, validBins, price) return trackedBins.map(bin => { const behavior = analyzeBehavior(bin, price, klinesStats.natr, avg5mVol) if (behavior.xMult < 4) return null let tte = avg5mVol > 0 ? bin.notional / (avg5mVol / 5) : Infinity return { symbol: sym, sideKey, price: Math.round(bin.anchorPrice * 10000) / 10000, notional: bin.notional, distancePct: Math.round(behavior.distancePct * 100) / 100, lifetimeMins: Math.round(behavior.lifetimeMins * 10) / 10, score: behavior.trustScore, xMult: Math.round(behavior.xMult * 10) / 10, severity: behavior.severity, tags: behavior.tags, levelsCount: bin.levelsCount, natr: klinesStats.natr, avg5mVol: Math.round(avg5mVol), vol1: klinesStats.vol1, vol2: klinesStats.vol2, vol3: klinesStats.vol3, vol4: klinesStats.vol4, vol5: klinesStats.vol5, timeToEatMinutes: tte } }).filter(Boolean).sort((a, b) => b.score - a.score).slice(0, 2) } allWalls.push(...processSide(bidLevelsRaw, 'bid'), ...processSide(askLevelsRaw, 'ask')) } allWalls.sort((a, b) => b.score - a.score) // Top 3 per symbol const perSym = {} for (const w of allWalls) { if (!perSym[w.symbol]) perSym[w.symbol] = [] if (perSym[w.symbol].length < 3) perSym[w.symbol].push(w) } const finalData = Object.values(perSym).flat().sort((a, b) => b.score - a.score) const meta = { count: finalData.length, minNotional: 0, depthLimit: 100, concurrency: 0, mmMode: false, windowPct: 5, mmMultiplier: 4 } densityCache = { data: finalData, meta, ts: Date.now() } saveDensityToDisk(finalData, meta) if (skipped) console.warn(`[density] Rebuild: ${skipped}/${subscribedSyms.length} symbols skipped (klines unavailable)`) } start()