"""
SignalReader — reads new Order-Flow Imbalance signals from the screener DB.
============================================================================
Read-only access to the Futures Screener's signal_log. Tracks the last seen
signal id in a file so a restart never replays history. On first run it seeds
last_seen to the current MAX(id) → the bot only acts on signals emitted AFTER
it started (we are not back-trading old rows).
"""
import json
import logging
import os
import sqlite3
from pathlib import Path
from src.config import (
SCREENER_DB_PATH, OF_SIGNAL_TYPE, LAST_SEEN_FILE, DATA_DIR,
OF_MIN_CONFIDENCE, OF_MIN_IMBALANCE,
)
logger = logging.getLogger("signal_reader")
class SignalReader:
def __init__(self):
self.db_path = SCREENER_DB_PATH
self.last_seen_id = None
# ---- connection (read-only, never locks the screener's writer) ----
def _connect(self):
uri = f"file:{self.db_path}?mode=ro"
conn = sqlite3.connect(uri, uri=True, timeout=5)
conn.row_factory = sqlite3.Row
return conn
def _max_id(self, conn):
row = conn.execute(
"SELECT MAX(id) AS m FROM signal_log WHERE type = ?", (OF_SIGNAL_TYPE,)
).fetchone()
return row["m"] if row and row["m"] is not None else 0
# ---- last-seen persistence ----
def _load_last_seen(self):
try:
return json.loads(Path(LAST_SEEN_FILE).read_text()).get("last_seen_id")
except Exception:
return None
def _save_last_seen(self, sid):
Path(DATA_DIR).mkdir(parents=True, exist_ok=True)
Path(LAST_SEEN_FILE).write_text(json.dumps({"last_seen_id": sid}))
def initialize(self):
"""Seed last_seen: resume from file if present, else current MAX(id)."""
persisted = self._load_last_seen()
if persisted is not None:
self.last_seen_id = persisted
logger.info(f"Resuming from persisted last_seen_id={self.last_seen_id}")
return
with self._connect() as conn:
self.last_seen_id = self._max_id(conn)
self._save_last_seen(self.last_seen_id)
logger.info(f"Seeded last_seen_id={self.last_seen_id} (only NEW signals from here)")
# ---- parse one row into a trade intent ----
@staticmethod
def _parse(row):
try:
meta = json.loads(row["metadata"]) if row["metadata"] else {}
except Exception:
meta = {}
ticket = meta.get("ticket", {})
return {
"id": row["id"],
"symbol": row["symbol"],
"direction": row["direction"], # LONG / SHORT
"entry_price": row["entry_price"],
"confidence": row["confidence"],
"imbalance": abs(float(meta.get("imbalance", 0))),
"bid_vol": meta.get("bidVol"),
"ask_vol": meta.get("askVol"),
"ticket": ticket, # tp/sl/qty/notional/leverage...
"created_at": row["created_at"],
}
# ---- poll for new signals ----
def poll(self):
"""Return list of new trade intents (id > last_seen), oldest first."""
if self.last_seen_id is None:
self.initialize()
try:
with self._connect() as conn:
rows = conn.execute(
"SELECT id, symbol, direction, entry_price, confidence, metadata, created_at "
"FROM signal_log WHERE type = ? AND id > ? ORDER BY id ASC",
(OF_SIGNAL_TYPE, self.last_seen_id),
).fetchall()
except sqlite3.Error as e:
logger.error(f"DB read error: {e}")
return []
out = []
for row in rows:
self.last_seen_id = row["id"]
sig = self._parse(row)
# optional extra filters
if sig["confidence"] is not None and sig["confidence"] < OF_MIN_CONFIDENCE:
continue
if sig["imbalance"] < OF_MIN_IMBALANCE:
continue
out.append(sig)
if rows:
self._save_last_seen(self.last_seen_id)
return out
📜 Git History
4480f03feat(of-trader): signal reader + dry-run loop4 weeks ago