"""
OFManager — Order-Flow trade lifecycle.
=======================================
gate → maker entry (chase) → bracket (TP/SL) → time-stop.
DRY_RUN short-circuits every exchange call: it logs the intended plan, registers
a simulated position (assumed filled at signal price) and resolves it on the
time-stop only (no live price feed in dry mode). Real fills / TP / SL touch are
validated on testnet (Chunk 4).
Taker entry is never used — if the maker chase can't fill in CHASE_MAX_SEC the
signal is skipped (the imbalance is gone, and a taker fill would eat the edge).
"""
import logging
import time
from src.config import (
DRY_RUN, OF_HOLD_MAX_MIN, OF_CAPITAL_USD, OF_LEVERAGE,
OF_TP_PCT, OF_SL_PCT,
CHASE_MAX_SEC, CHASE_REPRICE_SEC, CHASE_POLL_SEC,
)
logger = logging.getLogger("of_manager")
class OFManager:
def __init__(self, exchange, risk):
self.ex = exchange # Exchange instance, or None in dry mode
self.risk = risk
self.positions = {} # symbol -> position dict
self.events = [] # drained by main loop (TG + journal)
def _emit(self, **ev):
self.events.append(ev)
def drain_events(self):
evs, self.events = self.events, []
return evs
# ---------------------------------------------------------------
def open(self, sig):
symbol = sig["symbol"]
if symbol in self.positions:
logger.info(f"Skip #{sig['id']} {symbol}: already in a position")
self._emit(type="skip", symbol=symbol, id=sig["id"], reason="already_in_position")
return
direction = sig["direction"]
ticket = sig.get("ticket", {})
entry = sig["entry_price"]
tp = ticket.get("tp")
sl = ticket.get("sl")
# Size from OUR config (not the screener's $250 ticket) — config is the
# single source of truth for position size.
notional = OF_CAPITAL_USD * OF_LEVERAGE
qty = notional / entry if entry else ticket.get("qty")
ok, reason = self.risk.can_open(len(self.positions), notional)
if not ok:
logger.info(f"Skip #{sig['id']} {symbol}: risk gate → {reason}")
self._emit(type="skip", symbol=symbol, id=sig["id"], reason=reason)
return
deadline = time.time() + OF_HOLD_MAX_MIN * 60
if DRY_RUN or self.ex is None:
logger.info(
f"[DRY would-open] #{sig['id']} {direction} {symbol} maker@~{entry} "
f"qty={qty} → TP={tp} SL={sl} hold≤{OF_HOLD_MAX_MIN}m (${notional:.0f})"
)
self.positions[symbol] = {
"id": sig["id"], "direction": direction, "entry": entry,
"tp": tp, "sl": sl, "qty": qty, "deadline": deadline, "dry": True,
}
self._emit(type="open", symbol=symbol, id=sig["id"], direction=direction,
entry=entry, tp=tp, sl=sl, qty=qty, notional=notional, dry=True)
return
# ---- LIVE ----
entry_side = "BUY" if direction == "LONG" else "SELL"
exit_side = "SELL" if direction == "LONG" else "BUY"
si = self.ex.get_symbol_info(symbol)
if not si:
logger.warning(f"No symbol info for {symbol}, skip")
return
qty = self.ex.round_qty(si, qty)
min_notional = si.get("min_notional") or 5
if qty <= 0 or qty * entry < min_notional:
logger.info(f"Skip {symbol}: qty {qty} below min notional ${min_notional} (size too small)")
self._emit(type="skip", symbol=symbol, id=sig["id"], reason="below_min_notional")
return
self.ex.set_leverage(symbol)
self.ex.set_margin_type(symbol)
filled = self._chase_entry(symbol, entry_side, qty, si, direction)
if not filled:
logger.info(f"Maker not filled in {CHASE_MAX_SEC}s for {symbol} — signal skipped")
self._emit(type="skip", symbol=symbol, id=sig["id"], reason="maker_no_fill")
return
fill_price, filled_qty = filled
# TP/SL anchored to OUR actual fill (not the screener's signal entry) so
# the bracket distance equals the configured pct regardless of chase slip.
if direction == "LONG":
tp = fill_price * (1 + OF_TP_PCT / 100)
sl = fill_price * (1 - OF_SL_PCT / 100)
else:
tp = fill_price * (1 - OF_TP_PCT / 100)
sl = fill_price * (1 + OF_SL_PCT / 100)
tp = self.ex.round_price(si, tp) # clean values for orders + notifications
sl = self.ex.round_price(si, sl)
# Bracket — closePosition stops protect the whole position. If EITHER leg
# fails to register (None OR any exception), never hold a naked position.
try:
tp_o = self.ex.place_tp_market(symbol, exit_side, tp, si)
sl_o = self.ex.place_sl_market(symbol, exit_side, sl, si)
bracket_ok = bool(self.ex._order_id(tp_o)) and bool(self.ex._order_id(sl_o))
except Exception as e:
logger.error(f"Bracket placement raised for {symbol}: {e}")
bracket_ok = False
if not bracket_ok:
logger.error(f"Bracket failed for {symbol} — closing naked position")
self.ex.cancel_all_orders(symbol)
self.ex.close_position_market(symbol, exit_side, filled_qty, si)
self._emit(type="skip", symbol=symbol, id=sig["id"], reason="bracket_failed_closed")
return
logger.info(f"OPENED {direction} {symbol} @ {fill_price} qty={filled_qty}, bracket set")
self.positions[symbol] = {
"id": sig["id"], "direction": direction, "entry": fill_price,
"tp": tp, "sl": sl, "qty": filled_qty, "exit_side": exit_side,
"deadline": deadline, "si": si, "dry": False, "opened_ts": time.time(),
}
self._emit(type="open", symbol=symbol, id=sig["id"], direction=direction,
entry=fill_price, tp=tp, sl=sl, qty=filled_qty, notional=notional, dry=False)
# ---------------------------------------------------------------
def _chase_entry(self, symbol, side, qty, si, direction):
"""Post-only chase: join best price, reprice if unfilled, give up after CHASE_MAX_SEC."""
start = time.time()
while time.time() - start < CHASE_MAX_SEC:
bid, ask = self.ex.get_best_bid_ask(symbol)
if bid is None:
time.sleep(CHASE_POLL_SEC)
continue
price = bid if direction == "LONG" else ask # join the maker side
order = self.ex.place_limit_maker(symbol, side, qty, price, si)
if not order:
time.sleep(CHASE_POLL_SEC) # post-only rejected (would take) → retry
continue
oid = order["orderId"]
waited = 0.0
while waited < CHASE_REPRICE_SEC:
st = self.ex.get_order_status(symbol, oid)
if st and st.get("status") == "FILLED":
return float(st["avgPrice"]), float(st["executedQty"])
time.sleep(CHASE_POLL_SEC)
waited += CHASE_POLL_SEC
# unfilled in this window → cancel & reprice; salvage any partial
self.ex.cancel_order(symbol, oid)
st = self.ex.get_order_status(symbol, oid)
exec_qty = float(st.get("executedQty", 0)) if st else 0.0
if exec_qty > 0:
avg = float(st.get("avgPrice", price))
logger.info(f"Partial fill salvaged {symbol}: {exec_qty} @ {avg}")
return avg, exec_qty
return None
# ---------------------------------------------------------------
def tick(self):
"""Resolve open positions (time-stop; live TP/SL handled exchange-side)."""
now = time.time()
# Fetch live positions ONCE per tick (not per symbol) to spare the rate limit.
live_syms = None
if any(not p.get("dry") for p in self.positions.values()) and self.ex is not None:
live_syms = {p["symbol"] for p in self.ex.get_positions() if float(p["positionAmt"]) != 0}
for symbol in list(self.positions.keys()):
pos = self.positions[symbol]
if pos.get("dry"):
if now >= pos["deadline"]:
logger.info(f"[DRY would-close] {symbol} #{pos['id']} (time-stop {OF_HOLD_MAX_MIN}m)")
self.risk.record_close(0.0)
self._emit(type="close", symbol=symbol, id=pos["id"], reason="time_stop",
pnl=0.0, dry=True)
del self.positions[symbol]
continue
# LIVE: did the exchange-side bracket already close it?
still_open = symbol in (live_syms or set())
if not still_open:
self.ex.cancel_all_orders(symbol) # clear the unfilled bracket leg
pnl = self._resolve_realized(symbol, pos)
# which leg fired? TP closes in profit, SL in loss (pnl sign).
reason = "TP" if pnl > 0 else "SL" if pnl < 0 else "bracket"
logger.info(f"CLOSED {symbol} #{pos['id']} ({reason}, pnl={pnl:+.3f})")
self._emit(type="close", symbol=symbol, id=pos["id"], reason=reason,
pnl=pnl, dry=False)
del self.positions[symbol]
continue
if now >= pos["deadline"]:
logger.info(f"Time-stop {symbol} #{pos['id']} — market close")
self.ex.close_position_market(symbol, pos["exit_side"], pos["qty"], pos["si"])
self.ex.cancel_all_orders(symbol)
pnl = self._resolve_realized(symbol, pos)
self._emit(type="close", symbol=symbol, id=pos["id"], reason="time_stop",
pnl=pnl, dry=False)
del self.positions[symbol]
def _resolve_realized(self, symbol, pos):
"""Net realized PnL (USD) for THIS position for the daily-loss gate (live only).
Only counts fills since the position opened (not stale re-trades), net of fees."""
pnl = 0.0
try:
since_ms = int(pos.get("opened_ts", 0) * 1000)
trades = self.ex.client.futures_account_trades(symbol=symbol, limit=50)
for t in trades:
if int(t.get("time", 0)) < since_ms:
continue
pnl += float(t.get("realizedPnl", 0)) - float(t.get("commission", 0))
except Exception as e:
logger.debug(f"Realized PnL fetch failed {symbol}: {e}")
self.risk.record_close(pnl)
return pnl
def close_all(self, reason="shutdown"):
for symbol in list(self.positions.keys()):
pos = self.positions[symbol]
if not pos.get("dry") and self.ex is not None:
self.ex.close_position_market(symbol, pos["exit_side"], pos["qty"], pos["si"])
self.ex.cancel_all_orders(symbol)
logger.info(f"Closed {symbol} ({reason})")
del self.positions[symbol]
📜 Git History
12371c2feat(of-trader): show real close reason (TP/SL/time-stop) instead of 'bracket'4 weeks ago
80435dffix(of-trader): round TP/SL to tick before order+notification (clean display)4 weeks ago
2d5387ffix(of-trader): 6 bug fixes from live review4 weeks ago