โ† Back
โ˜†
const WebSocket = require('ws');
const { createLogger } = require('./logger');
const log = createLogger('ws');

const BINANCE_WS_URL = 'wss://fstream.binance.com/ws';
const PING_INTERVAL = 3 * 60 * 1000; // 3 min (Binance closes idle after 5 min)
const MAX_STREAMS_PER_CONN = 190; // Binance limit 200, leave margin

class BinanceWSConnection {
  constructor(id, onMessage, onReconnect) {
    this.id = id;
    this.ws = null;
    this.streams = new Set();
    this.onMessage = onMessage;
    this.onReconnect = onReconnect;
    this._pingTimer = null;
    this._reconnectTimer = null;
    this._hasConnectedBefore = false;
  }

  connect() {
    if (this.streams.size === 0) return;
    // Already connecting โ€” just wait for 'open' to resubscribe
    if (this.ws && this.ws.readyState === WebSocket.CONNECTING) return;
    this._cleanup();

    log.info({ connId: this.id, streams: this.streams.size }, 'Connecting');
    this.ws = new WebSocket(BINANCE_WS_URL);

    this.ws.on('open', () => {
      const isReconnect = this._hasConnectedBefore;
      this._hasConnectedBefore = true;
      log.info({ connId: this.id, reconnect: isReconnect }, 'Connected');
      this._resubscribe();
      this._startPing();
      if (isReconnect && this.onReconnect) {
        this.onReconnect([...this.streams]);
      }
    });

    this.ws.on('message', (data) => {
      try {
        const payload = JSON.parse(data);
        if (payload.e === 'depthUpdate') {
          this.onMessage(payload);
        }
      } catch (err) {
        log.error({ connId: this.id, err: err.message }, 'Parse error');
      }
    });

    this.ws.on('close', () => {
      this._stopPing();
      if (this.streams.size > 0) {
        log.warn({ connId: this.id }, 'Disconnected, reconnecting in 5s');
        this._reconnectTimer = setTimeout(() => this.connect(), 5000);
      } else {
        log.info({ connId: this.id }, 'Disconnected, no streams, staying offline');
      }
    });

    this.ws.on('error', (err) => {
      log.error({ connId: this.id, err: err.message }, 'Connection error');
    });

    this.ws.on('pong', () => {});
  }

  addStream(streamName) {
    this.streams.add(streamName);
    if (this.ws && this.ws.readyState === WebSocket.OPEN) {
      this._send('SUBSCRIBE', [streamName]);
    } else if (!this.ws || this.ws.readyState === WebSocket.CLOSED) {
      this.connect();
    }
  }

  removeStream(streamName) {
    this.streams.delete(streamName);
    if (this.ws && this.ws.readyState === WebSocket.OPEN) {
      this._send('UNSUBSCRIBE', [streamName]);
    }
    if (this.streams.size === 0 && this.ws) {
      // Terminate immediately regardless of state to avoid lingering connections
      this._cleanup();
    }
  }

  _resubscribe() {
    if (this.streams.size === 0) return;
    // Binance: max 200 params per message, send in batches
    const all = [...this.streams];
    for (let i = 0; i < all.length; i += 200) {
      const batch = all.slice(i, i + 200);
      this._send('SUBSCRIBE', batch);
    }
  }

  _send(method, params) {
    if (this.ws && this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify({ method, params, id: Date.now() }));
    }
  }

  _startPing() {
    this._stopPing();
    this._pingTimer = setInterval(() => {
      if (this.ws && this.ws.readyState === WebSocket.OPEN) {
        this.ws.ping();
      }
    }, PING_INTERVAL);
  }

  _stopPing() {
    if (this._pingTimer) {
      clearInterval(this._pingTimer);
      this._pingTimer = null;
    }
  }

  _cleanup() {
    this._stopPing();
    if (this._reconnectTimer) {
      clearTimeout(this._reconnectTimer);
      this._reconnectTimer = null;
    }
    if (this.ws) {
      const oldWs = this.ws;
      this.ws = null;
      oldWs.removeAllListeners();
      oldWs.on('error', () => {}); // swallow async errors after cleanup
      try {
        // terminate() works in any state (OPEN, CONNECTING, CLOSING) โ€” forces immediate close
        if (oldWs.readyState !== WebSocket.CLOSED) {
          oldWs.terminate();
        }
      } catch (_) { /* safe โ€” socket may already be dead */ }
    }
  }

  destroy() {
    this.streams.clear();
    this._cleanup();
  }
}

class BinanceWS {
  constructor() {
    this.connections = []; // BinanceWSConnection[]
    this.callbacks = new Map(); // symbol -> callback
    this.streamToConn = new Map(); // streamName -> connection
    this._reconnectHandler = null; // (symbols[]) => void
  }

  setReconnectHandler(fn) {
    this._reconnectHandler = fn;
  }

  subscribe(symbol, callback) {
    const streamName = `${symbol.toLowerCase()}@depth@100ms`;
    this.callbacks.set(symbol.toUpperCase(), callback);

    // Already subscribed?
    if (this.streamToConn.has(streamName)) return;

    // Find connection with capacity or create new one
    let conn = this.connections.find(c => c.streams.size < MAX_STREAMS_PER_CONN);
    if (!conn) {
      const id = this.connections.length;
      conn = new BinanceWSConnection(id, (payload) => {
        const sym = payload.s;
        if (this.callbacks.has(sym)) {
          this.callbacks.get(sym)(payload);
        }
      }, (streamNames) => {
        // On reconnect: extract symbols from stream names and notify
        if (this._reconnectHandler) {
          const symbols = streamNames.map(s => s.split('@')[0].toUpperCase());
          log.info({ connId: id, symbols: symbols.length }, 'Reconnect: notifying handler');
          this._reconnectHandler(symbols);
        }
      });
      this.connections.push(conn);
      log.info({ connId: id, total: this.connections.length }, 'Created connection');
    }

    this.streamToConn.set(streamName, conn);
    conn.addStream(streamName);
  }

  unsubscribe(symbol) {
    const streamName = `${symbol.toLowerCase()}@depth@100ms`;
    this.callbacks.delete(symbol.toUpperCase());

    const conn = this.streamToConn.get(streamName);
    if (conn) {
      conn.removeStream(streamName);
      this.streamToConn.delete(streamName);
      // Clean up empty connections
      if (conn.streams.size === 0) {
        conn.destroy();
        this.connections = this.connections.filter(c => c !== conn);
        log.info({ total: this.connections.length }, 'Removed empty connection');
      }
    }
  }
}

module.exports = new BinanceWS();

๐Ÿ“œ Git History

85e4ebdfix: 16-bug audit โ€” resync storm, memory leaks, API errors, data persistence7 weeks ago
6d27024feat: structured pino logging + revert resync queue to simple handler8 weeks ago
59232b4fix: 14-bug audit โ€” null guards, WS cleanup, shutdown flush, input validation9 weeks ago
4446d3afix: 12 silent failure audit + multi-select signal filter + sync fix9 weeks ago
6c58d95feat: OI Divergence + Funding Squeeze signals + OI ROC + funding gate9 weeks ago
4381cbafix: migrate WS to /market/stream endpoint + TF switch reconnect10 weeks ago
ab5b9ecfeat: auto S/R levels + multi-connection WS + sidebar sort2 months ago
ccd2c26perf: 3-tier klines cache (Memoryโ†’IndexedDBโ†’Server) + infinite scroll2 months ago
c391b6ffix: WS flap + JWT_SECRET + separate push/in-tab notification toggles2 months ago
198472afeat: Phase 2 Multi-Chart layout + memory leak fixes3 months ago
Show last diff
Loading...