/** MULTI-USER copy daemon (Friend-beta A1). A per-user fork of copy-loop.mjs:
* one shared leader-trade detection stream β fan-out a copy into EACH subscribed
* user's OWN deposit wallet, with their own budget / caps / ledger.
*
* The single-user copy-loop.mjs is left BYTE-IDENTICAL and keeps running the live
* edge measurement. This file powers the isolated `copy-trader-mu` instance only.
* Do NOT point the live instance at this until the edge gate passes.
*
* Built in chunks:
* 4a (this commit) β per-user context loader + DB ledger load. NO trading yet.
* Run `node copy-loop-mu.mjs --selftest` to dump resolved context.
* 4b β per-user executor + budget. 4c-1 β detection + fan-out (DRY decision).
* 4c-2 β per-user BUY reserveβexecuteβfinalize + SELL mirror-exit close.
* 4d-1 β per-user reconcile (pending promote/cancel, leader-exit mirror,
* on-chain-absent close, mark refresh; write-through to copy_positions).
* 4d-2 (this commit) β per-user PnL: daemon posts per-leader realized+unrealized keyed
* by userAddress; Rust copy_pnl is now a per-user map (?user= with global fallback).
* Client switch to ?user= lands in 4d-2b.
*
* Env: SUBS_URL=https://poly-dev.szhub.space/api/copy/active SERVICE_TOKEN=...
* POSITIONS_URL (defaults to SUBS_URL with /activeβ/positions) FOLLOWER=did,did
* LIVE=1 (default DRY) POLL=1 (default WS)
* Node>=20 or --experimental-global-webcrypto (CLOB L2 HMAC needs crypto.subtle). */
import fs from 'node:fs';
import { createPublicClient, http, erc20Abi } from 'viem';
import { polygon } from 'viem/chains';
const DATA_API = 'https://data-api.polymarket.com';
const PUSD = '0xC011a7E12a19f7B1f670d46F03B03f3342E82DFB';
const pub = createPublicClient({ chain: polygon, transport: http(process.env.RPC || 'https://polygon.drpc.org') });
const LIVE = process.env.LIVE === '1'; // default DRY for safety
const POLL = process.env.POLL === '1'; // default = websocket
const SUBS_URL = process.env.SUBS_URL || '';
const SERVICE_TOKEN = process.env.SERVICE_TOKEN || '';
const POSITIONS_URL = process.env.POSITIONS_URL || (SUBS_URL ? SUBS_URL.replace(/\/active$/, '/positions') : '');
const RESOLUTION_URL = process.env.RESOLUTION_URL || (SUBS_URL ? SUBS_URL.replace(/\/active$/, '/market-resolution') : '');
const FOLLOWERS = new Set((process.env.FOLLOWER || '').split(',').map((s) => s.trim().toLowerCase()).filter(Boolean));
const SUBS_POLL_MS = Number(process.env.SUBS_POLL_MS || '20000');
// Hard safety ceilings β a per-user UI config can only go BELOW these (defense in depth).
const MIN_NOTIONAL = Number(process.env.MIN_NOTIONAL || '1');
const MAX_NOTIONAL = Number(process.env.MAX_NOTIONAL || '3');
const MAX_OPEN = Number(process.env.MAX_OPEN || '10');
const MAX_PER_EVENT = Number(process.env.MAX_PER_EVENT || '1');
const MAX_OPEN_PER_LEADER = Number(process.env.MAX_OPEN_PER_LEADER || '2');
if (!SUBS_URL) { console.log('set SUBS_URL (DB-driven multi-user daemon requires it)'); process.exit(1); }
/** map a UI CopyConfig β sizing params, clamped to the env ceilings. */
function deriveCfg(c) {
c = c || {};
const mode = c.allocMode || 'proportional';
const allocValue = Number(c.allocValue) || 0;
const maxNotional = Math.max(MIN_NOTIONAL, Math.min(Number(c.maxPerTrade) || MAX_NOTIONAL, MAX_NOTIONAL));
const maxOpen = (c.maxExposure && c.maxPerTrade) ? Math.max(1, Math.min(Math.floor(c.maxExposure / c.maxPerTrade), MAX_OPEN)) : MAX_OPEN;
const priceMin = c.priceMin != null ? Number(c.priceMin) / 100 : 0;
const priceMax = c.priceMax != null ? Number(c.priceMax) / 100 : 1;
const maxPerEvent = Math.max(1, Math.min(Number(c.maxPerEvent) || MAX_PER_EVENT, MAX_PER_EVENT));
const maxOpenPerLeader = Math.max(1, Math.min(Number(c.maxOpenPerLeader) || MAX_OPEN_PER_LEADER, MAX_OPEN_PER_LEADER));
return { mode, allocValue, maxNotional, maxOpen, priceMin, priceMax, maxPerEvent, maxOpenPerLeader };
}
// ---- Per-user context. userAddress β { dw, eoa, subs: Map(leaderβcfg), budget, executor, ledger } ----
// ledger: tokenID β position record (mirrors copy_positions; in-memory mirror, write-through to DB API).
const users = new Map();
const leaderIndex = new Map(); // leader β Set(userAddress) for O(1) fan-out on a detected trade
function getUser(addr) {
let u = users.get(addr);
if (!u) { u = { addr, dw: null, eoa: null, subs: new Map(), budget: 0, executor: null, ledger: new Map() }; users.set(addr, u); }
return u;
}
/** Refresh the active-subscription roster + per-user wallets from the screener DB. */
async function pollSubs() {
try {
const r = await fetch(SUBS_URL, { headers: { 'x-service-token': SERVICE_TOKEN }, signal: AbortSignal.timeout(12000) });
if (!r.ok) { console.log('[subs] http', r.status); return; }
const d = await r.json();
const seenUsers = new Set();
const nextLeaderIndex = new Map();
for (const s of (d.data || [])) {
const ua = String(s.userAddress || '').toLowerCase();
if (FOLLOWERS.size && !FOLLOWERS.has(ua)) continue; // allow-list (isolation from live instance)
const dw = s.depositWallet ? String(s.depositWallet).toLowerCase() : null;
const eoa = s.ownerEoa ? String(s.ownerEoa).toLowerCase() : null;
if (!dw || !eoa) { console.log(`[subs] skip ${ua.slice(0, 16)}β¦ β missing ${!dw ? 'depositWallet' : 'ownerEoa'}`); continue; }
const leader = String(s.leaderAddress || '').toLowerCase();
if (leader.length !== 42) continue;
const u = getUser(ua);
u.dw = dw; u.eoa = eoa;
u.subs.set(leader, deriveCfg(s.config));
seenUsers.add(ua);
if (!nextLeaderIndex.has(leader)) nextLeaderIndex.set(leader, new Set());
nextLeaderIndex.get(leader).add(ua);
}
// Drop users/leaders no longer active.
for (const [ua, u] of users) {
if (!seenUsers.has(ua)) { users.delete(ua); continue; }
for (const leader of [...u.subs.keys()]) if (!(nextLeaderIndex.get(leader)?.has(ua))) u.subs.delete(leader);
}
leaderIndex.clear(); for (const [l, set] of nextLeaderIndex) leaderIndex.set(l, set);
console.log(`[subs] users=${users.size} leaders=${leaderIndex.size} (active subs across all users)`);
} catch (e) { console.log('[subs] err:', e.message); }
}
/** Load the persisted ledger (copy_positions) into each user's in-memory mirror on startup. */
async function loadLedger() {
if (!POSITIONS_URL) return;
try {
const r = await fetch(POSITIONS_URL, { headers: { 'x-service-token': SERVICE_TOKEN }, signal: AbortSignal.timeout(12000) });
if (!r.ok) { console.log('[ledger] http', r.status); return; }
const d = await r.json();
let n = 0;
for (const p of (d.data || [])) {
const ua = String(p.userAddress || '').toLowerCase();
if (FOLLOWERS.size && !FOLLOWERS.has(ua)) continue;
if (!p.tokenId) continue;
getUser(ua).ledger.set(String(p.tokenId), p);
n += 1;
}
console.log(`[ledger] loaded ${n} positions across ${users.size} users`);
} catch (e) { console.log('[ledger] err:', e.message); }
}
/** Write-through one position record to the DB ledger (upsert by user+token). 4c/4d use this. */
async function saveLedger(userAddress, pos) {
if (!POSITIONS_URL) return;
try {
const r = await fetch(POSITIONS_URL, {
method: 'POST',
headers: { 'content-type': 'application/json', 'x-service-token': SERVICE_TOKEN },
body: JSON.stringify({ userAddress, ...pos }),
signal: AbortSignal.timeout(12000),
});
if (!r.ok) console.log('[ledger] save http', r.status, userAddress.slice(0, 12));
} catch (e) { console.log('[ledger] save err:', e.message); }
}
/** Refresh one user's pUSD budget (deposit-wallet balance) for "% Π±ΡΠ΄ΠΆΠ΅ΡΠ°" allocation. */
async function refreshBudget(u) {
try { u.budget = Number(await pub.readContract({ address: PUSD, abi: erc20Abi, functionName: 'balanceOf', args: [u.dw] })) / 1e6; }
catch (e) { console.log(`[budget] ${u.addr.slice(0, 12)} read err:`, e.message); }
}
/** Lazily build this user's CLOB executor (signs with THEIR embedded EOA, trades THEIR DW).
* Only in LIVE β needs .env.privy next to the scripts + non-geoblocked CLOB egress (Malaysia). */
async function ensureExecutor(u) {
if (u.executor || !LIVE) return u.executor;
try {
const { makeExecutor } = await import('./copy-exec-mu.mjs');
u.executor = await makeExecutor('./.env.privy', { dw: u.dw, eoa: u.eoa });
console.log(`[exec] ready ${u.addr.slice(0, 14)}β¦ dw=${u.dw.slice(0, 10)} eoa=${u.eoa.slice(0, 10)}`);
} catch (e) { console.log(`[exec] FAIL ${u.addr.slice(0, 14)}:`, e.message); u.executor = null; }
return u.executor;
}
// ---- Realized PnL on close: resolution truth, not stale markPnl --------------------
// A resolved-onchain-absent close used to bank `markPnl` (often a stale 0 after redemption),
// so winning copies banked $0. Look up the market resolution (our service, cached β a resolved
// outcome is immutable) and bank the payout truth: won β size-cost, lost β -cost. If the market
// is NOT resolved (token absent because we SOLD pre-resolution), fall back to markPnl (proceeds).
const _resCache = new Map();
async function marketResolution(cond) {
if (!cond || !RESOLUTION_URL) return null;
if (_resCache.has(cond)) return _resCache.get(cond);
try {
const r = await fetch(`${RESOLUTION_URL}?cond=${encodeURIComponent(cond)}`,
{ headers: { 'x-service-token': SERVICE_TOKEN }, signal: AbortSignal.timeout(8000) });
if (!r.ok) return null;
const d = (await r.json())?.data;
const out = d && d.resolved && d.winningOutcome != null ? { winningOutcome: d.winningOutcome } : null;
_resCache.set(cond, out); // cache resolved outcomes (immutable); transient nulls re-fetch next time
if (!out) _resCache.delete(cond);
return out;
} catch { return null; }
}
async function realizedOnClose(p) {
const res = await marketResolution(p.marketId);
if (res) {
const won = String(p.outcome).toLowerCase() === String(res.winningOutcome).toLowerCase();
return won ? (Number(p.size) || 0) - (Number(p.cost) || 0) : -(Number(p.cost) || 0);
}
return Number(p.markPnl) || 0; // not resolved (sold pre-resolution) β last mark (proceeds)
}
// ---- Per-user reconcile (4d-1): promote/cancel pending, mirror leader-exits, bank resolved ----
const CTF = '0x4D97DCd97eC945f40cF65F87097ACe5EA0476045'; // Conditional Tokens (ERC1155) β held-token source of truth
const CTF_BAL_ABI = [{ name: 'balanceOf', type: 'function', stateMutability: 'view', inputs: [{ name: 'account', type: 'address' }, { name: 'id', type: 'uint256' }], outputs: [{ type: 'uint256' }] }];
const pub2 = createPublicClient({ chain: polygon, transport: http(process.env.RPC2 || 'https://polygon-bor-rpc.publicnode.com') });
const PENDING_TIMEOUT_S = Number(process.env.PENDING_TIMEOUT_S || '180');
const RECONCILE_GRACE_S = Number(process.env.RECONCILE_GRACE_S || '600'); // 10 min
const MAX_OPEN_AGE_S = Number(process.env.MAX_OPEN_AGE_S || '86400'); // 24h
const MIRROR_EXIT_LIVE = process.env.MIRROR_EXIT_LIVE === '1'; // default off = detect+log only (safe observe)
const LEADER_EXIT_CONFIRM = Number(process.env.LEADER_EXIT_CONFIRM || '2'); // cycles leader absent before trusting exit (Data API flake guard)
const EXIT_SLIPPAGE = Number(process.env.EXIT_SLIPPAGE || '0.02');
/** Reconcile ONE user's open/pending positions against their DW holdings + on-chain balance.
* Mirrors single-user reconcileOpenPositions 1:1 but on the in-memory ledger Map (write-through
* to copy_positions). Phase 1 = slow reads on a snapshot; Phase 2 = patch the CURRENT record
* (re-read per tid, status-guarded) so a copy/close that landed mid-read is never clobbered. */
async function reconcileUser(u) {
try {
const res = await fetch(`${DATA_API}/positions?user=${u.dw}`, { signal: AbortSignal.timeout(15000) });
const held = await res.json();
if (!Array.isArray(held)) return;
const heldPnl = new Map(held.map((p) => [String(p.asset), Number(p.cashPnl) || 0])); // asset β live mark
const curPrice = new Map(held.map((h) => [String(h.asset), (Number(h.size) > 0 ? Number(h.currentValue) / Number(h.size) : 0)]));
const now = Math.floor(Date.now() / 1000);
const tag = `[${u.addr.slice(8, 18)}]`;
// Phase 1 (no lock): inspect a snapshot + slow on-chain / leader-holdings reads, collect changes.
const snapshot = [...u.ledger.values()];
const changes = new Map(); // tid β patch
const toClose = []; // confirmed leader-exits to mirror-sell (Phase B)
const openLeaders = [...new Set(snapshot.filter((p) => p.status === 'open' && p.leader).map((p) => String(p.leader).toLowerCase()))];
const leaderHeld = new Map(); // leader β Set(asset); absent entry = unreliable this cycle β skip exit check
await Promise.allSettled(openLeaders.map(async (ldr) => {
try {
const r = await fetch(`${DATA_API}/positions?user=${ldr}`, { signal: AbortSignal.timeout(15000) });
const arr = await r.json();
if (Array.isArray(arr) && arr.length) leaderHeld.set(ldr, new Set(arr.map((x) => String(x.asset))));
} catch { /* leave unset β don't trust absence this cycle */ }
}));
for (const p of snapshot) {
const tid = String(p.tokenId);
if (p.status === 'closing') {
// Manual close requested by the user (server flipped statusβ'closing'). Decide the action
// by on-chain truth: have shares β mirror-sell at current bid; only a resting BUY β cancel it;
// nothing on-chain β close flat. Retries each cycle until done (no rollback β user asked to exit).
let bal;
try { bal = await pub.readContract({ address: CTF, abi: CTF_BAL_ABI, functionName: 'balanceOf', args: [u.dw, BigInt(tid)] }); }
catch { continue; } // RPC error β retry next cycle
if (bal > 0n) toClose.push({ tid, price: curPrice.get(tid) || Number(p.leaderPrice) || Number(p.entryPrice) || 0, manual: true });
else if (p.lastOrderId) changes.set(tid, { cancelManual: p.lastOrderId });
else changes.set(tid, { manualFlat: true });
continue;
}
if (p.status === 'pending') {
// On-chain balance is truth: appeared β promote; still zero past timeout β cancel + free slot.
let bal;
try { bal = await pub.readContract({ address: CTF, abi: CTF_BAL_ABI, functionName: 'balanceOf', args: [u.dw, BigInt(tid)] }); }
catch { continue; } // RPC error β leave pending, retry next cycle
if (bal > 0n) changes.set(tid, { promote: true });
else if ((now - (p.openedAt || 0)) > PENDING_TIMEOUT_S) changes.set(tid, { cancelPending: p.lastOrderId || null });
continue;
}
if (p.status !== 'open') continue;
if (heldPnl.has(tid)) {
const ch = { markPnl: heldPnl.get(tid) };
const lh = leaderHeld.get(String(p.leader || '').toLowerCase());
if (lh) { // reliable leader holdings this cycle
if (!lh.has(tid)) { ch.leaderExit = true; ch.exitPrice = curPrice.get(tid) || 0; }
else ch.leaderStillIn = true;
}
changes.set(tid, ch);
} else if ((now - (p.openedAt || 0)) > RECONCILE_GRACE_S) {
// Absent from Data API (which flakes) β verify real ERC1155 balance before banking, and
// confirm a zero on a SECOND independent RPC (drpc can serve a stale false-0).
let onchain;
try { onchain = await pub.readContract({ address: CTF, abi: CTF_BAL_ABI, functionName: 'balanceOf', args: [u.dw, BigInt(tid)] }); }
catch (e) { console.log(`${tag} [reconcile] CTF read err ${tid.slice(0, 10)}:`, e.message); continue; }
if (onchain === 0n) {
let onchain2;
try { onchain2 = await pub2.readContract({ address: CTF, abi: CTF_BAL_ABI, functionName: 'balanceOf', args: [u.dw, BigInt(tid)] }); }
catch (e) { console.log(`${tag} [reconcile] CTF confirm-read err ${tid.slice(0, 10)}:`, e.message); continue; }
if (onchain2 === 0n) changes.set(tid, { close: 'resolved-onchain-absent' });
else console.log(`${tag} [reconcile] false-zero guarded ${tid.slice(0, 10)}: rpc1=0 rpc2=${onchain2} β keep open`);
} else if ((now - (p.openedAt || 0)) > MAX_OPEN_AGE_S) {
changes.set(tid, { close: 'resolved-worthless-aged' }); // on-chain leftover of a resolved $0 loser
}
}
}
if (!changes.size) return;
// Phase 2: apply each patch to the CURRENT record (status-guarded), write-through per change.
let freed = 0;
for (const [tid, patch] of changes) {
const p = u.ledger.get(tid);
if (!p) continue;
if (patch.promote) { if (p.status === 'pending') { p.status = 'open'; await saveLedger(u.addr, p); } continue; }
if (patch.cancelPending !== undefined) {
if (p.status === 'pending') {
if (patch.cancelPending) { const ex = await ensureExecutor(u); ex?.cancel(patch.cancelPending).catch(() => {}); }
p.status = 'closed'; p.closedReason = 'pending-timeout'; p.closedAt = now; p.realizedPnl = 0;
u.ledger.delete(tid); await saveLedger(u.addr, p); // no DELETE endpoint β mark closed; drop frees the slot
}
continue;
}
if (patch.cancelManual !== undefined) { // manual close, only a resting BUY on-chain β cancel it
if (p.status === 'closing') {
if (patch.cancelManual) { const ex = await ensureExecutor(u); ex?.cancel(patch.cancelManual).catch(() => {}); }
p.status = 'closed'; p.closedReason = 'manual-cancel-pending'; p.closedAt = now; p.realizedPnl = 0;
u.ledger.delete(tid); await saveLedger(u.addr, p);
}
continue;
}
if (patch.manualFlat) { // manual close, nothing on-chain to sell β close flat
if (p.status === 'closing') {
p.status = 'closed'; p.closedReason = 'manual-user'; p.closedAt = now; p.realizedPnl = Number(p.markPnl) || 0;
u.ledger.delete(tid); await saveLedger(u.addr, p);
}
continue;
}
if (p.status !== 'open') continue; // a concurrent handler already changed it
if (patch.markPnl != null) p.markPnl = patch.markPnl;
if (patch.leaderStillIn) { if (p.leaderExitSeen) p.leaderExitSeen = 0; }
else if (patch.leaderExit) {
p.leaderExitSeen = (p.leaderExitSeen || 0) + 1;
if (p.leaderExitSeen >= LEADER_EXIT_CONFIRM) {
console.log(`${tag} π» leader-exit confirmed (${p.leaderExitSeen}x) ${tid.slice(0, 10)} "${String(p.title).slice(0, 38)}" markβ$${(Number(p.markPnl) || 0).toFixed(2)} β ${MIRROR_EXIT_LIVE ? 'closing' : 'OBSERVE-ONLY'}`);
if (MIRROR_EXIT_LIVE) toClose.push({ tid, price: patch.exitPrice });
} else {
console.log(`${tag} β¦ leader-exit seen ${p.leaderExitSeen}/${LEADER_EXIT_CONFIRM} ${tid.slice(0, 10)} (awaiting confirm)`);
}
}
if (patch.close) {
p.status = 'closed'; p.closedReason = patch.close; p.closedAt = now;
// worthless-aged = resolved $0 loser β realized is full cost lost. onchain-absent = sold/redeemed β last mark.
p.realizedPnl = patch.close === 'resolved-worthless-aged' ? -(Number(p.cost) || 0) : await realizedOnClose(p);
freed += 1;
}
await saveLedger(u.addr, p);
}
if (freed) console.log(`${tag} [reconcile] freed ${freed} slot(s); open now ${[...u.ledger.values()].filter((x) => x.status === 'open').length}`);
// Phase B (network): mirror-sell confirmed leader-exits. Bank closed ONLY on a real fill.
// Manual closes always execute (toClose carries them even when MIRROR_EXIT_LIVE is off);
// leader-exits were only pushed above when MIRROR_EXIT_LIVE.
if (toClose.length) {
const ex = await ensureExecutor(u);
if (ex) for (const { tid, price, manual } of toClose) {
const sellPx = Math.max(0.01, Math.min(0.99, (Number(price) || 0) * (1 - EXIT_SLIPPAGE)));
let resp;
try { resp = await ex.sellAll({ tokenID: tid, price: sellPx }); }
catch (e) { resp = { success: false, errorMsg: e?.message || String(e) }; }
const h = u.ledger.get(tid);
if (!h || (h.status !== 'open' && h.status !== 'closing')) continue; // 'closing' = manual-close intent
const what = manual ? 'manual-close' : 'mirror-close';
const soldNow = resp?.success && (resp.status === 'matched' || Number(resp.takingAmount || 0) > 0 || Number(resp.makingAmount || 0) > 0);
if (resp?.skipped) {
h.status = 'closed'; h.closedReason = `${manual ? 'manual' : 'leader-exit'}-${String(resp.skipped)}`; h.closedAt = now; h.realizedPnl = Number(h.markPnl) || 0;
await saveLedger(u.addr, h); console.log(`${tag} ${what} skipped (${resp.skipped}) β closed ${tid.slice(0, 10)}`);
} else if (soldNow) {
h.status = 'closed'; h.closedReason = manual ? 'manual-user' : 'mirror-leader-exit'; h.closeOrderId = resp.orderID; h.closedAt = now; h.realizedPnl = Number(h.markPnl) || 0;
await saveLedger(u.addr, h); console.log(`${tag} β
${manual ? 'MANUAL' : 'MIRROR'}-CLOSED ${tid.slice(0, 10)} pnlβ$${(Number(h.markPnl) || 0).toFixed(2)}`);
} else if (resp?.success && resp.orderID) {
ex.cancel(resp.orderID).catch(() => {}); h.leaderExitSeen = 0; await saveLedger(u.addr, h); // status stays open/closing β retry next cycle
console.log(`${tag} β³ ${what} not filled; retry ${tid.slice(0, 10)}`);
} else {
const reason = resp?.errorMsg || resp?.error || resp?.status || 'unknown';
// A resolved market delists its token β sell fails with "invalid token id"
// (or not found). Can't sell a settled market; close as resolved (realize the
// mark PnL) so the slot frees instead of retrying the sell forever.
if (/invalid token id|token.*not found|market.*(resolved|closed)/i.test(String(reason))) {
h.status = 'closed'; h.closedReason = manual ? 'manual-resolved' : 'leader-exit-resolved'; h.closedAt = now; h.realizedPnl = await realizedOnClose(h);
await saveLedger(u.addr, h);
console.log(`${tag} β
${what} closed (resolved/untradeable) ${tid.slice(0, 10)} pnlβ$${(Number(h.markPnl) || 0).toFixed(2)}`);
} else {
h.leaderExitSeen = 0; await saveLedger(u.addr, h); // status unchanged β retry next cycle
console.log(`${tag} β ${what} rejected, retry ${tid.slice(0, 10)} β ${reason}`);
}
}
}
}
} catch (e) { console.log(`[reconcile] ${u.addr.slice(0, 12)} err:`, e.message); }
}
/** Pull user-requested manual closes from the DB into the in-memory ledger. The server flips
* statusβ'closing' on the close endpoint, but the daemon only loadLedger()'d once at startup β
* without this, a 'closing' set after boot is never seen and the position hangs forever. Cheap:
* fetches only the (usually empty) closing set each cycle and flips the in-memory status. */
async function pullCloseIntents() {
if (!POSITIONS_URL) return;
try {
const r = await fetch(`${POSITIONS_URL}?status=closing`, { headers: { 'x-service-token': SERVICE_TOKEN }, signal: AbortSignal.timeout(12000) });
if (!r.ok) return;
const d = await r.json();
for (const p of (d.data || [])) {
const ua = String(p.userAddress || '').toLowerCase();
if (FOLLOWERS.size && !FOLLOWERS.has(ua)) continue;
if (!p.tokenId) continue;
const u = users.get(ua); if (!u) continue;
const cur = u.ledger.get(String(p.tokenId));
if (!cur) { u.ledger.set(String(p.tokenId), p); console.log(`[close-intent] ${ua.slice(0,12)} ${String(p.tokenId).slice(0,10)} (loaded)`); }
else if (cur.status !== 'closing' && cur.status !== 'closed') { cur.status = 'closing'; console.log(`[close-intent] ${ua.slice(0,12)} ${String(p.tokenId).slice(0,10)} β closing`); }
}
} catch (e) { console.log('[close-intent] err:', e.message); }
}
/** Reconcile every active user (one cycle). Per-user failures are isolated. */
async function reconcileAll() {
await pullCloseIntents(); // pick up manual-close requests set since the last ledger load
for (const u of users.values()) {
if (!u.dw) continue;
try { await reconcileUser(u); } catch (e) { console.log(`[reconcile] ${u.addr.slice(0, 12)} fatal:`, e.message); }
}
}
// ---- Per-user PnL (4d-2): per-leader realized+unrealized, posted keyed by userAddress ----
const PNL_URL = process.env.PNL_URL || (SUBS_URL ? SUBS_URL.replace(/\/active$/, '/pnl') : '');
const PNL_POLL_MS = Number(process.env.PNL_POLL_MS || '120000');
/** One user's per-leader P&L: realized (banked from their CLOSED copies) + unrealized (live
* cashPnl of their currently-held copies). Posted to /api/copy/pnl keyed by userAddress so
* each user's "ΠΠΎΠΈ ΠΊΠΎΠΏΠΈΠΈ" reads only their own snapshot. Mirrors single-user computeAndPostPnL. */
async function computeAndPostPnLForUser(u) {
try {
const res = await fetch(`${DATA_API}/positions?user=${u.dw}`, { signal: AbortSignal.timeout(15000) });
const held = await res.json();
const heldMap = new Map(Array.isArray(held) ? held.map((h) => [String(h.asset), h]) : []);
const agg = new Map();
const get = (ldr) => { let a = agg.get(ldr); if (!a) { a = { leader: ldr, realized: 0, unrealized: 0, value: 0, openCount: 0, closedCount: 0 }; agg.set(ldr, a); } return a; };
for (const p of u.ledger.values()) {
if (p.status === 'pending') continue; // slot reserved, not a real P&L line yet
const a = get(String(p.leader || 'unknown').toLowerCase());
if (p.status === 'open') {
const h = heldMap.get(String(p.tokenId));
if (h) { a.unrealized += Number(h.cashPnl) || 0; a.value += Number(h.currentValue) || 0; a.openCount += 1; }
} else { a.realized += Number(p.realizedPnl) || 0; a.closedCount += 1; }
}
const leaders = [...agg.values()].map((a) => ({
leader: a.leader,
realized: +a.realized.toFixed(4),
unrealized: +a.unrealized.toFixed(4),
pnl: +(a.realized + a.unrealized).toFixed(4),
value: +a.value.toFixed(4),
openCount: a.openCount,
closedCount: a.closedCount,
}));
if (!PNL_URL) return;
// credsReady = executor initialized = L2 CLOB creds cached. The client uses this to know it's
// safe to attach the orders-only Privy policy (Q1: policy auto-attach after creds exist β
// attaching before creds are cached would DENY the ClobAuth derivation and break copying).
const r = await fetch(PNL_URL, { method: 'POST', headers: { 'content-type': 'application/json', 'x-service-token': SERVICE_TOKEN }, body: JSON.stringify({ userAddress: u.addr, leaders, credsReady: !!u.executor }), signal: AbortSignal.timeout(12000) });
if (!r.ok) console.log(`[pnl] ${u.addr.slice(0, 12)} post http`, r.status);
} catch (e) { console.log(`[pnl] ${u.addr.slice(0, 12)} err:`, e.message); }
}
/** Compute+post P&L for every active user (one cycle). Per-user failures isolated. */
async function computeAndPostPnLAll() {
for (const u of users.values()) {
if (!u.dw) continue;
try { await computeAndPostPnLForUser(u); } catch (e) { console.log(`[pnl] ${u.addr.slice(0, 12)} fatal:`, e.message); }
}
}
// ---- Detection + per-user copy decision (4c: BUY/SELL fan-out; 4d: reconcile+pnl above) ----
const RTDS_WS = 'wss://ws-live-data.polymarket.com';
const COPY_FRACTION = Number(process.env.COPY_FRACTION || '0.1');
const MIN_SHARES = Number(process.env.MIN_SHARES || '5');
const LEADER_KILL_USD = Number(process.env.LEADER_KILL_USD || '8');
const STATE_FILE = process.env.STATE || './copy-mu-state.json';
const POLL_MS = Number(process.env.POLL_MS || '15000');
const isActive = (p) => !!p && (p.status === 'open' || p.status === 'pending');
const toSec = (ts) => { const n = Number(ts) || 0; return n > 2e10 ? Math.floor(n / 1000) : n; };
const tradeKey = (t) => `${t.transactionHash || ''}:${t.asset || ''}:${t.side || ''}:${t.timestamp || ''}:${t.size || ''}`;
/** leader trade β the order WE'd place for this user (their cfg + their budget). */
function decideCopy(t, cfg, budget) {
const leaderSize = Number(t.size || 0);
const price = Number(t.price || 0);
if (!t.asset || !leaderSize || !price) return null;
let target;
if (cfg.mode === 'fixed') target = cfg.allocValue;
else if (cfg.mode === 'percent') target = (cfg.allocValue / 100) * budget;
else target = leaderSize * price * COPY_FRACTION;
let size = Math.round((target || 0) / price);
if (size < MIN_SHARES) size = MIN_SHARES;
if (size * price < MIN_NOTIONAL) size = Math.ceil(MIN_NOTIONAL / price);
if (size * price > cfg.maxNotional) {
size = Math.floor(cfg.maxNotional / price);
if (size < MIN_SHARES || size * price < MIN_NOTIONAL) return null;
}
return { tokenID: t.asset, side: (t.side || '').toUpperCase(), price, size, notional: +(size * price).toFixed(2), outcome: t.outcome, title: t.title };
}
/** Decide (and in 4c-2: execute) a copy of leader-trade `t` for one user `u`. Cap checks run
* against the user's OWN in-memory ledger. 4c-1 logs the decision only (no reserve/execute). */
async function handleTradeForUser(t, u, cfg) {
const leader = String(t.proxyWallet || '').toLowerCase();
const side = String(t.side || '').toUpperCase();
const tag = `[${u.addr.slice(8, 18)}]`;
const ledger = [...u.ledger.values()];
if (side === 'BUY') {
const px = Number(t.price || 0);
if (px < cfg.priceMin || px > cfg.priceMax) { console.log(`${tag} skip (price ${px} outside band ${cfg.priceMin}-${cfg.priceMax})`); return; }
const order = decideCopy(t, cfg, u.budget);
if (!order) { console.log(`${tag} skip (incomplete/over-cap)`); return; }
const label = `${order.side} ${order.size} "${order.outcome}" @ ${order.price.toFixed(3)} ($${order.notional}) | ${String(order.title).slice(0, 45)}`;
const evt = t.eventSlug || t.slug || '';
const leaderRealized = ledger.filter((p) => p.status === 'closed' && String(p.leader || '').toLowerCase() === leader).reduce((s, p) => s + (Number(p.realizedPnl) || 0), 0);
if (LEADER_KILL_USD > 0 && leaderRealized <= -LEADER_KILL_USD) { console.log(`${tag} β skip (leader killed: realized $${leaderRealized.toFixed(2)}): ${label}`); return; }
if (isActive(u.ledger.get(order.tokenID))) { console.log(`${tag} βοΈ skip (already holding/pending): ${label}`); return; }
if (evt) {
const sameEvt = ledger.filter((p) => isActive(p) && p.leader === leader && p.eventSlug === evt).length;
if (sameEvt >= cfg.maxPerEvent) { console.log(`${tag} βοΈ skip (leader ${sameEvt} on event, max ${cfg.maxPerEvent}): ${label}`); return; }
}
const leaderOpen = ledger.filter((p) => isActive(p) && p.leader === leader).length;
if (leaderOpen >= cfg.maxOpenPerLeader) { console.log(`${tag} βοΈ skip (leader ${leaderOpen}, max per-leader ${cfg.maxOpenPerLeader}): ${label}`); return; }
const openCount = ledger.filter(isActive).length;
if (openCount >= cfg.maxOpen) { console.log(`${tag} βοΈ skip (max ${cfg.maxOpen} open): ${label}`); return; }
if (!LIVE) { console.log(`${tag} π’ WOULD COPY: ${label} tokenID=${String(order.tokenID).slice(0, 14)}β¦ budget=$${u.budget.toFixed(2)}`); return; }
// Reserve the slot in THIS user's ledger as 'pending' (write-through to DB) BEFORE the slow
// CLOB POST β caps count pending, so a concurrent fan-out of the same token can't double-open.
const rec = { tokenId: order.tokenID, leader, marketId: t.conditionId, outcome: t.outcome, title: t.title, eventSlug: evt, side: 'BUY', size: order.size, leaderPrice: order.price, cost: order.notional, openedAt: toSec(t.timestamp) || Math.floor(Date.now() / 1000), buys: 0, status: 'pending' };
u.ledger.set(order.tokenID, rec);
await saveLedger(u.addr, rec);
const exec = await ensureExecutor(u);
if (!exec) { rec.status = 'closed'; rec.closedReason = 'no-executor'; u.ledger.delete(order.tokenID); await saveLedger(u.addr, rec); console.log(`${tag} β no executor, slot released: ${label}`); return; }
// Network call OUTSIDE any lock; finalize against the in-memory ledger (single-threaded fan-out).
let resp;
try { resp = await exec.buy(order); }
catch (e) { resp = { success: false, errorMsg: e?.message || String(e) }; }
const p = u.ledger.get(order.tokenID);
if (!p || p.status !== 'pending') return; // reconcile/another handler already resolved it
const filledNow = resp?.success && (resp.status === 'matched' || Number(resp.takingAmount || 0) > 0 || Number(resp.makingAmount || 0) > 0);
if (filledNow) {
p.status = 'open'; p.buys = 1; p.lastOrderId = resp.orderID; p.openedAt = toSec(t.timestamp) || p.openedAt;
await saveLedger(u.addr, p);
console.log(`${tag} β
COPIED: ${label} orderID=${resp.orderID} status=${resp.status}`);
} else if (resp?.success && resp.orderID) {
p.lastOrderId = resp.orderID; await saveLedger(u.addr, p);
console.log(`${tag} β posted, awaiting fill (status ${resp.status}); reconcile will confirm/cancel: ${label}`);
} else {
p.status = 'closed'; p.closedReason = 'rejected'; p.realizedPnl = 0;
u.ledger.delete(order.tokenID);
await saveLedger(u.addr, p); // no DELETE endpoint β mark closed; in-memory drop frees the slot now
console.log(`${tag} β copy rejected, slot released: ${label} β ${resp?.errorMsg || resp?.error || resp?.status || 'unknown'}\n ${JSON.stringify(resp).slice(0, 160)}`);
}
return;
}
if (side === 'SELL') {
const held = u.ledger.get(t.asset);
if (!held || held.status !== 'open') return; // not held (pending/closed/absent) β ignore
if (String(held.leader).toLowerCase() !== leader) return; // only the entry leader's exit mirrors a close
const label = `SELL "${t.outcome}" @ ${Number(t.price).toFixed(3)} | ${String(t.title).slice(0, 45)}`;
if (!LIVE) { console.log(`${tag} π» WOULD CLOSE (entry leader exited): ${label} tokenID=${String(t.asset).slice(0, 14)}β¦`); return; }
const exec = await ensureExecutor(u);
if (!exec) { console.log(`${tag} β οΈ no executor, cannot close (still holding): ${label}`); return; }
let resp;
try { resp = await exec.sellAll({ tokenID: t.asset, price: Number(t.price) }); }
catch (e) { resp = { success: false, errorMsg: e?.message || String(e) }; }
const p = u.ledger.get(t.asset);
if (!p || p.status !== 'open') return; // already resolved by another handler
if (resp?.skipped) {
p.status = 'closed'; p.closedReason = String(resp.skipped); p.closedAt = toSec(t.timestamp); p.realizedPnl = Number(p.markPnl) || 0;
await saveLedger(u.addr, p);
console.log(`${tag} close skipped (${resp.skipped}) β marked closed: ${label}`); return;
}
const soldNow = resp?.success && (resp.status === 'matched' || Number(resp.takingAmount || 0) > 0 || Number(resp.makingAmount || 0) > 0);
if (soldNow) {
p.status = 'closed'; p.closeOrderId = resp.orderID; p.closedAt = toSec(t.timestamp); p.realizedPnl = Number(p.markPnl) || 0;
await saveLedger(u.addr, p);
console.log(`${tag} β
CLOSED: ${label} orderID=${resp.orderID} status=${resp.status}`);
} else if (resp?.success && resp.orderID) {
// Posted but didn't fill (rested) β we STILL HOLD. Cancel the resting close, keep open so the
// next leader-sell / reconcile retries β do NOT falsely bank it closed.
exec.cancel(resp.orderID).then(() => console.log(`${tag} β©οΈ cancelled unfilled close ${resp.orderID}`)).catch(() => {});
console.log(`${tag} β³ close not filled (status ${resp.status}); keeping open, cancelled resting: ${label}`);
} else {
// A resolved/delisted market can't be sold (token delisted) β the sell rejects forever and the
// slot stays frozen. Mirror reconcile Phase B: close as resolved (realize mark PnL) to free the
// slot instead of re-hitting sell on every leader-exit. Otherwise keep open + log the reason.
const reason = resp?.errorMsg || resp?.error || resp?.status || 'unknown';
if (/invalid token id|token.*not found|market.*(resolved|closed)/i.test(String(reason))) {
p.status = 'closed'; p.closedReason = 'leader-exit-resolved'; p.closedAt = toSec(t.timestamp); p.realizedPnl = await realizedOnClose(p);
await saveLedger(u.addr, p);
console.log(`${tag} β
closed (resolved/untradeable) ${String(t.asset).slice(0, 10)} pnlβ$${(Number(p.markPnl) || 0).toFixed(2)}`);
} else {
console.log(`${tag} β close rejected (still holding): ${label} β ${reason}`);
}
}
}
}
/** Fan one detected leader trade out to every user subscribed to that leader. */
async function fanout(t) {
const leader = String(t.proxyWallet || '').toLowerCase();
const subscribers = leaderIndex.get(leader);
if (!subscribers || !subscribers.size) return;
for (const ua of subscribers) {
const u = users.get(ua);
const cfg = u?.subs.get(leader);
if (!u || !cfg) continue;
try { await handleTradeForUser(t, u, cfg); }
catch (e) { console.log(`[fanout] ${ua.slice(0, 12)} err:`, e.message); } // one user's failure never blocks others
}
}
// --- WebSocket detection (default): RTDS firehose, filter by any active leader ---
async function runWebsocket() {
const { default: WebSocket } = await import('ws');
const seen = new Set();
const remember = (k) => { seen.add(k); if (seen.size > 4000) seen.delete(seen.values().next().value); };
let backoff = 1000;
const connect = () => {
const ws = new WebSocket(RTDS_WS);
let ping;
ws.on('open', () => {
backoff = 1000;
ws.send(JSON.stringify({ action: 'subscribe', subscriptions: [{ topic: 'activity', type: 'trades' }] }));
ping = setInterval(() => { try { ws.send('PING'); } catch {} }, 5000);
console.log(`[ws] connected; watching ${leaderIndex.size} leader(s)`);
});
ws.on('message', async (d) => {
const s = d.toString();
if (s[0] !== '{' && s[0] !== '[') return;
let msg; try { msg = JSON.parse(s); } catch { return; }
for (const m of (Array.isArray(msg) ? msg : [msg])) {
const t = m.payload || m;
if (!t || !t.proxyWallet || !t.asset) continue;
if (!leaderIndex.has(String(t.proxyWallet).toLowerCase())) continue;
const k = tradeKey(t);
if (seen.has(k)) continue;
remember(k);
await fanout(t);
}
});
ws.on('error', (e) => console.log('[ws] error:', e.message));
ws.on('close', () => { clearInterval(ping); console.log(`[ws] closed; reconnect in ${backoff}ms`); setTimeout(connect, backoff); backoff = Math.min(backoff * 2, 30000); });
};
connect();
}
// --- Polling detection (POLL=1): Data API /trades?user= per active leader, per-leader cursors ---
const loadState = () => { try { return JSON.parse(fs.readFileSync(STATE_FILE, 'utf8')); } catch { return {}; } };
const saveState = (s) => fs.writeFileSync(STATE_FILE, JSON.stringify(s));
async function tick() {
const leaders = [...leaderIndex.keys()];
if (!leaders.length) { console.log('[poll] no active leaders'); return; }
const state = loadState();
const seen = new Set(Array.isArray(state.seen) ? state.seen : []);
if (!state.lastTs || typeof state.lastTs !== 'object') state.lastTs = {};
if (!state.primed || typeof state.primed !== 'object') state.primed = {};
const results = await Promise.allSettled(leaders.map(async (leader) => {
const r = await fetch(`${DATA_API}/trades?user=${leader}&limit=50`, { signal: AbortSignal.timeout(15000) });
if (!r.ok) throw new Error(`${leader.slice(0, 8)}β¦ data-api ${r.status}`);
return { leader, trades: await r.json() };
}));
const toCopy = [];
for (const res of results) {
if (res.status !== 'fulfilled') { console.log('[poll] err:', res.reason?.message); continue; }
const { leader, trades } = res.value;
if (!Array.isArray(trades)) continue;
if (!state.primed[leader]) {
for (const t of trades) seen.add(tradeKey(t));
state.lastTs[leader] = trades.reduce((m, t) => Math.max(m, Number(t.timestamp || 0)), 0);
state.primed[leader] = true;
console.log(`[prime] ${leader.slice(0, 8)}β¦ baseline (${trades.length} trades)`);
continue;
}
for (const t of trades) {
if (seen.has(tradeKey(t))) continue;
if (Number(t.timestamp || 0) < (state.lastTs[leader] || 0)) continue;
seen.add(tradeKey(t));
state.lastTs[leader] = Math.max(state.lastTs[leader] || 0, Number(t.timestamp || 0));
toCopy.push(t);
}
}
toCopy.sort((a, b) => Number(a.timestamp) - Number(b.timestamp));
for (const t of toCopy) await fanout(t);
state.seen = Array.from(seen).slice(-3000);
saveState(state);
console.log(`[poll] ${leaders.length} leader(s), ${toCopy.length} new trade(s)`);
}
async function main() {
await pollSubs();
await loadLedger();
const selftest = process.argv.includes('--selftest');
console.log(`[mu] mode=${LIVE ? 'LIVE' : 'DRY'} detect=${POLL ? 'POLL' : 'WS'} followers=${FOLLOWERS.size || 'ALL'}`);
for (const [ua, u] of users) {
await refreshBudget(u);
if (LIVE && !selftest) await ensureExecutor(u);
console.log(` user ${ua.slice(0, 20)}β¦ dw=${u.dw?.slice(0, 10)} subs=${u.subs.size} ledger=${u.ledger.size} budget=$${u.budget.toFixed(2)} exec=${u.executor ? 'ok' : '-'}`);
}
if (selftest) { console.log('[mu] selftest done β context + budgets resolved, detection + execution wired. exiting.'); return; }
await reconcileAll();
await computeAndPostPnLAll();
if (process.env.ONCE !== '1') {
setInterval(() => { users.forEach(refreshBudget); }, 60000);
setInterval(pollSubs, SUBS_POLL_MS);
setInterval(reconcileAll, Number(process.env.RECONCILE_MS || '120000'));
setInterval(computeAndPostPnLAll, PNL_POLL_MS);
}
console.log(`[mu] ${LIVE ? 'LIVE (executes copies)' : 'DRY (decision-only)'} | mode=${POLL ? 'POLL' : 'WS'} | leaders=${leaderIndex.size}`);
if (POLL) { await tick(); if (process.env.ONCE !== '1') setInterval(tick, POLL_MS); }
else { await runWebsocket(); }
}
main();