'use strict'
const path = require('path')
const Database = require('better-sqlite3')
const { createLogger } = require('./logger')
const log = createLogger('depth-store')
/**
* Depth Store — SQLite-backed order book snapshot storage
*
* Replaces in-memory depth-heatmap with persistent 4-hour rolling history.
* Auto-tracks top symbols by volume. Data survives PM2 restarts.
*
* Used by: Heatmap UI (bookmap overlay on modal chart)
* Future: Density persistence migration, density overlay on heatmap
*/
const DB_PATH = path.join(__dirname, '..', 'data', 'depth.db')
const SNAPSHOT_INTERVAL_MS = 10_000 // snapshot every 10s
const MAX_HISTORY_MS = 4 * 3600_000 // 4 hours rolling window
const CLEANUP_INTERVAL_MS = 600_000 // cleanup every 10 min
const TRACK_UPDATE_MS = 60_000 // refresh tracked symbols every 60s
const TOP_SYMBOLS = 50 // auto-track top N by volume
const WINDOW_PCT = 3 // ±3% from mark price
const BUCKET_DIVISOR = 0.001 // 0.1% price bands
// Order-flow imbalance tuning (see brain/log.md — backtest 4 Jun)
const OF_WINDOW_PCT = 0.5 // imbalance: only count walls within 0.5% of price
const OF_PERSIST_SNAPS = 4 // anti-spoof: inspect last N snapshots (~30-40s)
const OF_PERSIST_FRAC = 0.5 // a wall counts only if >=50% of its volume survived in EVERY prior snapshot
let _db = null
let _stateManager = null
let _getProxyCached = null
let _snapshotTimer = null
let _cleanupTimer = null
let _trackTimer = null
let _trackedSymbols = new Set()
let _onDemandSymbols = new Map() // symbol -> lastAccess timestamp
let _markPriceMap = new Map()
let _markPriceTs = 0
const ON_DEMAND_IDLE_MS = 300_000 // drop on-demand after 5 min idle
// Prepared statements (cached for performance)
let _stmtInsert = null
let _stmtSelect = null
let _stmtDelete = null
let _stmtStats = null
function init({ stateManager, getProxyCached }) {
_stateManager = stateManager
_getProxyCached = getProxyCached
_db = new Database(DB_PATH)
_db.pragma('journal_mode = WAL')
_db.pragma('synchronous = NORMAL')
_db.exec(`
CREATE TABLE IF NOT EXISTS snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
ts INTEGER NOT NULL,
mark_price REAL NOT NULL,
data TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_snap_sym_ts ON snapshots(symbol, ts);
`)
// Prepare statements
_stmtInsert = _db.prepare('INSERT INTO snapshots (symbol, ts, mark_price, data) VALUES (?, ?, ?, ?)')
_stmtSelect = _db.prepare('SELECT ts, mark_price, data FROM snapshots WHERE symbol = ? AND ts > ? ORDER BY ts')
_stmtDelete = _db.prepare('DELETE FROM snapshots WHERE ts < ?')
_stmtStats = _db.prepare('SELECT symbol, COUNT(*) as cnt, MAX(ts) as lastTs FROM snapshots GROUP BY symbol ORDER BY cnt DESC LIMIT 30')
// Auto-track top symbols
updateTrackedSymbols()
_trackTimer = setInterval(updateTrackedSymbols, TRACK_UPDATE_MS)
// Start snapshot collection
_snapshotTimer = setInterval(takeSnapshots, SNAPSHOT_INTERVAL_MS)
// Start cleanup
cleanup()
_cleanupTimer = setInterval(cleanup, CLEANUP_INTERVAL_MS)
const existing = _stmtStats.all()
log.info({
intervalSec: SNAPSHOT_INTERVAL_MS / 1000,
maxHours: MAX_HISTORY_MS / 3600_000,
existingSymbols: existing.length,
existingSnapshots: existing.reduce((sum, r) => sum + r.cnt, 0)
}, 'Depth store started (SQLite)')
}
function stop() {
if (_snapshotTimer) { clearInterval(_snapshotTimer); _snapshotTimer = null }
if (_cleanupTimer) { clearInterval(_cleanupTimer); _cleanupTimer = null }
if (_trackTimer) { clearInterval(_trackTimer); _trackTimer = null }
if (_db) { _db.close(); _db = null }
log.info('Depth store stopped')
}
/**
* Auto-track top symbols by 24h volume
*/
function updateTrackedSymbols() {
if (!_getProxyCached) return
const tickers = _getProxyCached('ticker24hr', 60_000)
if (!Array.isArray(tickers)) return
const sorted = tickers
.filter(t => t.symbol && t.symbol.endsWith('USDT'))
.map(t => ({ symbol: t.symbol, vol: parseFloat(t.quoteVolume) || 0 }))
.sort((a, b) => b.vol - a.vol)
.slice(0, TOP_SYMBOLS)
_trackedSymbols = new Set(sorted.map(t => t.symbol))
// Merge on-demand symbols (user-requested, not in top N)
const now = Date.now()
for (const [sym, lastAccess] of _onDemandSymbols) {
if (now - lastAccess > ON_DEMAND_IDLE_MS) {
_onDemandSymbols.delete(sym)
} else {
_trackedSymbols.add(sym)
}
}
}
/**
* On-demand track a symbol (called from API when user opens modal)
*/
function track(symbol) {
if (!symbol) return
_onDemandSymbols.set(symbol, Date.now())
_trackedSymbols.add(symbol)
}
/**
* Take snapshots of all tracked symbols, write to SQLite
*/
function takeSnapshots() {
if (!_stateManager || !_trackedSymbols.size || !_db) return
// Refresh mark prices from ticker cache
const now = Date.now()
if (now - _markPriceTs > 10_000 && _getProxyCached) {
const tickers = _getProxyCached('ticker24hr', 60_000)
if (Array.isArray(tickers)) {
for (const t of tickers) {
const p = parseFloat(t.lastPrice)
if (p > 0) _markPriceMap.set(t.symbol, p)
}
_markPriceTs = now
}
}
const rows = []
for (const symbol of _trackedSymbols) {
const book = _stateManager.books.get(symbol)
if (!book) continue
const markPrice = _markPriceMap.get(symbol) || 0
if (!markPrice) continue
const bucketSize = markPrice * BUCKET_DIVISOR
const minPrice = markPrice * (1 - WINDOW_PCT / 100)
const maxPrice = markPrice * (1 + WINDOW_PCT / 100)
const bids = bucketSide(book.bids, bucketSize, minPrice, maxPrice)
const asks = bucketSide(book.asks, bucketSize, minPrice, maxPrice)
if (Object.keys(bids).length > 0 || Object.keys(asks).length > 0) {
rows.push([symbol, now, markPrice, JSON.stringify({ bids, asks })])
}
}
if (rows.length > 0) {
const insertMany = _db.transaction((items) => {
for (const r of items) _stmtInsert.run(r[0], r[1], r[2], r[3])
})
insertMany(rows)
}
}
/**
* Bucket one side of the order book into price bands
*/
function bucketSide(sideMap, bucketSize, minPrice, maxPrice) {
const buckets = {}
if (!sideMap || !bucketSize) return buckets
for (const [price, data] of sideMap) {
if (price < minPrice || price > maxPrice) continue
if (!data.notional || data.notional <= 0) continue
const bucketKey = +(Math.round(price / bucketSize) * bucketSize).toFixed(8)
buckets[bucketKey] = (buckets[bucketKey] || 0) + data.notional
}
return buckets
}
/**
* Get heatmap snapshots from SQLite
* Returns same format as old depth-heatmap for frontend compatibility
*/
function getSnapshots(symbol, hours = 4) {
if (!_db) return null
const since = Date.now() - hours * 3600_000
const rows = _stmtSelect.all(symbol, since)
if (!rows.length) return null
const lastRow = rows[rows.length - 1]
const markPrice = lastRow.mark_price
const bucketSize = markPrice * BUCKET_DIVISOR
return {
symbol,
markPrice,
bucketSize: +bucketSize.toFixed(8),
windowPct: WINDOW_PCT,
count: rows.length,
snapshots: rows.map(r => {
const d = JSON.parse(r.data)
return { ts: r.ts, bids: d.bids, asks: d.asks }
})
}
}
/**
* Cleanup snapshots older than MAX_HISTORY_MS
*/
function cleanup() {
if (!_db) return
const cutoff = Date.now() - MAX_HISTORY_MS
const result = _stmtDelete.run(cutoff)
if (result.changes > 0) {
log.info({ deleted: result.changes }, 'Cleaned up old snapshots')
}
}
/**
* Order-book imbalance from the latest snapshot of a symbol.
* imbalance = (ΣbidVol - ΣaskVol) / (ΣbidVol + ΣaskVol), range [-1, +1].
* Read-only helper for the order-flow signal scanner.
*/
/**
* Order-book imbalance with a 0.5% window + anti-spoof persistence filter.
* Walls are bucketed by 0.1% distance from price; a bucket is counted only if
* it survived (>=50% volume) across the last OF_PERSIST_SNAPS snapshots — this
* drops flash/spoof walls that appear for a single frame.
*/
function getImbalance(symbol) {
if (!_db) return null
const rows = _db.prepare('SELECT mark_price, data FROM snapshots WHERE symbol = ? ORDER BY ts DESC LIMIT ?').all(symbol, OF_PERSIST_SNAPS)
if (!rows.length || !rows[0].data || !rows[0].mark_price) return null
const mark = rows[0].mark_price
// bucket one snapshot's levels by 0.1% distance band, within the OF window
const bucketize = (dataStr, markPrice) => {
let d
try { d = JSON.parse(dataStr) } catch { return null }
const out = { bid: new Map(), ask: new Map() }
const add = (side, levels) => {
for (const [ps, v] of Object.entries(levels || {})) {
const p = Number(ps), vol = Number(v) || 0
if (!p || vol <= 0) continue
const distPct = Math.abs(p - markPrice) / markPrice * 100
if (distPct > OF_WINDOW_PCT) continue
const k = Math.floor(distPct / (BUCKET_DIVISOR * 100)) // 0.1% band index
out[side].set(k, (out[side].get(k) || 0) + vol)
}
}
add('bid', d.bids)
add('ask', d.asks)
return out
}
const cur = bucketize(rows[0].data, mark)
if (!cur) return null
const priors = []
for (let i = 1; i < rows.length; i++) {
if (!rows[i].data || !rows[i].mark_price) continue
const b = bucketize(rows[i].data, rows[i].mark_price)
if (b) priors.push(b)
}
if (!priors.length) return null // need history to confirm persistence (no signal right after restart)
// sum only buckets that persisted across EVERY prior snapshot
const sumPersistent = (side) => {
let total = 0
for (const [k, vol] of cur[side]) {
let persists = true
for (const pb of priors) {
if ((pb[side].get(k) || 0) < OF_PERSIST_FRAC * vol) { persists = false; break }
}
if (persists) total += vol
}
return total
}
const bidVol = sumPersistent('bid')
const askVol = sumPersistent('ask')
const tot = bidVol + askVol
if (tot <= 0) return null
// One-sided book (no persistent liquidity on a side within the window) is an
// un-synced/frozen-book artifact, not a tradeable imbalance — it yields a bogus
// ±1.0 (100%) reading. A real orderflow signal has liquidity on BOTH sides.
if (bidVol <= 0 || askVol <= 0) return null
return { imbalance: (bidVol - askVol) / tot, bidVol, askVol, markPrice: mark }
}
/**
* Stats for monitoring
*/
function getStats() {
if (!_db) return { trackedSymbols: 0, symbols: [] }
const total = _db.prepare('SELECT COUNT(*) as cnt FROM snapshots').get()
const symbols = _stmtStats.all()
return {
trackedSymbols: _trackedSymbols.size,
totalSnapshots: total.cnt,
maxHours: MAX_HISTORY_MS / 3600_000,
intervalSec: SNAPSHOT_INTERVAL_MS / 1000,
symbols
}
}
module.exports = { init, stop, track, getSnapshots, getStats, getImbalance }
📜 Git History
72fcec5fix(orderflow): reject one-sided book imbalance (kill bogus 100% signals)4 weeks ago
181ff05feat(orderflow): getImbalance 0.5% window + anti-spoof persistence filter4 weeks ago
8970a3dfeat(signals): order-flow imbalance signal type (Phase 0 detect+log)4 weeks ago
dc48be8feat: depth heatmap v2 — SQLite persistence, visual polish, klines gap fix8 weeks ago