← Back
'use strict'
/**
 * Centralized Binance REST client with Bottleneck rate limiting.
 *
 * ALL Binance REST calls MUST go through bget() / bgetWithRetry() from this module.
 * Two layers of protection:
 *   1. Bottleneck — pre-flight: concurrency, spacing, reservoir (weight budget)
 *   2. Header tracking — post-flight: reads X-MBX-USED-WEIGHT-1M, syncs reservoir
 */
const Bottleneck = require('bottleneck')
const path = require('path')
const fs = require('fs')
const { createLogger } = require('./logger')

const log = createLogger('binance-client')

const BINANCE_FAPI = 'https://fapi.binance.com'
const FETCH_TIMEOUT_MS = 15_000
const RATE_LIMIT_FILE = path.resolve(__dirname, '..', 'data', 'rate-limit-pause.json')

// ─── Weight estimator (matches Binance docs) ───────────────────────────
function estimateWeight(apiPath) {
  if (apiPath.includes('/depth')) {
    const m = apiPath.match(/limit=(\d+)/)
    const limit = m ? parseInt(m[1]) : 100
    if (limit <= 5) return 2
    if (limit <= 50) return 5
    if (limit <= 500) return 10
    return 20 // limit=1000
  }
  if (apiPath.includes('/klines')) {
    const m = apiPath.match(/limit=(\d+)/)
    const limit = m ? parseInt(m[1]) : 10
    if (limit < 500) return 5
    if (limit < 1000) return 10
    return 20
  }
  if (apiPath.includes('/ticker/24hr')) {
    return apiPath.includes('symbol=') ? 1 : 40
  }
  if (apiPath.includes('/premiumIndex')) {
    return apiPath.includes('symbol=') ? 1 : 10
  }
  if (apiPath.includes('/openInterestHist')) return 30
  if (apiPath.includes('/takerlongshortRatio')) return 30
  if (apiPath.includes('/exchangeInfo')) return 1
  return 5 // conservative default
}

// ─── Disk persistence for pause state ──────────────────────────────────
function loadPauseFromDisk() {
  try {
    const raw = fs.readFileSync(RATE_LIMIT_FILE, 'utf8')
    const { pauseUntil } = JSON.parse(raw)
    if (pauseUntil && pauseUntil > Date.now()) {
      log.info({ remaining: Math.ceil((pauseUntil - Date.now()) / 1000) }, 'Restored pause from disk')
      return pauseUntil
    }
  } catch {}
  return 0
}

function savePauseToDisk(pauseUntil) {
  try {
    fs.writeFileSync(RATE_LIMIT_FILE, JSON.stringify({ pauseUntil, savedAt: new Date().toISOString() }))
  } catch {}
}

// ─── Bottleneck limiter ────────────────────────────────────────────────
const limiter = new Bottleneck({
  maxConcurrent: 50,       // must be >= max single request weight (ticker24hr=40, openInterestHist=30)
  minTime: 50,             // min 50ms between requests
  reservoir: 2400,         // Binance weight budget
  reservoirRefreshAmount: 2400,
  reservoirRefreshInterval: 60_000, // resets every minute
})

// Log when limiter is depleted
limiter.on('depleted', () => {
  log.warn('Bottleneck reservoir depleted — requests queued until refresh')
})

// ─── Header-based weight tracking (secondary safety) ───────────────────
const weightTracker = {
  usedWeight: 0,
  weightUpdatedAt: 0,
  pauseUntil: loadPauseFromDisk(),
  WEIGHT_SOFT_LIMIT: 1800,
  WEIGHT_HARD_LIMIT: 2200,

  update(headers) {
    const w = parseInt(headers.get('x-mbx-used-weight-1m') || '0', 10)
    if (w > 0) {
      this.usedWeight = w
      this.weightUpdatedAt = Date.now()
      // Sync Bottleneck reservoir with actual Binance weight.
      // Only sync when weight is below hard limit — at/above limit the response
      // is likely a 429 and setting reservoir=0 would deadlock Bottleneck
      // (auto-refresh can't reliably recover after updateSettings).
      if (w < this.WEIGHT_HARD_LIMIT) {
        const remaining = Math.max(2400 - w, 0)
        limiter.updateSettings({ reservoir: remaining })
      }
    }
  },

  setPause(ms) {
    const until = Date.now() + ms
    if (until > this.pauseUntil) {
      this.pauseUntil = until
      savePauseToDisk(until)
      log.warn({ pauseMs: ms, until: new Date(until).toISOString().slice(11, 19) }, 'Global pause set')
    }
  },

  checkPause() {
    const now = Date.now()
    if (now < this.pauseUntil) {
      throw new RateLimitError('paused', this.pauseUntil - now, this.usedWeight)
    }
    // Weight is stale after 60s — Binance resets every minute
    if (now - this.weightUpdatedAt > 60_000) {
      this.usedWeight = 0
    }
  },

  status() {
    const paused = this.pauseUntil > Date.now()
    return `weight=${this.usedWeight}/2400, pause=${paused ? (this.pauseUntil - Date.now()) + 'ms' : 'none'}`
  }
}

// ─── Error class ───────────────────────────────────────────────────────
class RateLimitError extends Error {
  constructor(apiPath, retryAfterMs, usedWeight) {
    super(`Binance 429 rate limited: ${apiPath}`)
    this.name = 'RateLimitError'
    this.retryAfterMs = retryAfterMs
    this.usedWeight = usedWeight
  }
}

// ─── Core fetch (single request, goes through Bottleneck) ──────────────
async function bget(apiPath) {
  // Check global pause BEFORE entering Bottleneck queue
  weightTracker.checkPause()

  const weight = estimateWeight(apiPath)

  return limiter.schedule({ weight }, async () => {
    // Re-check pause (may have changed while queued)
    weightTracker.checkPause()

    const controller = new AbortController()
    const timeoutId = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS)
    try {
      const res = await fetch(BINANCE_FAPI + apiPath, { method: 'GET', signal: controller.signal })

      // Always track weight from response headers
      weightTracker.update(res.headers)

      if (res.status === 429 || res.status === 418) {
        const retryAfter = parseInt(res.headers.get('retry-after') || '0', 10)
        const retryMs = retryAfter > 0 ? retryAfter * 1000 : (res.status === 418 ? 120_000 : 30_000)
        weightTracker.setPause(retryMs)
        throw new RateLimitError(apiPath, retryMs, weightTracker.usedWeight)
      }

      if (!res.ok) {
        const txt = await res.text().catch(() => '')
        throw new Error(`Binance GET ${apiPath} failed: ${res.status} ${txt}`)
      }
      return res.json()
    } finally {
      clearTimeout(timeoutId)
    }
  })
}

// ─── Retry wrapper ─────────────────────────────────────────────────────
async function bgetWithRetry(apiPath, maxRetries = 3, baseDelay = 500) {
  const MAX_RL_RETRIES = 3
  let attempt = 0
  let rlRetries = 0

  while (attempt < maxRetries) {
    try {
      return await bget(apiPath)
    } catch (err) {
      if (err instanceof RateLimitError) {
        rlRetries++
        if (rlRetries > MAX_RL_RETRIES) {
          throw new Error(`Binance GET ${apiPath} rate limited ${rlRetries}x, giving up (${weightTracker.status()})`)
        }
        const waitMs = Math.max(err.retryAfterMs || 30_000, 1000)
        log.warn({ path: apiPath.slice(0, 60), retry: rlRetries, waitMs }, '429 — retrying')
        await new Promise(r => setTimeout(r, waitMs))
        continue // does NOT count as a regular attempt
      }
      attempt++
      if (attempt >= maxRetries) {
        throw new Error(`Binance GET ${apiPath} failed after ${maxRetries} attempts: ${err.message}`)
      }
      const delay = baseDelay * Math.pow(2, attempt - 1)
      await new Promise(r => setTimeout(r, delay))
    }
  }
  throw new Error(`Binance GET ${apiPath} failed: exhausted all retries`)
}

// ─── Stats for monitoring ──────────────────────────────────────────────
async function getStats() {
  const counts = limiter.counts()
  let reservoir = 'unknown'
  try { reservoir = await limiter.currentReservoir() } catch {}
  return {
    weight: weightTracker.usedWeight,
    weightLimit: 2400,
    paused: weightTracker.pauseUntil > Date.now(),
    pauseRemaining: Math.max(0, weightTracker.pauseUntil - Date.now()),
    bottleneck: {
      running: counts.RUNNING,
      queued: counts.QUEUED,
      reservoir,
    }
  }
}

module.exports = {
  bget,
  bgetWithRetry,
  RateLimitError,
  rateLimiter: weightTracker, // backward-compatible name
  limiter,
  BINANCE_FAPI,
  estimateWeight,
  getStats,
  savePauseToDisk,
}

📜 Git History

85e4ebdfix: 16-bug audit — resync storm, memory leaks, API errors, data persistence7 weeks ago
59cbf69fix: eliminate crash loop + 29x faster page load8 weeks ago
88e03dbfix: Bottleneck maxConcurrent=10 blocked weight>10 endpoints8 weeks ago
23948fcfeat: centralized Binance rate limiter with Bottleneck8 weeks ago
Show last diff
Loading...