← Back
const WebSocket = require('ws');
const config = require('../config');
const { cache, enrichWithGreeks } = require('../services/cache');
const logger = require('../utils/logger');

// Store channels in lowercase for normalized comparison
const VALID_CHANNELS = new Set(['btc', 'eth', 'all', 'top-movers', 'unusual-volume']);
// Map lowercase to canonical form used in symbols
const CHANNEL_CANONICAL = { btc: 'BTC', eth: 'ETH', all: 'all', 'top-movers': 'top-movers', 'unusual-volume': 'unusual-volume' };

function getChannelData(channel) {
  if (!cache.options) return null;

  if (channel === 'top-movers') {
    let items = cache.options
      .filter(o => o.priceChange != null && o.priceChange !== '')
      .map(o => ({ ...o, _change: parseFloat(o.priceChange) }));
    items.sort((a, b) => b._change - a._change);
    const gainers = items.slice(0, 10).map(({ _change, ...o }) => o);
    const losers = items.slice(-10).reverse().map(({ _change, ...o }) => o);
    return { type: 'update', channel: 'top-movers', lastUpdate: cache.lastUpdate, gainers, losers };
  }

  if (channel === 'unusual-volume') {
    let items = cache.options
      .filter(o => parseFloat(o.volume || 0) > 0)
      .map(o => ({ ...o, _vol: parseFloat(o.volume) }));
    if (items.length === 0) {
      return { type: 'update', channel: 'unusual-volume', lastUpdate: cache.lastUpdate, count: 0, data: [] };
    }
    const avgVolume = items.reduce((s, o) => s + o._vol, 0) / items.length;
    const threshold = avgVolume * 2;
    const unusual = items
      .filter(o => o._vol >= threshold)
      .sort((a, b) => b._vol - a._vol)
      .slice(0, 10)
      .map(({ _vol, ...o }) => ({ ...o, volumeRatio: parseFloat((_vol / avgVolume).toFixed(2)) }));
    return {
      type: 'update', channel: 'unusual-volume', lastUpdate: cache.lastUpdate,
      count: unusual.length, data: enrichWithGreeks(unusual),
    };
  }

  let filtered = cache.options;
  if (channel !== 'all') {
    filtered = filtered.filter(o => o.symbol.startsWith(channel + '-'));
  }
  const enriched = enrichWithGreeks(filtered);
  return { type: 'update', channel, lastUpdate: cache.lastUpdate, count: enriched.length, data: enriched };
}

function setupWebSocket(server) {
  const wss = new WebSocket.Server({ server });

  function broadcastUpdates() {
    wss.clients.forEach(ws => {
      if (ws.readyState !== WebSocket.OPEN) return;
      const channel = ws._channel || 'all';
      const payload = getChannelData(channel);
      if (payload) {
        ws.send(JSON.stringify(payload));
      }
    });
  }

  wss.on('connection', (ws, req) => {
    ws._channel = 'all';
    ws._alive = true;
    const ip = req.headers['x-forwarded-for'] || req.socket.remoteAddress;
    logger.info(`[WS] Client connected: ${ip} (default channel: all)`);

    ws.on('message', (raw) => {
      try {
        const msg = JSON.parse(raw);
        const key = typeof msg.subscribe === 'string' ? msg.subscribe.toLowerCase() : '';
        if (msg.subscribe && VALID_CHANNELS.has(key)) {
          ws._channel = CHANNEL_CANONICAL[key] || key;
          logger.info(`[WS] ${ip} subscribed to: ${ws._channel}`);
          const payload = getChannelData(ws._channel);
          if (payload) ws.send(JSON.stringify(payload));
        } else if (msg.unsubscribe) {
          ws._channel = 'all';
          logger.info(`[WS] ${ip} unsubscribed, reset to: all`);
        } else {
          ws.send(JSON.stringify({ type: 'error', message: 'Invalid channel. Use: BTC, ETH, all, top-movers, unusual-volume' }));
        }
      } catch {
        ws.send(JSON.stringify({ type: 'error', message: 'Invalid JSON' }));
      }
    });

    ws.on('pong', () => { ws._alive = true; });

    ws.on('close', () => {
      logger.info(`[WS] Client disconnected: ${ip}`);
    });

    const payload = getChannelData(ws._channel);
    if (payload) ws.send(JSON.stringify(payload));
  });

  const heartbeat = setInterval(() => {
    wss.clients.forEach(ws => {
      if (!ws._alive) return ws.terminate();
      ws._alive = false;
      ws.ping();
    });
  }, config.ws.heartbeatInterval);

  wss.on('close', () => clearInterval(heartbeat));

  return { wss, broadcastUpdates };
}

module.exports = { setupWebSocket };

📜 Git History

06783dafix: options-screener-v2 — 9 bug fixes, 5 strategy improvements, 2 infra enhancements3 months ago
163bb5dfeat: migrate to options-screener-v2 folder to isolate deployment4 months ago
Show last diff
Loading...