← Back
"""
OFManager — Order-Flow trade lifecycle.
=======================================
gate → maker entry (chase) → bracket (TP/SL) → time-stop.

DRY_RUN short-circuits every exchange call: it logs the intended plan, registers
a simulated position (assumed filled at signal price) and resolves it on the
time-stop only (no live price feed in dry mode). Real fills / TP / SL touch are
validated on testnet (Chunk 4).

Taker entry is never used — if the maker chase can't fill in CHASE_MAX_SEC the
signal is skipped (the imbalance is gone, and a taker fill would eat the edge).
"""

import logging
import time

from src.config import (
    DRY_RUN, OF_HOLD_MAX_MIN, OF_CAPITAL_USD, OF_LEVERAGE,
    OF_TP_PCT, OF_SL_PCT,
    CHASE_MAX_SEC, CHASE_REPRICE_SEC, CHASE_POLL_SEC,
)

logger = logging.getLogger("of_manager")


class OFManager:
    def __init__(self, exchange, risk):
        self.ex = exchange          # Exchange instance, or None in dry mode
        self.risk = risk
        self.positions = {}         # symbol -> position dict
        self.events = []            # drained by main loop (TG + journal)

    def _emit(self, **ev):
        self.events.append(ev)

    def drain_events(self):
        evs, self.events = self.events, []
        return evs

    # ---------------------------------------------------------------
    def open(self, sig):
        symbol = sig["symbol"]
        if symbol in self.positions:
            logger.info(f"Skip #{sig['id']} {symbol}: already in a position")
            self._emit(type="skip", symbol=symbol, id=sig["id"], reason="already_in_position")
            return

        direction = sig["direction"]
        ticket = sig.get("ticket", {})
        entry = sig["entry_price"]
        tp = ticket.get("tp")
        sl = ticket.get("sl")
        # Size from OUR config (not the screener's $250 ticket) — config is the
        # single source of truth for position size.
        notional = OF_CAPITAL_USD * OF_LEVERAGE
        qty = notional / entry if entry else ticket.get("qty")

        ok, reason = self.risk.can_open(len(self.positions), notional)
        if not ok:
            logger.info(f"Skip #{sig['id']} {symbol}: risk gate → {reason}")
            self._emit(type="skip", symbol=symbol, id=sig["id"], reason=reason)
            return

        deadline = time.time() + OF_HOLD_MAX_MIN * 60

        if DRY_RUN or self.ex is None:
            logger.info(
                f"[DRY would-open] #{sig['id']} {direction} {symbol} maker@~{entry} "
                f"qty={qty} → TP={tp} SL={sl} hold≤{OF_HOLD_MAX_MIN}m (${notional:.0f})"
            )
            self.positions[symbol] = {
                "id": sig["id"], "direction": direction, "entry": entry,
                "tp": tp, "sl": sl, "qty": qty, "deadline": deadline, "dry": True,
            }
            self._emit(type="open", symbol=symbol, id=sig["id"], direction=direction,
                       entry=entry, tp=tp, sl=sl, qty=qty, notional=notional, dry=True)
            return

        # ---- LIVE ----
        entry_side = "BUY" if direction == "LONG" else "SELL"
        exit_side = "SELL" if direction == "LONG" else "BUY"
        si = self.ex.get_symbol_info(symbol)
        if not si:
            logger.warning(f"No symbol info for {symbol}, skip")
            return
        qty = self.ex.round_qty(si, qty)
        min_notional = si.get("min_notional") or 5
        if qty <= 0 or qty * entry < min_notional:
            logger.info(f"Skip {symbol}: qty {qty} below min notional ${min_notional} (size too small)")
            self._emit(type="skip", symbol=symbol, id=sig["id"], reason="below_min_notional")
            return
        self.ex.set_leverage(symbol)
        self.ex.set_margin_type(symbol)

        filled = self._chase_entry(symbol, entry_side, qty, si, direction)
        if not filled:
            logger.info(f"Maker not filled in {CHASE_MAX_SEC}s for {symbol} — signal skipped")
            self._emit(type="skip", symbol=symbol, id=sig["id"], reason="maker_no_fill")
            return
        fill_price, filled_qty = filled

        # TP/SL anchored to OUR actual fill (not the screener's signal entry) so
        # the bracket distance equals the configured pct regardless of chase slip.
        if direction == "LONG":
            tp = fill_price * (1 + OF_TP_PCT / 100)
            sl = fill_price * (1 - OF_SL_PCT / 100)
        else:
            tp = fill_price * (1 - OF_TP_PCT / 100)
            sl = fill_price * (1 + OF_SL_PCT / 100)
        tp = self.ex.round_price(si, tp)   # clean values for orders + notifications
        sl = self.ex.round_price(si, sl)

        # Bracket — closePosition stops protect the whole position. If EITHER leg
        # fails to register (None OR any exception), never hold a naked position.
        try:
            tp_o = self.ex.place_tp_market(symbol, exit_side, tp, si)
            sl_o = self.ex.place_sl_market(symbol, exit_side, sl, si)
            bracket_ok = bool(self.ex._order_id(tp_o)) and bool(self.ex._order_id(sl_o))
        except Exception as e:
            logger.error(f"Bracket placement raised for {symbol}: {e}")
            bracket_ok = False
        if not bracket_ok:
            logger.error(f"Bracket failed for {symbol} — closing naked position")
            self.ex.cancel_all_orders(symbol)
            self.ex.close_position_market(symbol, exit_side, filled_qty, si)
            self._emit(type="skip", symbol=symbol, id=sig["id"], reason="bracket_failed_closed")
            return
        logger.info(f"OPENED {direction} {symbol} @ {fill_price} qty={filled_qty}, bracket set")
        self.positions[symbol] = {
            "id": sig["id"], "direction": direction, "entry": fill_price,
            "tp": tp, "sl": sl, "qty": filled_qty, "exit_side": exit_side,
            "deadline": deadline, "si": si, "dry": False, "opened_ts": time.time(),
        }
        self._emit(type="open", symbol=symbol, id=sig["id"], direction=direction,
                   entry=fill_price, tp=tp, sl=sl, qty=filled_qty, notional=notional, dry=False)

    # ---------------------------------------------------------------
    def _chase_entry(self, symbol, side, qty, si, direction):
        """Post-only chase: join best price, reprice if unfilled, give up after CHASE_MAX_SEC."""
        start = time.time()
        while time.time() - start < CHASE_MAX_SEC:
            bid, ask = self.ex.get_best_bid_ask(symbol)
            if bid is None:
                time.sleep(CHASE_POLL_SEC)
                continue
            price = bid if direction == "LONG" else ask   # join the maker side
            order = self.ex.place_limit_maker(symbol, side, qty, price, si)
            if not order:
                time.sleep(CHASE_POLL_SEC)   # post-only rejected (would take) → retry
                continue
            oid = order["orderId"]

            waited = 0.0
            while waited < CHASE_REPRICE_SEC:
                st = self.ex.get_order_status(symbol, oid)
                if st and st.get("status") == "FILLED":
                    return float(st["avgPrice"]), float(st["executedQty"])
                time.sleep(CHASE_POLL_SEC)
                waited += CHASE_POLL_SEC

            # unfilled in this window → cancel & reprice; salvage any partial
            self.ex.cancel_order(symbol, oid)
            st = self.ex.get_order_status(symbol, oid)
            exec_qty = float(st.get("executedQty", 0)) if st else 0.0
            if exec_qty > 0:
                avg = float(st.get("avgPrice", price))
                logger.info(f"Partial fill salvaged {symbol}: {exec_qty} @ {avg}")
                return avg, exec_qty
        return None

    # ---------------------------------------------------------------
    def tick(self):
        """Resolve open positions (time-stop; live TP/SL handled exchange-side)."""
        now = time.time()
        # Fetch live positions ONCE per tick (not per symbol) to spare the rate limit.
        live_syms = None
        if any(not p.get("dry") for p in self.positions.values()) and self.ex is not None:
            live_syms = {p["symbol"] for p in self.ex.get_positions() if float(p["positionAmt"]) != 0}

        for symbol in list(self.positions.keys()):
            pos = self.positions[symbol]

            if pos.get("dry"):
                if now >= pos["deadline"]:
                    logger.info(f"[DRY would-close] {symbol} #{pos['id']} (time-stop {OF_HOLD_MAX_MIN}m)")
                    self.risk.record_close(0.0)
                    self._emit(type="close", symbol=symbol, id=pos["id"], reason="time_stop",
                               pnl=0.0, dry=True)
                    del self.positions[symbol]
                continue

            # LIVE: did the exchange-side bracket already close it?
            still_open = symbol in (live_syms or set())
            if not still_open:
                self.ex.cancel_all_orders(symbol)   # clear the unfilled bracket leg
                pnl = self._resolve_realized(symbol, pos)
                # which leg fired? TP closes in profit, SL in loss (pnl sign).
                reason = "TP" if pnl > 0 else "SL" if pnl < 0 else "bracket"
                logger.info(f"CLOSED {symbol} #{pos['id']} ({reason}, pnl={pnl:+.3f})")
                self._emit(type="close", symbol=symbol, id=pos["id"], reason=reason,
                           pnl=pnl, dry=False)
                del self.positions[symbol]
                continue

            if now >= pos["deadline"]:
                logger.info(f"Time-stop {symbol} #{pos['id']} — market close")
                self.ex.close_position_market(symbol, pos["exit_side"], pos["qty"], pos["si"])
                self.ex.cancel_all_orders(symbol)
                pnl = self._resolve_realized(symbol, pos)
                self._emit(type="close", symbol=symbol, id=pos["id"], reason="time_stop",
                           pnl=pnl, dry=False)
                del self.positions[symbol]

    def _resolve_realized(self, symbol, pos):
        """Net realized PnL (USD) for THIS position for the daily-loss gate (live only).
        Only counts fills since the position opened (not stale re-trades), net of fees."""
        pnl = 0.0
        try:
            since_ms = int(pos.get("opened_ts", 0) * 1000)
            trades = self.ex.client.futures_account_trades(symbol=symbol, limit=50)
            for t in trades:
                if int(t.get("time", 0)) < since_ms:
                    continue
                pnl += float(t.get("realizedPnl", 0)) - float(t.get("commission", 0))
        except Exception as e:
            logger.debug(f"Realized PnL fetch failed {symbol}: {e}")
        self.risk.record_close(pnl)
        return pnl

    def close_all(self, reason="shutdown"):
        for symbol in list(self.positions.keys()):
            pos = self.positions[symbol]
            if not pos.get("dry") and self.ex is not None:
                self.ex.close_position_market(symbol, pos["exit_side"], pos["qty"], pos["si"])
                self.ex.cancel_all_orders(symbol)
            logger.info(f"Closed {symbol} ({reason})")
            del self.positions[symbol]

📜 Git History

12371c2feat(of-trader): show real close reason (TP/SL/time-stop) instead of 'bracket'4 weeks ago
80435dffix(of-trader): round TP/SL to tick before order+notification (clean display)4 weeks ago
2d5387ffix(of-trader): 6 bug fixes from live review4 weeks ago
495965ffix(of-trader): robust bracket — closePosition + algoId + naked-position guard4 weeks ago
92496f2feat(of-trader): size from config not screener ticket + min-notional guard4 weeks ago
793f4d9feat(of-trader): telegram notifications + control (chunk 3a)4 weeks ago
c2258e0feat(of-trader): maker chase + bracket + time-stop lifecycle (chunk 2b)4 weeks ago
Show last diff
Loading...