/** Detection loop: copy a leader's Polymarket trades onto the follower's deposit wallet.
*
* Default mode = WEBSOCKET (near-real-time, enter ASAP after the whale):
* subscribe to RTDS activity/trades firehose, filter by leader address, copy instantly.
* Fallback mode = POLL=1 (Data API /trades?user=, ~5min lag).
*
* Usage: LEADER=0x..[,0x..] [LIVE=1] [COPY_FRACTION=0.1] [MIN_SHARES=5] [MAX_NOTIONAL=2]
* [POLL=1] [STATE=./copy-state.json] node [--experimental-global-webcrypto] copy-loop.mjs
*
* Detection (Data API + RTDS WS) is public/read-only (not geoblocked) β runs anywhere.
* The copy POST (LIVE) must run from the Malaysia executor (non-geoblocked egress). */
import fs from 'node:fs';
import { createPublicClient, http, erc20Abi } from 'viem';
import { polygon } from 'viem/chains';
const DATA_API = 'https://data-api.polymarket.com';
const RTDS_WS = 'wss://ws-live-data.polymarket.com';
const PUSD = '0xC011a7E12a19f7B1f670d46F03B03f3342E82DFB';
const CTF = '0x4D97DCd97eC945f40cF65F87097ACe5EA0476045'; // Conditional Tokens (ERC1155) β source of truth for held outcome tokens
const CTF_BAL_ABI = [{ name: 'balanceOf', type: 'function', stateMutability: 'view', inputs: [{ name: 'account', type: 'address' }, { name: 'id', type: 'uint256' }], outputs: [{ type: 'uint256' }] }];
const DEPOSIT_WALLET = process.env.DEPOSIT_WALLET || '0x50A8061e9448EB1e5d5e7aF07BE4E4F63C6F24Ff';
const pub = createPublicClient({ chain: polygon, transport: http(process.env.RPC || 'https://polygon.drpc.org') });
// Second, INDEPENDENT RPC used only to confirm a balanceOf=0 before closing a position.
// drpc round-robins across nodes and a lagging node can return a FALSE 0 β without a
// second opinion that false zero would close a live position and strand its value.
const pub2 = createPublicClient({ chain: polygon, transport: http(process.env.RPC2 || 'https://polygon-bor-rpc.publicnode.com') });
let budget = 0; // follower wallet pUSD balance, for "% Π±ΡΠ΄ΠΆΠ΅ΡΠ°" allocation
async function refreshBudget() {
try { budget = Number(await pub.readContract({ address: PUSD, abi: erc20Abi, functionName: 'balanceOf', args: [DEPOSIT_WALLET] })) / 1e6; }
catch (e) { console.log('[budget] read err:', e.message); }
}
const LIVE = process.env.LIVE === '1'; // default DRY for safety
const POLL = process.env.POLL === '1'; // default = websocket
const LEADERS = new Set((process.env.LEADER || '').split(',').map((s) => s.trim().toLowerCase()).filter(Boolean));
const FOLLOWERS = new Set((process.env.FOLLOWER || '').split(',').map((s) => s.trim().toLowerCase()).filter(Boolean)); // allow-list of sub owners we copy
const COPY_FRACTION = Number(process.env.COPY_FRACTION || '0.1'); // copy 10% of leader size
const MIN_SHARES = Number(process.env.MIN_SHARES || '5'); // CLOB min order
const MAX_NOTIONAL = Number(process.env.MAX_NOTIONAL || '2'); // cap $ per copy (safety)
const MIN_NOTIONAL = Number(process.env.MIN_NOTIONAL || '1'); // Polymarket marketable-order minimum ($1)
const MAX_OPEN = Number(process.env.MAX_OPEN || '5'); // cap open positions β total exposure β€ MAX_OPEN*MAX_NOTIONAL
const MAX_PER_EVENT = Number(process.env.MAX_PER_EVENT || '1'); // cap open positions per (leader, eventSlug) β no multi-leg pile-up on one game
const MAX_OPEN_PER_LEADER = Number(process.env.MAX_OPEN_PER_LEADER || '1'); // cap open positions per leader β one active whale can't squat all MAX_OPEN slots
const LEADER_KILL_USD = Number(process.env.LEADER_KILL_USD || '8'); // per-whale stop-loss: once our realized PnL from a leader drops to -$X, stop copying its new trades (existing opens ride to resolution)
const MIRROR_EXIT_LIVE = process.env.MIRROR_EXIT_LIVE === '1'; // ACT on a detected leader-exit (sell our copy). Default off = detect+log only (safe observe phase)
const LEADER_EXIT_CONFIRM = Number(process.env.LEADER_EXIT_CONFIRM || '2'); // consecutive reconcile cycles a leader must be absent from a held token before we trust the exit (guards Data API flake)
const EXIT_SLIPPAGE = Number(process.env.EXIT_SLIPPAGE || '0.02'); // when mirror-selling, undercut current mark by this to cross the spread and fill
const STATE_FILE = process.env.STATE || './copy-state.json';
const POLL_MS = Number(process.env.POLL_MS || '15000');
const SUBS_URL = process.env.SUBS_URL || ''; // e.g. https://poly-dev.szhub.space/api/copy/active
const SERVICE_TOKEN = process.env.SERVICE_TOKEN || '';
const SUBS_POLL_MS = Number(process.env.SUBS_POLL_MS || '20000');
const PNL_URL = process.env.PNL_URL || (SUBS_URL ? SUBS_URL.replace(/\/active$/, '/pnl') : ''); // POST per-leader PnL here
const PNL_POLL_MS = Number(process.env.PNL_POLL_MS || '120000'); // every 2 min
if (![...LEADERS].every((a) => a.length === 42)) { console.log('set LEADER=0x... (42 chars, comma-separated for many)'); process.exit(1); }
if (!LEADERS.size && !SUBS_URL) { console.log('set LEADER=0x... or SUBS_URL (DB-driven)'); process.exit(1); }
// Map a UI CopyConfig β our sizing params. The env values are HARD safety ceilings
// the UI can only go below (defense in depth on a money path).
function deriveCfg(c) {
c = c || {};
const mode = c.allocMode || 'proportional'; // 'fixed' | 'percent' | 'proportional'
const allocValue = Number(c.allocValue) || 0;
const maxNotional = Math.max(MIN_NOTIONAL, Math.min(Number(c.maxPerTrade) || MAX_NOTIONAL, MAX_NOTIONAL));
const maxOpen = (c.maxExposure && c.maxPerTrade) ? Math.max(1, Math.min(Math.floor(c.maxExposure / c.maxPerTrade), MAX_OPEN)) : MAX_OPEN;
const priceMin = c.priceMin != null ? Number(c.priceMin) / 100 : 0;
const priceMax = c.priceMax != null ? Number(c.priceMax) / 100 : 1;
const maxPerEvent = Math.max(1, Math.min(Number(c.maxPerEvent) || MAX_PER_EVENT, MAX_PER_EVENT));
const maxOpenPerLeader = Math.max(1, Math.min(Number(c.maxOpenPerLeader) || MAX_OPEN_PER_LEADER, MAX_OPEN_PER_LEADER));
return { mode, allocValue, maxNotional, maxOpen, priceMin, priceMax, maxPerEvent, maxOpenPerLeader };
}
const defaultCfg = deriveCfg(null); // env-based defaults (env LEADER mode)
const subsConfig = new Map(); // leader β derived cfg (DB-driven)
// Active leaders = env LEADERS βͺ subscriptions from the screener DB (refreshed live).
const activeLeaders = new Set(LEADERS);
async function pollSubs() {
if (!SUBS_URL) return;
try {
const r = await fetch(SUBS_URL, { headers: { 'x-service-token': SERVICE_TOKEN }, signal: AbortSignal.timeout(12000) });
if (!r.ok) { console.log('[subs] http', r.status); return; }
const d = await r.json();
subsConfig.clear();
for (const s of (d.data || [])) {
// Only copy subscriptions owned by allow-listed followers (this daemon trades
// ONE deposit wallet). Blocks foreign subs from steering our wallet while the
// subscription endpoint still trusts a client-asserted address (IDOR β the real
// fix is server-side Privy token verification before multi-user/prod).
if (FOLLOWERS.size && !FOLLOWERS.has(String(s.userAddress || '').toLowerCase())) continue;
const a = String(s.leaderAddress || '').toLowerCase();
if (a.length === 42) subsConfig.set(a, deriveCfg(s.config));
}
const next = new Set([...LEADERS, ...subsConfig.keys()]);
if (next.size !== activeLeaders.size || [...next].some((a) => !activeLeaders.has(a))) {
console.log(`[subs] active leaders updated: ${next.size} (${subsConfig.size} from DB)`);
}
activeLeaders.clear(); for (const a of next) activeLeaders.add(a);
} catch (e) { console.log('[subs] err:', e.message); }
}
const tradeKey = (t) => `${t.transactionHash || ''}:${t.asset || ''}:${t.side || ''}:${t.timestamp || ''}:${t.size || ''}`;
/** map a leader trade β the order WE would place for the follower (per-leader cfg).
* Target $ per copy depends on the allocation mode:
* fixed β allocValue $ per trade
* percent β allocValue% of the follower's wallet budget
* proportionalβ COPY_FRACTION of the leader's own trade notional ("ΠΊΠ°ΠΊ ΠΊΠΈΡ") */
function decideCopy(t, cfg) {
const leaderSize = Number(t.size || 0);
const price = Number(t.price || 0);
if (!t.asset || !leaderSize || !price) return null;
let target; // target notional ($) for this copy
if (cfg.mode === 'fixed') target = cfg.allocValue;
else if (cfg.mode === 'percent') target = (cfg.allocValue / 100) * budget;
else target = leaderSize * price * COPY_FRACTION;
let size = Math.round((target || 0) / price);
if (size < MIN_SHARES) size = MIN_SHARES; // bump to CLOB share minimum
if (size * price < MIN_NOTIONAL) size = Math.ceil(MIN_NOTIONAL / price); // meet $ minimum (marketable orders)
if (size * price > cfg.maxNotional) { // per-leader $ cap (β€ env ceiling)
size = Math.floor(cfg.maxNotional / price);
if (size < MIN_SHARES || size * price < MIN_NOTIONAL) return null; // can't satisfy min$ and cap$ at this price β skip
}
return { tokenID: t.asset, side: (t.side || '').toUpperCase(), price, size,
notional: +(size * price).toFixed(2), outcome: t.outcome, title: t.title };
}
let executor = null;
// --- positions ledger (local JSON): one open record per held tokenID ---
const POS_FILE = process.env.POSITIONS || './positions.json';
const loadPos = () => { try { return JSON.parse(fs.readFileSync(POS_FILE, 'utf8')); } catch { return {}; } };
// Atomic write: a crash mid-write would corrupt positions.json β loadPos() falls back
// to {} β all position tracking lost (re-buy storm + orphaned on-chain holdings). Write
// to a temp file then rename (atomic on the same filesystem) so readers always see a
// complete file β either the old one or the new one, never a half-written one.
const savePos = (p) => { const tmp = `${POS_FILE}.tmp`; fs.writeFileSync(tmp, JSON.stringify(p, null, 1)); fs.renameSync(tmp, POS_FILE); };
// Serialize every read-modify-write of the ledger. WS message handlers run async and are
// NOT awaited by `ws`, so two trades (or a buy + a slow reconcile) can interleave at await
// points: both pass the MAX_OPEN / per-leader / dup-token checks before either saves (TOCTOU
// β cap breach / double-buy), or a reconcile holding a stale `pos` across on-chain reads
// overwrites a buy that landed meanwhile (lost position β can't close). Chain all mutations
// through one promise so they run strictly one at a time.
let posLock = Promise.resolve();
const withPos = (fn) => { const run = posLock.then(fn, fn); posLock = run.catch(() => {}); return run; };
// A position is 'pending' from the moment we reserve its slot until the CLOB order is
// confirmed filled (then 'open') or rolled back. Caps count pending too, so two concurrent
// trades can't both pass MAX_OPEN while their orders are in flight outside the lock.
const isActive = (p) => !!p && (p.status === 'open' || p.status === 'pending');
// A reservation whose order never fills within this window (rejected / rested / never matched)
// is cancelled + released by reconcile so it doesn't squat a slot forever.
const PENDING_TIMEOUT_S = Number(process.env.PENDING_TIMEOUT_S || '180');
// Normalize a trade timestamp to Unix SECONDS (reconcile age math assumes seconds). RTDS
// currently sends seconds; guard in case a payload ever arrives in milliseconds.
const toSec = (ts) => { const n = Number(ts) || 0; return n > 2e10 ? Math.floor(n / 1000) : n; };
// Reconcile local open ledger against on-chain reality. Markets that resolve
// WITHOUT the leader selling (e.g. sports held to settlement) get redeemed by
// the DE cron on-chain but leave a stale 'open' record here β MAX_OPEN saturates
// forever and no new copies happen. Mark any open position the DW no longer
// holds as closed, freeing the slot. Grace period guards against Data API lag on
// freshly-filled positions.
const RECONCILE_GRACE_S = Number(process.env.RECONCILE_GRACE_S || '600'); // 10 min
// A position absent from the Data API for this long while STILL on-chain = the market
// resolved and these are worthless losing tokens nobody redeems ($0 payout). They'd
// occupy a MAX_OPEN slot forever (can't sell dust, redeem-cron only redeems winners)
// β free the slot. Live long-held positions keep value, stay in the Data API, never
// reach this branch. Well beyond the flake window so transient drops don't trip it.
const MAX_OPEN_AGE_S = Number(process.env.MAX_OPEN_AGE_S || '86400'); // 24h
async function reconcileOpenPositions() {
try {
const res = await fetch(`${DATA_API}/positions?user=${DEPOSIT_WALLET}`, { signal: AbortSignal.timeout(15000) });
const held = await res.json();
if (!Array.isArray(held)) return;
const heldPnl = new Map(held.map((p) => [String(p.asset), Number(p.cashPnl) || 0])); // asset β live mark-to-market
const now = Math.floor(Date.now() / 1000);
// Phase 1 (NO lock): inspect a snapshot + do the slow on-chain reads, collect intended
// changes. Holding the lock across these reads would block copies for seconds.
const snapshot = loadPos();
const changes = new Map(); // tid β { markPnl?, close?: reason }
const toClose = []; // [{ tid, price }] confirmed leader-exits to mirror-sell (Phase B)
// Proactive leader-exit: pull current holdings of the leaders behind our OPEN
// positions. If the entry-leader no longer holds a token we still hold, they've
// exited β mirror the close (confirmed over LEADER_EXIT_CONFIRM cycles vs flake).
const openLeaders = [...new Set(Object.values(snapshot)
.filter((p) => p.status === 'open' && p.leader).map((p) => String(p.leader).toLowerCase()))];
const leaderHeld = new Map(); // leader β Set(asset). Absent entry = unreliable this cycle β skip exit check.
await Promise.allSettled(openLeaders.map(async (ldr) => {
try {
const r = await fetch(`${DATA_API}/positions?user=${ldr}`, { signal: AbortSignal.timeout(15000) });
const arr = await r.json();
if (Array.isArray(arr) && arr.length) leaderHeld.set(ldr, new Set(arr.map((x) => String(x.asset))));
} catch { /* leave unset β don't trust absence this cycle */ }
}));
const curPrice = new Map(held.map((h) => [String(h.asset), (Number(h.size) > 0 ? Number(h.currentValue) / Number(h.size) : 0)]));
for (const [tid, p] of Object.entries(snapshot)) {
if (p.status === 'pending') {
// Reservation awaiting fill. On-chain balance is the source of truth: if it appeared
// the order filled β promote to open; if still zero past the timeout the order never
// filled (rejected/rested) β cancel its resting order + release the slot.
let bal;
try { bal = await pub.readContract({ address: CTF, abi: CTF_BAL_ABI, functionName: 'balanceOf', args: [DEPOSIT_WALLET, BigInt(tid)] }); }
catch { continue; } // RPC error β leave pending, retry next cycle
if (bal > 0n) changes.set(tid, { promote: true });
else if ((now - (p.openedAt || 0)) > PENDING_TIMEOUT_S) changes.set(tid, { cancelPending: p.lastOrderId || null });
continue;
}
if (p.status !== 'open') continue;
if (heldPnl.has(String(tid))) {
const ch = { markPnl: heldPnl.get(String(tid)) }; // refresh last-known P&L each cycle
const lh = leaderHeld.get(String(p.leader || '').toLowerCase());
if (lh) { // reliable leader holdings this cycle
if (!lh.has(String(tid))) { ch.leaderExit = true; ch.exitPrice = curPrice.get(String(tid)) || 0; }
else ch.leaderStillIn = true; // still in β reset any pending exit-confirm
}
changes.set(tid, ch);
} else if ((now - (p.openedAt || 0)) > RECONCILE_GRACE_S) {
// Absent from the Data API. That API flakes (drops held positions transiently),
// so DON'T trust absence β verify the real ERC1155 balance on CTF before banking.
// Only a true on-chain zero balance means the position is gone (sold/resolved+redeemed).
let onchain;
try { onchain = await pub.readContract({ address: CTF, abi: CTF_BAL_ABI, functionName: 'balanceOf', args: [DEPOSIT_WALLET, BigInt(tid)] }); }
catch (e) { console.log(`[reconcile] CTF read err ${String(tid).slice(0, 10)}:`, e.message); continue; } // RPC error β leave open, retry next cycle
if (onchain === 0n) {
// Confirm the zero on a second independent RPC before banking the close β
// drpc can serve a stale node returning a false 0 (would strand live value).
let onchain2;
try { onchain2 = await pub2.readContract({ address: CTF, abi: CTF_BAL_ABI, functionName: 'balanceOf', args: [DEPOSIT_WALLET, BigInt(tid)] }); }
catch (e) { console.log(`[reconcile] CTF confirm-read err ${String(tid).slice(0, 10)}:`, e.message); continue; } // can't confirm β keep open, retry next cycle
if (onchain2 === 0n) {
changes.set(tid, { close: 'resolved-onchain-absent' });
} else {
console.log(`[reconcile] false-zero guarded ${String(tid).slice(0, 10)}: rpc1=0 rpc2=${onchain2} β keep open`);
}
} else if ((now - (p.openedAt || 0)) > MAX_OPEN_AGE_S) {
// On-chain balance persists but the position vanished from the Data API
// long ago β resolved-worthless leftover. Free the slot.
changes.set(tid, { close: 'resolved-worthless-aged' });
} // onchain > 0 and recent β still held but missing from Data API β keep open
}
}
if (!changes.size) return;
// Phase 2 (LOCKED): re-load the FRESH ledger and apply patches only to still-open
// positions, so we never overwrite a buy/close that landed during the on-chain reads.
await withPos(() => {
const pos = loadPos();
let freed = 0, changed = false;
for (const [tid, patch] of changes) {
const p = pos[tid];
if (!p) continue;
if (patch.promote) { if (p.status === 'pending') { p.status = 'open'; changed = true; } continue; }
if (patch.cancelPending !== undefined) { // pending timed out β cancel resting order + drop the reservation
if (p.status === 'pending') { if (patch.cancelPending && executor) executor.cancel(patch.cancelPending).catch(() => {}); delete pos[tid]; changed = true; }
continue;
}
if (p.status !== 'open') continue; // a concurrent handler already changed it
if (patch.markPnl != null) { p.markPnl = patch.markPnl; changed = true; }
if (patch.leaderStillIn) { if (p.leaderExitSeen) { p.leaderExitSeen = 0; changed = true; } }
else if (patch.leaderExit) {
p.leaderExitSeen = (p.leaderExitSeen || 0) + 1; changed = true;
if (p.leaderExitSeen >= LEADER_EXIT_CONFIRM) {
console.log(`π» leader-exit confirmed (${p.leaderExitSeen}x) ${String(tid).slice(0, 10)} leader=${String(p.leader).slice(0, 8)} "${String(p.title).slice(0, 38)}" markβ$${(Number(p.markPnl) || 0).toFixed(2)} β ${MIRROR_EXIT_LIVE ? 'closing' : 'OBSERVE-ONLY'}`);
if (MIRROR_EXIT_LIVE) toClose.push({ tid, price: patch.exitPrice });
} else {
console.log(`β¦ leader-exit seen ${p.leaderExitSeen}/${LEADER_EXIT_CONFIRM} ${String(tid).slice(0, 10)} (awaiting confirm)`);
}
}
if (patch.close) {
p.status = 'closed'; p.closedReason = patch.close;
// worthless-aged = resolved $0 loser β realized is the full cost basis lost.
// onchain-absent = sold/redeemed β last mark-to-market is the best estimate we have.
p.realizedPnl = patch.close === 'resolved-worthless-aged' ? -(Number(p.cost) || 0) : (Number(p.markPnl) || 0);
freed += 1; changed = true;
}
}
if (changed) savePos(pos);
if (freed) console.log(`[reconcile] freed ${freed} resolved slot(s); open now ${Object.values(pos).filter((x) => x.status === 'open').length}`);
});
// Phase B (UNLOCKED network): mirror-sell positions whose entry-leader has exited.
// Banks closed only on a real fill; otherwise cancels the resting order, resets the
// confirm counter and keeps the position open for next cycle (never falsely banks).
if (MIRROR_EXIT_LIVE && toClose.length && executor) {
for (const { tid, price } of toClose) {
const sellPx = Math.max(0.01, Math.min(0.99, (Number(price) || 0) * (1 - EXIT_SLIPPAGE)));
let resp;
try { resp = await executor.sellAll({ tokenID: tid, price: sellPx }); }
catch (e) { resp = { success: false, errorMsg: e?.message || String(e) }; }
await withPos(() => {
const pos = loadPos();
const h = pos[tid];
if (!h || h.status !== 'open') return;
const soldNow = resp?.success && (resp.status === 'matched' || Number(resp.takingAmount || 0) > 0 || Number(resp.makingAmount || 0) > 0);
if (resp?.skipped) {
h.status = 'closed'; h.closedReason = `leader-exit-${String(resp.skipped)}`; h.closedAt = now; h.realizedPnl = Number(h.markPnl) || 0; savePos(pos);
console.log(` mirror-close skipped (${resp.skipped}) β closed ${String(tid).slice(0, 10)}`);
} else if (soldNow) {
h.status = 'closed'; h.closedReason = 'mirror-leader-exit'; h.closeOrderId = resp.orderID; h.closedAt = now; h.realizedPnl = Number(h.markPnl) || 0; savePos(pos);
console.log(`β
MIRROR-CLOSED (leader exited) ${String(tid).slice(0, 10)} pnlβ$${(Number(h.markPnl) || 0).toFixed(2)}`);
} else if (resp?.success && resp.orderID) {
executor.cancel(resp.orderID).catch(() => {}); h.leaderExitSeen = 0; savePos(pos);
console.log(`β³ mirror-close not filled; keep open, retry next cycle ${String(tid).slice(0, 10)}`);
} else {
h.leaderExitSeen = 0; savePos(pos);
console.log(`β mirror-close rejected, keep open ${String(tid).slice(0, 10)}: ${JSON.stringify(resp).slice(0, 120)}`);
}
});
}
}
} catch (e) { console.log('[reconcile] err:', e.message); }
}
// Per-leader PnL for the UI: realized (banked from CLOSED copies of this leader) +
// unrealized (live cashPnl of currently-held copies). One entry per leader we have
// ANY record for, so the card can always show a line and we can see who bleeds.
async function computeAndPostPnL() {
try {
const res = await fetch(`${DATA_API}/positions?user=${DEPOSIT_WALLET}`, { signal: AbortSignal.timeout(15000) });
const held = await res.json();
if (!Array.isArray(held)) return;
const heldMap = new Map(held.map((h) => [String(h.asset), h])); // asset β live position
const pos = loadPos();
const agg = new Map(); // leader β { realized, unrealized, value, openCount, closedCount }
const get = (ldr) => {
let a = agg.get(ldr);
if (!a) { a = { leader: ldr, realized: 0, unrealized: 0, value: 0, openCount: 0, closedCount: 0 }; agg.set(ldr, a); }
return a;
};
for (const [tid, p] of Object.entries(pos)) {
if (p.status === 'pending') continue; // slot reserved, order not yet confirmed β not a real P&L line
const a = get(String(p.leader || 'unknown').toLowerCase());
if (p.status === 'open') {
const h = heldMap.get(String(tid));
if (h) { a.unrealized += Number(h.cashPnl) || 0; a.value += Number(h.currentValue) || 0; a.openCount += 1; }
} else {
a.realized += Number(p.realizedPnl) || 0; a.closedCount += 1;
}
}
const leaders = [...agg.values()].map((a) => ({
leader: a.leader,
realized: +a.realized.toFixed(4),
unrealized: +a.unrealized.toFixed(4),
pnl: +(a.realized + a.unrealized).toFixed(4), // total result (back-compat field)
value: +a.value.toFixed(4),
openCount: a.openCount,
closedCount: a.closedCount,
}));
console.log(`[pnl] ${leaders.map((l) => `${l.leader.slice(0, 8)}:tot${l.pnl >= 0 ? '+' : ''}${l.pnl.toFixed(2)}(o${l.openCount}/c${l.closedCount})`).join(' ') || '(no records)'}`);
if (!PNL_URL) return;
const r = await fetch(PNL_URL, { method: 'POST', headers: { 'content-type': 'application/json', 'x-service-token': SERVICE_TOKEN }, body: JSON.stringify({ leaders }), signal: AbortSignal.timeout(12000) });
if (!r.ok) console.log('[pnl] post http', r.status);
} catch (e) { console.log('[pnl] err:', e.message); }
}
async function handleTrade(t) {
const side = String(t.side || '').toUpperCase();
const leader = String(t.proxyWallet || '').toLowerCase();
const cfg = subsConfig.get(leader) || defaultCfg;
if (side === 'BUY') {
const px = Number(t.price || 0);
if (px < cfg.priceMin || px > cfg.priceMax) { console.log(` skip (price ${px} outside band ${cfg.priceMin}-${cfg.priceMax})`); return; }
const order = decideCopy(t, cfg);
if (!order) { console.log(' skip (incomplete/over-cap)'); return; }
const label = `${order.side} ${order.size} "${order.outcome}" @ ${order.price.toFixed(3)} ($${order.notional}) | ${String(order.title).slice(0, 45)}`;
const evt = t.eventSlug || t.slug || ''; // group all legs of one game (moneyline/spreads/O-U)
// Phase A (LOCKED): cap/dup checks + RESERVE the slot as 'pending', then release the lock so
// the slow CLOB POST runs OUTSIDE the mutex (it no longer blocks other leaders' trades).
// Caps count pending too, so two concurrent reservations can't both breach MAX_OPEN (TOCTOU).
const reserved = await withPos(() => {
const pos0 = loadPos();
// Per-whale stop-loss: if our banked realized PnL from this leader is already
// at/below -$LEADER_KILL_USD, stop copying its new trades (it's bleeding us).
const leaderRealized = Object.values(pos0)
.filter((p) => p.status === 'closed' && String(p.leader || '').toLowerCase() === leader)
.reduce((s, p) => s + (Number(p.realizedPnl) || 0), 0);
if (LEADER_KILL_USD > 0 && leaderRealized <= -LEADER_KILL_USD) { console.log(`β skip (leader killed: realized $${leaderRealized.toFixed(2)} β€ -$${LEADER_KILL_USD}): ${label}`); return false; }
if (isActive(pos0[order.tokenID])) { console.log(`βοΈ skip (already holding/pending this token): ${label}`); return false; }
if (evt) {
const sameEvt = Object.values(pos0).filter((p) => isActive(p) && p.leader === leader && p.eventSlug === evt).length;
if (sameEvt >= cfg.maxPerEvent) { console.log(`βοΈ skip (leader already ${sameEvt} on event ${evt}, max ${cfg.maxPerEvent}): ${label}`); return false; }
}
const leaderOpen = Object.values(pos0).filter((p) => isActive(p) && p.leader === leader).length;
if (leaderOpen >= cfg.maxOpenPerLeader) { console.log(`βοΈ skip (leader already ${leaderOpen}, max per-leader ${cfg.maxOpenPerLeader}): ${label}`); return false; }
const openCount = Object.values(pos0).filter(isActive).length;
if (openCount >= cfg.maxOpen) { console.log(`βοΈ skip (max ${cfg.maxOpen} open positions reached): ${label}`); return false; }
if (!LIVE) { console.log(`π’ WOULD COPY: ${label}\n tokenID=${order.tokenID}`); return false; }
pos0[order.tokenID] = { tokenID: order.tokenID, conditionId: t.conditionId, eventSlug: evt, outcome: t.outcome, title: t.title, leader, buys: 0, cost: order.notional, openedAt: Math.floor(Date.now() / 1000), status: 'pending' };
savePos(pos0);
return true;
});
if (!reserved) return;
// Phase B (UNLOCKED network): place the order, then finalize or roll back the reservation.
let resp;
try { resp = await executor.buy(order); }
catch (e) { resp = { success: false, errorMsg: e?.message || String(e) }; }
await withPos(() => {
const pos = loadPos();
const p = pos[order.tokenID];
if (!p || p.status !== 'pending') return; // reconcile/another handler already resolved it
// A copy BUY priced at the leader's fill is marketable; 'matched' or non-zero filled
// amounts mean we actually hold tokens now β confirm open. A 'live'/'delayed' status
// with nothing filled means it's resting/queued β keep pending; reconcile confirms via
// on-chain balance (promote) or cancels it on timeout. Outright rejection β release slot.
const filledNow = resp?.success && (resp.status === 'matched' || Number(resp.takingAmount || 0) > 0 || Number(resp.makingAmount || 0) > 0);
if (filledNow) {
p.status = 'open'; p.buys = 1; p.lastOrderId = resp.orderID; p.openedAt = toSec(t.timestamp) || p.openedAt; savePos(pos);
console.log(`β
COPIED: ${label}\n orderID=${resp.orderID} status=${resp.status}`);
} else if (resp?.success && resp.orderID) {
p.lastOrderId = resp.orderID; savePos(pos);
console.log(`β posted, awaiting fill (status ${resp.status}); reconcile will confirm/cancel: ${label}`);
} else {
delete pos[order.tokenID]; savePos(pos);
console.log(`β copy rejected, slot released: ${label}\n ${JSON.stringify(resp).slice(0, 160)}`);
}
});
return;
}
if (side === 'SELL') {
const label = `SELL "${t.outcome}" @ ${Number(t.price).toFixed(3)} | ${String(t.title).slice(0, 45)}`;
// Phase A (LOCKED): only the ENTRY leader's exit mirrors a close. A different followed leader
// selling the same token is NOT our exit signal β ignore it.
const go = await withPos(() => {
const pos = loadPos();
const held = pos[t.asset];
if (!held || held.status !== 'open') return false; // not held (pending/closed/absent) β ignore
if (held.leader !== leader) { console.log(` ignore SELL from non-entry leader ${leader.slice(0, 8)} (entry ${String(held.leader).slice(0, 8)}): ${label}`); return false; }
return true;
});
if (!go) return;
if (!LIVE) { console.log(`π» WOULD CLOSE (leader exited): ${label}\n tokenID=${t.asset}`); return; }
// Phase B (UNLOCKED network): place the close, then bank it ONLY if it actually filled.
let resp;
try { resp = await executor.sellAll({ tokenID: t.asset, price: Number(t.price) }); }
catch (e) { resp = { success: false, errorMsg: e?.message || String(e) }; }
await withPos(() => {
const pos = loadPos();
const held = pos[t.asset];
if (!held || held.status !== 'open') return; // already resolved by another handler
if (resp?.skipped) { held.status = 'closed'; held.closedReason = String(resp.skipped); held.closedAt = toSec(t.timestamp); held.realizedPnl = Number(held.markPnl) || 0; savePos(pos); console.log(` close skipped (${resp.skipped}) β marked closed: ${label}`); return; }
const soldNow = resp?.success && (resp.status === 'matched' || Number(resp.takingAmount || 0) > 0 || Number(resp.makingAmount || 0) > 0);
if (soldNow) {
held.status = 'closed'; held.closeOrderId = resp.orderID; held.closedAt = toSec(t.timestamp); held.realizedPnl = Number(held.markPnl) || 0; savePos(pos);
console.log(`β
CLOSED: ${label}\n orderID=${resp.orderID} status=${resp.status}`);
} else if (resp?.success && resp.orderID) {
// Posted but didn't fill (rested) β we STILL HOLD. Cancel the resting close and keep the
// position open so the next leader-sell / reconcile retries β do NOT falsely bank it closed.
executor.cancel(resp.orderID).then(() => console.log(`β©οΈ cancelled unfilled close ${resp.orderID}`)).catch(() => {});
console.log(`β³ close not filled (status ${resp.status}); keeping open, cancelled resting: ${label}`);
} else {
console.log(`β close rejected (still holding): ${label}\n ${JSON.stringify(resp).slice(0, 160)}`);
}
});
}
}
// --- WebSocket mode (default): near-real-time firehose, filter by leader ---
async function runWebsocket() {
const { default: WebSocket } = await import('ws');
const seen = new Set();
const remember = (k) => { seen.add(k); if (seen.size > 4000) seen.delete(seen.values().next().value); };
let backoff = 1000;
const connect = () => {
const ws = new WebSocket(RTDS_WS);
let ping;
ws.on('open', () => {
backoff = 1000;
ws.send(JSON.stringify({ action: 'subscribe', subscriptions: [{ topic: 'activity', type: 'trades' }] }));
ping = setInterval(() => { try { ws.send('PING'); } catch {} }, 5000);
console.log(`[ws] connected; watching ${activeLeaders.size} leader(s)`);
});
ws.on('message', async (d) => {
const s = d.toString();
if (s[0] !== '{' && s[0] !== '[') return; // skip PONG/control frames
let msg; try { msg = JSON.parse(s); } catch { return; }
for (const m of (Array.isArray(msg) ? msg : [msg])) {
const t = m.payload || m;
if (!t || !t.proxyWallet || !t.asset) continue;
if (!activeLeaders.has(String(t.proxyWallet).toLowerCase())) continue;
const k = tradeKey(t);
if (seen.has(k)) continue;
remember(k);
await handleTrade(t);
}
});
ws.on('error', (e) => console.log('[ws] error:', e.message));
ws.on('close', () => {
clearInterval(ping);
console.log(`[ws] closed; reconnecting in ${backoff}ms`);
setTimeout(connect, backoff);
backoff = Math.min(backoff * 2, 30000);
});
};
connect();
}
// --- Polling mode (POLL=1): Data API /trades?user= for EVERY active leader (~POLL_MS lag).
// Used when the RTDS websocket is unavailable (Cloudflare 429s datacenter IPs on WS upgrade).
// Polls activeLeaders (env LEADER βͺ DB subs), not just LEADERS[0]; per-leader cursors.
const loadState = () => { try { return JSON.parse(fs.readFileSync(STATE_FILE, 'utf8')); } catch { return {}; } };
const saveState = (s) => fs.writeFileSync(STATE_FILE, JSON.stringify(s));
async function tick() {
const leaders = [...activeLeaders];
if (!leaders.length) { console.log('[poll] no active leaders'); return; }
const state = loadState();
const seen = new Set(Array.isArray(state.seen) ? state.seen : []);
// Per-leader cursors; migrate any legacy scalar shape to objects.
if (!state.lastTs || typeof state.lastTs !== 'object') state.lastTs = {};
if (!state.primed || typeof state.primed !== 'object') state.primed = {};
const results = await Promise.allSettled(leaders.map(async (leader) => {
const r = await fetch(`${DATA_API}/trades?user=${leader}&limit=50`, { signal: AbortSignal.timeout(15000) });
if (!r.ok) throw new Error(`${leader.slice(0, 8)}β¦ data-api ${r.status}`);
return { leader, trades: await r.json() };
}));
const toCopy = [];
for (const res of results) {
if (res.status !== 'fulfilled') { console.log('[poll] err:', res.reason?.message); continue; }
const { leader, trades } = res.value;
if (!Array.isArray(trades)) continue;
if (!state.primed[leader]) {
// First sight of this leader: baseline current trades as seen (don't copy history).
for (const t of trades) seen.add(tradeKey(t));
state.lastTs[leader] = trades.reduce((m, t) => Math.max(m, Number(t.timestamp || 0)), 0);
state.primed[leader] = true;
console.log(`[prime] ${leader.slice(0, 8)}β¦ baseline (${trades.length} trades)`);
continue;
}
for (const t of trades) {
if (seen.has(tradeKey(t))) continue;
if (Number(t.timestamp || 0) < (state.lastTs[leader] || 0)) continue;
seen.add(tradeKey(t));
state.lastTs[leader] = Math.max(state.lastTs[leader] || 0, Number(t.timestamp || 0));
toCopy.push(t);
}
}
toCopy.sort((a, b) => Number(a.timestamp) - Number(b.timestamp));
for (const t of toCopy) await handleTrade(t);
state.seen = Array.from(seen).slice(-3000);
saveState(state);
console.log(`[poll] ${leaders.length} leader(s), ${toCopy.length} new trade(s)`);
}
if (LIVE) { const { makeExecutor } = await import('./copy-exec.mjs'); executor = await makeExecutor(); }
await refreshBudget(); setInterval(refreshBudget, 60000);
await reconcileOpenPositions(); setInterval(reconcileOpenPositions, 120000);
await computeAndPostPnL(); setInterval(computeAndPostPnL, PNL_POLL_MS);
if (SUBS_URL) { await pollSubs(); setInterval(pollSubs, SUBS_POLL_MS); }
console.log(`detection ${LIVE ? 'LIVE' : 'DRY-RUN'} | mode=${POLL ? 'POLL' : 'WS'} | leaders=${activeLeaders.size}${SUBS_URL ? ' (DB-driven)' : ''} budget=$${budget.toFixed(2)} maxNotional=$${MAX_NOTIONAL}`);
if (POLL) { await tick(); if (process.env.ONCE !== '1') setInterval(tick, POLL_MS); }
else { await runWebsocket(); }