const WebSocket = require('ws');
const { createLogger } = require('./logger');
const log = createLogger('ws');
const BINANCE_WS_URL = 'wss://fstream.binance.com/ws';
const PING_INTERVAL = 3 * 60 * 1000; // 3 min (Binance closes idle after 5 min)
const MAX_STREAMS_PER_CONN = 190; // Binance limit 200, leave margin
class BinanceWSConnection {
constructor(id, onMessage, onReconnect) {
this.id = id;
this.ws = null;
this.streams = new Set();
this.onMessage = onMessage;
this.onReconnect = onReconnect;
this._pingTimer = null;
this._reconnectTimer = null;
this._hasConnectedBefore = false;
}
connect() {
if (this.streams.size === 0) return;
// Already connecting โ just wait for 'open' to resubscribe
if (this.ws && this.ws.readyState === WebSocket.CONNECTING) return;
this._cleanup();
log.info({ connId: this.id, streams: this.streams.size }, 'Connecting');
this.ws = new WebSocket(BINANCE_WS_URL);
this.ws.on('open', () => {
const isReconnect = this._hasConnectedBefore;
this._hasConnectedBefore = true;
log.info({ connId: this.id, reconnect: isReconnect }, 'Connected');
this._resubscribe();
this._startPing();
if (isReconnect && this.onReconnect) {
this.onReconnect([...this.streams]);
}
});
this.ws.on('message', (data) => {
try {
const payload = JSON.parse(data);
if (payload.e === 'depthUpdate') {
this.onMessage(payload);
}
} catch (err) {
log.error({ connId: this.id, err: err.message }, 'Parse error');
}
});
this.ws.on('close', () => {
this._stopPing();
if (this.streams.size > 0) {
log.warn({ connId: this.id }, 'Disconnected, reconnecting in 5s');
this._reconnectTimer = setTimeout(() => this.connect(), 5000);
} else {
log.info({ connId: this.id }, 'Disconnected, no streams, staying offline');
}
});
this.ws.on('error', (err) => {
log.error({ connId: this.id, err: err.message }, 'Connection error');
});
this.ws.on('pong', () => {});
}
addStream(streamName) {
this.streams.add(streamName);
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this._send('SUBSCRIBE', [streamName]);
} else if (!this.ws || this.ws.readyState === WebSocket.CLOSED) {
this.connect();
}
}
removeStream(streamName) {
this.streams.delete(streamName);
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this._send('UNSUBSCRIBE', [streamName]);
}
if (this.streams.size === 0 && this.ws) {
// Terminate immediately regardless of state to avoid lingering connections
this._cleanup();
}
}
_resubscribe() {
if (this.streams.size === 0) return;
// Binance: max 200 params per message, send in batches
const all = [...this.streams];
for (let i = 0; i < all.length; i += 200) {
const batch = all.slice(i, i + 200);
this._send('SUBSCRIBE', batch);
}
}
_send(method, params) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ method, params, id: Date.now() }));
}
}
_startPing() {
this._stopPing();
this._pingTimer = setInterval(() => {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.ping();
}
}, PING_INTERVAL);
}
_stopPing() {
if (this._pingTimer) {
clearInterval(this._pingTimer);
this._pingTimer = null;
}
}
_cleanup() {
this._stopPing();
if (this._reconnectTimer) {
clearTimeout(this._reconnectTimer);
this._reconnectTimer = null;
}
if (this.ws) {
const oldWs = this.ws;
this.ws = null;
oldWs.removeAllListeners();
oldWs.on('error', () => {}); // swallow async errors after cleanup
try {
// terminate() works in any state (OPEN, CONNECTING, CLOSING) โ forces immediate close
if (oldWs.readyState !== WebSocket.CLOSED) {
oldWs.terminate();
}
} catch (_) { /* safe โ socket may already be dead */ }
}
}
destroy() {
this.streams.clear();
this._cleanup();
}
}
class BinanceWS {
constructor() {
this.connections = []; // BinanceWSConnection[]
this.callbacks = new Map(); // symbol -> callback
this.streamToConn = new Map(); // streamName -> connection
this._reconnectHandler = null; // (symbols[]) => void
}
setReconnectHandler(fn) {
this._reconnectHandler = fn;
}
subscribe(symbol, callback) {
const streamName = `${symbol.toLowerCase()}@depth@100ms`;
this.callbacks.set(symbol.toUpperCase(), callback);
// Already subscribed?
if (this.streamToConn.has(streamName)) return;
// Find connection with capacity or create new one
let conn = this.connections.find(c => c.streams.size < MAX_STREAMS_PER_CONN);
if (!conn) {
const id = this.connections.length;
conn = new BinanceWSConnection(id, (payload) => {
const sym = payload.s;
if (this.callbacks.has(sym)) {
this.callbacks.get(sym)(payload);
}
}, (streamNames) => {
// On reconnect: extract symbols from stream names and notify
if (this._reconnectHandler) {
const symbols = streamNames.map(s => s.split('@')[0].toUpperCase());
log.info({ connId: id, symbols: symbols.length }, 'Reconnect: notifying handler');
this._reconnectHandler(symbols);
}
});
this.connections.push(conn);
log.info({ connId: id, total: this.connections.length }, 'Created connection');
}
this.streamToConn.set(streamName, conn);
conn.addStream(streamName);
}
unsubscribe(symbol) {
const streamName = `${symbol.toLowerCase()}@depth@100ms`;
this.callbacks.delete(symbol.toUpperCase());
const conn = this.streamToConn.get(streamName);
if (conn) {
conn.removeStream(streamName);
this.streamToConn.delete(streamName);
// Clean up empty connections
if (conn.streams.size === 0) {
conn.destroy();
this.connections = this.connections.filter(c => c !== conn);
log.info({ total: this.connections.length }, 'Removed empty connection');
}
}
}
}
module.exports = new BinanceWS();
๐ Git History
85e4ebdfix: 16-bug audit โ resync storm, memory leaks, API errors, data persistence7 weeks ago
6d27024feat: structured pino logging + revert resync queue to simple handler8 weeks ago
59232b4fix: 14-bug audit โ null guards, WS cleanup, shutdown flush, input validation9 weeks ago
4446d3afix: 12 silent failure audit + multi-select signal filter + sync fix9 weeks ago
6c58d95feat: OI Divergence + Funding Squeeze signals + OI ROC + funding gate9 weeks ago
4381cbafix: migrate WS to /market/stream endpoint + TF switch reconnect10 weeks ago
ab5b9ecfeat: auto S/R levels + multi-connection WS + sidebar sort2 months ago
ccd2c26perf: 3-tier klines cache (MemoryโIndexedDBโServer) + infinite scroll2 months ago
c391b6ffix: WS flap + JWT_SECRET + separate push/in-tab notification toggles2 months ago
198472afeat: Phase 2 Multi-Chart layout + memory leak fixes3 months ago
Show last diff
Loading...