← Back
β˜†
/** Detection loop: copy a leader's Polymarket trades onto the follower's deposit wallet.
 *
 *  Default mode = WEBSOCKET (near-real-time, enter ASAP after the whale):
 *    subscribe to RTDS activity/trades firehose, filter by leader address, copy instantly.
 *  Fallback mode = POLL=1 (Data API /trades?user=, ~5min lag).
 *
 *  Usage: LEADER=0x..[,0x..] [LIVE=1] [COPY_FRACTION=0.1] [MIN_SHARES=5] [MAX_NOTIONAL=2]
 *         [POLL=1] [STATE=./copy-state.json] node [--experimental-global-webcrypto] copy-loop.mjs
 *
 *  Detection (Data API + RTDS WS) is public/read-only (not geoblocked) β†’ runs anywhere.
 *  The copy POST (LIVE) must run from the Malaysia executor (non-geoblocked egress). */
import fs from 'node:fs';
import { createPublicClient, http, erc20Abi } from 'viem';
import { polygon } from 'viem/chains';

const DATA_API = 'https://data-api.polymarket.com';
const RTDS_WS = 'wss://ws-live-data.polymarket.com';
const PUSD = '0xC011a7E12a19f7B1f670d46F03B03f3342E82DFB';
const CTF = '0x4D97DCd97eC945f40cF65F87097ACe5EA0476045';   // Conditional Tokens (ERC1155) β€” source of truth for held outcome tokens
const CTF_BAL_ABI = [{ name: 'balanceOf', type: 'function', stateMutability: 'view', inputs: [{ name: 'account', type: 'address' }, { name: 'id', type: 'uint256' }], outputs: [{ type: 'uint256' }] }];
const DEPOSIT_WALLET = process.env.DEPOSIT_WALLET || '0x50A8061e9448EB1e5d5e7aF07BE4E4F63C6F24Ff';
const pub = createPublicClient({ chain: polygon, transport: http(process.env.RPC || 'https://polygon.drpc.org') });
// Second, INDEPENDENT RPC used only to confirm a balanceOf=0 before closing a position.
// drpc round-robins across nodes and a lagging node can return a FALSE 0 β†’ without a
// second opinion that false zero would close a live position and strand its value.
const pub2 = createPublicClient({ chain: polygon, transport: http(process.env.RPC2 || 'https://polygon-bor-rpc.publicnode.com') });
let budget = 0;   // follower wallet pUSD balance, for "% Π±ΡŽΠ΄ΠΆΠ΅Ρ‚Π°" allocation
async function refreshBudget() {
  try { budget = Number(await pub.readContract({ address: PUSD, abi: erc20Abi, functionName: 'balanceOf', args: [DEPOSIT_WALLET] })) / 1e6; }
  catch (e) { console.log('[budget] read err:', e.message); }
}
const LIVE = process.env.LIVE === '1';   // default DRY for safety
const POLL = process.env.POLL === '1';   // default = websocket
const LEADERS = new Set((process.env.LEADER || '').split(',').map((s) => s.trim().toLowerCase()).filter(Boolean));
const FOLLOWERS = new Set((process.env.FOLLOWER || '').split(',').map((s) => s.trim().toLowerCase()).filter(Boolean));  // allow-list of sub owners we copy
const COPY_FRACTION = Number(process.env.COPY_FRACTION || '0.1');   // copy 10% of leader size
const MIN_SHARES = Number(process.env.MIN_SHARES || '5');           // CLOB min order
const MAX_NOTIONAL = Number(process.env.MAX_NOTIONAL || '2');       // cap $ per copy (safety)
const MIN_NOTIONAL = Number(process.env.MIN_NOTIONAL || '1');       // Polymarket marketable-order minimum ($1)
const MAX_OPEN = Number(process.env.MAX_OPEN || '5');              // cap open positions β†’ total exposure ≀ MAX_OPEN*MAX_NOTIONAL
const MAX_PER_EVENT = Number(process.env.MAX_PER_EVENT || '1');    // cap open positions per (leader, eventSlug) β†’ no multi-leg pile-up on one game
const MAX_OPEN_PER_LEADER = Number(process.env.MAX_OPEN_PER_LEADER || '1'); // cap open positions per leader β†’ one active whale can't squat all MAX_OPEN slots
const LEADER_KILL_USD = Number(process.env.LEADER_KILL_USD || '8'); // per-whale stop-loss: once our realized PnL from a leader drops to -$X, stop copying its new trades (existing opens ride to resolution)
const MIRROR_EXIT_LIVE = process.env.MIRROR_EXIT_LIVE === '1';       // ACT on a detected leader-exit (sell our copy). Default off = detect+log only (safe observe phase)
const LEADER_EXIT_CONFIRM = Number(process.env.LEADER_EXIT_CONFIRM || '2'); // consecutive reconcile cycles a leader must be absent from a held token before we trust the exit (guards Data API flake)
const EXIT_SLIPPAGE = Number(process.env.EXIT_SLIPPAGE || '0.02');   // when mirror-selling, undercut current mark by this to cross the spread and fill
const STATE_FILE = process.env.STATE || './copy-state.json';
const POLL_MS = Number(process.env.POLL_MS || '15000');
const SUBS_URL = process.env.SUBS_URL || '';            // e.g. https://poly-dev.szhub.space/api/copy/active
const SERVICE_TOKEN = process.env.SERVICE_TOKEN || '';
const SUBS_POLL_MS = Number(process.env.SUBS_POLL_MS || '20000');
const PNL_URL = process.env.PNL_URL || (SUBS_URL ? SUBS_URL.replace(/\/active$/, '/pnl') : '');  // POST per-leader PnL here
const PNL_POLL_MS = Number(process.env.PNL_POLL_MS || '120000');  // every 2 min

if (![...LEADERS].every((a) => a.length === 42)) { console.log('set LEADER=0x... (42 chars, comma-separated for many)'); process.exit(1); }
if (!LEADERS.size && !SUBS_URL) { console.log('set LEADER=0x... or SUBS_URL (DB-driven)'); process.exit(1); }

// Map a UI CopyConfig β†’ our sizing params. The env values are HARD safety ceilings
// the UI can only go below (defense in depth on a money path).
function deriveCfg(c) {
  c = c || {};
  const mode = c.allocMode || 'proportional';                 // 'fixed' | 'percent' | 'proportional'
  const allocValue = Number(c.allocValue) || 0;
  const maxNotional = Math.max(MIN_NOTIONAL, Math.min(Number(c.maxPerTrade) || MAX_NOTIONAL, MAX_NOTIONAL));
  const maxOpen = (c.maxExposure && c.maxPerTrade) ? Math.max(1, Math.min(Math.floor(c.maxExposure / c.maxPerTrade), MAX_OPEN)) : MAX_OPEN;
  const priceMin = c.priceMin != null ? Number(c.priceMin) / 100 : 0;
  const priceMax = c.priceMax != null ? Number(c.priceMax) / 100 : 1;
  const maxPerEvent = Math.max(1, Math.min(Number(c.maxPerEvent) || MAX_PER_EVENT, MAX_PER_EVENT));
  const maxOpenPerLeader = Math.max(1, Math.min(Number(c.maxOpenPerLeader) || MAX_OPEN_PER_LEADER, MAX_OPEN_PER_LEADER));
  return { mode, allocValue, maxNotional, maxOpen, priceMin, priceMax, maxPerEvent, maxOpenPerLeader };
}
const defaultCfg = deriveCfg(null);              // env-based defaults (env LEADER mode)
const subsConfig = new Map();                    // leader β†’ derived cfg (DB-driven)

// Active leaders = env LEADERS βˆͺ subscriptions from the screener DB (refreshed live).
const activeLeaders = new Set(LEADERS);
async function pollSubs() {
  if (!SUBS_URL) return;
  try {
    const r = await fetch(SUBS_URL, { headers: { 'x-service-token': SERVICE_TOKEN }, signal: AbortSignal.timeout(12000) });
    if (!r.ok) { console.log('[subs] http', r.status); return; }
    const d = await r.json();
    subsConfig.clear();
    for (const s of (d.data || [])) {
      // Only copy subscriptions owned by allow-listed followers (this daemon trades
      // ONE deposit wallet). Blocks foreign subs from steering our wallet while the
      // subscription endpoint still trusts a client-asserted address (IDOR β€” the real
      // fix is server-side Privy token verification before multi-user/prod).
      if (FOLLOWERS.size && !FOLLOWERS.has(String(s.userAddress || '').toLowerCase())) continue;
      const a = String(s.leaderAddress || '').toLowerCase();
      if (a.length === 42) subsConfig.set(a, deriveCfg(s.config));
    }
    const next = new Set([...LEADERS, ...subsConfig.keys()]);
    if (next.size !== activeLeaders.size || [...next].some((a) => !activeLeaders.has(a))) {
      console.log(`[subs] active leaders updated: ${next.size} (${subsConfig.size} from DB)`);
    }
    activeLeaders.clear(); for (const a of next) activeLeaders.add(a);
  } catch (e) { console.log('[subs] err:', e.message); }
}

const tradeKey = (t) => `${t.transactionHash || ''}:${t.asset || ''}:${t.side || ''}:${t.timestamp || ''}:${t.size || ''}`;

/** map a leader trade β†’ the order WE would place for the follower (per-leader cfg).
 *  Target $ per copy depends on the allocation mode:
 *    fixed       β†’ allocValue $ per trade
 *    percent     β†’ allocValue% of the follower's wallet budget
 *    proportional→ COPY_FRACTION of the leader's own trade notional ("как кит") */
function decideCopy(t, cfg) {
  const leaderSize = Number(t.size || 0);
  const price = Number(t.price || 0);
  if (!t.asset || !leaderSize || !price) return null;
  let target;   // target notional ($) for this copy
  if (cfg.mode === 'fixed') target = cfg.allocValue;
  else if (cfg.mode === 'percent') target = (cfg.allocValue / 100) * budget;
  else target = leaderSize * price * COPY_FRACTION;
  let size = Math.round((target || 0) / price);
  if (size < MIN_SHARES) size = MIN_SHARES;            // bump to CLOB share minimum
  if (size * price < MIN_NOTIONAL) size = Math.ceil(MIN_NOTIONAL / price);  // meet $ minimum (marketable orders)
  if (size * price > cfg.maxNotional) {               // per-leader $ cap (≀ env ceiling)
    size = Math.floor(cfg.maxNotional / price);
    if (size < MIN_SHARES || size * price < MIN_NOTIONAL) return null;  // can't satisfy min$ and cap$ at this price β†’ skip
  }
  return { tokenID: t.asset, side: (t.side || '').toUpperCase(), price, size,
           notional: +(size * price).toFixed(2), outcome: t.outcome, title: t.title };
}

let executor = null;

// --- positions ledger (local JSON): one open record per held tokenID ---
const POS_FILE = process.env.POSITIONS || './positions.json';
const loadPos = () => { try { return JSON.parse(fs.readFileSync(POS_FILE, 'utf8')); } catch { return {}; } };
// Atomic write: a crash mid-write would corrupt positions.json β†’ loadPos() falls back
// to {} β†’ all position tracking lost (re-buy storm + orphaned on-chain holdings). Write
// to a temp file then rename (atomic on the same filesystem) so readers always see a
// complete file β€” either the old one or the new one, never a half-written one.
const savePos = (p) => { const tmp = `${POS_FILE}.tmp`; fs.writeFileSync(tmp, JSON.stringify(p, null, 1)); fs.renameSync(tmp, POS_FILE); };

// Serialize every read-modify-write of the ledger. WS message handlers run async and are
// NOT awaited by `ws`, so two trades (or a buy + a slow reconcile) can interleave at await
// points: both pass the MAX_OPEN / per-leader / dup-token checks before either saves (TOCTOU
// β†’ cap breach / double-buy), or a reconcile holding a stale `pos` across on-chain reads
// overwrites a buy that landed meanwhile (lost position β†’ can't close). Chain all mutations
// through one promise so they run strictly one at a time.
let posLock = Promise.resolve();
const withPos = (fn) => { const run = posLock.then(fn, fn); posLock = run.catch(() => {}); return run; };

// A position is 'pending' from the moment we reserve its slot until the CLOB order is
// confirmed filled (then 'open') or rolled back. Caps count pending too, so two concurrent
// trades can't both pass MAX_OPEN while their orders are in flight outside the lock.
const isActive = (p) => !!p && (p.status === 'open' || p.status === 'pending');
// A reservation whose order never fills within this window (rejected / rested / never matched)
// is cancelled + released by reconcile so it doesn't squat a slot forever.
const PENDING_TIMEOUT_S = Number(process.env.PENDING_TIMEOUT_S || '180');

// Normalize a trade timestamp to Unix SECONDS (reconcile age math assumes seconds). RTDS
// currently sends seconds; guard in case a payload ever arrives in milliseconds.
const toSec = (ts) => { const n = Number(ts) || 0; return n > 2e10 ? Math.floor(n / 1000) : n; };

// Reconcile local open ledger against on-chain reality. Markets that resolve
// WITHOUT the leader selling (e.g. sports held to settlement) get redeemed by
// the DE cron on-chain but leave a stale 'open' record here β†’ MAX_OPEN saturates
// forever and no new copies happen. Mark any open position the DW no longer
// holds as closed, freeing the slot. Grace period guards against Data API lag on
// freshly-filled positions.
const RECONCILE_GRACE_S = Number(process.env.RECONCILE_GRACE_S || '600'); // 10 min
// A position absent from the Data API for this long while STILL on-chain = the market
// resolved and these are worthless losing tokens nobody redeems ($0 payout). They'd
// occupy a MAX_OPEN slot forever (can't sell dust, redeem-cron only redeems winners)
// β†’ free the slot. Live long-held positions keep value, stay in the Data API, never
// reach this branch. Well beyond the flake window so transient drops don't trip it.
const MAX_OPEN_AGE_S = Number(process.env.MAX_OPEN_AGE_S || '86400'); // 24h
async function reconcileOpenPositions() {
  try {
    const res = await fetch(`${DATA_API}/positions?user=${DEPOSIT_WALLET}`, { signal: AbortSignal.timeout(15000) });
    const held = await res.json();
    if (!Array.isArray(held)) return;
    const heldPnl = new Map(held.map((p) => [String(p.asset), Number(p.cashPnl) || 0]));  // asset β†’ live mark-to-market
    const now = Math.floor(Date.now() / 1000);
    // Phase 1 (NO lock): inspect a snapshot + do the slow on-chain reads, collect intended
    // changes. Holding the lock across these reads would block copies for seconds.
    const snapshot = loadPos();
    const changes = new Map();   // tid β†’ { markPnl?, close?: reason }
    const toClose = [];          // [{ tid, price }] confirmed leader-exits to mirror-sell (Phase B)
    // Proactive leader-exit: pull current holdings of the leaders behind our OPEN
    // positions. If the entry-leader no longer holds a token we still hold, they've
    // exited β†’ mirror the close (confirmed over LEADER_EXIT_CONFIRM cycles vs flake).
    const openLeaders = [...new Set(Object.values(snapshot)
      .filter((p) => p.status === 'open' && p.leader).map((p) => String(p.leader).toLowerCase()))];
    const leaderHeld = new Map();   // leader β†’ Set(asset). Absent entry = unreliable this cycle β†’ skip exit check.
    await Promise.allSettled(openLeaders.map(async (ldr) => {
      try {
        const r = await fetch(`${DATA_API}/positions?user=${ldr}`, { signal: AbortSignal.timeout(15000) });
        const arr = await r.json();
        if (Array.isArray(arr) && arr.length) leaderHeld.set(ldr, new Set(arr.map((x) => String(x.asset))));
      } catch { /* leave unset β†’ don't trust absence this cycle */ }
    }));
    const curPrice = new Map(held.map((h) => [String(h.asset), (Number(h.size) > 0 ? Number(h.currentValue) / Number(h.size) : 0)]));
    for (const [tid, p] of Object.entries(snapshot)) {
      if (p.status === 'pending') {
        // Reservation awaiting fill. On-chain balance is the source of truth: if it appeared
        // the order filled β†’ promote to open; if still zero past the timeout the order never
        // filled (rejected/rested) β†’ cancel its resting order + release the slot.
        let bal;
        try { bal = await pub.readContract({ address: CTF, abi: CTF_BAL_ABI, functionName: 'balanceOf', args: [DEPOSIT_WALLET, BigInt(tid)] }); }
        catch { continue; }   // RPC error β†’ leave pending, retry next cycle
        if (bal > 0n) changes.set(tid, { promote: true });
        else if ((now - (p.openedAt || 0)) > PENDING_TIMEOUT_S) changes.set(tid, { cancelPending: p.lastOrderId || null });
        continue;
      }
      if (p.status !== 'open') continue;
      if (heldPnl.has(String(tid))) {
        const ch = { markPnl: heldPnl.get(String(tid)) };          // refresh last-known P&L each cycle
        const lh = leaderHeld.get(String(p.leader || '').toLowerCase());
        if (lh) {                                                  // reliable leader holdings this cycle
          if (!lh.has(String(tid))) { ch.leaderExit = true; ch.exitPrice = curPrice.get(String(tid)) || 0; }
          else ch.leaderStillIn = true;                            // still in β†’ reset any pending exit-confirm
        }
        changes.set(tid, ch);
      } else if ((now - (p.openedAt || 0)) > RECONCILE_GRACE_S) {
        // Absent from the Data API. That API flakes (drops held positions transiently),
        // so DON'T trust absence β€” verify the real ERC1155 balance on CTF before banking.
        // Only a true on-chain zero balance means the position is gone (sold/resolved+redeemed).
        let onchain;
        try { onchain = await pub.readContract({ address: CTF, abi: CTF_BAL_ABI, functionName: 'balanceOf', args: [DEPOSIT_WALLET, BigInt(tid)] }); }
        catch (e) { console.log(`[reconcile] CTF read err ${String(tid).slice(0, 10)}:`, e.message); continue; }  // RPC error β†’ leave open, retry next cycle
        if (onchain === 0n) {
          // Confirm the zero on a second independent RPC before banking the close β€”
          // drpc can serve a stale node returning a false 0 (would strand live value).
          let onchain2;
          try { onchain2 = await pub2.readContract({ address: CTF, abi: CTF_BAL_ABI, functionName: 'balanceOf', args: [DEPOSIT_WALLET, BigInt(tid)] }); }
          catch (e) { console.log(`[reconcile] CTF confirm-read err ${String(tid).slice(0, 10)}:`, e.message); continue; }  // can't confirm β†’ keep open, retry next cycle
          if (onchain2 === 0n) {
            changes.set(tid, { close: 'resolved-onchain-absent' });
          } else {
            console.log(`[reconcile] false-zero guarded ${String(tid).slice(0, 10)}: rpc1=0 rpc2=${onchain2} β†’ keep open`);
          }
        } else if ((now - (p.openedAt || 0)) > MAX_OPEN_AGE_S) {
          // On-chain balance persists but the position vanished from the Data API
          // long ago β†’ resolved-worthless leftover. Free the slot.
          changes.set(tid, { close: 'resolved-worthless-aged' });
        }  // onchain > 0 and recent β†’ still held but missing from Data API β†’ keep open
      }
    }
    if (!changes.size) return;
    // Phase 2 (LOCKED): re-load the FRESH ledger and apply patches only to still-open
    // positions, so we never overwrite a buy/close that landed during the on-chain reads.
    await withPos(() => {
      const pos = loadPos();
      let freed = 0, changed = false;
      for (const [tid, patch] of changes) {
        const p = pos[tid];
        if (!p) continue;
        if (patch.promote) { if (p.status === 'pending') { p.status = 'open'; changed = true; } continue; }
        if (patch.cancelPending !== undefined) {   // pending timed out β†’ cancel resting order + drop the reservation
          if (p.status === 'pending') { if (patch.cancelPending && executor) executor.cancel(patch.cancelPending).catch(() => {}); delete pos[tid]; changed = true; }
          continue;
        }
        if (p.status !== 'open') continue;   // a concurrent handler already changed it
        if (patch.markPnl != null) { p.markPnl = patch.markPnl; changed = true; }
        if (patch.leaderStillIn) { if (p.leaderExitSeen) { p.leaderExitSeen = 0; changed = true; } }
        else if (patch.leaderExit) {
          p.leaderExitSeen = (p.leaderExitSeen || 0) + 1; changed = true;
          if (p.leaderExitSeen >= LEADER_EXIT_CONFIRM) {
            console.log(`πŸ”» leader-exit confirmed (${p.leaderExitSeen}x) ${String(tid).slice(0, 10)} leader=${String(p.leader).slice(0, 8)} "${String(p.title).slice(0, 38)}" markβ‰ˆ$${(Number(p.markPnl) || 0).toFixed(2)} β€” ${MIRROR_EXIT_LIVE ? 'closing' : 'OBSERVE-ONLY'}`);
            if (MIRROR_EXIT_LIVE) toClose.push({ tid, price: patch.exitPrice });
          } else {
            console.log(`… leader-exit seen ${p.leaderExitSeen}/${LEADER_EXIT_CONFIRM} ${String(tid).slice(0, 10)} (awaiting confirm)`);
          }
        }
        if (patch.close) {
          p.status = 'closed'; p.closedReason = patch.close;
          // worthless-aged = resolved $0 loser β†’ realized is the full cost basis lost.
          // onchain-absent = sold/redeemed β†’ last mark-to-market is the best estimate we have.
          p.realizedPnl = patch.close === 'resolved-worthless-aged' ? -(Number(p.cost) || 0) : (Number(p.markPnl) || 0);
          freed += 1; changed = true;
        }
      }
      if (changed) savePos(pos);
      if (freed) console.log(`[reconcile] freed ${freed} resolved slot(s); open now ${Object.values(pos).filter((x) => x.status === 'open').length}`);
    });
    // Phase B (UNLOCKED network): mirror-sell positions whose entry-leader has exited.
    // Banks closed only on a real fill; otherwise cancels the resting order, resets the
    // confirm counter and keeps the position open for next cycle (never falsely banks).
    if (MIRROR_EXIT_LIVE && toClose.length && executor) {
      for (const { tid, price } of toClose) {
        const sellPx = Math.max(0.01, Math.min(0.99, (Number(price) || 0) * (1 - EXIT_SLIPPAGE)));
        let resp;
        try { resp = await executor.sellAll({ tokenID: tid, price: sellPx }); }
        catch (e) { resp = { success: false, errorMsg: e?.message || String(e) }; }
        await withPos(() => {
          const pos = loadPos();
          const h = pos[tid];
          if (!h || h.status !== 'open') return;
          const soldNow = resp?.success && (resp.status === 'matched' || Number(resp.takingAmount || 0) > 0 || Number(resp.makingAmount || 0) > 0);
          if (resp?.skipped) {
            h.status = 'closed'; h.closedReason = `leader-exit-${String(resp.skipped)}`; h.closedAt = now; h.realizedPnl = Number(h.markPnl) || 0; savePos(pos);
            console.log(`   mirror-close skipped (${resp.skipped}) β†’ closed ${String(tid).slice(0, 10)}`);
          } else if (soldNow) {
            h.status = 'closed'; h.closedReason = 'mirror-leader-exit'; h.closeOrderId = resp.orderID; h.closedAt = now; h.realizedPnl = Number(h.markPnl) || 0; savePos(pos);
            console.log(`βœ… MIRROR-CLOSED (leader exited) ${String(tid).slice(0, 10)} pnlβ‰ˆ$${(Number(h.markPnl) || 0).toFixed(2)}`);
          } else if (resp?.success && resp.orderID) {
            executor.cancel(resp.orderID).catch(() => {}); h.leaderExitSeen = 0; savePos(pos);
            console.log(`⏳ mirror-close not filled; keep open, retry next cycle ${String(tid).slice(0, 10)}`);
          } else {
            h.leaderExitSeen = 0; savePos(pos);
            console.log(`❌ mirror-close rejected, keep open ${String(tid).slice(0, 10)}: ${JSON.stringify(resp).slice(0, 120)}`);
          }
        });
      }
    }
  } catch (e) { console.log('[reconcile] err:', e.message); }
}

// Per-leader PnL for the UI: realized (banked from CLOSED copies of this leader) +
// unrealized (live cashPnl of currently-held copies). One entry per leader we have
// ANY record for, so the card can always show a line and we can see who bleeds.
async function computeAndPostPnL() {
  try {
    const res = await fetch(`${DATA_API}/positions?user=${DEPOSIT_WALLET}`, { signal: AbortSignal.timeout(15000) });
    const held = await res.json();
    if (!Array.isArray(held)) return;
    const heldMap = new Map(held.map((h) => [String(h.asset), h]));   // asset β†’ live position
    const pos = loadPos();
    const agg = new Map();   // leader β†’ { realized, unrealized, value, openCount, closedCount }
    const get = (ldr) => {
      let a = agg.get(ldr);
      if (!a) { a = { leader: ldr, realized: 0, unrealized: 0, value: 0, openCount: 0, closedCount: 0 }; agg.set(ldr, a); }
      return a;
    };
    for (const [tid, p] of Object.entries(pos)) {
      if (p.status === 'pending') continue;   // slot reserved, order not yet confirmed β€” not a real P&L line
      const a = get(String(p.leader || 'unknown').toLowerCase());
      if (p.status === 'open') {
        const h = heldMap.get(String(tid));
        if (h) { a.unrealized += Number(h.cashPnl) || 0; a.value += Number(h.currentValue) || 0; a.openCount += 1; }
      } else {
        a.realized += Number(p.realizedPnl) || 0; a.closedCount += 1;
      }
    }
    const leaders = [...agg.values()].map((a) => ({
      leader: a.leader,
      realized: +a.realized.toFixed(4),
      unrealized: +a.unrealized.toFixed(4),
      pnl: +(a.realized + a.unrealized).toFixed(4),   // total result (back-compat field)
      value: +a.value.toFixed(4),
      openCount: a.openCount,
      closedCount: a.closedCount,
    }));
    console.log(`[pnl] ${leaders.map((l) => `${l.leader.slice(0, 8)}:tot${l.pnl >= 0 ? '+' : ''}${l.pnl.toFixed(2)}(o${l.openCount}/c${l.closedCount})`).join(' ') || '(no records)'}`);
    if (!PNL_URL) return;
    const r = await fetch(PNL_URL, { method: 'POST', headers: { 'content-type': 'application/json', 'x-service-token': SERVICE_TOKEN }, body: JSON.stringify({ leaders }), signal: AbortSignal.timeout(12000) });
    if (!r.ok) console.log('[pnl] post http', r.status);
  } catch (e) { console.log('[pnl] err:', e.message); }
}

async function handleTrade(t) {
  const side = String(t.side || '').toUpperCase();
  const leader = String(t.proxyWallet || '').toLowerCase();
  const cfg = subsConfig.get(leader) || defaultCfg;
  if (side === 'BUY') {
    const px = Number(t.price || 0);
    if (px < cfg.priceMin || px > cfg.priceMax) { console.log(`  skip (price ${px} outside band ${cfg.priceMin}-${cfg.priceMax})`); return; }
    const order = decideCopy(t, cfg);
    if (!order) { console.log('  skip (incomplete/over-cap)'); return; }
    const label = `${order.side} ${order.size} "${order.outcome}" @ ${order.price.toFixed(3)} ($${order.notional}) | ${String(order.title).slice(0, 45)}`;
    const evt = t.eventSlug || t.slug || '';   // group all legs of one game (moneyline/spreads/O-U)
    // Phase A (LOCKED): cap/dup checks + RESERVE the slot as 'pending', then release the lock so
    // the slow CLOB POST runs OUTSIDE the mutex (it no longer blocks other leaders' trades).
    // Caps count pending too, so two concurrent reservations can't both breach MAX_OPEN (TOCTOU).
    const reserved = await withPos(() => {
      const pos0 = loadPos();
      // Per-whale stop-loss: if our banked realized PnL from this leader is already
      // at/below -$LEADER_KILL_USD, stop copying its new trades (it's bleeding us).
      const leaderRealized = Object.values(pos0)
        .filter((p) => p.status === 'closed' && String(p.leader || '').toLowerCase() === leader)
        .reduce((s, p) => s + (Number(p.realizedPnl) || 0), 0);
      if (LEADER_KILL_USD > 0 && leaderRealized <= -LEADER_KILL_USD) { console.log(`β›” skip (leader killed: realized $${leaderRealized.toFixed(2)} ≀ -$${LEADER_KILL_USD}): ${label}`); return false; }
      if (isActive(pos0[order.tokenID])) { console.log(`⏭️ skip (already holding/pending this token): ${label}`); return false; }
      if (evt) {
        const sameEvt = Object.values(pos0).filter((p) => isActive(p) && p.leader === leader && p.eventSlug === evt).length;
        if (sameEvt >= cfg.maxPerEvent) { console.log(`⏭️ skip (leader already ${sameEvt} on event ${evt}, max ${cfg.maxPerEvent}): ${label}`); return false; }
      }
      const leaderOpen = Object.values(pos0).filter((p) => isActive(p) && p.leader === leader).length;
      if (leaderOpen >= cfg.maxOpenPerLeader) { console.log(`⏭️ skip (leader already ${leaderOpen}, max per-leader ${cfg.maxOpenPerLeader}): ${label}`); return false; }
      const openCount = Object.values(pos0).filter(isActive).length;
      if (openCount >= cfg.maxOpen) { console.log(`⏭️ skip (max ${cfg.maxOpen} open positions reached): ${label}`); return false; }
      if (!LIVE) { console.log(`🟒 WOULD COPY: ${label}\n     tokenID=${order.tokenID}`); return false; }
      pos0[order.tokenID] = { tokenID: order.tokenID, conditionId: t.conditionId, eventSlug: evt, outcome: t.outcome, title: t.title, leader, buys: 0, cost: order.notional, openedAt: Math.floor(Date.now() / 1000), status: 'pending' };
      savePos(pos0);
      return true;
    });
    if (!reserved) return;
    // Phase B (UNLOCKED network): place the order, then finalize or roll back the reservation.
    let resp;
    try { resp = await executor.buy(order); }
    catch (e) { resp = { success: false, errorMsg: e?.message || String(e) }; }
    await withPos(() => {
      const pos = loadPos();
      const p = pos[order.tokenID];
      if (!p || p.status !== 'pending') return;   // reconcile/another handler already resolved it
      // A copy BUY priced at the leader's fill is marketable; 'matched' or non-zero filled
      // amounts mean we actually hold tokens now β†’ confirm open. A 'live'/'delayed' status
      // with nothing filled means it's resting/queued β†’ keep pending; reconcile confirms via
      // on-chain balance (promote) or cancels it on timeout. Outright rejection β†’ release slot.
      const filledNow = resp?.success && (resp.status === 'matched' || Number(resp.takingAmount || 0) > 0 || Number(resp.makingAmount || 0) > 0);
      if (filledNow) {
        p.status = 'open'; p.buys = 1; p.lastOrderId = resp.orderID; p.openedAt = toSec(t.timestamp) || p.openedAt; savePos(pos);
        console.log(`βœ… COPIED: ${label}\n     orderID=${resp.orderID} status=${resp.status}`);
      } else if (resp?.success && resp.orderID) {
        p.lastOrderId = resp.orderID; savePos(pos);
        console.log(`βŒ› posted, awaiting fill (status ${resp.status}); reconcile will confirm/cancel: ${label}`);
      } else {
        delete pos[order.tokenID]; savePos(pos);
        console.log(`❌ copy rejected, slot released: ${label}\n     ${JSON.stringify(resp).slice(0, 160)}`);
      }
    });
    return;
  }
  if (side === 'SELL') {
    const label = `SELL "${t.outcome}" @ ${Number(t.price).toFixed(3)} | ${String(t.title).slice(0, 45)}`;
    // Phase A (LOCKED): only the ENTRY leader's exit mirrors a close. A different followed leader
    // selling the same token is NOT our exit signal β€” ignore it.
    const go = await withPos(() => {
      const pos = loadPos();
      const held = pos[t.asset];
      if (!held || held.status !== 'open') return false;     // not held (pending/closed/absent) β†’ ignore
      if (held.leader !== leader) { console.log(`   ignore SELL from non-entry leader ${leader.slice(0, 8)} (entry ${String(held.leader).slice(0, 8)}): ${label}`); return false; }
      return true;
    });
    if (!go) return;
    if (!LIVE) { console.log(`πŸ”» WOULD CLOSE (leader exited): ${label}\n     tokenID=${t.asset}`); return; }
    // Phase B (UNLOCKED network): place the close, then bank it ONLY if it actually filled.
    let resp;
    try { resp = await executor.sellAll({ tokenID: t.asset, price: Number(t.price) }); }
    catch (e) { resp = { success: false, errorMsg: e?.message || String(e) }; }
    await withPos(() => {
      const pos = loadPos();
      const held = pos[t.asset];
      if (!held || held.status !== 'open') return;            // already resolved by another handler
      if (resp?.skipped) { held.status = 'closed'; held.closedReason = String(resp.skipped); held.closedAt = toSec(t.timestamp); held.realizedPnl = Number(held.markPnl) || 0; savePos(pos); console.log(`   close skipped (${resp.skipped}) β†’ marked closed: ${label}`); return; }
      const soldNow = resp?.success && (resp.status === 'matched' || Number(resp.takingAmount || 0) > 0 || Number(resp.makingAmount || 0) > 0);
      if (soldNow) {
        held.status = 'closed'; held.closeOrderId = resp.orderID; held.closedAt = toSec(t.timestamp); held.realizedPnl = Number(held.markPnl) || 0; savePos(pos);
        console.log(`βœ… CLOSED: ${label}\n     orderID=${resp.orderID} status=${resp.status}`);
      } else if (resp?.success && resp.orderID) {
        // Posted but didn't fill (rested) β†’ we STILL HOLD. Cancel the resting close and keep the
        // position open so the next leader-sell / reconcile retries β€” do NOT falsely bank it closed.
        executor.cancel(resp.orderID).then(() => console.log(`↩️ cancelled unfilled close ${resp.orderID}`)).catch(() => {});
        console.log(`⏳ close not filled (status ${resp.status}); keeping open, cancelled resting: ${label}`);
      } else {
        console.log(`❌ close rejected (still holding): ${label}\n     ${JSON.stringify(resp).slice(0, 160)}`);
      }
    });
  }
}

// --- WebSocket mode (default): near-real-time firehose, filter by leader ---
async function runWebsocket() {
  const { default: WebSocket } = await import('ws');
  const seen = new Set();
  const remember = (k) => { seen.add(k); if (seen.size > 4000) seen.delete(seen.values().next().value); };
  let backoff = 1000;
  const connect = () => {
    const ws = new WebSocket(RTDS_WS);
    let ping;
    ws.on('open', () => {
      backoff = 1000;
      ws.send(JSON.stringify({ action: 'subscribe', subscriptions: [{ topic: 'activity', type: 'trades' }] }));
      ping = setInterval(() => { try { ws.send('PING'); } catch {} }, 5000);
      console.log(`[ws] connected; watching ${activeLeaders.size} leader(s)`);
    });
    ws.on('message', async (d) => {
      const s = d.toString();
      if (s[0] !== '{' && s[0] !== '[') return;          // skip PONG/control frames
      let msg; try { msg = JSON.parse(s); } catch { return; }
      for (const m of (Array.isArray(msg) ? msg : [msg])) {
        const t = m.payload || m;
        if (!t || !t.proxyWallet || !t.asset) continue;
        if (!activeLeaders.has(String(t.proxyWallet).toLowerCase())) continue;
        const k = tradeKey(t);
        if (seen.has(k)) continue;
        remember(k);
        await handleTrade(t);
      }
    });
    ws.on('error', (e) => console.log('[ws] error:', e.message));
    ws.on('close', () => {
      clearInterval(ping);
      console.log(`[ws] closed; reconnecting in ${backoff}ms`);
      setTimeout(connect, backoff);
      backoff = Math.min(backoff * 2, 30000);
    });
  };
  connect();
}

// --- Polling mode (POLL=1): Data API /trades?user= for EVERY active leader (~POLL_MS lag).
// Used when the RTDS websocket is unavailable (Cloudflare 429s datacenter IPs on WS upgrade).
// Polls activeLeaders (env LEADER βˆͺ DB subs), not just LEADERS[0]; per-leader cursors.
const loadState = () => { try { return JSON.parse(fs.readFileSync(STATE_FILE, 'utf8')); } catch { return {}; } };
const saveState = (s) => fs.writeFileSync(STATE_FILE, JSON.stringify(s));
async function tick() {
  const leaders = [...activeLeaders];
  if (!leaders.length) { console.log('[poll] no active leaders'); return; }
  const state = loadState();
  const seen = new Set(Array.isArray(state.seen) ? state.seen : []);
  // Per-leader cursors; migrate any legacy scalar shape to objects.
  if (!state.lastTs || typeof state.lastTs !== 'object') state.lastTs = {};
  if (!state.primed || typeof state.primed !== 'object') state.primed = {};
  const results = await Promise.allSettled(leaders.map(async (leader) => {
    const r = await fetch(`${DATA_API}/trades?user=${leader}&limit=50`, { signal: AbortSignal.timeout(15000) });
    if (!r.ok) throw new Error(`${leader.slice(0, 8)}… data-api ${r.status}`);
    return { leader, trades: await r.json() };
  }));
  const toCopy = [];
  for (const res of results) {
    if (res.status !== 'fulfilled') { console.log('[poll] err:', res.reason?.message); continue; }
    const { leader, trades } = res.value;
    if (!Array.isArray(trades)) continue;
    if (!state.primed[leader]) {
      // First sight of this leader: baseline current trades as seen (don't copy history).
      for (const t of trades) seen.add(tradeKey(t));
      state.lastTs[leader] = trades.reduce((m, t) => Math.max(m, Number(t.timestamp || 0)), 0);
      state.primed[leader] = true;
      console.log(`[prime] ${leader.slice(0, 8)}… baseline (${trades.length} trades)`);
      continue;
    }
    for (const t of trades) {
      if (seen.has(tradeKey(t))) continue;
      if (Number(t.timestamp || 0) < (state.lastTs[leader] || 0)) continue;
      seen.add(tradeKey(t));
      state.lastTs[leader] = Math.max(state.lastTs[leader] || 0, Number(t.timestamp || 0));
      toCopy.push(t);
    }
  }
  toCopy.sort((a, b) => Number(a.timestamp) - Number(b.timestamp));
  for (const t of toCopy) await handleTrade(t);
  state.seen = Array.from(seen).slice(-3000);
  saveState(state);
  console.log(`[poll] ${leaders.length} leader(s), ${toCopy.length} new trade(s)`);
}

if (LIVE) { const { makeExecutor } = await import('./copy-exec.mjs'); executor = await makeExecutor(); }
await refreshBudget(); setInterval(refreshBudget, 60000);
await reconcileOpenPositions(); setInterval(reconcileOpenPositions, 120000);
await computeAndPostPnL(); setInterval(computeAndPostPnL, PNL_POLL_MS);
if (SUBS_URL) { await pollSubs(); setInterval(pollSubs, SUBS_POLL_MS); }
console.log(`detection ${LIVE ? 'LIVE' : 'DRY-RUN'} | mode=${POLL ? 'POLL' : 'WS'} | leaders=${activeLeaders.size}${SUBS_URL ? ' (DB-driven)' : ''} budget=$${budget.toFixed(2)} maxNotional=$${MAX_NOTIONAL}`);
if (POLL) { await tick(); if (process.env.ONCE !== '1') setInterval(tick, POLL_MS); }
else { await runWebsocket(); }