← Назадconst WebSocket = require('ws');
const config = require('../config');
const { cache, enrichWithGreeks } = require('../services/cache');
const logger = require('../utils/logger');
// Store channels in lowercase for normalized comparison
const VALID_CHANNELS = new Set(['btc', 'eth', 'all', 'top-movers', 'unusual-volume']);
// Map lowercase to canonical form used in symbols
const CHANNEL_CANONICAL = { btc: 'BTC', eth: 'ETH', all: 'all', 'top-movers': 'top-movers', 'unusual-volume': 'unusual-volume' };
function getChannelData(channel) {
if (!cache.options) return null;
if (channel === 'top-movers') {
let items = cache.options
.filter(o => o.priceChange != null && o.priceChange !== '')
.map(o => ({ ...o, _change: parseFloat(o.priceChange) }));
items.sort((a, b) => b._change - a._change);
const gainers = items.slice(0, 10).map(({ _change, ...o }) => o);
const losers = items.slice(-10).reverse().map(({ _change, ...o }) => o);
return { type: 'update', channel: 'top-movers', lastUpdate: cache.lastUpdate, gainers, losers };
}
if (channel === 'unusual-volume') {
let items = cache.options
.filter(o => parseFloat(o.volume || 0) > 0)
.map(o => ({ ...o, _vol: parseFloat(o.volume) }));
if (items.length === 0) {
return { type: 'update', channel: 'unusual-volume', lastUpdate: cache.lastUpdate, count: 0, data: [] };
}
const avgVolume = items.reduce((s, o) => s + o._vol, 0) / items.length;
const threshold = avgVolume * 2;
const unusual = items
.filter(o => o._vol >= threshold)
.sort((a, b) => b._vol - a._vol)
.slice(0, 10)
.map(({ _vol, ...o }) => ({ ...o, volumeRatio: parseFloat((_vol / avgVolume).toFixed(2)) }));
return {
type: 'update', channel: 'unusual-volume', lastUpdate: cache.lastUpdate,
count: unusual.length, data: enrichWithGreeks(unusual),
};
}
let filtered = cache.options;
if (channel !== 'all') {
filtered = filtered.filter(o => o.symbol.startsWith(channel + '-'));
}
const enriched = enrichWithGreeks(filtered);
return { type: 'update', channel, lastUpdate: cache.lastUpdate, count: enriched.length, data: enriched };
}
function setupWebSocket(server) {
const wss = new WebSocket.Server({ server });
function broadcastUpdates() {
wss.clients.forEach(ws => {
if (ws.readyState !== WebSocket.OPEN) return;
const channel = ws._channel || 'all';
const payload = getChannelData(channel);
if (payload) {
ws.send(JSON.stringify(payload));
}
});
}
wss.on('connection', (ws, req) => {
ws._channel = 'all';
ws._alive = true;
const ip = req.headers['x-forwarded-for'] || req.socket.remoteAddress;
logger.info(`[WS] Client connected: ${ip} (default channel: all)`);
ws.on('message', (raw) => {
try {
const msg = JSON.parse(raw);
const key = typeof msg.subscribe === 'string' ? msg.subscribe.toLowerCase() : '';
if (msg.subscribe && VALID_CHANNELS.has(key)) {
ws._channel = CHANNEL_CANONICAL[key] || key;
logger.info(`[WS] ${ip} subscribed to: ${ws._channel}`);
const payload = getChannelData(ws._channel);
if (payload) ws.send(JSON.stringify(payload));
} else if (msg.unsubscribe) {
ws._channel = 'all';
logger.info(`[WS] ${ip} unsubscribed, reset to: all`);
} else {
ws.send(JSON.stringify({ type: 'error', message: 'Invalid channel. Use: BTC, ETH, all, top-movers, unusual-volume' }));
}
} catch {
ws.send(JSON.stringify({ type: 'error', message: 'Invalid JSON' }));
}
});
ws.on('pong', () => { ws._alive = true; });
ws.on('close', () => {
logger.info(`[WS] Client disconnected: ${ip}`);
});
const payload = getChannelData(ws._channel);
if (payload) ws.send(JSON.stringify(payload));
});
const heartbeat = setInterval(() => {
wss.clients.forEach(ws => {
if (!ws._alive) return ws.terminate();
ws._alive = false;
ws.ping();
});
}, config.ws.heartbeatInterval);
wss.on('close', () => clearInterval(heartbeat));
return { wss, broadcastUpdates };
}
module.exports = { setupWebSocket };