← Назадconst fastify = require('fastify')({ logger: true })
// CORS — whitelist known origins only
const ALLOWED_ORIGINS = [
'https://futures-screener.szhub.space',
'http://localhost:3200',
'http://127.0.0.1:3200'
]
const cors = require('@fastify/cors')
fastify.register(cors, {
origin: (origin, cb) => {
// Allow same-origin requests (no origin header) and whitelisted origins
if (!origin || ALLOWED_ORIGINS.includes(origin)) {
cb(null, true)
} else {
cb(null, false)
}
},
methods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'],
allowedHeaders: ['Content-Type', 'Authorization'],
credentials: true
})
// Rate limiting
const rateLimit = require('@fastify/rate-limit')
fastify.register(rateLimit, {
max: 100, // 100 requests per minute (global default)
timeWindow: 60000,
keyGenerator: (req) => req.ip,
addHeadersOnExceeding: { 'x-ratelimit-limit': true, 'x-ratelimit-remaining': true },
addHeaders: { 'x-ratelimit-limit': true, 'x-ratelimit-remaining': true, 'retry-after': true }
})
// Binance Futures (USDT-M) REST base
const BINANCE_FAPI = 'https://fapi.binance.com'
// Custom Modules
const wsManager = require('./ws');
const stateManager = require('./state');
const { binLevels } = require('./logic');
const { analyzeBehavior } = require('./scorer');
const densityV2 = require('./densityV2');
const auth = require('./auth');
const signals = require('./signals');
const push = require('./push');
const klinesCache = require('./klines-cache');
// WS connects lazily on first subscribe() — no eager connect needed
// ---- helpers ----
const FETCH_TIMEOUT_MS = 15000 // 15s timeout for all Binance requests
// ---- Global Binance Rate Limiter ----
const rateLimiter = {
usedWeight: 0, // last known X-MBX-USED-WEIGHT-1M
weightUpdatedAt: 0, // when usedWeight was last updated
pauseUntil: 0, // global pause timestamp (after 429)
WEIGHT_SOFT_LIMIT: 1800, // start throttling at 1800/2400 (75%)
WEIGHT_HARD_LIMIT: 2200, // hard pause at 2200/2400 (92%)
THROTTLE_DELAY_MS: 500, // delay when approaching soft limit
/** Update weight from Binance response headers */
updateWeight(headers) {
const w = parseInt(headers.get('x-mbx-used-weight-1m') || '0', 10)
if (w > 0) {
this.usedWeight = w
this.weightUpdatedAt = Date.now()
}
},
/** Set global pause (all requests wait) */
setPause(ms) {
const until = Date.now() + ms
if (until > this.pauseUntil) {
this.pauseUntil = until
console.log(`[rate-limiter] ⚠️ Global pause ${ms}ms (until ${new Date(until).toISOString().slice(11,19)})`)
}
},
/** Wait if paused or throttled. Call before every request. */
async waitIfNeeded() {
// Global pause (after 429)
const now = Date.now()
if (now < this.pauseUntil) {
const wait = this.pauseUntil - now
console.log(`[rate-limiter] Waiting ${wait}ms (global pause)`)
await new Promise(r => setTimeout(r, wait))
}
// Weight is stale after 60s — Binance resets every minute
if (Date.now() - this.weightUpdatedAt > 60000) {
this.usedWeight = 0
}
// Hard limit — pause 5s
if (this.usedWeight >= this.WEIGHT_HARD_LIMIT) {
console.log(`[rate-limiter] Weight ${this.usedWeight} >= ${this.WEIGHT_HARD_LIMIT}, pausing 5s`)
await new Promise(r => setTimeout(r, 5000))
}
// Soft limit — small delay
else if (this.usedWeight >= this.WEIGHT_SOFT_LIMIT) {
await new Promise(r => setTimeout(r, this.THROTTLE_DELAY_MS))
}
},
/** Get current status for logging */
status() {
return `weight=${this.usedWeight}/2400, pause=${this.pauseUntil > Date.now() ? (this.pauseUntil - Date.now()) + 'ms' : 'none'}`
}
}
class RateLimitError extends Error {
constructor(path, retryAfterMs, usedWeight) {
super(`Binance 429 rate limited: ${path}`)
this.name = 'RateLimitError'
this.retryAfterMs = retryAfterMs
this.usedWeight = usedWeight
}
}
async function bget(path) {
// Wait if globally paused or approaching weight limit
await rateLimiter.waitIfNeeded()
const controller = new AbortController()
const timeoutId = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS)
try {
const res = await fetch(BINANCE_FAPI + path, { method: 'GET', signal: controller.signal })
// Always track weight from response
rateLimiter.updateWeight(res.headers)
if (res.status === 429 || res.status === 418) {
// 429 = rate limited, 418 = IP ban (temp)
const retryAfter = parseInt(res.headers.get('retry-after') || '0', 10)
const retryMs = retryAfter > 0 ? retryAfter * 1000 : (res.status === 418 ? 120000 : 30000)
rateLimiter.setPause(retryMs)
throw new RateLimitError(path, retryMs, rateLimiter.usedWeight)
}
if (!res.ok) {
const txt = await res.text().catch(() => '')
throw new Error(`Binance GET ${path} failed: ${res.status} ${txt}`)
}
return res.json()
} finally {
clearTimeout(timeoutId)
}
}
function toNumber(x) { return Number(x) }
// K-lines timeframe (5 minutes in ms)
const KLINE_INTERVAL = '5m'
const KLINE_LIMIT = 20
// Binance K-lines order: index 0 = oldest, last = newest
// After reverse(): bars[0] = newest (t), bars[1] = prev (t-1), bars[2] = oldest (t-2)
// So: vol1 = newest (t), vol2 = prev (t-1), vol3 = oldest (t-2)
// Note: Variable names now match time order, not index order
function filterLevelsByWindow(levels, markPrice, windowPct) {
return levels.filter(level => {
const distPct = Math.abs(level.price - markPrice) / markPrice * 100;
return distPct <= windowPct;
});
}
// Group close levels into clusters (for MM detection)
// levels: array of {price, notional, distancePct}
// maxGapPct: max distance between levels in same cluster (%)
function groupCloseLevels(levels, maxGapPct = 0.2) {
if (!levels || levels.length === 0) return []
// Sort by price
const sorted = [...levels].sort((a, b) => a.price - b.price)
const clusters = []
let currentCluster = [sorted[0]]
for (let i = 1; i < sorted.length; i++) {
const level = sorted[i]
const prevLevel = sorted[i - 1]
// Calculate gap in % relative to price
const gapPct = Math.abs(level.price - prevLevel.price) / prevLevel.price * 100
if (gapPct <= maxGapPct) {
// Add to current cluster
currentCluster.push(level)
} else {
// Close current cluster and start new one
if (currentCluster.length >= 2) {
clusters.push(currentCluster)
}
currentCluster = [level]
}
}
// Don't forget the last cluster
if (currentCluster.length >= 2) {
clusters.push(currentCluster)
}
return clusters
}
function calcNearestDensities({ price, bids, asks, minNotional, windowPct }) {
// bids/asks are arrays: [priceStr, qtyStr]
const filteredLevels = [];
for (const [pStr, qStr] of bids) {
const p = toNumber(pStr), q = toNumber(qStr)
const notional = p * q
if (notional >= minNotional) {
const distPct = Math.abs((price - p) / price) * 100
if (distPct <= windowPct) {
filteredLevels.push({
side: 'bid',
price: p,
qty: q,
notional,
distancePct: distPct
})
}
}
}
for (const [pStr, qStr] of asks) {
const p = toNumber(pStr), q = toNumber(qStr)
const notional = p * q
if (notional >= minNotional) {
const distPct = Math.abs((p - price) / price) * 100
if (distPct <= windowPct) {
filteredLevels.push({
side: 'ask',
price: p,
qty: q,
notional,
distancePct: distPct
})
}
}
}
const bidLevels = filteredLevels.filter(l => l.side === 'bid');
const askLevels = filteredLevels.filter(l => l.side === 'ask');
return { filteredLevels, bidLevels, askLevels };
}
// Simple concurrency limiter (no deps) with optional per-item delay
async function mapLimit(items, limit, fn, delayMs = 0) {
const out = new Array(items.length)
let i = 0
const workers = new Array(Math.min(limit, items.length)).fill(0).map(async () => {
while (true) {
const idx = i++
if (idx >= items.length) break
out[idx] = await fn(items[idx], idx)
if (delayMs > 0) await new Promise(r => setTimeout(r, delayMs))
}
})
await Promise.all(workers)
return out
}
// Density endpoint result cache (avoid re-scanning 500+ symbols on every request)
let densityCache = { data: null, meta: null, ts: 0 }
const DENSITY_CACHE_TTL = 60000 // 60 seconds (was 30s)
// Disk cache helpers for density results (survive PM2 restarts)
const DENSITY_CACHE_FILE = require('path').join(__dirname, '..', 'data', 'density-cache.json')
function saveDensityToDisk(data, meta) {
try {
const dir = require('path').dirname(DENSITY_CACHE_FILE)
if (!require('fs').existsSync(dir)) require('fs').mkdirSync(dir, { recursive: true })
// Non-blocking write (fire-and-forget)
require('fs').promises.writeFile(DENSITY_CACHE_FILE, JSON.stringify({ data, meta, ts: Date.now() }))
.catch(e => console.log('[density-cache] disk save error:', e.message))
} catch (e) { console.log('[density-cache] disk save error:', e.message) }
}
function loadDensityFromDisk() {
try {
if (!require('fs').existsSync(DENSITY_CACHE_FILE)) return null
// Sync read OK here — only called once at startup
const raw = JSON.parse(require('fs').readFileSync(DENSITY_CACHE_FILE, 'utf8'))
// Accept disk cache up to 10 minutes old (stale but better than nothing)
if (raw && raw.data && (Date.now() - raw.ts) < 600000) {
console.log(`[density-cache] Loaded ${raw.data.length} walls from disk (age: ${((Date.now() - raw.ts) / 1000).toFixed(0)}s)`)
return raw
}
} catch (e) { console.log('[density-cache] disk load error:', e.message) }
return null
}
// Load disk cache on startup
const diskCache = loadDensityFromDisk()
if (diskCache) {
densityCache = { data: diskCache.data, meta: diskCache.meta, ts: diskCache.ts }
}
// Scoring function: enhanced with Time To Eat, NATR, and lifetime
function calcScore({ notional, distancePct, isMM, timeToEatMinutes, natr, lifetimeSec }) {
let score = notional / 1000000; // Base score in millions
score = score / (1 + distancePct); // Penalty for distance
if (timeToEatMinutes > 60) score *= 1.5; // Huge wall compared to 25m passing volume
if (natr > 1.0) score *= 1.2; // Bonus for high volatility coins
if (lifetimeSec > 300) score *= 1.2; // Bonus for proven walls (5+ mins old)
if (isMM) score *= 1.5; // Bonus for clustered MM levels
return score;
}
// Track all intervals for graceful shutdown cleanup
const _intervals = []
// In-memory cache (TTL: 3 seconds)
const cache = new Map()
const CACHE_TTL_MS = 3000
// Cleanup expired cache entries every 10 seconds
_intervals.push(setInterval(() => {
const now = Date.now()
for (const [key, entry] of cache.entries()) {
if (now - entry.ts > CACHE_TTL_MS) cache.delete(key)
}
}, 10000))
// --- Level History State ---
const levelHistory = new Map()
// Очистка старых уровней (TTL: 1 минута без обновлений)
_intervals.push(setInterval(() => {
const now = Date.now()
for (const [key, val] of levelHistory.entries()) {
if (now - val.lastUpdate > 60000) {
levelHistory.delete(key)
}
}
}, 30000))
// --- Telegram Alerts Scaffold ---
const TELEGRAM_BOT_TOKEN = process.env.TELEGRAM_BOT_TOKEN || ''
const TELEGRAM_CHAT_ID = process.env.TELEGRAM_CHAT_ID || ''
async function sendTelegramAlert(msg) {
if (!TELEGRAM_BOT_TOKEN || !TELEGRAM_CHAT_ID) return
const url = `https://api.telegram.org/bot${TELEGRAM_BOT_TOKEN}/sendMessage`
try {
await fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ chat_id: TELEGRAM_CHAT_ID, text: msg, parse_mode: 'HTML' })
})
} catch (e) {
console.error('Telegram Error:', e.message)
}
}
function getCacheKey(req) {
return JSON.stringify({
symbols: req.query.symbols || 'all',
minNotional: req.query.minNotional || 50000,
depthLimit: req.query.depthLimit || 100,
windowPct: req.query.windowPct || 1.0,
minScore: req.query.minScore || 0,
concurrency: req.query.concurrency || 6
})
}
function getCached(req) {
const key = getCacheKey(req)
const entry = cache.get(key)
if (entry && Date.now() - entry.ts < CACHE_TTL_MS) {
return entry.data
}
return null
}
function setCached(req, data) {
const key = getCacheKey(req)
cache.set(key, { data, ts: Date.now() })
}
// Retry/backoff helper for Binance requests (rate-limit aware)
async function bgetWithRetry(path, maxRetries = 3, baseDelay = 500) {
let rateLimitRetries = 0
const MAX_RATE_LIMIT_RETRIES = 3 // max 3 rate-limit retries on top of regular retries
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await bget(path)
} catch (err) {
if (err instanceof RateLimitError) {
rateLimitRetries++
if (rateLimitRetries > MAX_RATE_LIMIT_RETRIES) {
throw new Error(`Binance GET ${path} rate limited ${rateLimitRetries}x, giving up (${rateLimiter.status()})`)
}
// 429/418: global pause already set by bget(), wait and retry (don't count as regular attempt)
console.log(`[bgetWithRetry] 429 on ${path.slice(0, 60)}, retry ${rateLimitRetries}/${MAX_RATE_LIMIT_RETRIES}, waiting ${err.retryAfterMs}ms`)
await new Promise(r => setTimeout(r, err.retryAfterMs))
attempt-- // don't count rate limit as a regular retry attempt
continue
}
if (attempt === maxRetries) {
throw new Error(`Binance GET ${path} failed after ${maxRetries} attempts: ${err.message}`)
}
const delay = baseDelay * Math.pow(2, attempt - 1)
await new Promise(resolve => setTimeout(resolve, delay))
}
}
}
// Klines stats cache (TTL 60s) to avoid hammering Binance
const klinesStatsCache = new Map()
const KLINES_STATS_TTL = 60000
// Periodic cleanup of stale klinesStats entries (every 5min)
_intervals.push(setInterval(() => {
const now = Date.now()
let evicted = 0
for (const [symbol, entry] of klinesStatsCache) {
if (now - entry.ts > KLINES_STATS_TTL * 5) { // 5x TTL = 5min
klinesStatsCache.delete(symbol)
evicted++
}
}
if (evicted > 0) console.log(`[klines-stats] Cache cleanup: evicted ${evicted}, ${klinesStatsCache.size} remaining`)
}, 5 * 60_000))
// Получить K-lines и рассчитать объёмы + ATR
async function getKlinesWithStats(symbol) {
const cached = klinesStatsCache.get(symbol)
if (cached && (Date.now() - cached.ts) < KLINES_STATS_TTL) return cached.data
try {
// Получаем K-lines параллельно: 1d (для NATR) и 5m (для объемов)
const [klines1d, klines5m] = await Promise.all([
bgetWithRetry(`/fapi/v1/klines?symbol=${symbol}&interval=1d&limit=${14}`),
bgetWithRetry(`/fapi/v1/klines?symbol=${symbol}&interval=5m&limit=${5}`)
])
let natr = 0;
// Расчет NATR по 1-дневным свечам
if (klines1d && klines1d.length > 0) {
const convert = (k) => ({
high: toNumber(k[2]),
low: toNumber(k[3]),
close: toNumber(k[4])
})
const bars = klines1d.map(convert)
const trValues = []
// True Range
for (let i = 1; i < bars.length; i++) {
const highLow = bars[i].high - bars[i].low
const highPrevClose = Math.abs(bars[i].high - bars[i - 1].close)
const lowPrevClose = Math.abs(bars[i].low - bars[i - 1].close)
const tr = Math.max(highLow, highPrevClose, lowPrevClose)
trValues.push(tr)
}
if (trValues.length > 0) {
const atr = trValues.reduce((a, b) => a + b, 0) / trValues.length
const latestClose = bars[bars.length - 1].close
natr = latestClose > 0 ? (atr / latestClose) * 100 : 0
} else if (bars.length === 1) { // Если монета совсем новая
const latestClose = bars[0].close
const highLow = bars[0].high - bars[0].low
natr = latestClose > 0 ? (highLow / latestClose) * 100 : 0
}
}
let vol1 = 0, vol2 = 0, vol3 = 0, vol4 = 0, vol5 = 0;
// Объемы по 5-минутным свечам
if (klines5m && klines5m.length > 0) {
const convert5m = (k) => ({ volume: toNumber(k[7]) })
const bars5m = klines5m.map(convert5m).reverse() // [newest, prev, oldest...]
vol1 = bars5m[0] ? bars5m[0].volume : 0
vol2 = bars5m[1] ? bars5m[1].volume : 0
vol3 = bars5m[2] ? bars5m[2].volume : 0
vol4 = bars5m[3] ? bars5m[3].volume : 0
vol5 = bars5m[4] ? bars5m[4].volume : 0
}
const result = { vol1, vol2, vol3, vol4, vol5, natr }
klinesStatsCache.set(symbol, { data: result, ts: Date.now() })
return result
} catch (err) {
console.warn(`[klines-stats] ${symbol} failed: ${err.message}`)
return null // caller must handle null (skip symbol)
}
}
// ---- UI (static files from ../app, cached in memory at startup) ----
const path = require('path')
const fs = require('fs')
const APP_DIR = path.resolve(__dirname, '..', 'app')
// Pre-load static files into memory (never changes at runtime)
const staticCache = new Map()
function getStatic(relPath) {
if (staticCache.has(relPath)) return staticCache.get(relPath)
const p = path.join(APP_DIR, relPath)
const buf = fs.readFileSync(p)
staticCache.set(relPath, buf)
return buf
}
// Pre-warm all static files at module load
const STATIC_FILES = [
'index.html', 'app.js', 'densities.js', 'mini-charts.js', 'auth.js',
'drawing-manager.js', 'signals.js', 'settings.js', 'styles.css',
'manifest.json', 'sw.js', 'icon-192.svg', 'icon-512.svg'
]
for (const f of STATIC_FILES) {
try { getStatic(f) } catch (e) { console.warn(`[Static] Failed to pre-load ${f}: ${e.message}`) }
}
// Also cache the UMD library
const LWC_DRAWING_PATH = path.resolve(__dirname, '..', 'node_modules', 'lightweight-charts-drawing', 'dist', 'lightweight-charts-drawing.umd.js')
let lwcDrawingBuf
try { lwcDrawingBuf = fs.readFileSync(LWC_DRAWING_PATH) } catch (e) { console.warn('[Static] lightweight-charts-drawing UMD not found') }
// Cache headers: HTML = must-revalidate (cache-buster URLs update), assets = 1 day (have ?v= buster)
const ASSET_CACHE = 'public, max-age=86400' // 1 day
const HTML_CACHE = 'no-cache' // always revalidate
fastify.get('/', async (req, reply) => {
reply.header('Cache-Control', HTML_CACHE).type('text/html; charset=utf-8').send(getStatic('index.html'))
})
fastify.get('/app.js', async (req, reply) => {
reply.header('Cache-Control', ASSET_CACHE).type('application/javascript; charset=utf-8').send(getStatic('app.js'))
})
fastify.get('/densities.js', async (req, reply) => {
reply.header('Cache-Control', ASSET_CACHE).type('application/javascript; charset=utf-8').send(getStatic('densities.js'))
})
fastify.get('/mini-charts.js', async (req, reply) => {
reply.header('Cache-Control', ASSET_CACHE).type('application/javascript; charset=utf-8').send(getStatic('mini-charts.js'))
})
fastify.get('/auth.js', async (req, reply) => {
reply.header('Cache-Control', ASSET_CACHE).type('application/javascript; charset=utf-8').send(getStatic('auth.js'))
})
fastify.get('/drawing-manager.js', async (req, reply) => {
reply.header('Cache-Control', ASSET_CACHE).type('application/javascript; charset=utf-8').send(getStatic('drawing-manager.js'))
})
fastify.get('/signals.js', async (req, reply) => {
reply.header('Cache-Control', ASSET_CACHE).type('application/javascript; charset=utf-8').send(getStatic('signals.js'))
})
fastify.get('/settings.js', async (req, reply) => {
reply.header('Cache-Control', ASSET_CACHE).type('application/javascript; charset=utf-8').send(getStatic('settings.js'))
})
fastify.get('/styles.css', async (req, reply) => {
reply.header('Cache-Control', ASSET_CACHE).type('text/css; charset=utf-8').send(getStatic('styles.css'))
})
fastify.get('/lightweight-charts-drawing.umd.js', async (req, reply) => {
if (!lwcDrawingBuf) return reply.code(404).send('Not found')
reply.header('Cache-Control', ASSET_CACHE).type('application/javascript; charset=utf-8').send(lwcDrawingBuf)
})
fastify.get('/favicon.ico', async (req, reply) => {
reply.code(204).send()
})
fastify.get('/manifest.json', async (req, reply) => {
reply.header('Cache-Control', ASSET_CACHE).type('application/manifest+json; charset=utf-8').send(getStatic('manifest.json'))
})
fastify.get('/sw.js', async (req, reply) => {
reply.header('Cache-Control', 'no-cache').type('application/javascript; charset=utf-8').header('Service-Worker-Allowed', '/').send(getStatic('sw.js'))
})
fastify.get('/icon-192.svg', async (req, reply) => {
reply.header('Cache-Control', ASSET_CACHE).type('image/svg+xml').send(getStatic('icon-192.svg'))
})
fastify.get('/icon-512.svg', async (req, reply) => {
reply.header('Cache-Control', ASSET_CACHE).type('image/svg+xml').send(getStatic('icon-512.svg'))
})
// ---- Auth routes ----
// Attach user to every request (non-blocking)
fastify.addHook('onRequest', async (req) => {
auth.authHook(req)
})
fastify.post('/api/auth/register', { config: { rateLimit: { max: 10, timeWindow: 60000 } } }, async (req, reply) => {
const { email, password, name } = req.body || {}
const result = auth.register(email, password, name)
if (result.error) return reply.code(400).send(result)
return result
})
fastify.post('/api/auth/login', { config: { rateLimit: { max: 15, timeWindow: 60000 } } }, async (req, reply) => {
const { email, password } = req.body || {}
const result = auth.login(email, password)
if (result.error) return reply.code(401).send(result)
return result
})
fastify.get('/api/auth/me', async (req, reply) => {
if (!req.user) return reply.code(401).send({ error: 'Not authenticated' })
return { success: true, user: req.user }
})
// Google OAuth
fastify.get('/api/auth/google/url', async () => {
const url = auth.getGoogleAuthUrl()
if (!url) return { error: 'Google OAuth not configured' }
return { url }
})
fastify.post('/api/auth/google/callback', async (req, reply) => {
const { code } = req.body || {}
if (!code) return reply.code(400).send({ error: 'Code required' })
const result = await auth.googleAuth(code)
if (result.error) return reply.code(400).send(result)
return result
})
// Admin: set user tier (requires admin tier)
fastify.post('/api/auth/set-tier', async (req, reply) => {
if (!req.user || req.user.tier !== 'admin') {
return reply.code(403).send({ error: 'Admin only' })
}
const { userId, tier } = req.body || {}
if (!userId || !['free', 'pro', 'admin'].includes(tier)) {
return reply.code(400).send({ error: 'Invalid userId or tier' })
}
const user = auth.setTier(userId, tier)
return { success: true, user }
})
// Admin: list users
fastify.get('/api/auth/users', async (req, reply) => {
if (!req.user || req.user.tier !== 'admin') {
return reply.code(403).send({ error: 'Admin only' })
}
return { users: auth.listUsers(), count: auth.getUserCount() }
})
// ---- User Data routes (settings, watchlists, layouts, alerts) ----
// Settings
fastify.get('/api/settings', async (req, reply) => {
if (!auth.requireAuth(req, reply)) return
return { success: true, settings: auth.getSettings(req.user.id) }
})
fastify.put('/api/settings', async (req, reply) => {
if (!auth.requireAuth(req, reply)) return
const settings = req.body || {}
auth.saveSettings(req.user.id, settings)
return { success: true }
})
// Watchlists
fastify.get('/api/watchlist', async (req, reply) => {
if (!auth.requireAuth(req, reply)) return
return { success: true, watchlist: auth.getWatchlist(req.user.id) }
})
fastify.post('/api/watchlist', async (req, reply) => {
if (!auth.requireAuth(req, reply)) return
const { symbol, color, sort_order } = req.body || {}
if (!symbol) return reply.code(400).send({ error: 'Symbol required' })
auth.addToWatchlist(req.user.id, symbol, color, sort_order)
return { success: true }
})
fastify.delete('/api/watchlist/:symbol', async (req, reply) => {
if (!auth.requireAuth(req, reply)) return
auth.removeFromWatchlist(req.user.id, req.params.symbol)
return { success: true }
})
// Layouts
fastify.get('/api/layouts', async (req, reply) => {
if (!auth.requireAuth(req, reply)) return
return { success: true, layouts: auth.getLayouts(req.user.id), active: auth.getActiveLayout(req.user.id) }
})
fastify.post('/api/layouts', async (req, reply) => {
if (!auth.requireAuth(req, reply)) return
const { name, layout_type, config } = req.body || {}
const result = auth.createLayout(req.user.id, name || 'Layout', layout_type || '1', config || {})
return { success: true, id: result.lastInsertRowid }
})
fastify.put('/api/layouts/:id', async (req, reply) => {
if (!auth.requireAuth(req, reply)) return
const { config, layout_type } = req.body || {}
auth.updateLayout(Number(req.params.id), req.user.id, config, layout_type)
return { success: true }
})
// Alerts
fastify.get('/api/alerts', async (req, reply) => {
if (!auth.requireAuth(req, reply)) return
return { success: true, alerts: auth.getUserAlerts(req.user.id) }
})
fastify.post('/api/alerts', async (req, reply) => {
if (!auth.requireAuth(req, reply)) return
const { type, symbol, condition, cooldown_sec } = req.body || {}
if (!type) return reply.code(400).send({ error: 'Alert type required' })
const result = auth.createUserAlert(req.user.id, type, symbol, condition || {}, cooldown_sec || 300)
return { success: true, id: result.lastInsertRowid }
})
fastify.delete('/api/alerts/:id', async (req, reply) => {
if (!auth.requireAuth(req, reply)) return
auth.stmts.deleteAlert.run(Number(req.params.id), req.user.id)
return { success: true }
})
fastify.get('/api/alerts/triggers', async (req, reply) => {
if (!auth.requireAuth(req, reply)) return
const limit = Number(req.query.limit || 50)
return { success: true, triggers: auth.getAlertTriggers(req.user.id, limit) }
})
// Signal stats (public)
fastify.get('/api/signals/stats', async () => {
return { success: true, stats: auth.getSignalStats(), recent: auth.getRecentSignals(20) }
})
// Live signals (in-memory, real-time)
fastify.get('/api/signals/live', async (req) => {
const { type, symbol, direction, minConfidence, limit, hours } = req.query
const data = signals.getLiveSignals({ type, symbol, direction, minConfidence, limit, hours })
return { success: true, count: data.length, data }
})
// Signal summary (counts, types)
fastify.get('/api/signals/summary', async () => {
return { success: true, ...signals.getSignalSummary() }
})
// Outcome stats (WIN/LOSS by type)
fastify.get('/api/signals/outcomes', async () => {
return { success: true, stats: signals.getOutcomeStats() }
})
// Signal history (from DB, with pagination)
fastify.get('/api/signals/history', async (req) => {
const limit = Math.min(Number(req.query.limit || 50), 200)
const recent = auth.getRecentSignals(limit)
return { success: true, count: recent.length, data: recent }
})
// ---- API routes ----
fastify.get('/health', async () => {
return { status: 'ok', service: process.env.SERVICE_NAME || 'futures-screener', users: auth.getUserCount() }
})
fastify.get('/symbols', async () => {
const info = await bgetWithRetry('/fapi/v1/exchangeInfo')
const symbols = (info.symbols || [])
.filter(s => s.contractType === 'PERPETUAL' && s.quoteAsset === 'USDT' && s.status === 'TRADING')
.map(s => s.symbol)
return { count: symbols.length, symbols }
})
fastify.get('/depth/:symbol', async (req) => {
const symbol = String(req.params.symbol || '').toUpperCase()
const limit = Number(req.query.limit || 100)
const ob = await bgetWithRetry(`/fapi/v1/depth?symbol=${encodeURIComponent(symbol)}&limit=${limit}`)
return { symbol, lastUpdateId: ob.lastUpdateId, bids: ob.bids, asks: ob.asks }
})
// NEW: simple flat output for UI (scoring, sorting, cache)
fastify.get('/densities/simple', async (req, reply) => {
const minNotional = Number(req.query.minNotional || 0)
const depthLimit = Number(req.query.depthLimit || 100)
const mmMode = req.query.mmMode === 'true'
const windowPct = Number(req.query.windowPct || 5.0) // 5% по умолчанию
const mmMultiplier = Number(req.query.mmMultiplier || 4) // 4x по умолчанию
const xFilter = Number(req.query.xFilter || 0) // фильтр по x (0 = без фильтра)
const natrFilter = Number(req.query.natrFilter || 0) // фильтр по NATR (0 = без фильтра)
const minScore = Number(req.query.minScore || 0) // фильтр по Score
const concurrency = Number(req.query.concurrency || 3) // parallel Binance requests (3 to stay under rate limit)
const isSpecificSymbols = !!req.query.symbols
let symbols
if (isSpecificSymbols) {
symbols = String(req.query.symbols).split(',').map(s => s.trim().toUpperCase()).filter(s => s)
} else {
// Full scan — always return from cache (warmup populates it)
if (densityCache.data) {
let finalData = [...densityCache.data]
if (xFilter > 0) finalData = finalData.filter(d => d.xMult >= xFilter)
if (natrFilter > 0) finalData = finalData.filter(d => d.natr !== null && d.natr >= natrFilter)
if (minScore > 0) finalData = finalData.filter(d => d.score >= minScore)
const ageSec = ((Date.now() - densityCache.ts) / 1000).toFixed(0)
return { ...densityCache.meta, xFilter, natrFilter, data: finalData, cached: true, cacheAgeSec: Number(ageSec) }
}
// No cache at all — return empty (warmup will fill it)
reply.code(503)
return { count: 0, data: [], cached: false, message: 'Warming up, try again in 30s' }
}
// Optional limit (0 = no limit, scan all)
const limitSymbols = Number(req.query.limitSymbols || 0)
if (limitSymbols > 0 && symbols.length > limitSymbols) {
symbols = symbols.slice(0, limitSymbols)
}
const marks = await bgetWithRetry('/fapi/v1/premiumIndex')
const markMap = new Map(marks.map(m => [m.symbol, Number(m.markPrice)]))
// Delay between items to stay under Binance rate limit (2400/min)
// Full scan: 500 symbols × ~3 calls each = 1500 calls, concurrency 3, delay 500ms
// → ~6 calls/sec → ~360/min (safe margin)
const itemDelay = isSpecificSymbols ? 0 : 500
const rowsArr = await mapLimit(symbols, concurrency, async (sym) => {
const price = markMap.get(sym)
if (!price) return []
// 1. If not yet WS-subscribed: for full scans, skip (no data yet).
// For specific symbol queries (charts), fetch depth on demand.
if (!wsManager.callbacks.has(sym)) {
if (!isSpecificSymbols) {
return []; // Full scan: skip unsubscribed symbols, they'll warm up via chart views
}
try {
const ob = await bgetWithRetry(`/fapi/v1/depth?symbol=${encodeURIComponent(sym)}&limit=1000`);
stateManager.initBook(sym, ob.bids, ob.asks);
wsManager.subscribe(sym, (payload) => { stateManager.processDelta(sym, payload); });
} catch (err) {
console.log(`[density] Skip ${sym}: ${err.message.slice(0, 80)}`);
return [];
}
}
// 2. Get local state from memory (from WS deltas)
const bidLevelsRaw = stateManager.getTopLevels(sym, 'bid', price, minNotional, depthLimit, windowPct);
const askLevelsRaw = stateManager.getTopLevels(sym, 'ask', price, minNotional, depthLimit, windowPct);
// Получить K-lines для объёмов и ATR
const klinesStats = await getKlinesWithStats(sym)
if (!klinesStats) return [] // skip symbol if klines unavailable
// 3. Binning & Density Analysis v2 (x-multiplier based)
const avg5mVol = (klinesStats.vol1 + klinesStats.vol2 + klinesStats.vol3 + klinesStats.vol4 + klinesStats.vol5) / 5;
const processSide = (levels, sideKey) => {
const BIN_SIZE_PCT = 0.1;
const rawBins = binLevels(levels, BIN_SIZE_PCT);
const validBins = rawBins.filter(b => b.notional >= minNotional);
const trackedBins = stateManager.trackAndEnrichBins(sym, sideKey, validBins, price);
const scoredBins = trackedBins.map(bin => {
const behavior = analyzeBehavior(bin, price, klinesStats.natr, avg5mVol);
// x-multiplier filter: only walls >= xFilter (default x4)
const minX = xFilter > 0 ? xFilter : 4;
if (behavior.xMult < minX) return null;
let tte = Infinity;
if (avg5mVol > 0) {
tte = bin.notional / (avg5mVol / 5);
}
return {
symbol: sym,
sideKey,
price: Math.round(bin.anchorPrice * 10000) / 10000,
notional: bin.notional,
distancePct: Math.round(behavior.distancePct * 100) / 100,
lifetimeMins: Math.round(behavior.lifetimeMins * 10) / 10,
score: behavior.trustScore,
xMult: Math.round(behavior.xMult * 10) / 10,
severity: behavior.severity,
tags: behavior.tags,
levelsCount: bin.levelsCount,
natr: klinesStats.natr,
avg5mVol: Math.round(avg5mVol),
vol1: klinesStats.vol1,
vol2: klinesStats.vol2,
vol3: klinesStats.vol3,
vol4: klinesStats.vol4,
vol5: klinesStats.vol5,
timeToEatMinutes: tte
};
}).filter(Boolean);
scoredBins.sort((a, b) => {
if (b.score !== a.score) return b.score - a.score;
return a.distancePct - b.distancePct;
});
// Top 2 per side (best bid wall + best ask wall)
return scoredBins.slice(0, 2);
};
const bidResult = processSide(bidLevelsRaw, 'bid');
const askResult = processSide(askLevelsRaw, 'ask');
return [...bidResult, ...askResult];
}, itemDelay);
// All levels flat, sorted by score desc
const allLevels = rowsArr.flat().sort((a, b) => b.score - a.score);
// Top 3 per symbol (best bid + best ask + next best)
const perSymbol = {};
for (const entry of allLevels) {
if (!perSymbol[entry.symbol]) perSymbol[entry.symbol] = [];
if (perSymbol[entry.symbol].length < 3) {
perSymbol[entry.symbol].push(entry);
}
}
let finalData = Object.values(perSymbol).flat();
finalData.sort((a, b) => b.score - a.score);
// Фильтрация по NATR (если natrFilter > 0, показываем только уровни с natr >= natrFilter%)
if (natrFilter > 0) {
finalData = finalData.filter(d => d.natr !== null && d.natr >= natrFilter)
}
// Фильтрация по Score
if (minScore > 0) {
finalData = finalData.filter(d => d.score >= minScore)
}
// Cache full unfiltered data for subsequent requests
if (!isSpecificSymbols) {
// Store unfiltered data (before xFilter/natrFilter/minScore applied by params)
// allLevels already has all scored walls, perSymbol top 3 = finalData before natr/score filters
const unfilteredData = Object.values(perSymbol).flat()
unfilteredData.sort((a, b) => b.score - a.score)
const meta = { count: unfilteredData.length, minNotional, depthLimit, concurrency, mmMode, windowPct, mmMultiplier }
densityCache = { data: unfilteredData, meta, ts: Date.now() }
// Persist to disk so data survives PM2 restarts
saveDensityToDisk(unfilteredData, meta)
}
const result = {
count: finalData.length,
minNotional,
depthLimit,
concurrency,
mmMode,
windowPct,
mmMultiplier,
xFilter,
natrFilter,
data: finalData
}
return result
})
// ---- Density V2: Statistical Walls + Imbalance ----
const densityV2PersistenceMap = new Map()
let densityV2Cache = { data: null, ts: 0 }
const DENSITY_V2_CACHE_TTL = 15000 // 15 seconds
// Persistence map disk save/load (survive PM2 restarts)
const PERSISTENCE_MAP_FILE = require('path').join(__dirname, '..', 'data', 'density-persistence.json')
function savePersistenceMapToDisk() {
try {
const dir = require('path').dirname(PERSISTENCE_MAP_FILE)
if (!require('fs').existsSync(dir)) require('fs').mkdirSync(dir, { recursive: true })
const obj = {}
for (const [key, val] of densityV2PersistenceMap.entries()) {
obj[key] = val
}
require('fs').promises.writeFile(PERSISTENCE_MAP_FILE, JSON.stringify({ map: obj, ts: Date.now() }))
.catch(e => console.log('[persistence-map] disk save error:', e.message))
} catch (e) { console.log('[persistence-map] disk save error:', e.message) }
}
function loadPersistenceMapFromDisk() {
try {
if (!require('fs').existsSync(PERSISTENCE_MAP_FILE)) return
const raw = JSON.parse(require('fs').readFileSync(PERSISTENCE_MAP_FILE, 'utf8'))
// Accept up to 30 minutes old (walls older than that are stale anyway)
if (!raw || !raw.map || (Date.now() - raw.ts) > 1800000) return
const now = Date.now()
let loaded = 0
for (const [key, val] of Object.entries(raw.map)) {
// Skip entries not seen in last 5 min (same as cleanupPersistence)
if (val.lastSeen && (now - val.lastSeen) > 300000) continue
densityV2PersistenceMap.set(key, val)
loaded++
}
if (loaded > 0) console.log(`[persistence-map] Loaded ${loaded} wall records from disk (age: ${((Date.now() - raw.ts) / 1000).toFixed(0)}s)`)
} catch (e) { console.log('[persistence-map] disk load error:', e.message) }
}
// Load on startup
loadPersistenceMapFromDisk()
// Save every 30s (lightweight — typically a few hundred entries)
_intervals.push(setInterval(savePersistenceMapToDisk, 30000))
// Cleanup persistence every 60s
_intervals.push(setInterval(() => densityV2.cleanupPersistence(densityV2PersistenceMap), 60000))
fastify.get('/densities/v2', async (req) => {
const windowPct = Number(req.query.windowPct || 2)
const nSigma = Number(req.query.nSigma || 2)
const minVolume24h = Number(req.query.minVolume24h || 50000000) // $50M default
const minImbalance = Number(req.query.minImbalance || 0) // 0 = show all
const isSpecific = !!req.query.symbols
const forceRefresh = req.query.force === 'true'
// Return cached data if fresh enough (full scan only)
if (!isSpecific && !forceRefresh && densityV2Cache.data && (Date.now() - densityV2Cache.ts) < DENSITY_V2_CACHE_TTL) {
let filtered = [...densityV2Cache.data]
if (minImbalance > 0) filtered = filtered.filter(d => Math.abs(d.imbalance) >= minImbalance)
return { count: filtered.length, data: filtered, cached: true, cacheAgeSec: Math.round((Date.now() - densityV2Cache.ts) / 1000) }
}
// Get 24h ticker for volume filter
let ticker24h
try {
ticker24h = await bgetWithRetry('/fapi/v1/ticker/24hr')
} catch (err) {
return { count: 0, data: [], error: 'Ticker data temporarily unavailable' }
}
const volumeMap = new Map(ticker24h.map(t => [t.symbol, Number(t.quoteVolume)]))
// Get mark prices
const marks = await bgetWithRetry('/fapi/v1/premiumIndex')
const markMap = new Map(marks.map(m => [m.symbol, Number(m.markPrice)]))
// Determine symbols to analyze
let symbols
if (isSpecific) {
symbols = String(req.query.symbols).split(',').map(s => s.trim().toUpperCase()).filter(Boolean)
} else {
// All subscribed symbols filtered by volume
symbols = [...wsManager.callbacks.keys()].filter(sym => {
const vol = volumeMap.get(sym) || 0
return vol >= minVolume24h && markMap.has(sym)
})
}
const results = []
for (const sym of symbols) {
const price = markMap.get(sym)
if (!price) continue
// If not yet WS-subscribed and specific symbol requested: fetch depth on demand
if (!wsManager.callbacks.has(sym)) {
if (!isSpecific) continue // Full scan: skip unsubscribed
try {
const ob = await bgetWithRetry(`/fapi/v1/depth?symbol=${encodeURIComponent(sym)}&limit=1000`)
stateManager.initBook(sym, ob.bids, ob.asks)
wsManager.subscribe(sym, (payload) => { stateManager.processDelta(sym, payload) })
} catch (err) {
continue
}
}
// Get raw levels from WS state (wider window for analysis, windowPct for filtering)
const bidLevels = stateManager.getTopLevels(sym, 'bid', price, 0, 500, windowPct + 1)
const askLevels = stateManager.getTopLevels(sym, 'ask', price, 0, 500, windowPct + 1)
if (bidLevels.length < 3 && askLevels.length < 3) continue // not enough data
try {
const analysis = densityV2.analyzeSymbol({
symbol: sym,
markPrice: price,
bidLevels,
askLevels,
persistenceMap: densityV2PersistenceMap,
windowPct,
nSigma
})
// Only include if has at least one wall
if (analysis.wallCount > 0) {
// Add volume info
analysis.volume24h = volumeMap.get(sym) || 0
// Calculate erosion time for each wall (avg 5m volume from last 14 candles)
let avgVol5m = 0
try {
const candles5m = klinesCache.getCandles(sym, '5m', 14)
if (candles5m && candles5m.length >= 3) {
avgVol5m = candles5m.reduce((sum, c) => sum + c.volume, 0) / candles5m.length
}
} catch (_) {}
const addErosion = (wall) => {
if (!wall) return wall
wall.erosionMins = avgVol5m > 0
? Math.round(wall.notional * 5 / avgVol5m * 10) / 10
: null
return wall
}
addErosion(analysis.support)
addErosion(analysis.resistance)
if (analysis.bidWalls) analysis.bidWalls.forEach(addErosion)
if (analysis.askWalls) analysis.askWalls.forEach(addErosion)
results.push(analysis)
}
} catch (err) {
// Skip problematic symbols silently
continue
}
}
// Sort by strongest wall score (support or resistance, whichever is bigger)
results.sort((a, b) => {
const scoreA = Math.max(a.support?.score || 0, a.resistance?.score || 0)
const scoreB = Math.max(b.support?.score || 0, b.resistance?.score || 0)
return scoreB - scoreA
})
// Cache for full scans
if (!isSpecific) {
densityV2Cache = { data: results, ts: Date.now() }
}
let finalData = results
if (minImbalance > 0) finalData = finalData.filter(d => Math.abs(d.imbalance) >= minImbalance)
return { count: finalData.length, data: finalData, cached: false }
})
// Cache stats endpoint
fastify.get('/_cache/stats', async () => ({
size: cache.size,
keys: [...cache.keys()]
}))
// ---- Binance Proxy for Mini-Charts (cached) ----
const proxyCache = new Map()
const PROXY_MAX_TTL_MS = 600000 // 10 min max TTL for cleanup (NATR read TTL is 600s, must survive refresh gaps)
// Cleanup expired proxy cache entries every 30 seconds
_intervals.push(setInterval(() => {
const now = Date.now()
for (const [key, entry] of proxyCache.entries()) {
if (now - entry.ts > PROXY_MAX_TTL_MS) proxyCache.delete(key)
}
}, 30000))
function getProxyCached(key, ttlMs) {
const entry = proxyCache.get(key)
if (entry && Date.now() - entry.ts < ttlMs) return entry.data
return null
}
function setProxyCached(key, data) {
proxyCache.set(key, { data, ts: Date.now() })
}
// 24hr ticker — cached 30s (all pairs, heavy endpoint)
fastify.get('/api/ticker24hr', async () => {
const cached = getProxyCached('ticker24hr', 60000)
if (cached) return cached
const data = await bgetWithRetry('/fapi/v1/ticker/24hr')
setProxyCached('ticker24hr', data)
return data
})
// Klines — SQLite cache first, Binance fallback
fastify.get('/api/klines', async (req) => {
const symbol = String(req.query.symbol || '').toUpperCase()
const interval = String(req.query.interval || '15m')
const limit = Math.min(Number(req.query.limit || 200), 1500)
const endTime = req.query.endTime ? Number(req.query.endTime) : null
const startTime = req.query.startTime ? Number(req.query.startTime) : null
if (!symbol) return { error: 'symbol required' }
// Delta request: return only candles after startTime (for client cache sync)
if (startTime) {
const delta = klinesCache.getCandlesAfter(symbol, interval, Math.floor(startTime / 1000))
if (delta.length > 0) return { cached: true, data: delta }
// Fallback: fetch from Binance
const data = await bgetWithRetry(`/fapi/v1/klines?symbol=${encodeURIComponent(symbol)}&interval=${interval}&limit=${limit}&startTime=${startTime}`)
if (Array.isArray(data)) klinesCache.storeCandles(symbol, interval, data)
return data
}
// Historical pagination (endTime) — check SQLite first
if (endTime) {
const beforeSec = Math.floor(endTime / 1000)
const cached = klinesCache.getCandlesBefore(symbol, interval, beforeSec, limit)
if (cached.length >= limit * 0.8) {
// Return in Binance-compatible format for client parseKlines()
return cached.map(c => [c.time * 1000, String(c.open), String(c.high), String(c.low), String(c.close), String(c.volume)])
}
// Not enough in cache — fetch from Binance and cache
const key = `klines:${symbol}:${interval}:${limit}:end${endTime}`
const proxyCached = getProxyCached(key, 300000)
if (proxyCached) return proxyCached
const data = await bgetWithRetry(`/fapi/v1/klines?symbol=${encodeURIComponent(symbol)}&interval=${interval}&limit=${limit}&endTime=${endTime}`)
if (Array.isArray(data)) {
setProxyCached(key, data)
klinesCache.storeCandles(symbol, interval, data)
}
return data
}
// Latest candles — try SQLite cache (fresh or stale with background refresh)
const cachedCount = klinesCache.getCount(symbol, interval)
if (cachedCount >= limit) {
const latestTime = klinesCache.getLatestTime(symbol, interval)
const age = Date.now() / 1000 - (latestTime || 0)
if (age < 300) {
const rows = klinesCache.getCandles(symbol, interval, limit)
const result = rows.map(c => [c.time * 1000, String(c.open), String(c.high), String(c.low), String(c.close), String(c.volume)])
// Stale cache (>60s) — return immediately but trigger background refresh
if (age >= 60) {
bgetWithRetry(`/fapi/v1/klines?symbol=${encodeURIComponent(symbol)}&interval=${interval}&limit=3`)
.then(data => { if (Array.isArray(data)) klinesCache.storeCandles(symbol, interval, data) })
.catch(() => {})
}
return result
}
}
// Cache stale or miss — fetch from Binance, update SQLite
const key = `klines:${symbol}:${interval}:${limit}`
const proxyCached = getProxyCached(key, 10000)
if (proxyCached) return proxyCached
const data = await bgetWithRetry(`/fapi/v1/klines?symbol=${encodeURIComponent(symbol)}&interval=${interval}&limit=${limit}`)
if (Array.isArray(data)) {
setProxyCached(key, data)
klinesCache.storeCandles(symbol, interval, data)
}
return data
})
// Test signal — inject a fake signal for notification testing (auto-expires in 60s)
// ---- Web Push API ----
// Get VAPID public key (client needs this to subscribe)
fastify.get('/api/push/vapid-key', async () => {
const key = push.getVapidPublicKey()
if (!key) return { success: false, error: 'Push not configured' }
return { success: true, key }
})
// Subscribe to push notifications
fastify.post('/api/push/subscribe', async (req) => {
const { subscription, filters } = req.body || {}
if (!subscription?.endpoint || !subscription?.keys?.p256dh || !subscription?.keys?.auth) {
return { success: false, error: 'Invalid subscription' }
}
auth.stmts.upsertPushSub.run(
subscription.endpoint,
subscription.keys.p256dh,
subscription.keys.auth,
JSON.stringify(filters || {})
)
const count = auth.stmts.countPushSubs.get()?.count || 0
console.log(`[Push] New subscription registered (total: ${count})`)
return { success: true, total: count }
})
// Unsubscribe from push notifications
fastify.post('/api/push/unsubscribe', async (req) => {
const { endpoint } = req.body || {}
if (!endpoint) return { success: false, error: 'Missing endpoint' }
auth.stmts.deletePushSub.run(endpoint)
return { success: true }
})
// ---- Test Signal ----
fastify.get('/api/signals/test', async (req, reply) => {
// Require authenticated user to prevent public abuse
if (!req.user) return reply.code(401).send({ error: 'Auth required' })
const sig = {
id: `test-${Date.now()}`,
type: 'volume_spike',
symbol: 'BTCUSDT',
direction: 'LONG',
price: 94500,
confidence: 85,
description: 'Test signal — Volume 5.2x above SMA20 average',
metadata: { ratio: 5.2, currentVol: 12000000, avgVol: 2300000 },
created_at: new Date().toISOString(),
}
// Remove any old test signals first (splice to keep array reference — don't reassign!)
for (let i = signals.liveSignals.length - 1; i >= 0; i--) {
if (String(signals.liveSignals[i].id).startsWith('test-')) signals.liveSignals.splice(i, 1)
}
signals.liveSignals.unshift(sig)
// Auto-remove after 60s (splice to keep reference)
setTimeout(() => {
const idx = signals.liveSignals.indexOf(sig)
if (idx >= 0) signals.liveSignals.splice(idx, 1)
}, 60_000)
// Test signals do NOT trigger push — only real signals do
return { success: true, signal: sig, pushEnabled: push.isEnabled() }
})
// OI history — proxied from Binance /futures/data/openInterestHist
fastify.get('/api/oi-history', async (req, reply) => {
const symbol = String(req.query.symbol || '').toUpperCase()
const period = String(req.query.period || '5m')
const limit = Math.min(Number(req.query.limit || 500), 500)
if (!symbol) return { error: 'symbol required' }
const key = `oiHist:${symbol}:${period}:${limit}`
const cached = getProxyCached(key, 60000) // cache 1 min
if (cached) return cached
try {
const url = `/futures/data/openInterestHist?symbol=${encodeURIComponent(symbol)}&period=${period}&limit=${limit}`
const data = await bgetWithRetry(url)
setProxyCached(key, data)
return data
} catch (e) {
reply.code(503)
return { error: 'Failed to fetch OI history', message: e.message }
}
})
// Batch klines — SQLite cache first, Binance fallback
fastify.post('/api/klines-batch', async (req) => {
const symbols = req.body?.symbols
const interval = String(req.body?.interval || '15m')
const limit = Math.min(Number(req.body?.limit || 200), 1500)
if (!Array.isArray(symbols) || symbols.length === 0) return { error: 'symbols[] required' }
const syms = symbols.slice(0, 30).map(s => String(s).toUpperCase())
const result = {}
const needFetch = []
// Try SQLite cache first for each symbol (fresh <60s instant, stale <300s with bg refresh)
const nowSec = Math.floor(Date.now() / 1000)
const bgRefresh = [] // symbols with stale cache — refresh in background
for (const symbol of syms) {
const cachedCount = klinesCache.getCount(symbol, interval)
if (cachedCount >= limit) {
const latestTime = klinesCache.getLatestTime(symbol, interval)
const age = latestTime ? nowSec - latestTime : Infinity
if (age <= 300) {
const rows = klinesCache.getCandles(symbol, interval, limit)
result[symbol] = rows.map(c => [c.time * 1000, String(c.open), String(c.high), String(c.low), String(c.close), String(c.volume)])
if (age > 60) bgRefresh.push(symbol) // schedule background update
} else {
needFetch.push(symbol) // too stale (>5min) — must refetch
}
} else {
needFetch.push(symbol)
}
}
// Background refresh for stale-but-usable cache (non-blocking)
if (bgRefresh.length > 0) {
Promise.allSettled(bgRefresh.map(symbol =>
bgetWithRetry(`/fapi/v1/klines?symbol=${encodeURIComponent(symbol)}&interval=${interval}&limit=3`)
.then(data => { if (Array.isArray(data)) klinesCache.storeCandles(symbol, interval, data) })
)).catch(() => {})
}
// Fetch uncached from Binance in parallel
if (needFetch.length > 0) {
const promises = needFetch.map(async (symbol) => {
try {
const key = `klines:${symbol}:${interval}:${limit}`
let data = getProxyCached(key, 10000)
if (!data) {
data = await bgetWithRetry(`/fapi/v1/klines?symbol=${encodeURIComponent(symbol)}&interval=${interval}&limit=${limit}`)
if (data) {
setProxyCached(key, data)
klinesCache.storeCandles(symbol, interval, data)
}
}
if (Array.isArray(data)) result[symbol] = data
} catch(e) { /* skip */ }
})
await Promise.all(promises).catch(e => console.error('[klines-batch] Error:', e.message))
}
return result
})
// NATR(14) for all USDT pairs — cached 5min
fastify.get('/api/natr', async (req) => {
const interval = String(req.query.interval || '15m')
const cached = getProxyCached(`natr:${interval}`, 300000) // 5 min cache
if (cached) return cached
// Get all USDT pairs from ticker
const tickerCached = getProxyCached('ticker24hr', 30000)
const ticker = tickerCached || await bgetWithRetry('/fapi/v1/ticker/24hr')
if (!tickerCached) setProxyCached('ticker24hr', ticker)
const usdtPairs = ticker
.filter(t => t.symbol.endsWith('USDT') && !t.symbol.includes('_'))
.filter(t => parseFloat(t.quoteVolume) > 10000000) // >$10M vol only
.sort((a, b) => parseFloat(b.quoteVolume) - parseFloat(a.quoteVolume))
.slice(0, 200) // top 200 by volume
// Fetch klines in parallel batches of 20
const result = {}
const batchSize = 20
for (let i = 0; i < usdtPairs.length; i += batchSize) {
const batch = usdtPairs.slice(i, i + batchSize)
const promises = batch.map(async (t) => {
try {
const key = `klines:${t.symbol}:${interval}:50`
let klines = getProxyCached(key, 10000)
if (!klines) {
klines = await bgetWithRetry(`/fapi/v1/klines?symbol=${t.symbol}&interval=${interval}&limit=50`)
setProxyCached(key, klines)
}
if (!Array.isArray(klines) || klines.length < 15) return
// Calculate ATR(14)
const candles = klines.map(k => ({
high: parseFloat(k[2]),
low: parseFloat(k[3]),
close: parseFloat(k[4]),
}))
let trSum = 0
for (let j = candles.length - 14; j < candles.length; j++) {
const h = candles[j].high
const l = candles[j].low
const pc = candles[j - 1].close
trSum += Math.max(h - l, Math.abs(h - pc), Math.abs(l - pc))
}
const atr = trSum / 14
const lastClose = candles[candles.length - 1].close
if (lastClose > 0) result[t.symbol] = parseFloat(((atr / lastClose) * 100).toFixed(2))
} catch(e) { /* skip pair */ }
})
await Promise.all(promises).catch(e => console.error('[natr] Unexpected error:', e.message))
// Small delay between batches to avoid rate limits
if (i + batchSize < usdtPairs.length) await new Promise(r => setTimeout(r, 200))
}
setProxyCached(`natr:${interval}`, result)
return result
})
// Klines cache stats
fastify.get('/api/klines-cache/stats', async () => {
try { return klinesCache.getStats() } catch(e) { return { error: e.message } }
})
// Background klines updater — refreshes cached symbols every 30s
let _klinesUpdaterInterval = null
function startKlinesUpdater() {
const UPDATE_INTERVAL = 30000 // 30s
const BATCH_SIZE = 10 // symbols per batch
const BATCH_DELAY = 2000 // 2s between batches (rate-limit safe)
_klinesUpdaterInterval = setInterval(async () => {
try {
// Get the current TF from most common cached interval
const intervals = ['1m', '5m', '15m', '1h', '4h'] // all used TFs
for (const interval of intervals) {
const symbols = klinesCache.getCachedSymbols(interval)
if (symbols.length === 0) continue
// Process in batches
for (let i = 0; i < symbols.length; i += BATCH_SIZE) {
const batch = symbols.slice(i, i + BATCH_SIZE)
const promises = batch.map(async (symbol) => {
try {
// Fetch only last 3 candles (latest + 2 for safety)
const data = await bgetWithRetry(`/fapi/v1/klines?symbol=${encodeURIComponent(symbol)}&interval=${interval}&limit=3`)
if (Array.isArray(data) && data.length > 0) {
klinesCache.storeCandles(symbol, interval, data)
}
} catch(e) { /* skip individual failures */ }
})
await Promise.all(promises)
if (i + BATCH_SIZE < symbols.length) {
await new Promise(r => setTimeout(r, BATCH_DELAY))
}
}
}
} catch(e) {
console.error('[KlinesUpdater] Error:', e.message)
}
}, UPDATE_INTERVAL)
console.log('[KlinesUpdater] Started (30s interval)')
}
// ---- rate limiter status endpoint ----
fastify.get('/api/rate-limiter', async () => ({
usedWeight: rateLimiter.usedWeight,
weightUpdatedAt: rateLimiter.weightUpdatedAt,
weightAge: Date.now() - rateLimiter.weightUpdatedAt,
pauseUntil: rateLimiter.pauseUntil,
pauseRemaining: Math.max(0, rateLimiter.pauseUntil - Date.now()),
softLimit: rateLimiter.WEIGHT_SOFT_LIMIT,
hardLimit: rateLimiter.WEIGHT_HARD_LIMIT,
binanceLimit: 2400,
status: rateLimiter.pauseUntil > Date.now() ? 'PAUSED'
: rateLimiter.usedWeight >= rateLimiter.WEIGHT_HARD_LIMIT ? 'HARD_THROTTLE'
: rateLimiter.usedWeight >= rateLimiter.WEIGHT_SOFT_LIMIT ? 'SOFT_THROTTLE'
: 'OK'
}))
// ---- start ----
const port = Number(process.env.PORT || 3200)
const start = async () => {
try {
await fastify.listen({ port, host: '0.0.0.0' })
fastify.log.info(`listening on 0.0.0.0:${port}`)
// Init signals scanner (after server up so proxyCache is available)
push.init({ stmts: auth.stmts })
// Init klines SQLite cache (before signals so liq_sweep can use it)
klinesCache.initDB()
signals.init({ getProxyCached, setProxyCached, bgetWithRetry, auth, push, klinesCache, stateManager, densityV2, persistenceMap: densityV2PersistenceMap })
// Start background klines updater (every 30s, updates cached symbols)
startKlinesUpdater()
// Pre-warm NATR cache so signals scanner has data from first scan
setTimeout(async () => {
try {
console.log('[startup] Pre-warming NATR cache (15m)...')
await fastify.inject({ method: 'GET', url: '/api/natr?interval=15m' })
console.log('[startup] NATR cache warmed ✓')
} catch (e) { console.warn('[startup] NATR warmup failed:', e.message) }
}, 5_000) // 5s after start — let ticker cache populate first
// Periodic NATR refresh every 5 min (cache TTL is 5min, so re-compute before expiry)
_intervals.push(setInterval(async () => {
try {
await fastify.inject({ method: 'GET', url: '/api/natr?interval=15m' })
} catch (e) { console.warn('[NATR-refresh] failed:', e.message) }
}, 270_000)) // 4.5 min (slightly before 5min TTL expiry)
// Background warmup: subscribe top symbols to WS gradually (rate-limit safe)
warmupDensitySubscriptions()
} catch (err) {
fastify.log.error(err)
process.exit(1)
}
}
// Graceful shutdown — clean up resources on PM2 restart / kill
async function gracefulShutdown(signal) {
console.log(`[shutdown] ${signal} received, closing gracefully...`)
try {
// Stop all intervals
for (const id of _intervals) clearInterval(id)
if (_klinesUpdaterInterval) clearInterval(_klinesUpdaterInterval)
// Stop signal scanners
try { signals.stop() } catch (_) {}
console.log(`[shutdown] Cleared ${_intervals.length + 1} interval(s) + signal scanners`)
// Close Fastify (stop accepting new requests, finish in-flight)
await fastify.close()
// Close WebSocket connections
if (wsManager.connections && wsManager.connections.length > 0) {
console.log(`[shutdown] Closing ${wsManager.connections.length} WS connection(s)...`)
for (const conn of wsManager.connections) {
conn.destroy()
}
wsManager.connections = []
wsManager.streamToConn.clear()
wsManager.callbacks.clear()
}
// Save density cache + persistence map to disk before exit
if (densityCache.data) {
saveDensityToDisk(densityCache.data, densityCache.meta)
}
if (densityV2PersistenceMap.size > 0) {
savePersistenceMapToDisk()
}
// Give async writes a moment to flush
await new Promise(r => setTimeout(r, 300))
console.log(`[shutdown] Clean exit.`)
process.exit(0)
} catch (err) {
console.error('[shutdown] Error during cleanup:', err.message)
process.exit(1)
}
}
process.on('SIGTERM', () => gracefulShutdown('SIGTERM'))
process.on('SIGINT', () => gracefulShutdown('SIGINT'))
// Gradually subscribe symbols to depth WS and build density cache
async function warmupDensitySubscriptions() {
try {
const info = await bgetWithRetry('/fapi/v1/exchangeInfo')
const allSymbols = (info.symbols || [])
.filter(s => s.contractType === 'PERPETUAL' && s.quoteAsset === 'USDT' && s.status === 'TRADING')
.map(s => s.symbol)
const BATCH = 10
const BATCH_PAUSE = 20000 // 20s between batches
const ITEM_DELAY = 300 // 300ms between items
console.log(`[warmup] ${allSymbols.length} symbols, ${BATCH}/batch, ${BATCH_PAUSE/1000}s pause`)
let subscribed = 0
let batchRetries = 0
const MAX_BATCH_RETRIES = 3
for (let i = 0; i < allSymbols.length; i += BATCH) {
const batch = allSymbols.slice(i, i + BATCH)
let batchFailed = false
for (const sym of batch) {
if (wsManager.callbacks.has(sym)) { subscribed++; continue }
try {
const ob = await bgetWithRetry(`/fapi/v1/depth?symbol=${encodeURIComponent(sym)}&limit=1000`)
stateManager.initBook(sym, ob.bids, ob.asks)
wsManager.subscribe(sym, (payload) => { stateManager.processDelta(sym, payload) })
subscribed++
} catch (err) {
console.log(`[warmup] Error ${sym}: ${err.message.slice(0, 60)}, pausing 60s... (retry ${batchRetries + 1}/${MAX_BATCH_RETRIES})`)
await new Promise(r => setTimeout(r, 60000))
if (batchRetries < MAX_BATCH_RETRIES) {
batchRetries++
i -= BATCH // retry this batch
} else {
console.log(`[warmup] Skipping batch after ${MAX_BATCH_RETRIES} retries`)
batchRetries = 0
}
batchFailed = true
break
}
await new Promise(r => setTimeout(r, ITEM_DELAY))
}
if (!batchFailed) batchRetries = 0
// After every 5 batches (50 symbols), rebuild density cache
const batchNum = Math.floor(i / BATCH) + 1
if (batchNum % 5 === 0 || i + BATCH >= allSymbols.length) {
try { await rebuildDensityCache(allSymbols) } catch (_) {}
console.log(`[warmup] ${subscribed}/${allSymbols.length} subscribed, cache: ${densityCache.data ? densityCache.data.length : 0} walls`)
}
if (i + BATCH < allSymbols.length) {
await new Promise(r => setTimeout(r, BATCH_PAUSE))
}
}
console.log(`[warmup] Done: ${subscribed} symbols`)
// After density warmup, pre-warm klines cache for fast chart loads
warmupKlinesCache()
} catch (err) {
console.log(`[warmup] Failed: ${err.message.slice(0, 100)}`)
}
}
// Pre-warm klines cache: top 200 by volume × main TFs → instant chart opens
async function warmupKlinesCache() {
try {
// Get top 200 symbols by 24h volume
const ticker = await bgetWithRetry('/fapi/v1/ticker/24hr')
const sorted = ticker
.filter(t => t.symbol.endsWith('USDT'))
.sort((a, b) => Number(b.quoteVolume) - Number(a.quoteVolume))
.slice(0, 200)
.map(t => t.symbol)
const TFS = ['15m', '1h', '4h']
const LIMITS = { '15m': 1500, '1h': 1500, '4h': 1500 }
const total = sorted.length * TFS.length
let done = 0, cached = 0
console.log(`[klines-warmup] Starting: ${sorted.length} symbols × ${TFS.length} TFs = ${total} requests`)
for (const tf of TFS) {
for (let i = 0; i < sorted.length; i++) {
const sym = sorted[i]
const limit = LIMITS[tf]
const key = `klines:${sym}:${tf}:${limit}`
// Skip if already cached
if (getProxyCached(key, 300000)) { done++; cached++; continue }
const MAX_SYMBOL_RETRIES = 3
let symbolRetries = 0
let success = false
while (!success && symbolRetries <= MAX_SYMBOL_RETRIES) {
try {
const data = await bgetWithRetry(`/fapi/v1/klines?symbol=${encodeURIComponent(sym)}&interval=${tf}&limit=${limit}`)
setProxyCached(key, data)
// Persist to SQLite so data survives proxy cache expiry and PM2 restarts
if (klinesCache && Array.isArray(data)) {
try { klinesCache.storeCandles(sym, tf, data) } catch (_) {}
}
done++
success = true
} catch (err) {
symbolRetries++
if (symbolRetries > MAX_SYMBOL_RETRIES) {
console.log(`[klines-warmup] Skipping ${sym}:${tf} after ${MAX_SYMBOL_RETRIES} retries`)
done++
break
}
console.log(`[klines-warmup] Rate limited at ${done}/${total} (${sym} retry ${symbolRetries}/${MAX_SYMBOL_RETRIES}), pausing 30s...`)
await new Promise(r => setTimeout(r, 30000))
}
}
// Throttle: 150ms between requests (concurrency 1, ~400 req/min safe)
await new Promise(r => setTimeout(r, 150))
// Progress every 100 requests
if (done % 100 === 0) {
console.log(`[klines-warmup] ${done}/${total} (${cached} from cache)`)
}
}
}
console.log(`[klines-warmup] Done: ${done}/${total} cached (${cached} already had)`)
} catch (err) {
console.log(`[klines-warmup] Failed: ${err.message.slice(0, 100)}`)
}
}
// Rebuild density cache from currently subscribed symbols (no Binance depth calls)
async function rebuildDensityCache(allSymbols) {
const marks = await bgetWithRetry('/fapi/v1/premiumIndex')
const markMap = new Map(marks.map(m => [m.symbol, Number(m.markPrice)]))
const subscribedSyms = allSymbols.filter(s => wsManager.callbacks.has(s))
const allWalls = []
let skipped = 0
for (const sym of subscribedSyms) {
const price = markMap.get(sym)
if (!price) continue
const bidLevelsRaw = stateManager.getTopLevels(sym, 'bid', price, 0, 100, 5.0)
const askLevelsRaw = stateManager.getTopLevels(sym, 'ask', price, 0, 100, 5.0)
const klinesStats = await getKlinesWithStats(sym)
if (!klinesStats) { skipped++; continue }
const avg5mVol = (klinesStats.vol1 + klinesStats.vol2 + klinesStats.vol3 + klinesStats.vol4 + klinesStats.vol5) / 5
const processSide = (levels, sideKey) => {
const BIN_SIZE_PCT = 0.1
const rawBins = binLevels(levels, BIN_SIZE_PCT)
const validBins = rawBins.filter(b => b.notional >= 0)
const trackedBins = stateManager.trackAndEnrichBins(sym, sideKey, validBins, price)
return trackedBins.map(bin => {
const behavior = analyzeBehavior(bin, price, klinesStats.natr, avg5mVol)
if (behavior.xMult < 4) return null
let tte = avg5mVol > 0 ? bin.notional / (avg5mVol / 5) : Infinity
return {
symbol: sym, sideKey, price: Math.round(bin.anchorPrice * 10000) / 10000,
notional: bin.notional, distancePct: Math.round(behavior.distancePct * 100) / 100,
lifetimeMins: Math.round(behavior.lifetimeMins * 10) / 10,
score: behavior.trustScore, xMult: Math.round(behavior.xMult * 10) / 10,
severity: behavior.severity, tags: behavior.tags, levelsCount: bin.levelsCount,
natr: klinesStats.natr, avg5mVol: Math.round(avg5mVol),
vol1: klinesStats.vol1, vol2: klinesStats.vol2, vol3: klinesStats.vol3,
vol4: klinesStats.vol4, vol5: klinesStats.vol5, timeToEatMinutes: tte
}
}).filter(Boolean).sort((a, b) => b.score - a.score).slice(0, 2)
}
allWalls.push(...processSide(bidLevelsRaw, 'bid'), ...processSide(askLevelsRaw, 'ask'))
}
allWalls.sort((a, b) => b.score - a.score)
// Top 3 per symbol
const perSym = {}
for (const w of allWalls) {
if (!perSym[w.symbol]) perSym[w.symbol] = []
if (perSym[w.symbol].length < 3) perSym[w.symbol].push(w)
}
const finalData = Object.values(perSym).flat().sort((a, b) => b.score - a.score)
const meta = { count: finalData.length, minNotional: 0, depthLimit: 100, concurrency: 0, mmMode: false, windowPct: 5, mmMultiplier: 4 }
densityCache = { data: finalData, meta, ts: Date.now() }
saveDensityToDisk(finalData, meta)
if (skipped) console.warn(`[density] Rebuild: ${skipped}/${subscribedSyms.length} symbols skipped (klines unavailable)`)
}
start()