'use strict'
const { createLogger } = require('./logger')
const log = createLogger('vpin')
/**
* VPIN — Volume-Synchronized Probability of Informed Trading
*
* Measures order flow toxicity by comparing buy vs sell volume
* in equal-sized volume buckets. High VPIN (>0.5) = informed trading,
* expect volatility. Leading indicator (precedes moves by minutes).
*
* Uses Binance taker buy/sell volume from klines (exact, not estimated).
* Buckets: 50 (standard), each = totalVolume / 50.
* VPIN = Σ|V_buy - V_sell| / (n × V_bucket)
*
* Scan runs periodically for top symbols by volume.
*/
const NUM_BUCKETS = 50
const SCAN_INTERVAL_MS = 60_000 // scan every 60s
const SCAN_KLINE_LIMIT = 100 // 100 candles for computation
const SCAN_INTERVAL_TF = '5m' // 5-minute candles
const MAX_SYMBOLS = 60 // top 60 by volume
const CACHE_TTL_MS = 55_000 // cache results for 55s
let _bgetWithRetry = null
let _getProxyCached = null
let _scanInterval = null
// symbol -> { vpin, buyPct, sellPct, totalVol, ts }
const vpinCache = new Map()
function init({ bgetWithRetry, getProxyCached }) {
_bgetWithRetry = bgetWithRetry
_getProxyCached = getProxyCached
// First scan after 30s (let server warm up)
setTimeout(() => {
scanAll().catch(() => {})
_scanInterval = setInterval(() => scanAll().catch(() => {}), SCAN_INTERVAL_MS)
}, 30_000)
log.info({ intervalSec: SCAN_INTERVAL_MS / 1000, buckets: NUM_BUCKETS, tf: SCAN_INTERVAL_TF }, 'VPIN scanner started')
}
function stop() {
if (_scanInterval) { clearInterval(_scanInterval); _scanInterval = null }
vpinCache.clear()
log.info('VPIN scanner stopped')
}
/**
* Compute VPIN for a single symbol from kline data
* Kline format: [time, open, high, low, close, vol, closeTime, quoteVol, trades, takerBuyBaseVol, takerBuyQuoteVol, ...]
*/
function computeVPIN(klines) {
if (!Array.isArray(klines) || klines.length < 20) return null
// Extract buy/sell quote volumes per candle
const candles = klines.map(k => {
const quoteVol = parseFloat(k[7]) // total quote volume
const buyQuoteVol = parseFloat(k[10]) // taker buy quote volume
const sellQuoteVol = quoteVol - buyQuoteVol
return { quoteVol, buyQuoteVol, sellQuoteVol }
})
const totalVolume = candles.reduce((s, c) => s + c.quoteVol, 0)
if (totalVolume <= 0) return null
const bucketSize = totalVolume / NUM_BUCKETS
// Fill volume buckets
let bucketBuy = 0
let bucketSell = 0
let bucketFilled = 0
let sumAbsDiff = 0
let bucketsComplete = 0
let totalBuy = 0
let totalSell = 0
for (const candle of candles) {
let remainBuy = candle.buyQuoteVol
let remainSell = candle.sellQuoteVol
totalBuy += candle.buyQuoteVol
totalSell += candle.sellQuoteVol
while (remainBuy + remainSell > 0) {
const remaining = bucketSize - bucketFilled
const candleRemaining = remainBuy + remainSell
if (candleRemaining >= remaining) {
// This candle fills the bucket
const ratio = remaining / candleRemaining
bucketBuy += remainBuy * ratio
bucketSell += remainSell * ratio
remainBuy -= remainBuy * ratio
remainSell -= remainSell * ratio
sumAbsDiff += Math.abs(bucketBuy - bucketSell)
bucketsComplete++
bucketBuy = 0
bucketSell = 0
bucketFilled = 0
} else {
// Candle partially fills bucket
bucketBuy += remainBuy
bucketSell += remainSell
bucketFilled += candleRemaining
remainBuy = 0
remainSell = 0
}
}
}
if (bucketsComplete < 10) return null // not enough data
const vpin = sumAbsDiff / (bucketsComplete * bucketSize)
const buyPct = totalBuy / totalVolume
const sellPct = totalSell / totalVolume
return {
vpin: Math.min(1, Math.max(0, vpin)),
buyPct,
sellPct,
totalVol: totalVolume,
buckets: bucketsComplete,
}
}
/**
* Get VPIN for a single symbol (from cache or compute)
*/
async function getVPIN(symbol) {
const cached = vpinCache.get(symbol)
if (cached && Date.now() - cached.ts < CACHE_TTL_MS) return cached
try {
const klines = await _bgetWithRetry(`/fapi/v1/klines?symbol=${symbol}&interval=${SCAN_INTERVAL_TF}&limit=${SCAN_KLINE_LIMIT}`)
if (!Array.isArray(klines)) return null
const result = computeVPIN(klines)
if (!result) return null
const entry = { symbol, ...result, ts: Date.now() }
vpinCache.set(symbol, entry)
return entry
} catch (err) {
return vpinCache.get(symbol) || null // return stale on error
}
}
/**
* Scan top symbols by volume, compute VPIN for each
*/
async function scanAll() {
if (!_getProxyCached || !_bgetWithRetry) return
const tickers = _getProxyCached('ticker24hr', 60_000)
if (!Array.isArray(tickers)) return
// Sort by quote volume, take top N
const sorted = tickers
.filter(t => t.symbol.endsWith('USDT') && parseFloat(t.quoteVolume) > 10_000_000)
.sort((a, b) => parseFloat(b.quoteVolume) - parseFloat(a.quoteVolume))
.slice(0, MAX_SYMBOLS)
let computed = 0
for (const t of sorted) {
try {
const result = await getVPIN(t.symbol)
if (result) computed++
} catch (_) {}
}
log.debug({ computed, total: sorted.length }, 'VPIN scan complete')
}
/**
* Get all cached VPIN values, sorted by VPIN descending (most toxic first)
*/
function getAll() {
const results = []
const now = Date.now()
for (const [, entry] of vpinCache) {
if (now - entry.ts < CACHE_TTL_MS * 2) { // allow slightly stale
results.push(entry)
}
}
results.sort((a, b) => b.vpin - a.vpin)
return results
}
/**
* Get stats for monitoring
*/
function getStats() {
const all = getAll()
const avg = all.length ? all.reduce((s, e) => s + e.vpin, 0) / all.length : 0
const high = all.filter(e => e.vpin > 0.5)
return {
cached: vpinCache.size,
avgVpin: +avg.toFixed(4),
highToxicity: high.length,
top5: all.slice(0, 5).map(e => ({ symbol: e.symbol, vpin: +e.vpin.toFixed(4), buyPct: +(e.buyPct * 100).toFixed(1) })),
}
}
module.exports = { init, stop, getVPIN, getAll, getStats, computeVPIN }
📜 Git History
b1f6e80feat: VPIN scanner (order flow toxicity indicator)8 weeks ago
Show last diff
Loading...