← Back
β˜†
/** MULTI-USER copy daemon (Friend-beta A1). A per-user fork of copy-loop.mjs:
 *  one shared leader-trade detection stream β†’ fan-out a copy into EACH subscribed
 *  user's OWN deposit wallet, with their own budget / caps / ledger.
 *
 *  The single-user copy-loop.mjs is left BYTE-IDENTICAL and keeps running the live
 *  edge measurement. This file powers the isolated `copy-trader-mu` instance only.
 *  Do NOT point the live instance at this until the edge gate passes.
 *
 *  Built in chunks:
 *    4a (this commit) β€” per-user context loader + DB ledger load. NO trading yet.
 *                       Run `node copy-loop-mu.mjs --selftest` to dump resolved context.
 *    4b β€” per-user executor + budget.  4c-1 β€” detection + fan-out (DRY decision).
 *    4c-2 — per-user BUY reserve→execute→finalize + SELL mirror-exit close.
 *    4d-1 β€” per-user reconcile (pending promote/cancel, leader-exit mirror,
 *           on-chain-absent close, mark refresh; write-through to copy_positions).
 *    4d-2 (this commit) β€” per-user PnL: daemon posts per-leader realized+unrealized keyed
 *           by userAddress; Rust copy_pnl is now a per-user map (?user= with global fallback).
 *           Client switch to ?user= lands in 4d-2b.
 *
 *  Env: SUBS_URL=https://poly-dev.szhub.space/api/copy/active  SERVICE_TOKEN=...
 *       POSITIONS_URL (defaults to SUBS_URL with /active→/positions)  FOLLOWER=did,did
 *       LIVE=1 (default DRY)  POLL=1 (default WS)
 *  Node>=20 or --experimental-global-webcrypto (CLOB L2 HMAC needs crypto.subtle). */
import fs from 'node:fs';
import { createPublicClient, http, erc20Abi } from 'viem';
import { polygon } from 'viem/chains';

const DATA_API = 'https://data-api.polymarket.com';
const PUSD = '0xC011a7E12a19f7B1f670d46F03B03f3342E82DFB';
const pub = createPublicClient({ chain: polygon, transport: http(process.env.RPC || 'https://polygon.drpc.org') });

const LIVE = process.env.LIVE === '1';            // default DRY for safety
const POLL = process.env.POLL === '1';            // default = websocket
const SUBS_URL = process.env.SUBS_URL || '';
const SERVICE_TOKEN = process.env.SERVICE_TOKEN || '';
const POSITIONS_URL = process.env.POSITIONS_URL || (SUBS_URL ? SUBS_URL.replace(/\/active$/, '/positions') : '');
const RESOLUTION_URL = process.env.RESOLUTION_URL || (SUBS_URL ? SUBS_URL.replace(/\/active$/, '/market-resolution') : '');
const FOLLOWERS = new Set((process.env.FOLLOWER || '').split(',').map((s) => s.trim().toLowerCase()).filter(Boolean));
const SUBS_POLL_MS = Number(process.env.SUBS_POLL_MS || '20000');

// Hard safety ceilings β€” a per-user UI config can only go BELOW these (defense in depth).
const MIN_NOTIONAL = Number(process.env.MIN_NOTIONAL || '1');
const MAX_NOTIONAL = Number(process.env.MAX_NOTIONAL || '3');
const MAX_OPEN = Number(process.env.MAX_OPEN || '10');
const MAX_PER_EVENT = Number(process.env.MAX_PER_EVENT || '1');
const MAX_OPEN_PER_LEADER = Number(process.env.MAX_OPEN_PER_LEADER || '2');
const BUY_SLIPPAGE = Number(process.env.BUY_SLIPPAGE || '0.02'); // chunk6: mirror executor's buy slippage to bound effective price
const MANUAL_URL = process.env.MANUAL_URL || (SUBS_URL ? SUBS_URL.replace(/\/active$/, '/manual-orders') : '');
const MANUAL_POLL_MS = Number(process.env.MANUAL_POLL_MS || '8000');
let manualBusy = false;

if (!SUBS_URL) { console.log('set SUBS_URL (DB-driven multi-user daemon requires it)'); process.exit(1); }

/** map a UI CopyConfig β†’ sizing params, clamped to the env ceilings. */
function deriveCfg(c) {
  c = c || {};
  const mode = c.allocMode || 'proportional';
  const allocValue = Number(c.allocValue) || 0;
  const maxNotional = Math.max(MIN_NOTIONAL, Math.min(Number(c.maxPerTrade) || MAX_NOTIONAL, MAX_NOTIONAL));
  const priceMin = c.priceMin != null ? Number(c.priceMin) / 100 : 0;
  const priceMax = c.priceMax != null ? Number(c.priceMax) / 100 : 1;
  // Per-whale cap on positions in the SAME event (user sets it per subscription, editable).
  // Uncapped from the global env now β€” default 3 when unset; set a greedy whale to 1 in its config.
  const maxPerEvent = Math.max(1, Number(c.maxPerEvent) || 3);
  // Positions-per-whale = the ONLY position-count cap now (user sets it per subscription, editable).
  // No global total-open cap anymore β€” the real limit is the deposit balance. Default 2 when unset.
  const maxOpenPerLeader = Math.max(1, Number(c.maxOpenPerLeader) || MAX_OPEN_PER_LEADER);
  // Counter-trading (chunk B): direction 'fade' inverts the side (buy the complementary
  // outcome). fadeMin/fadeMax = leader-price risk band β€” don't fade extreme favorites.
  // NOTE: c.mode above is the ALLOCATION mode (fixed/percent/proportional) β€” NOT direction.
  const direction = c.direction === 'fade' ? 'fade' : 'copy';
  const fadeMin = c.fadeMin != null ? Number(c.fadeMin) : 0.20;
  const fadeMax = c.fadeMax != null ? Number(c.fadeMax) : 0.80;
  return { mode, allocValue, maxNotional, priceMin, priceMax, maxPerEvent, maxOpenPerLeader, direction, fadeMin, fadeMax };
}

// Resolve the COMPLEMENTARY clobTokenId of a binary market (the id != the leader's asset).
// Used only by FADE: leader buys YES@p β†’ we buy the other token NO@~(1-p). The complement id
// is a separate ERC1155 id (not derivable arithmetically) β†’ look it up via the chunk-A endpoint.
const MARKET_TOKENS_URL = process.env.MARKET_TOKENS_URL
  || (SUBS_URL ? SUBS_URL.replace('/api/copy/active', '/api/copy/market-tokens') : '');
async function resolveComplement(conditionId, leaderAsset) {
  if (!MARKET_TOKENS_URL || !conditionId) return null;
  try {
    const r = await fetch(`${MARKET_TOKENS_URL}?cond=${conditionId}`, {
      headers: { 'x-service-token': SERVICE_TOKEN }, signal: AbortSignal.timeout(12000),
    });
    if (!r.ok) return null;
    const d = await r.json();
    const tokens = (d?.data?.tokens || []).map(String);
    const comp = tokens.find((id) => id !== String(leaderAsset));
    return comp || null;
  } catch { return null; }
}

// ---- Per-user context. userAddress → { dw, eoa, subs: Map(leader→cfg), budget, executor, ledger } ----
// ledger: tokenID β†’ position record (mirrors copy_positions; in-memory mirror, write-through to DB API).
const users = new Map();
const leaderIndex = new Map();   // leader β†’ Set(userAddress) for O(1) fan-out on a detected trade

function getUser(addr) {
  let u = users.get(addr);
  if (!u) { u = { addr, dw: null, eoa: null, subs: new Map(), budget: 0, executor: null, ledger: new Map(), fadeInflight: new Set() }; users.set(addr, u); }
  return u;
}

/** Refresh the active-subscription roster + per-user wallets from the screener DB. */
async function pollSubs() {
  try {
    const r = await fetch(SUBS_URL, { headers: { 'x-service-token': SERVICE_TOKEN }, signal: AbortSignal.timeout(12000) });
    if (!r.ok) { console.log('[subs] http', r.status); return; }
    const d = await r.json();
    const seenUsers = new Set();
    const nextLeaderIndex = new Map();
    for (const s of (d.data || [])) {
      const ua = String(s.userAddress || '').toLowerCase();
      if (FOLLOWERS.size && !FOLLOWERS.has(ua)) continue;     // allow-list (isolation from live instance)
      const dw = s.depositWallet ? String(s.depositWallet).toLowerCase() : null;
      const eoa = s.ownerEoa ? String(s.ownerEoa).toLowerCase() : null;
      if (!dw || !eoa) { console.log(`[subs] skip ${ua.slice(0, 16)}… β€” missing ${!dw ? 'depositWallet' : 'ownerEoa'}`); continue; }
      const leader = String(s.leaderAddress || '').toLowerCase();
      if (leader.length !== 42) continue;
      const u = getUser(ua);
      u.dw = dw; u.eoa = eoa;
      u.subs.set(leader, deriveCfg(s.config));
      seenUsers.add(ua);
      if (!nextLeaderIndex.has(leader)) nextLeaderIndex.set(leader, new Set());
      nextLeaderIndex.get(leader).add(ua);
    }
    // Exit-only roster: leaders of the user's still-OPEN positions that are NOT active subs
    // (paused/archived). Poll them so we MIRROR THEIR EXIT; BUY is ignored (handleTradeForUser).
    for (const [ua, u] of users) {
      for (const p of u.ledger.values()) {
        if (!isActive(p)) continue;
        const lead = String(p.leader || '').toLowerCase();
        if (lead.length !== 42 || u.subs.has(lead)) continue;
        u.subs.set(lead, { ...deriveCfg({}), exitOnly: true });
        if (!nextLeaderIndex.has(lead)) nextLeaderIndex.set(lead, new Set());
        nextLeaderIndex.get(lead).add(ua);
        seenUsers.add(ua);
      }
    }
    // Drop users/leaders no longer active.
    for (const [ua, u] of users) {
      if (!seenUsers.has(ua)) { users.delete(ua); continue; }
      for (const leader of [...u.subs.keys()]) if (!(nextLeaderIndex.get(leader)?.has(ua))) u.subs.delete(leader);
    }
    leaderIndex.clear(); for (const [l, set] of nextLeaderIndex) leaderIndex.set(l, set);
    console.log(`[subs] users=${users.size} leaders=${leaderIndex.size} (active subs across all users)`);
  } catch (e) { console.log('[subs] err:', e.message); }
}

/** Load the persisted ledger (copy_positions) into each user's in-memory mirror on startup. */
async function loadLedger() {
  if (!POSITIONS_URL) return;
  try {
    const r = await fetch(POSITIONS_URL, { headers: { 'x-service-token': SERVICE_TOKEN }, signal: AbortSignal.timeout(12000) });
    if (!r.ok) { console.log('[ledger] http', r.status); return; }
    const d = await r.json();
    let n = 0;
    for (const p of (d.data || [])) {
      const ua = String(p.userAddress || '').toLowerCase();
      if (FOLLOWERS.size && !FOLLOWERS.has(ua)) continue;
      if (!p.tokenId) continue;
      getUser(ua).ledger.set(String(p.tokenId), p);
      n += 1;
    }
    console.log(`[ledger] loaded ${n} positions across ${users.size} users`);
  } catch (e) { console.log('[ledger] err:', e.message); }
}

/** Write-through one position record to the DB ledger (upsert by user+token). 4c/4d use this. */
// Returns true only if the position was durably persisted (so a caller reserving a
// slot before placing an order can abort when the write fails). No store configured
// (dev) β†’ treated as success so it doesn't block.
async function saveLedger(userAddress, pos) {
  if (!POSITIONS_URL) return true;
  try {
    const r = await fetch(POSITIONS_URL, {
      method: 'POST',
      headers: { 'content-type': 'application/json', 'x-service-token': SERVICE_TOKEN },
      body: JSON.stringify({ userAddress, ...pos }),
      signal: AbortSignal.timeout(12000),
    });
    if (!r.ok) { console.log('[ledger] save http', r.status, userAddress.slice(0, 12)); return false; }
    return true;
  } catch (e) { console.log('[ledger] save err:', e.message); return false; }
}

/** Refresh one user's pUSD budget (deposit-wallet balance) for "% Π±ΡŽΠ΄ΠΆΠ΅Ρ‚Π°" allocation. */
async function refreshBudget(u) {
  try { u.budget = Number(await pub.readContract({ address: PUSD, abi: erc20Abi, functionName: 'balanceOf', args: [u.dw] })) / 1e6; }
  catch (e) { console.log(`[budget] ${u.addr.slice(0, 12)} read err:`, e.message); }
}

/** Lazily build this user's CLOB executor (signs with THEIR embedded EOA, trades THEIR DW).
 *  Only in LIVE β€” needs .env.privy next to the scripts + non-geoblocked CLOB egress (Malaysia). */
async function ensureExecutor(u) {
  if (u.executor || !LIVE) return u.executor;
  if (u.execDisabled) return null;   // policy-denied earlier β€” skip silently (retries on daemon restart)
  try {
    const { makeExecutor } = await import('./copy-exec-mu.mjs');
    u.executor = await makeExecutor('./.env.privy', { dw: u.dw, eoa: u.eoa });
    console.log(`[exec] ready ${u.addr.slice(0, 14)}… dw=${u.dw.slice(0, 10)} eoa=${u.eoa.slice(0, 10)}`);
  } catch (e) {
    if (/policy violation/i.test(e.message)) { u.execDisabled = true; console.log(`[exec] DISABLED ${u.addr.slice(0, 16)}… β€” policy denied (silenced until daemon restart)`); }
    else console.log(`[exec] FAIL ${u.addr.slice(0, 14)}:`, e.message);
    u.executor = null;
  }
  return u.executor;
}

// ---- Realized PnL on close: resolution truth, not stale markPnl --------------------
// A resolved-onchain-absent close used to bank `markPnl` (often a stale 0 after redemption),
// so winning copies banked $0. Look up the market resolution (our service, cached β€” a resolved
// outcome is immutable) and bank the payout truth: won β†’ size-cost, lost β†’ -cost. If the market
// is NOT resolved (token absent because we SOLD pre-resolution), fall back to markPnl (proceeds).
const _resCache = new Map();
async function marketResolution(cond) {
  if (!cond || !RESOLUTION_URL) return null;
  if (_resCache.has(cond)) return _resCache.get(cond);
  try {
    const r = await fetch(`${RESOLUTION_URL}?cond=${encodeURIComponent(cond)}`,
      { headers: { 'x-service-token': SERVICE_TOKEN }, signal: AbortSignal.timeout(8000) });
    if (!r.ok) return null;
    const d = (await r.json())?.data;
    const out = d && d.resolved && d.winningOutcome != null ? { winningOutcome: d.winningOutcome } : null;
    _resCache.set(cond, out);   // cache resolved outcomes (immutable); transient nulls re-fetch next time
    if (!out) _resCache.delete(cond);
    return out;
  } catch { return null; }
}
async function realizedOnClose(p) {
  const res = await marketResolution(p.marketId);
  if (res) {
    const won = String(p.outcome).toLowerCase() === String(res.winningOutcome).toLowerCase();
    return won ? (Number(p.size) || 0) - (Number(p.cost) || 0) : -(Number(p.cost) || 0);
  }
  return Number(p.markPnl) || 0;   // not resolved (sold pre-resolution) β†’ last mark (proceeds)
}

// ---- Per-user reconcile (4d-1): promote/cancel pending, mirror leader-exits, bank resolved ----
const CTF = '0x4D97DCd97eC945f40cF65F87097ACe5EA0476045';   // Conditional Tokens (ERC1155) β€” held-token source of truth
const CTF_BAL_ABI = [{ name: 'balanceOf', type: 'function', stateMutability: 'view', inputs: [{ name: 'account', type: 'address' }, { name: 'id', type: 'uint256' }], outputs: [{ type: 'uint256' }] }];
const pub2 = createPublicClient({ chain: polygon, transport: http(process.env.RPC2 || 'https://polygon-bor-rpc.publicnode.com') });
const PENDING_TIMEOUT_S = Number(process.env.PENDING_TIMEOUT_S || '180');
const RECONCILE_GRACE_S = Number(process.env.RECONCILE_GRACE_S || '600');   // 10 min
const MAX_OPEN_AGE_S = Number(process.env.MAX_OPEN_AGE_S || '86400');       // 24h
const MIRROR_EXIT_LIVE = process.env.MIRROR_EXIT_LIVE === '1';              // default off = detect+log only (safe observe)
const LEADER_EXIT_CONFIRM = Number(process.env.LEADER_EXIT_CONFIRM || '2'); // cycles leader absent before trusting exit (Data API flake guard)
const EXIT_SLIPPAGE = Number(process.env.EXIT_SLIPPAGE || '0.02');

/** Reconcile ONE user's open/pending positions against their DW holdings + on-chain balance.
 *  Mirrors single-user reconcileOpenPositions 1:1 but on the in-memory ledger Map (write-through
 *  to copy_positions). Phase 1 = slow reads on a snapshot; Phase 2 = patch the CURRENT record
 *  (re-read per tid, status-guarded) so a copy/close that landed mid-read is never clobbered. */
async function reconcileUser(u) {
  try {
    const res = await fetch(`${DATA_API}/positions?user=${u.dw}`, { signal: AbortSignal.timeout(15000) });
    const held = await res.json();
    if (!Array.isArray(held)) return;
    const heldPnl = new Map(held.map((p) => [String(p.asset), Number(p.cashPnl) || 0]));        // asset β†’ live mark
    const curPrice = new Map(held.map((h) => [String(h.asset), (Number(h.size) > 0 ? Number(h.currentValue) / Number(h.size) : 0)]));
    const now = Math.floor(Date.now() / 1000);
    const tag = `[${u.addr.slice(8, 18)}]`;
    // Phase 1 (no lock): inspect a snapshot + slow on-chain / leader-holdings reads, collect changes.
    const snapshot = [...u.ledger.values()];
    const changes = new Map();   // tid β†’ patch
    const toClose = [];          // confirmed leader-exits to mirror-sell (Phase B)
    const openLeaders = [...new Set(snapshot.filter((p) => p.status === 'open' && p.leader).map((p) => String(p.leader).toLowerCase()))];
    const leaderHeld = new Map();   // leader β†’ Set(asset); absent entry = unreliable this cycle β†’ skip exit check
    await Promise.allSettled(openLeaders.map(async (ldr) => {
      try {
        const r = await fetch(`${DATA_API}/positions?user=${ldr}`, { signal: AbortSignal.timeout(15000) });
        const arr = await r.json();
        if (Array.isArray(arr) && arr.length) leaderHeld.set(ldr, new Set(arr.map((x) => String(x.asset))));
      } catch { /* leave unset β†’ don't trust absence this cycle */ }
    }));
    for (const p of snapshot) {
      const tid = String(p.tokenId);
      if (p.status === 'closing') {
        // Manual close requested by the user (server flipped status→'closing'). Decide the action
        // by on-chain truth: have shares β†’ mirror-sell at current bid; only a resting BUY β†’ cancel it;
        // nothing on-chain β†’ close flat. Retries each cycle until done (no rollback β€” user asked to exit).
        let bal;
        try { bal = await pub.readContract({ address: CTF, abi: CTF_BAL_ABI, functionName: 'balanceOf', args: [u.dw, BigInt(tid)] }); }
        catch { continue; }   // RPC error β†’ retry next cycle
        if (bal > 0n) toClose.push({ tid, price: curPrice.get(tid) || Number(p.leaderPrice) || Number(p.entryPrice) || 0, manual: true });
        else if (p.lastOrderId) changes.set(tid, { cancelManual: p.lastOrderId });
        else changes.set(tid, { manualFlat: true });
        continue;
      }
      if (p.status === 'pending') {
        // On-chain balance is truth: appeared β†’ promote; still zero past timeout β†’ cancel + free slot.
        let bal;
        try { bal = await pub.readContract({ address: CTF, abi: CTF_BAL_ABI, functionName: 'balanceOf', args: [u.dw, BigInt(tid)] }); }
        catch { continue; }   // RPC error β†’ leave pending, retry next cycle
        if (bal > 0n) changes.set(tid, { promote: true });
        // Timeout from when WE reserved the slot (wall-clock), NOT the leader's trade time.
        // openedAt carries the leader timestamp (for PnL/ordering) and can be minutes old at
        // detection (POLL/Data-API lag) β†’ using it here would cancel a freshly-posted order
        // before it can fill, freeing the slot while the CLOB order rests on β†’ orphan fill.
        else if ((now - (p.reservedAt || p.openedAt || 0)) > PENDING_TIMEOUT_S) changes.set(tid, { cancelPending: p.lastOrderId || null });
        continue;
      }
      if (p.status !== 'open') continue;
      if (heldPnl.has(tid)) {
        const ch = { markPnl: heldPnl.get(tid) };
        const lh = leaderHeld.get(String(p.leader || '').toLowerCase());
        if (lh) {                                            // reliable leader holdings this cycle
          // Fade positions hold the COMPLEMENT, which the leader never holds β€” compare against the
          // leader's ORIGINAL asset (leader_asset), else reconcile would false-exit every cycle.
          const leaderTok = p.leaderAsset ? String(p.leaderAsset) : tid;
          if (!lh.has(leaderTok)) { ch.leaderExit = true; ch.exitPrice = curPrice.get(tid) || 0; }
          else ch.leaderStillIn = true;
        }
        changes.set(tid, ch);
      } else if ((now - (p.openedAt || 0)) > RECONCILE_GRACE_S) {
        // Absent from Data API (which flakes) β†’ verify real ERC1155 balance before banking, and
        // confirm a zero on a SECOND independent RPC (drpc can serve a stale false-0).
        let onchain;
        try { onchain = await pub.readContract({ address: CTF, abi: CTF_BAL_ABI, functionName: 'balanceOf', args: [u.dw, BigInt(tid)] }); }
        catch (e) { console.log(`${tag} [reconcile] CTF read err ${tid.slice(0, 10)}:`, e.message); continue; }
        if (onchain === 0n) {
          let onchain2;
          try { onchain2 = await pub2.readContract({ address: CTF, abi: CTF_BAL_ABI, functionName: 'balanceOf', args: [u.dw, BigInt(tid)] }); }
          catch (e) { console.log(`${tag} [reconcile] CTF confirm-read err ${tid.slice(0, 10)}:`, e.message); continue; }
          if (onchain2 === 0n) changes.set(tid, { close: 'resolved-onchain-absent' });
          else console.log(`${tag} [reconcile] false-zero guarded ${tid.slice(0, 10)}: rpc1=0 rpc2=${onchain2} β†’ keep open`);
        } else if ((now - (p.openedAt || 0)) > MAX_OPEN_AGE_S) {
          changes.set(tid, { close: 'resolved-worthless-aged' });   // on-chain leftover of a resolved $0 loser
        }
      }
    }
    if (!changes.size) return;
    // Phase 2: apply each patch to the CURRENT record (status-guarded), write-through per change.
    let freed = 0;
    for (const [tid, patch] of changes) {
      const p = u.ledger.get(tid);
      if (!p) continue;
      if (patch.promote) { if (p.status === 'pending') { p.status = 'open'; await saveLedger(u.addr, p); } continue; }
      if (patch.cancelPending !== undefined) {
        if (p.status === 'pending') {
          if (patch.cancelPending) { const ex = await ensureExecutor(u); ex?.cancel(patch.cancelPending).catch(() => {}); }
          p.status = 'closed'; p.closedReason = 'pending-timeout'; p.closedAt = now; p.realizedPnl = 0;
          u.ledger.delete(tid); await saveLedger(u.addr, p);   // no DELETE endpoint β†’ mark closed; drop frees the slot
        }
        continue;
      }
      if (patch.cancelManual !== undefined) {   // manual close, only a resting BUY on-chain β†’ cancel it
        if (p.status === 'closing') {
          if (patch.cancelManual) { const ex = await ensureExecutor(u); ex?.cancel(patch.cancelManual).catch(() => {}); }
          p.status = 'closed'; p.closedReason = 'manual-cancel-pending'; p.closedAt = now; p.realizedPnl = 0;
          u.ledger.delete(tid); await saveLedger(u.addr, p);
        }
        continue;
      }
      if (patch.manualFlat) {   // manual close, nothing on-chain to sell β†’ close flat
        if (p.status === 'closing') {
          p.status = 'closed'; p.closedReason = 'manual-user'; p.closedAt = now; p.realizedPnl = Number(p.markPnl) || 0;
          u.ledger.delete(tid); await saveLedger(u.addr, p);
        }
        continue;
      }
      if (p.status !== 'open') continue;   // a concurrent handler already changed it
      if (patch.markPnl != null) p.markPnl = patch.markPnl;
      if (patch.leaderStillIn) { if (p.leaderExitSeen) p.leaderExitSeen = 0; }
      else if (patch.leaderExit) {
        p.leaderExitSeen = (p.leaderExitSeen || 0) + 1;
        if (p.leaderExitSeen >= LEADER_EXIT_CONFIRM) {
          console.log(`${tag} πŸ”» leader-exit confirmed (${p.leaderExitSeen}x) ${tid.slice(0, 10)} "${String(p.title).slice(0, 38)}" markβ‰ˆ$${(Number(p.markPnl) || 0).toFixed(2)} β€” ${MIRROR_EXIT_LIVE ? 'closing' : 'OBSERVE-ONLY'}`);
          if (MIRROR_EXIT_LIVE) toClose.push({ tid, price: patch.exitPrice });
        } else {
          console.log(`${tag} … leader-exit seen ${p.leaderExitSeen}/${LEADER_EXIT_CONFIRM} ${tid.slice(0, 10)} (awaiting confirm)`);
        }
      }
      if (patch.close) {
        p.status = 'closed'; p.closedReason = patch.close; p.closedAt = now;
        // worthless-aged = resolved $0 loser β†’ realized is full cost lost. onchain-absent = sold/redeemed β†’ last mark.
        p.realizedPnl = patch.close === 'resolved-worthless-aged' ? -(Number(p.cost) || 0) : await realizedOnClose(p);
        freed += 1;
      }
      await saveLedger(u.addr, p);
    }
    if (freed) console.log(`${tag} [reconcile] freed ${freed} slot(s); open now ${[...u.ledger.values()].filter((x) => x.status === 'open').length}`);
    // Phase B (network): mirror-sell confirmed leader-exits. Bank closed ONLY on a real fill.
    // Manual closes always execute (toClose carries them even when MIRROR_EXIT_LIVE is off);
    // leader-exits were only pushed above when MIRROR_EXIT_LIVE.
    if (toClose.length) {
      const ex = await ensureExecutor(u);
      if (ex) for (const { tid, price, manual } of toClose) {
        const sellPx = Math.max(0.01, Math.min(0.99, (Number(price) || 0) * (1 - EXIT_SLIPPAGE)));
        let resp;
        try { resp = await ex.sellAll({ tokenID: tid, price: sellPx }); }
        catch (e) { resp = { success: false, errorMsg: e?.message || String(e) }; }
        const h = u.ledger.get(tid);
        if (!h || (h.status !== 'open' && h.status !== 'closing')) continue;   // 'closing' = manual-close intent
        const what = manual ? 'manual-close' : 'mirror-close';
        const soldNow = resp?.success && (resp.status === 'matched' || Number(resp.takingAmount || 0) > 0 || Number(resp.makingAmount || 0) > 0);
        if (resp?.skipped) {
          h.status = 'closed'; h.closedReason = `${manual ? 'manual' : 'leader-exit'}-${String(resp.skipped)}`; h.closedAt = now; h.realizedPnl = Number(h.markPnl) || 0;
          await saveLedger(u.addr, h); console.log(`${tag} ${what} skipped (${resp.skipped}) β†’ closed ${tid.slice(0, 10)}`);
        } else if (soldNow) {
          h.status = 'closed'; h.closedReason = manual ? 'manual-user' : 'mirror-leader-exit'; h.closeOrderId = resp.orderID; h.closedAt = now; h.realizedPnl = Number(h.markPnl) || 0;
          await saveLedger(u.addr, h); console.log(`${tag} βœ… ${manual ? 'MANUAL' : 'MIRROR'}-CLOSED ${tid.slice(0, 10)} pnlβ‰ˆ$${(Number(h.markPnl) || 0).toFixed(2)}`);
        } else if (resp?.success && resp.orderID) {
          ex.cancel(resp.orderID).catch(() => {}); h.leaderExitSeen = 0; await saveLedger(u.addr, h);   // status stays open/closing β†’ retry next cycle
          console.log(`${tag} ⏳ ${what} not filled; retry ${tid.slice(0, 10)}`);
        } else {
          const reason = resp?.errorMsg || resp?.error || resp?.status || 'unknown';
          // A resolved market delists its token β†’ sell fails with "invalid token id"
          // (or not found). Can't sell a settled market; close as resolved (realize the
          // mark PnL) so the slot frees instead of retrying the sell forever.
          if (/invalid token id|token.*not found|market.*(resolved|closed)/i.test(String(reason))) {
            h.status = 'closed'; h.closedReason = manual ? 'manual-resolved' : 'leader-exit-resolved'; h.closedAt = now; h.realizedPnl = await realizedOnClose(h);
            await saveLedger(u.addr, h);
            console.log(`${tag} βœ… ${what} closed (resolved/untradeable) ${tid.slice(0, 10)} pnlβ‰ˆ$${(Number(h.markPnl) || 0).toFixed(2)}`);
          } else {
            h.leaderExitSeen = 0; await saveLedger(u.addr, h);   // status unchanged β†’ retry next cycle
            console.log(`${tag} ❌ ${what} rejected, retry ${tid.slice(0, 10)} β€” ${reason}`);
          }
        }
      }
    }
  } catch (e) { console.log(`[reconcile] ${u.addr.slice(0, 12)} err:`, e.message); }
}

/** Pull user-requested manual closes from the DB into the in-memory ledger. The server flips
 *  status→'closing' on the close endpoint, but the daemon only loadLedger()'d once at startup —
 *  without this, a 'closing' set after boot is never seen and the position hangs forever. Cheap:
 *  fetches only the (usually empty) closing set each cycle and flips the in-memory status. */
async function pullCloseIntents() {
  if (!POSITIONS_URL) return;
  try {
    const r = await fetch(`${POSITIONS_URL}?status=closing`, { headers: { 'x-service-token': SERVICE_TOKEN }, signal: AbortSignal.timeout(12000) });
    if (!r.ok) return;
    const d = await r.json();
    for (const p of (d.data || [])) {
      const ua = String(p.userAddress || '').toLowerCase();
      if (FOLLOWERS.size && !FOLLOWERS.has(ua)) continue;
      if (!p.tokenId) continue;
      const u = users.get(ua); if (!u) continue;
      const cur = u.ledger.get(String(p.tokenId));
      if (!cur) { u.ledger.set(String(p.tokenId), p); console.log(`[close-intent] ${ua.slice(0,12)} ${String(p.tokenId).slice(0,10)} (loaded)`); }
      else if (cur.status !== 'closing' && cur.status !== 'closed') { cur.status = 'closing'; console.log(`[close-intent] ${ua.slice(0,12)} ${String(p.tokenId).slice(0,10)} β†’ closing`); }
    }
  } catch (e) { console.log('[close-intent] err:', e.message); }
}

/** Reconcile every active user (one cycle). Per-user failures are isolated. */
async function reconcileAll() {
  await pullCloseIntents();   // pick up manual-close requests set since the last ledger load
  for (const u of users.values()) {
    if (!u.dw) continue;
    try { await reconcileUser(u); } catch (e) { console.log(`[reconcile] ${u.addr.slice(0, 12)} fatal:`, e.message); }
  }
}

// ---- Per-user PnL (4d-2): per-leader realized+unrealized, posted keyed by userAddress ----
const PNL_URL = process.env.PNL_URL || (SUBS_URL ? SUBS_URL.replace(/\/active$/, '/pnl') : '');
const PNL_POLL_MS = Number(process.env.PNL_POLL_MS || '120000');

/** One user's per-leader P&L: realized (banked from their CLOSED copies) + unrealized (live
 *  cashPnl of their currently-held copies). Posted to /api/copy/pnl keyed by userAddress so
 *  each user's "Мои копии" reads only their own snapshot. Mirrors single-user computeAndPostPnL. */
async function computeAndPostPnLForUser(u) {
  try {
    const res = await fetch(`${DATA_API}/positions?user=${u.dw}`, { signal: AbortSignal.timeout(15000) });
    const held = await res.json();
    const heldMap = new Map(Array.isArray(held) ? held.map((h) => [String(h.asset), h]) : []);
    const agg = new Map();
    const get = (ldr) => { let a = agg.get(ldr); if (!a) { a = { leader: ldr, realized: 0, unrealized: 0, value: 0, openCount: 0, closedCount: 0 }; agg.set(ldr, a); } return a; };
    for (const p of u.ledger.values()) {
      if (p.status === 'pending') continue;   // slot reserved, not a real P&L line yet
      const a = get(String(p.leader || 'unknown').toLowerCase());
      if (p.status === 'open') {
        const h = heldMap.get(String(p.tokenId));
        if (h) { a.unrealized += Number(h.cashPnl) || 0; a.value += Number(h.currentValue) || 0; a.openCount += 1; }
      } else { a.realized += Number(p.realizedPnl) || 0; a.closedCount += 1; }
    }
    const leaders = [...agg.values()].map((a) => ({
      leader: a.leader,
      realized: +a.realized.toFixed(4),
      unrealized: +a.unrealized.toFixed(4),
      pnl: +(a.realized + a.unrealized).toFixed(4),
      value: +a.value.toFixed(4),
      openCount: a.openCount,
      closedCount: a.closedCount,
    }));
    if (!PNL_URL) return;
    // credsReady = executor initialized = L2 CLOB creds cached. The client uses this to know it's
    // safe to attach the orders-only Privy policy (Q1: policy auto-attach after creds exist β€”
    // attaching before creds are cached would DENY the ClobAuth derivation and break copying).
    const r = await fetch(PNL_URL, { method: 'POST', headers: { 'content-type': 'application/json', 'x-service-token': SERVICE_TOKEN }, body: JSON.stringify({ userAddress: u.addr, leaders, credsReady: !!u.executor }), signal: AbortSignal.timeout(12000) });
    if (!r.ok) console.log(`[pnl] ${u.addr.slice(0, 12)} post http`, r.status);
  } catch (e) { console.log(`[pnl] ${u.addr.slice(0, 12)} err:`, e.message); }
}

/** Compute+post P&L for every active user (one cycle). Per-user failures isolated. */
async function computeAndPostPnLAll() {
  for (const u of users.values()) {
    if (!u.dw) continue;
    try { await computeAndPostPnLForUser(u); } catch (e) { console.log(`[pnl] ${u.addr.slice(0, 12)} fatal:`, e.message); }
  }
}

// ---- Detection + per-user copy decision (4c: BUY/SELL fan-out; 4d: reconcile+pnl above) ----
const RTDS_WS = 'wss://ws-live-data.polymarket.com';
const COPY_FRACTION = Number(process.env.COPY_FRACTION || '0.1');
const MIN_SHARES = Number(process.env.MIN_SHARES || '5');
const LEADER_KILL_USD = Number(process.env.LEADER_KILL_USD || '8');
const STATE_FILE = process.env.STATE || './copy-mu-state.json';
const POLL_MS = Number(process.env.POLL_MS || '15000');

const isActive = (p) => !!p && (p.status === 'open' || p.status === 'pending');
const toSec = (ts) => { const n = Number(ts) || 0; return n > 2e10 ? Math.floor(n / 1000) : n; };
const tradeKey = (t) => `${t.transactionHash || ''}:${t.asset || ''}:${t.side || ''}:${t.timestamp || ''}:${t.size || ''}`;

/** leader trade β†’ the order WE'd place for this user (their cfg + their budget). */
function decideCopy(t, cfg, budget) {
  const leaderSize = Number(t.size || 0);
  const price = Number(t.price || 0);
  if (!t.asset || !leaderSize || !price) return null;
  let target;
  if (cfg.mode === 'fixed') target = cfg.allocValue;
  else if (cfg.mode === 'percent') target = (cfg.allocValue / 100) * budget;
  else target = leaderSize * price * COPY_FRACTION;
  let size = Math.round((target || 0) / price);
  if (size < MIN_SHARES) size = MIN_SHARES;
  if (size * price < MIN_NOTIONAL) size = Math.ceil(MIN_NOTIONAL / price);
  if (size * price > cfg.maxNotional) {
    size = Math.floor(cfg.maxNotional / price);
    if (size < MIN_SHARES || size * price < MIN_NOTIONAL) return null;
  }
  return { tokenID: t.asset, side: (t.side || '').toUpperCase(), price, size, notional: +(size * price).toFixed(2), outcome: t.outcome, title: t.title };
}

/** Decide (and in 4c-2: execute) a copy of leader-trade `t` for one user `u`. Cap checks run
 *  against the user's OWN in-memory ledger. 4c-1 logs the decision only (no reserve/execute). */
async function handleTradeForUser(t, u, cfg) {
  const leader = String(t.proxyWallet || '').toLowerCase();
  const side = String(t.side || '').toUpperCase();
  const tag = `[${u.addr.slice(8, 18)}]`;
  const ledger = [...u.ledger.values()];
  if (side === 'BUY') {
    if (cfg.exitOnly) return;   // paused/archived leader: mirror exits only, never open new
    const px = Number(t.price || 0);
    if (px < cfg.priceMin || px > cfg.priceMax) { console.log(`${tag} skip (price ${px} outside band ${cfg.priceMin}-${cfg.priceMax})`); return; }
    // FADE: invert to the complementary outcome before all sizing/caps run on the faded trade.
    let leaderAsset = null;
    if (cfg.direction === 'fade') {
      if (px < cfg.fadeMin || px > cfg.fadeMax) { console.log(`${tag} skip fade (leader px ${px} outside band ${cfg.fadeMin}-${cfg.fadeMax})`); return; }
      const oc = String(t.outcome || '').toLowerCase();
      if (oc !== 'yes' && oc !== 'no') { console.log(`${tag} skip fade (non-binary outcome "${t.outcome}")`); return; }
      // Guard the only interleavable window (the resolveComplement await): two near-simultaneous
      // fade trades on the same market would otherwise both pass the not-yet-held check and
      // double-open the complement. Hold a per-user in-flight flag keyed by conditionId across the
      // await; the rest of the path to the ledger reserve runs synchronously (no further yield).
      const fcond = String(t.conditionId || '');
      if (u.fadeInflight.has(fcond)) { console.log(`${tag} ⏭️ skip fade (resolution in-flight ${fcond.slice(0, 10)})`); return; }
      let comp;
      u.fadeInflight.add(fcond);
      try { comp = await resolveComplement(t.conditionId, t.asset); }
      finally { u.fadeInflight.delete(fcond); }
      if (!comp) { console.log(`${tag} skip fade (complement unresolved cond=${String(t.conditionId).slice(0, 10)})`); return; }
      leaderAsset = String(t.asset);
      t = { ...t, asset: comp, price: +(1 - px).toFixed(4), outcome: oc === 'yes' ? 'No' : 'Yes' };
    }
    const order = decideCopy(t, cfg, u.budget);
    if (!order) { console.log(`${tag} skip (incomplete/over-cap)`); return; }
    const label = `${order.side} ${order.size} "${order.outcome}" @ ${order.price.toFixed(3)} ($${order.notional}) | ${String(order.title).slice(0, 45)}`;
    const evt = t.eventSlug || t.slug || '';
    const leaderRealized = ledger.filter((p) => p.status === 'closed' && String(p.leader || '').toLowerCase() === leader).reduce((s, p) => s + (Number(p.realizedPnl) || 0), 0);
    if (LEADER_KILL_USD > 0 && leaderRealized <= -LEADER_KILL_USD) { console.log(`${tag} β›” skip (leader killed: realized $${leaderRealized.toFixed(2)}): ${label}`); return; }
    if (isActive(u.ledger.get(order.tokenID))) { console.log(`${tag} ⏭️ skip (already holding/pending): ${label}`); return; }
    if (evt) {
      const sameEvt = ledger.filter((p) => isActive(p) && p.leader === leader && p.eventSlug === evt).length;
      if (sameEvt >= cfg.maxPerEvent) { console.log(`${tag} ⏭️ skip (leader ${sameEvt} on event, max ${cfg.maxPerEvent}): ${label}`); return; }
    }
    const leaderOpen = ledger.filter((p) => isActive(p) && p.leader === leader).length;
    if (leaderOpen >= cfg.maxOpenPerLeader) { console.log(`${tag} ⏭️ skip (leader ${leaderOpen}, max per-leader ${cfg.maxOpenPerLeader}): ${label}`); return; }
    // Global total-open cap removed β€” positions-per-whale (above) + deposit balance are the limits now.
    // chunk6: slippage cap β€” our effective buy price (px * (1+BUY_SLIPPAGE)) must not exceed the configured priceMax band.
    if (cfg.priceMax < 1 && order.price * (1 + BUY_SLIPPAGE) > cfg.priceMax) { console.log(`${tag} ⏭️ skip (slip-adj ${(order.price * (1 + BUY_SLIPPAGE)).toFixed(3)} > priceMax ${cfg.priceMax}): ${label}`); return; }
    if (!LIVE) { console.log(`${tag} 🟒 WOULD COPY: ${label}  tokenID=${String(order.tokenID).slice(0, 14)}… budget=$${u.budget.toFixed(2)}`); return; }
    // Reserve the slot in THIS user's ledger as 'pending' (write-through to DB) BEFORE the slow
    // CLOB POST β€” caps count pending, so a concurrent fan-out of the same token can't double-open.
    // cost = slippage-adjusted upper bound (executor buys at price*(1+BUY_SLIPPAGE), capped 0.99),
    // NOT the leader-priced notional β€” else realizedOnClose (size-cost) OVERSTATES wins by the slip.
    // reservedAt = our wall-clock reserve time, drives the pending-timeout in reconcile (see above).
    const effCost = +(order.size * Math.min(0.99, order.price * (1 + BUY_SLIPPAGE))).toFixed(2);
    const nowSec = Math.floor(Date.now() / 1000);
    const rec = { tokenId: order.tokenID, leader, marketId: t.conditionId, outcome: t.outcome, title: t.title, eventSlug: evt, side: 'BUY', size: order.size, leaderPrice: order.price, cost: effCost, openedAt: toSec(t.timestamp) || nowSec, reservedAt: nowSec, buys: 0, status: 'pending', direction: cfg.direction, leaderAsset: leaderAsset || order.tokenID };
    u.ledger.set(order.tokenID, rec);
    // The durable reserve MUST land before we place an order. Otherwise a crash
    // leaves a live order with no 'pending' record, and a restart re-detects the
    // leader trade and copies it again (double buy). Failed write β†’ abort, no order.
    if (!(await saveLedger(u.addr, rec))) {
      u.ledger.delete(order.tokenID);
      console.log(`${tag} ❌ reserve write failed β€” aborting copy, no order placed: ${label}`);
      return;
    }
    const exec = await ensureExecutor(u);
    if (!exec) { rec.status = 'closed'; rec.closedReason = 'no-executor'; u.ledger.delete(order.tokenID); await saveLedger(u.addr, rec); console.log(`${tag} ❌ no executor, slot released: ${label}`); return; }
    // Network call OUTSIDE any lock; finalize against the in-memory ledger (single-threaded fan-out).
    let resp;
    try { resp = await exec.buy(order); }
    catch (e) { resp = { success: false, errorMsg: e?.message || String(e) }; }
    const p = u.ledger.get(order.tokenID);
    if (!p || p.status !== 'pending') return;   // reconcile/another handler already resolved it
    const filledNow = resp?.success && (resp.status === 'matched' || Number(resp.takingAmount || 0) > 0 || Number(resp.makingAmount || 0) > 0);
    if (filledNow) {
      p.status = 'open'; p.buys = 1; p.lastOrderId = resp.orderID; p.openedAt = toSec(t.timestamp) || p.openedAt;
      await saveLedger(u.addr, p);
      console.log(`${tag} βœ… COPIED: ${label}  orderID=${resp.orderID} status=${resp.status}`);
    } else if (resp?.success && resp.orderID) {
      p.lastOrderId = resp.orderID; await saveLedger(u.addr, p);
      console.log(`${tag} βŒ› posted, awaiting fill (status ${resp.status}); reconcile will confirm/cancel: ${label}`);
    } else {
      p.status = 'closed'; p.closedReason = 'rejected'; p.realizedPnl = 0;
      u.ledger.delete(order.tokenID);
      await saveLedger(u.addr, p);   // no DELETE endpoint β†’ mark closed; in-memory drop frees the slot now
      console.log(`${tag} ❌ copy rejected, slot released: ${label} β€” ${resp?.errorMsg || resp?.error || resp?.status || 'unknown'}\n     ${JSON.stringify(resp).slice(0, 160)}`);
    }
    return;
  }
  if (side === 'SELL') {
    // COPY: we hold the same token the leader sells. FADE: we hold the COMPLEMENT (keyed by our
    // tokenId, leaderAsset === the token the leader sells) β†’ find the fade position by leaderAsset.
    let held = u.ledger.get(t.asset);
    if (!held) {
      held = [...u.ledger.values()].find((p) => p.direction === 'fade' && p.status === 'open'
        && String(p.leaderAsset) === String(t.asset) && String(p.leader).toLowerCase() === leader);
    }
    if (!held || held.status !== 'open') return;                 // not held (pending/closed/absent) β†’ ignore
    if (String(held.leader).toLowerCase() !== leader) return;    // only the entry leader's exit mirrors a close
    const ourTok = String(held.tokenId);                         // the token WE actually hold & close
    const closePx = held.direction === 'fade' ? +(1 - Number(t.price)).toFixed(4) : Number(t.price);
    const label = `SELL "${t.outcome}" @ ${Number(t.price).toFixed(3)}${held.direction === 'fade' ? ' (fade→close)' : ''} | ${String(t.title).slice(0, 45)}`;
    if (!LIVE) { console.log(`${tag} πŸ”» WOULD CLOSE (entry leader exited): ${label}  tokenID=${ourTok.slice(0, 14)}…`); return; }
    const exec = await ensureExecutor(u);
    if (!exec) { console.log(`${tag} ⚠️ no executor, cannot close (still holding): ${label}`); return; }
    let resp;
    try { resp = await exec.sellAll({ tokenID: ourTok, price: closePx }); }
    catch (e) { resp = { success: false, errorMsg: e?.message || String(e) }; }
    const p = u.ledger.get(ourTok);
    if (!p || p.status !== 'open') return;                       // already resolved by another handler
    if (resp?.skipped) {
      p.status = 'closed'; p.closedReason = String(resp.skipped); p.closedAt = toSec(t.timestamp); p.realizedPnl = Number(p.markPnl) || 0;
      await saveLedger(u.addr, p);
      console.log(`${tag} close skipped (${resp.skipped}) β†’ marked closed: ${label}`); return;
    }
    const soldNow = resp?.success && (resp.status === 'matched' || Number(resp.takingAmount || 0) > 0 || Number(resp.makingAmount || 0) > 0);
    if (soldNow) {
      p.status = 'closed'; p.closeOrderId = resp.orderID; p.closedAt = toSec(t.timestamp); p.realizedPnl = Number(p.markPnl) || 0;
      await saveLedger(u.addr, p);
      console.log(`${tag} βœ… CLOSED: ${label}  orderID=${resp.orderID} status=${resp.status}`);
    } else if (resp?.success && resp.orderID) {
      // Posted but didn't fill (rested) β†’ we STILL HOLD. Cancel the resting close, keep open so the
      // next leader-sell / reconcile retries β€” do NOT falsely bank it closed.
      exec.cancel(resp.orderID).then(() => console.log(`${tag} ↩️ cancelled unfilled close ${resp.orderID}`)).catch(() => {});
      console.log(`${tag} ⏳ close not filled (status ${resp.status}); keeping open, cancelled resting: ${label}`);
    } else {
      // A resolved/delisted market can't be sold (token delisted) β†’ the sell rejects forever and the
      // slot stays frozen. Mirror reconcile Phase B: close as resolved (realize mark PnL) to free the
      // slot instead of re-hitting sell on every leader-exit. Otherwise keep open + log the reason.
      const reason = resp?.errorMsg || resp?.error || resp?.status || 'unknown';
      if (/invalid token id|token.*not found|market.*(resolved|closed)/i.test(String(reason))) {
        p.status = 'closed'; p.closedReason = 'leader-exit-resolved'; p.closedAt = toSec(t.timestamp); p.realizedPnl = await realizedOnClose(p);
        await saveLedger(u.addr, p);
        console.log(`${tag} βœ… closed (resolved/untradeable) ${String(t.asset).slice(0, 10)} pnlβ‰ˆ$${(Number(p.markPnl) || 0).toFixed(2)}`);
      } else {
        console.log(`${tag} ❌ close rejected (still holding): ${label} β€” ${reason}`);
      }
    }
  }
}

/** Fan one detected leader trade out to every user subscribed to that leader. */
async function fanout(t) {
  const leader = String(t.proxyWallet || '').toLowerCase();
  const subscribers = leaderIndex.get(leader);
  if (!subscribers || !subscribers.size) return;
  for (const ua of subscribers) {
    const u = users.get(ua);
    const cfg = u?.subs.get(leader);
    if (!u || !cfg) continue;
    try { await handleTradeForUser(t, u, cfg); }
    catch (e) { console.log(`[fanout] ${ua.slice(0, 12)} err:`, e.message); }   // one user's failure never blocks others
  }
}

// --- WebSocket detection (default): RTDS firehose, filter by any active leader ---
async function runWebsocket() {
  const { default: WebSocket } = await import('ws');
  // Rehydrate dedup across restarts so a reconnect can't re-copy trades already
  // processed (RTDS may redeliver a recent window on resubscribe). Persisted to the
  // same STATE_FILE the POLL path uses.
  const st0 = loadState();
  const seen = new Set(Array.isArray(st0.seen) ? st0.seen : []);
  const remember = (k) => { seen.add(k); if (seen.size > 20000) seen.delete(seen.values().next().value); };
  const persistSeen = () => { try { const s = loadState(); s.seen = Array.from(seen).slice(-5000); saveState(s); } catch (e) { console.log('[ws] persist err:', e.message); } };
  let backoff = 1000;
  const connect = () => {
    const ws = new WebSocket(RTDS_WS);
    let ping;
    ws.on('open', () => {
      backoff = 1000;
      ws.send(JSON.stringify({ action: 'subscribe', subscriptions: [{ topic: 'activity', type: 'trades' }] }));
      ping = setInterval(() => { try { ws.send('PING'); } catch {} }, 5000);
      console.log(`[ws] connected; watching ${leaderIndex.size} leader(s)`);
    });
    ws.on('message', async (d) => {
      const s = d.toString();
      if (s[0] !== '{' && s[0] !== '[') return;
      let msg; try { msg = JSON.parse(s); } catch { return; }
      for (const m of (Array.isArray(msg) ? msg : [msg])) {
        const t = m.payload || m;
        if (!t || !t.proxyWallet || !t.asset) continue;
        if (!leaderIndex.has(String(t.proxyWallet).toLowerCase())) continue;
        const k = tradeKey(t);
        if (seen.has(k)) continue;
        remember(k);
        persistSeen();   // durably mark BEFORE acting β†’ a crash/restart won't re-copy
        await fanout(t);
      }
    });
    ws.on('error', (e) => console.log('[ws] error:', e.message));
    ws.on('close', () => { clearInterval(ping); console.log(`[ws] closed; reconnect in ${backoff}ms`); setTimeout(connect, backoff); backoff = Math.min(backoff * 2, 30000); });
  };
  connect();
}

// --- Polling detection (POLL=1): Data API /trades?user= per active leader, per-leader cursors ---
const loadState = () => { try { return JSON.parse(fs.readFileSync(STATE_FILE, 'utf8')); } catch { return {}; } };
// Atomic write: a crash mid-writeFileSync left a truncated state file, which loadState
// then silently read as {} β†’ re-prime β†’ re-copy. temp+rename is atomic on POSIX.
const saveState = (s) => { const tmp = STATE_FILE + '.tmp'; fs.writeFileSync(tmp, JSON.stringify(s)); fs.renameSync(tmp, STATE_FILE); };

// Re-entrancy guard: tick() does awaited network I/O; if a run exceeds POLL_MS the
// interval would start a second concurrent tick, and both load→save state, clobbering
// each other's cursor (lost or replayed trades).
let tickRunning = false;
async function tick() {
  if (tickRunning) { console.log('[poll] skip (previous tick still running)'); return; }
  tickRunning = true;
  try { await tickInner(); } finally { tickRunning = false; }
}
async function tickInner() {
  const leaders = [...leaderIndex.keys()];
  if (!leaders.length) { console.log('[poll] no active leaders'); return; }
  const state = loadState();
  const seen = new Set(Array.isArray(state.seen) ? state.seen : []);
  if (!state.lastTs || typeof state.lastTs !== 'object') state.lastTs = {};
  if (!state.primed || typeof state.primed !== 'object') state.primed = {};
  const results = await Promise.allSettled(leaders.map(async (leader) => {
    const r = await fetch(`${DATA_API}/trades?user=${leader}&limit=50`, { signal: AbortSignal.timeout(15000) });
    if (!r.ok) throw new Error(`${leader.slice(0, 8)}… data-api ${r.status}`);
    return { leader, trades: await r.json() };
  }));
  const toCopy = [];
  for (const res of results) {
    if (res.status !== 'fulfilled') { console.log('[poll] err:', res.reason?.message); continue; }
    const { leader, trades } = res.value;
    if (!Array.isArray(trades)) continue;
    if (!state.primed[leader]) {
      for (const t of trades) seen.add(tradeKey(t));
      state.lastTs[leader] = trades.reduce((m, t) => Math.max(m, Number(t.timestamp || 0)), 0);
      state.primed[leader] = true;
      console.log(`[prime] ${leader.slice(0, 8)}… baseline (${trades.length} trades)`);
      continue;
    }
    for (const t of trades) {
      if (seen.has(tradeKey(t))) continue;
      if (Number(t.timestamp || 0) < (state.lastTs[leader] || 0)) continue;
      seen.add(tradeKey(t));
      state.lastTs[leader] = Math.max(state.lastTs[leader] || 0, Number(t.timestamp || 0));
      toCopy.push(t);
    }
  }
  toCopy.sort((a, b) => Number(a.timestamp) - Number(b.timestamp));
  // Persist the dedup cursor (seen + lastTs) BEFORE fanning out. If we crash mid-
  // fanout, a restart must NOT re-detect & re-copy these trades. The durable ledger
  // catches users already filled; persisting first closes the remaining replay window.
  state.seen = Array.from(seen).slice(-3000);
  saveState(state);
  for (const t of toCopy) await fanout(t);
  console.log(`[poll] ${leaders.length} leader(s), ${toCopy.length} new trade(s)`);
}

async function setManualStatus(id, status, extra = {}) {
  try {
    const base = MANUAL_URL.replace(/\/manual-orders$/, '/manual-order');
    await fetch(`${base}/${id}/status`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json', 'x-service-token': SERVICE_TOKEN },
      body: JSON.stringify({ status, ...extra }),
      signal: AbortSignal.timeout(12000),
    });
  } catch (e) { console.log('[manual] status', id, status, 'err', e.message); }
}

// Manual BUY intents = a "copy without a leader". Same executor.buy() custody path,
// same ledger lifecycle (reserve -> buy -> reconcile). Bypasses copy-budget/maxOpen
// (explicit user action) but still needs pUSD on the deposit wallet.
async function discardPosition(user, token) {
  if (!POSITIONS_URL) return;
  try {
    await fetch(`${POSITIONS_URL}/${token}/discard`, {
      method: 'POST',
      headers: { 'content-type': 'application/json', 'x-service-token': SERVICE_TOKEN },
      body: JSON.stringify({ userAddress: user }),
      signal: AbortSignal.timeout(12000),
    });
  } catch (e) { console.log('[manual] discard err', e.message); }
}

async function executeManualOrders() {
  if (!MANUAL_URL || manualBusy) return;
  manualBusy = true;
  try {
    let list = [];
    try {
      const r = await fetch(MANUAL_URL, { headers: { 'x-service-token': SERVICE_TOKEN }, signal: AbortSignal.timeout(12000) });
      if (!r.ok) { console.log('[manual] http', r.status); return; }
      list = (await r.json()).data || [];
    } catch (e) { console.log('[manual] poll err', e.message); return; }
    for (const m of list) {
      const tag = `[manual ${String(m.userAddress).slice(0, 12)} #${m.id}]`;
      if (!m.depositWallet || !m.ownerEoa) { await setManualStatus(m.id, 'failed', { error: 'no deposit wallet/eoa' }); console.log(`${tag} βœ— no wallet`); continue; }
      await setManualStatus(m.id, 'executing');
      const u = getUser(String(m.userAddress).toLowerCase());
      u.dw = u.dw || m.depositWallet;
      u.eoa = u.eoa || m.ownerEoa;
      if (isActive(u.ledger.get(m.tokenId))) { await setManualStatus(m.id, 'failed', { error: 'already holding/pending' }); console.log(`${tag} ⏭️ already active`); continue; }
      if (!LIVE) { console.log(`${tag} 🟒 WOULD BUY $${m.amountUsd} @≀${m.limitPrice}`); await setManualStatus(m.id, 'failed', { error: 'daemon not LIVE' }); continue; }
      const exec = await ensureExecutor(u);
      if (!exec) { await setManualStatus(m.id, 'failed', { error: 'no executor' }); console.log(`${tag} βœ— no executor`); continue; }
      const effPrice = Math.min(0.99, Number(m.limitPrice) * (1 + BUY_SLIPPAGE));
      const size = Math.max(1, Math.floor(Number(m.amountUsd) / effPrice));
      const order = { tokenID: m.tokenId, price: Number(m.limitPrice), size };
      const nowSec = Math.floor(Date.now() / 1000);
      const rec = { tokenId: m.tokenId, leader: 'manual', marketId: m.marketId, outcome: m.outcome || null, title: null, eventSlug: '', side: 'BUY', size, leaderPrice: Number(m.limitPrice), cost: +(size * effPrice).toFixed(2), openedAt: nowSec, reservedAt: nowSec, buys: 0, status: 'pending', direction: 'manual', leaderAsset: m.tokenId };
      u.ledger.set(m.tokenId, rec);
      if (!(await saveLedger(u.addr, rec))) { u.ledger.delete(m.tokenId); await setManualStatus(m.id, 'failed', { error: 'reserve write failed' }); console.log(`${tag} βœ— reserve failed`); continue; }
      let resp;
      try { resp = await exec.buy(order); }
      catch (e) { resp = { success: false, errorMsg: e?.message || String(e) }; }
      const p = u.ledger.get(m.tokenId);
      if (!p || p.status !== 'pending') { await setManualStatus(m.id, 'done', { order_id: resp?.orderID || '' }); continue; }
      const filledNow = resp?.success && (resp.status === 'matched' || Number(resp.takingAmount || 0) > 0 || Number(resp.makingAmount || 0) > 0);
      if (filledNow) {
        p.status = 'open'; p.buys = 1; p.lastOrderId = resp.orderID; await saveLedger(u.addr, p);
        await setManualStatus(m.id, 'done', { order_id: resp.orderID || '' });
        console.log(`${tag} βœ… MANUAL BUY ${size}sh @≀${m.limitPrice} order=${String(resp.orderID || '').slice(0, 12)}`);
      } else if (resp?.success && resp.orderID) {
        p.lastOrderId = resp.orderID; await saveLedger(u.addr, p);
        await setManualStatus(m.id, 'posted', { order_id: resp.orderID });
        console.log(`${tag} βŒ› posted, awaiting fill; reconcile will confirm/cancel`);
      } else {
        u.ledger.delete(m.tokenId); await discardPosition(u.addr, m.tokenId);
        await setManualStatus(m.id, 'failed', { error: String(resp?.errorMsg || resp?.error || resp?.status || 'rejected').slice(0, 200) });
        console.log(`${tag} ❌ rejected: ${resp?.errorMsg || resp?.error || resp?.status}`);
      }
    }
  } finally { manualBusy = false; }
}

async function main() {
  await loadLedger();
  await pollSubs();
  const selftest = process.argv.includes('--selftest');
  console.log(`[mu] mode=${LIVE ? 'LIVE' : 'DRY'} detect=${POLL ? 'POLL' : 'WS'} followers=${FOLLOWERS.size || 'ALL'}`);
  for (const [ua, u] of users) {
    await refreshBudget(u);
    if (LIVE && !selftest) await ensureExecutor(u);
    console.log(`  user ${ua.slice(0, 20)}… dw=${u.dw?.slice(0, 10)} subs=${u.subs.size} ledger=${u.ledger.size} budget=$${u.budget.toFixed(2)} exec=${u.executor ? 'ok' : '-'}`);
  }
  if (selftest) { console.log('[mu] selftest done β€” context + budgets resolved, detection + execution wired. exiting.'); return; }
  await reconcileAll();
  await computeAndPostPnLAll();
  if (process.env.ONCE !== '1') {
    setInterval(() => { users.forEach(refreshBudget); }, 60000);
    setInterval(pollSubs, SUBS_POLL_MS);
    setInterval(reconcileAll, Number(process.env.RECONCILE_MS || '120000'));
    setInterval(computeAndPostPnLAll, PNL_POLL_MS);
    setInterval(executeManualOrders, MANUAL_POLL_MS);
  }
  console.log(`[mu] ${LIVE ? 'LIVE (executes copies)' : 'DRY (decision-only)'} | mode=${POLL ? 'POLL' : 'WS'} | leaders=${leaderIndex.size}`);
  if (POLL) { await tick(); if (process.env.ONCE !== '1') setInterval(tick, POLL_MS); }
  else { await runWebsocket(); }
}

main();