← Back
const { createLogger } = require('./logger');
const log = createLogger('state');

class StateManager {
  constructor() {
    // symbol -> Map<price, {notional, firstSeen, lastUpdate, isMM}>
    this.books = new Map();

    // Hard cap: max symbols in books (prevents unbounded memory growth)
    this.MAX_BOOKS = 600;
    // Hard cap: max levels per side per symbol
    this.MAX_LEVELS_PER_SIDE = 2000;

    // Resync callback: called when order book goes out-of-sync
    // Set via setResyncHandler(fn(symbol))
    // Throttling is handled by the caller (index.js resync queue with cooldown/debounce/cap)
    this._resyncHandler = null;

    // Cache to track dynamically created bins over time to detect robots
    // cacheKey: "SYMBOL:SIDE:BIN_ANCHOR" -> { oldestSeen, maxNotional, lastUpdate, isMovingTowardPrice }
    this.binHistory = new Map();
  }

  setResyncHandler(fn) {
    this._resyncHandler = fn;
  }

  initBook(symbol, bids, asks, lastUpdateId) {
    // Evict oldest book if at capacity
    if (!this.books.has(symbol) && this.books.size >= this.MAX_BOOKS) {
      let oldestSym = null, oldestTime = Infinity;
      for (const [sym, st] of this.books.entries()) {
        const t = st._lastActivity || 0;
        if (t < oldestTime) { oldestTime = t; oldestSym = sym; }
      }
      if (oldestSym) this.books.delete(oldestSym);
    }

    if (!this.books.has(symbol)) {
      this.books.set(symbol, {
        bids: new Map(),
        asks: new Map(),
        lastUpdateId: 0,
        _lastActivity: Date.now()
      });
    }

    const state = this.books.get(symbol);
    const now = Date.now();
    state._lastActivity = now;

    // Set lastUpdateId from snapshot to prevent resync loops
    if (lastUpdateId) state.lastUpdateId = lastUpdateId;
    // Futures sync handshake: require a valid first event before pu-continuity checks
    state.synced = false;

    bids.forEach(([priceStr, qtyStr]) => {
      const price = parseFloat(priceStr);
      const notional = price * parseFloat(qtyStr);
      state.bids.set(price, { notional, firstSeen: now, lastUpdate: now });
    });

    asks.forEach(([priceStr, qtyStr]) => {
      const price = parseFloat(priceStr);
      const notional = price * parseFloat(qtyStr);
      state.asks.set(price, { notional, firstSeen: now, lastUpdate: now });
    });
  }

  removeBook(symbol) {
    this.books.delete(symbol);
  }

  processDelta(symbol, payload) {
    if (!this.books.has(symbol)) return;

    const state = this.books.get(symbol);

    // Binance sequence: payload.U = first updateId, payload.u = last updateId
    // Drop stale deltas
    if (payload.u <= state.lastUpdateId) return;

    // Binance FUTURES sync (fstream @depth): update IDs are NOT contiguous between
    // events, so the spot-style U==lastUpdateId+1 check produces a false gap on nearly
    // every delta. Per Binance futures spec, continuity is verified via `pu` (final
    // updateId of the previous event), which must equal our stored lastUpdateId.
    if (!state.synced) {
      // First event after a snapshot must straddle the snapshot id (U <= lastUpdateId).
      if (state.lastUpdateId > 0 && payload.U > state.lastUpdateId + 1) {
        if (this._resyncHandler) {
          // Throttle log: a frozen high-volume book can drop a delta every 100ms;
          // log at most once per 10s/symbol to avoid flooding disk + event loop.
          const t = Date.now();
          if (!state._lastGapLog || t - state._lastGapLog > 10000) {
            log.warn({ symbol, expected: state.lastUpdateId + 1, got: payload.U }, 'Gap detected, requesting resync');
            state._lastGapLog = t;
          }
          this._resyncHandler(symbol);
        }
        return; // Snapshot too old — stale until resync
      }
      state.synced = true;
    } else if (payload.pu !== undefined && payload.pu !== state.lastUpdateId) {
      // Real gap: previous event's final id doesn't match → missed deltas.
      if (this._resyncHandler) {
        const t = Date.now();
        if (!state._lastGapLog || t - state._lastGapLog > 10000) {
          log.warn({ symbol, expected: state.lastUpdateId, got: payload.pu }, 'Gap detected, requesting resync');
          state._lastGapLog = t;
        }
        this._resyncHandler(symbol);
      }
      state.synced = false;
      return; // Drop this delta — book is stale until resync
    }

    const now = Date.now();

    // Process Bids
    payload.b.forEach(([priceStr, qtyStr]) => {
      const price = parseFloat(priceStr);
      const qty = parseFloat(qtyStr);
      if (qty === 0) {
        state.bids.delete(price);
      } else {
        const notional = price * qty;
        const existing = state.bids.get(price);
        if (existing) {
          existing.notional = notional;
          existing.lastUpdate = now;
        } else {
          state.bids.set(price, { notional, firstSeen: now, lastUpdate: now });
        }
      }
    });

    // Process Asks
    payload.a.forEach(([priceStr, qtyStr]) => {
      const price = parseFloat(priceStr);
      const qty = parseFloat(qtyStr);
      if (qty === 0) {
        state.asks.delete(price);
      } else {
        const notional = price * qty;
        const existing = state.asks.get(price);
        if (existing) {
          existing.notional = notional;
          existing.lastUpdate = now;
        } else {
          state.asks.set(price, { notional, firstSeen: now, lastUpdate: now });
        }
      }
    });

    state.lastUpdateId = payload.u;

    state._lastActivity = now;

    // Cleanup stale price levels not updated in 2 minutes
    if (!state._lastCleanup || now - state._lastCleanup > 60000) {
      state._lastCleanup = now;
      const staleMs = 120000;
      for (const [price, data] of state.bids.entries()) {
        if (now - data.lastUpdate > staleMs) state.bids.delete(price);
      }
      for (const [price, data] of state.asks.entries()) {
        if (now - data.lastUpdate > staleMs) state.asks.delete(price);
      }
      // Hard cap: trim to MAX_LEVELS_PER_SIDE by removing smallest notional
      for (const side of [state.bids, state.asks]) {
        if (side.size > this.MAX_LEVELS_PER_SIDE) {
          const sorted = [...side.entries()].sort((a, b) => a[1].notional - b[1].notional);
          const toRemove = sorted.slice(0, side.size - this.MAX_LEVELS_PER_SIDE);
          for (const [price] of toRemove) side.delete(price);
        }
      }
    }
  }

  getTopLevels(symbol, side, markPrice, minNotional, limit, windowPct) {
    if (!this.books.has(symbol)) return [];
    
    const state = this.books.get(symbol);
    const bookSide = side === 'bid' ? state.bids : state.asks;

    const ArrayOfLevels = [];
    const minPrice = markPrice * (1 - windowPct / 100);
    const maxPrice = markPrice * (1 + windowPct / 100);

    for (const [price, data] of bookSide.entries()) {
      if (price >= minPrice && price <= maxPrice && data.notional >= minNotional) {
        const distancePct = Math.abs(price - markPrice) / markPrice * 100;
        ArrayOfLevels.push({
          price,
          notional: data.notional,
          firstSeen: data.firstSeen,
          lastUpdate: data.lastUpdate,
          distancePct
        });
      }
    }

    // Sort heavily heavily heavily
    ArrayOfLevels.sort((a, b) => b.notional - a.notional);
    return ArrayOfLevels.slice(0, limit);
  }

  // --- Historical Bin Tracking for Robot Aggressor Detection ---
  trackAndEnrichBins(symbol, side, currentBins, markPrice) {
    const now = Date.now();
    const enrichedBins = [];
    const sideLabel = side.toUpperCase();

    // Size tolerance to consider two bins as "the same order moving"
    const NOTIONAL_TOLERANCE = 0.8; // current bin must be at least 80% of old bin

    for (const bin of currentBins) {
      const cacheKey = `${symbol}:${sideLabel}:${bin.anchorPrice}`;
      let history = this.binHistory.get(cacheKey);
      let isMovingTowardPrice = false;

      if (!history) {
        // Did it move from a previously active bin slightly further away?
        for (const [k, v] of this.binHistory.entries()) {
          // Check if same symbol and side
          if (k.startsWith(`${symbol}:${sideLabel}:`)) {
            const oldPrice = parseFloat(k.split(':')[2]);
            const distBetweenPrices = Math.abs(bin.anchorPrice - oldPrice) / oldPrice * 100;

            // Only consider recent bins (updated in last 5 seconds) 
            // that are within a small distance (0.5%) and have similar notional
            if (distBetweenPrices > 0 && distBetweenPrices < 0.5 && (now - v.lastUpdate) < 5000) {
              const oldDistToMark = Math.abs(oldPrice - markPrice);
              const newDistToMark = Math.abs(bin.anchorPrice - markPrice);

              // It moved closer to the mark price
              if (newDistToMark < oldDistToMark && bin.notional >= v.maxNotional * NOTIONAL_TOLERANCE) {
                isMovingTowardPrice = true;
                // Inherit history, rename key
                history = { ...v, lastUpdate: now, isMovingTowardPrice: true };
                this.binHistory.delete(k);
                break;
              }
            }
          }
        }

        if (!history) {
          // Completely new bin
          history = {
            oldestSeen: bin.oldestSeen || now,
            maxNotional: bin.notional,
            lastUpdate: now,
            isMovingTowardPrice: false
          };
        }
      } else {
        // Bin exists at this exact price, update it
        history.lastUpdate = now;
        if (bin.notional > history.maxNotional) {
          history.maxNotional = bin.notional;
        }
      }

      this.binHistory.set(cacheKey, history);

      enrichedBins.push({
        ...bin,
        oldestSeen: history.oldestSeen, // Use the history oldest time
        isMovingTowardPrice: history.isMovingTowardPrice
      });
    }

    // Cleanup very old bins (not seen in 1 minute) — deterministic, every 30s
    if (!this._lastBinCleanup || now - this._lastBinCleanup > 30000) {
      this._lastBinCleanup = now;
      for (const [k, v] of this.binHistory.entries()) {
        if (now - v.lastUpdate > 60000) {
          this.binHistory.delete(k);
        }
      }
      // Hard cap: if map still too large, drop oldest entries
      if (this.binHistory.size > 5000) {
        const sorted = [...this.binHistory.entries()].sort((a, b) => a[1].lastUpdate - b[1].lastUpdate);
        const toRemove = sorted.slice(0, sorted.length - 4000);
        for (const [k] of toRemove) this.binHistory.delete(k);
      }
    }

    return enrichedBins;
  }
}

module.exports = new StateManager();

📜 Git History

024d09dfix(orderflow): throttle gap-detect log to 1/10s per symbol4 weeks ago
b37f83ffix(orderflow): futures depth sync via pu-continuity, kill false-gap resync storm4 weeks ago
85e4ebdfix: 16-bug audit — resync storm, memory leaks, API errors, data persistence7 weeks ago
6d27024feat: structured pino logging + revert resync queue to simple handler8 weeks ago
4492574fix+test: comprehensive code audit — 11 bugfixes + 148 new tests8 weeks ago
59232b4fix: 14-bug audit — null guards, WS cleanup, shutdown flush, input validation9 weeks ago
198472afeat: Phase 2 Multi-Chart layout + memory leak fixes3 months ago
878ec53feat: add modular server files (logic, scorer, state, ws)3 months ago
Show last diff
Loading...