← Back
'use strict'
const path = require('path')
const Database = require('better-sqlite3')
const { createLogger } = require('./logger')
const log = createLogger('depth-store')

/**
 * Depth Store — SQLite-backed order book snapshot storage
 *
 * Replaces in-memory depth-heatmap with persistent 4-hour rolling history.
 * Auto-tracks top symbols by volume. Data survives PM2 restarts.
 *
 * Used by: Heatmap UI (bookmap overlay on modal chart)
 * Future:  Density persistence migration, density overlay on heatmap
 */

const DB_PATH = path.join(__dirname, '..', 'data', 'depth.db')
const SNAPSHOT_INTERVAL_MS = 10_000   // snapshot every 10s
const MAX_HISTORY_MS = 4 * 3600_000   // 4 hours rolling window
const CLEANUP_INTERVAL_MS = 600_000   // cleanup every 10 min
const TRACK_UPDATE_MS = 60_000        // refresh tracked symbols every 60s
const TOP_SYMBOLS = 50                // auto-track top N by volume
const WINDOW_PCT = 3                  // ±3% from mark price
const BUCKET_DIVISOR = 0.001          // 0.1% price bands

// Order-flow imbalance tuning (see brain/log.md — backtest 4 Jun)
const OF_WINDOW_PCT = 0.5             // imbalance: only count walls within 0.5% of price
const OF_PERSIST_SNAPS = 4            // anti-spoof: inspect last N snapshots (~30-40s)
const OF_PERSIST_FRAC = 0.5           // a wall counts only if >=50% of its volume survived in EVERY prior snapshot

let _db = null
let _stateManager = null
let _getProxyCached = null
let _snapshotTimer = null
let _cleanupTimer = null
let _trackTimer = null
let _trackedSymbols = new Set()
let _onDemandSymbols = new Map()  // symbol -> lastAccess timestamp
let _markPriceMap = new Map()
let _markPriceTs = 0
const ON_DEMAND_IDLE_MS = 300_000  // drop on-demand after 5 min idle

// Prepared statements (cached for performance)
let _stmtInsert = null
let _stmtSelect = null
let _stmtDelete = null
let _stmtStats = null

function init({ stateManager, getProxyCached }) {
  _stateManager = stateManager
  _getProxyCached = getProxyCached

  _db = new Database(DB_PATH)
  _db.pragma('journal_mode = WAL')
  _db.pragma('synchronous = NORMAL')

  _db.exec(`
    CREATE TABLE IF NOT EXISTS snapshots (
      id INTEGER PRIMARY KEY AUTOINCREMENT,
      symbol TEXT NOT NULL,
      ts INTEGER NOT NULL,
      mark_price REAL NOT NULL,
      data TEXT NOT NULL
    );
    CREATE INDEX IF NOT EXISTS idx_snap_sym_ts ON snapshots(symbol, ts);
  `)

  // Prepare statements
  _stmtInsert = _db.prepare('INSERT INTO snapshots (symbol, ts, mark_price, data) VALUES (?, ?, ?, ?)')
  _stmtSelect = _db.prepare('SELECT ts, mark_price, data FROM snapshots WHERE symbol = ? AND ts > ? ORDER BY ts')
  _stmtDelete = _db.prepare('DELETE FROM snapshots WHERE ts < ?')
  _stmtStats = _db.prepare('SELECT symbol, COUNT(*) as cnt, MAX(ts) as lastTs FROM snapshots GROUP BY symbol ORDER BY cnt DESC LIMIT 30')

  // Auto-track top symbols
  updateTrackedSymbols()
  _trackTimer = setInterval(updateTrackedSymbols, TRACK_UPDATE_MS)

  // Start snapshot collection
  _snapshotTimer = setInterval(takeSnapshots, SNAPSHOT_INTERVAL_MS)

  // Start cleanup
  cleanup()
  _cleanupTimer = setInterval(cleanup, CLEANUP_INTERVAL_MS)

  const existing = _stmtStats.all()
  log.info({
    intervalSec: SNAPSHOT_INTERVAL_MS / 1000,
    maxHours: MAX_HISTORY_MS / 3600_000,
    existingSymbols: existing.length,
    existingSnapshots: existing.reduce((sum, r) => sum + r.cnt, 0)
  }, 'Depth store started (SQLite)')
}

function stop() {
  if (_snapshotTimer) { clearInterval(_snapshotTimer); _snapshotTimer = null }
  if (_cleanupTimer) { clearInterval(_cleanupTimer); _cleanupTimer = null }
  if (_trackTimer) { clearInterval(_trackTimer); _trackTimer = null }
  if (_db) { _db.close(); _db = null }
  log.info('Depth store stopped')
}

/**
 * Auto-track top symbols by 24h volume
 */
function updateTrackedSymbols() {
  if (!_getProxyCached) return
  const tickers = _getProxyCached('ticker24hr', 60_000)
  if (!Array.isArray(tickers)) return

  const sorted = tickers
    .filter(t => t.symbol && t.symbol.endsWith('USDT'))
    .map(t => ({ symbol: t.symbol, vol: parseFloat(t.quoteVolume) || 0 }))
    .sort((a, b) => b.vol - a.vol)
    .slice(0, TOP_SYMBOLS)

  _trackedSymbols = new Set(sorted.map(t => t.symbol))

  // Merge on-demand symbols (user-requested, not in top N)
  const now = Date.now()
  for (const [sym, lastAccess] of _onDemandSymbols) {
    if (now - lastAccess > ON_DEMAND_IDLE_MS) {
      _onDemandSymbols.delete(sym)
    } else {
      _trackedSymbols.add(sym)
    }
  }
}

/**
 * On-demand track a symbol (called from API when user opens modal)
 */
function track(symbol) {
  if (!symbol) return
  _onDemandSymbols.set(symbol, Date.now())
  _trackedSymbols.add(symbol)
}

/**
 * Take snapshots of all tracked symbols, write to SQLite
 */
function takeSnapshots() {
  if (!_stateManager || !_trackedSymbols.size || !_db) return

  // Refresh mark prices from ticker cache
  const now = Date.now()
  if (now - _markPriceTs > 10_000 && _getProxyCached) {
    const tickers = _getProxyCached('ticker24hr', 60_000)
    if (Array.isArray(tickers)) {
      for (const t of tickers) {
        const p = parseFloat(t.lastPrice)
        if (p > 0) _markPriceMap.set(t.symbol, p)
      }
      _markPriceTs = now
    }
  }

  const rows = []
  for (const symbol of _trackedSymbols) {
    const book = _stateManager.books.get(symbol)
    if (!book) continue

    const markPrice = _markPriceMap.get(symbol) || 0
    if (!markPrice) continue

    const bucketSize = markPrice * BUCKET_DIVISOR
    const minPrice = markPrice * (1 - WINDOW_PCT / 100)
    const maxPrice = markPrice * (1 + WINDOW_PCT / 100)

    const bids = bucketSide(book.bids, bucketSize, minPrice, maxPrice)
    const asks = bucketSide(book.asks, bucketSize, minPrice, maxPrice)

    if (Object.keys(bids).length > 0 || Object.keys(asks).length > 0) {
      rows.push([symbol, now, markPrice, JSON.stringify({ bids, asks })])
    }
  }

  if (rows.length > 0) {
    const insertMany = _db.transaction((items) => {
      for (const r of items) _stmtInsert.run(r[0], r[1], r[2], r[3])
    })
    insertMany(rows)
  }
}

/**
 * Bucket one side of the order book into price bands
 */
function bucketSide(sideMap, bucketSize, minPrice, maxPrice) {
  const buckets = {}
  if (!sideMap || !bucketSize) return buckets

  for (const [price, data] of sideMap) {
    if (price < minPrice || price > maxPrice) continue
    if (!data.notional || data.notional <= 0) continue

    const bucketKey = +(Math.round(price / bucketSize) * bucketSize).toFixed(8)
    buckets[bucketKey] = (buckets[bucketKey] || 0) + data.notional
  }

  return buckets
}

/**
 * Get heatmap snapshots from SQLite
 * Returns same format as old depth-heatmap for frontend compatibility
 */
function getSnapshots(symbol, hours = 4) {
  if (!_db) return null

  const since = Date.now() - hours * 3600_000
  const rows = _stmtSelect.all(symbol, since)

  if (!rows.length) return null

  const lastRow = rows[rows.length - 1]
  const markPrice = lastRow.mark_price
  const bucketSize = markPrice * BUCKET_DIVISOR

  return {
    symbol,
    markPrice,
    bucketSize: +bucketSize.toFixed(8),
    windowPct: WINDOW_PCT,
    count: rows.length,
    snapshots: rows.map(r => {
      const d = JSON.parse(r.data)
      return { ts: r.ts, bids: d.bids, asks: d.asks }
    })
  }
}

/**
 * Cleanup snapshots older than MAX_HISTORY_MS
 */
function cleanup() {
  if (!_db) return
  const cutoff = Date.now() - MAX_HISTORY_MS
  const result = _stmtDelete.run(cutoff)
  if (result.changes > 0) {
    log.info({ deleted: result.changes }, 'Cleaned up old snapshots')
  }
}

/**
 * Order-book imbalance from the latest snapshot of a symbol.
 * imbalance = (ΣbidVol - ΣaskVol) / (ΣbidVol + ΣaskVol), range [-1, +1].
 * Read-only helper for the order-flow signal scanner.
 */
/**
 * Order-book imbalance with a 0.5% window + anti-spoof persistence filter.
 * Walls are bucketed by 0.1% distance from price; a bucket is counted only if
 * it survived (>=50% volume) across the last OF_PERSIST_SNAPS snapshots — this
 * drops flash/spoof walls that appear for a single frame.
 */
function getImbalance(symbol) {
  if (!_db) return null
  const rows = _db.prepare('SELECT mark_price, data FROM snapshots WHERE symbol = ? ORDER BY ts DESC LIMIT ?').all(symbol, OF_PERSIST_SNAPS)
  if (!rows.length || !rows[0].data || !rows[0].mark_price) return null
  const mark = rows[0].mark_price

  // bucket one snapshot's levels by 0.1% distance band, within the OF window
  const bucketize = (dataStr, markPrice) => {
    let d
    try { d = JSON.parse(dataStr) } catch { return null }
    const out = { bid: new Map(), ask: new Map() }
    const add = (side, levels) => {
      for (const [ps, v] of Object.entries(levels || {})) {
        const p = Number(ps), vol = Number(v) || 0
        if (!p || vol <= 0) continue
        const distPct = Math.abs(p - markPrice) / markPrice * 100
        if (distPct > OF_WINDOW_PCT) continue
        const k = Math.floor(distPct / (BUCKET_DIVISOR * 100))  // 0.1% band index
        out[side].set(k, (out[side].get(k) || 0) + vol)
      }
    }
    add('bid', d.bids)
    add('ask', d.asks)
    return out
  }

  const cur = bucketize(rows[0].data, mark)
  if (!cur) return null
  const priors = []
  for (let i = 1; i < rows.length; i++) {
    if (!rows[i].data || !rows[i].mark_price) continue
    const b = bucketize(rows[i].data, rows[i].mark_price)
    if (b) priors.push(b)
  }
  if (!priors.length) return null  // need history to confirm persistence (no signal right after restart)

  // sum only buckets that persisted across EVERY prior snapshot
  const sumPersistent = (side) => {
    let total = 0
    for (const [k, vol] of cur[side]) {
      let persists = true
      for (const pb of priors) {
        if ((pb[side].get(k) || 0) < OF_PERSIST_FRAC * vol) { persists = false; break }
      }
      if (persists) total += vol
    }
    return total
  }
  const bidVol = sumPersistent('bid')
  const askVol = sumPersistent('ask')
  const tot = bidVol + askVol
  if (tot <= 0) return null
  // One-sided book (no persistent liquidity on a side within the window) is an
  // un-synced/frozen-book artifact, not a tradeable imbalance — it yields a bogus
  // ±1.0 (100%) reading. A real orderflow signal has liquidity on BOTH sides.
  if (bidVol <= 0 || askVol <= 0) return null
  return { imbalance: (bidVol - askVol) / tot, bidVol, askVol, markPrice: mark }
}

/**
 * Stats for monitoring
 */
function getStats() {
  if (!_db) return { trackedSymbols: 0, symbols: [] }
  const total = _db.prepare('SELECT COUNT(*) as cnt FROM snapshots').get()
  const symbols = _stmtStats.all()
  return {
    trackedSymbols: _trackedSymbols.size,
    totalSnapshots: total.cnt,
    maxHours: MAX_HISTORY_MS / 3600_000,
    intervalSec: SNAPSHOT_INTERVAL_MS / 1000,
    symbols
  }
}

module.exports = { init, stop, track, getSnapshots, getStats, getImbalance }

📜 Git History

72fcec5fix(orderflow): reject one-sided book imbalance (kill bogus 100% signals)4 weeks ago
181ff05feat(orderflow): getImbalance 0.5% window + anti-spoof persistence filter4 weeks ago
8970a3dfeat(signals): order-flow imbalance signal type (Phase 0 detect+log)4 weeks ago
dc48be8feat: depth heatmap v2 — SQLite persistence, visual polish, klines gap fix8 weeks ago
Show last diff
Loading...