← Back
"""
SignalReader — reads new Order-Flow Imbalance signals from the screener DB.
============================================================================
Read-only access to the Futures Screener's signal_log. Tracks the last seen
signal id in a file so a restart never replays history. On first run it seeds
last_seen to the current MAX(id) → the bot only acts on signals emitted AFTER
it started (we are not back-trading old rows).
"""

import json
import logging
import os
import sqlite3
from pathlib import Path

from src.config import (
    SCREENER_DB_PATH, OF_SIGNAL_TYPE, LAST_SEEN_FILE, DATA_DIR,
    OF_MIN_CONFIDENCE, OF_MIN_IMBALANCE,
)

logger = logging.getLogger("signal_reader")


class SignalReader:
    def __init__(self):
        self.db_path = SCREENER_DB_PATH
        self.last_seen_id = None

    # ---- connection (read-only, never locks the screener's writer) ----
    def _connect(self):
        uri = f"file:{self.db_path}?mode=ro"
        conn = sqlite3.connect(uri, uri=True, timeout=5)
        conn.row_factory = sqlite3.Row
        return conn

    def _max_id(self, conn):
        row = conn.execute(
            "SELECT MAX(id) AS m FROM signal_log WHERE type = ?", (OF_SIGNAL_TYPE,)
        ).fetchone()
        return row["m"] if row and row["m"] is not None else 0

    # ---- last-seen persistence ----
    def _load_last_seen(self):
        try:
            return json.loads(Path(LAST_SEEN_FILE).read_text()).get("last_seen_id")
        except Exception:
            return None

    def _save_last_seen(self, sid):
        Path(DATA_DIR).mkdir(parents=True, exist_ok=True)
        Path(LAST_SEEN_FILE).write_text(json.dumps({"last_seen_id": sid}))

    def initialize(self):
        """Seed last_seen: resume from file if present, else current MAX(id)."""
        persisted = self._load_last_seen()
        if persisted is not None:
            self.last_seen_id = persisted
            logger.info(f"Resuming from persisted last_seen_id={self.last_seen_id}")
            return
        with self._connect() as conn:
            self.last_seen_id = self._max_id(conn)
        self._save_last_seen(self.last_seen_id)
        logger.info(f"Seeded last_seen_id={self.last_seen_id} (only NEW signals from here)")

    # ---- parse one row into a trade intent ----
    @staticmethod
    def _parse(row):
        try:
            meta = json.loads(row["metadata"]) if row["metadata"] else {}
        except Exception:
            meta = {}
        ticket = meta.get("ticket", {})
        return {
            "id": row["id"],
            "symbol": row["symbol"],
            "direction": row["direction"],          # LONG / SHORT
            "entry_price": row["entry_price"],
            "confidence": row["confidence"],
            "imbalance": abs(float(meta.get("imbalance", 0))),
            "bid_vol": meta.get("bidVol"),
            "ask_vol": meta.get("askVol"),
            "ticket": ticket,                        # tp/sl/qty/notional/leverage...
            "created_at": row["created_at"],
        }

    # ---- poll for new signals ----
    def poll(self):
        """Return list of new trade intents (id > last_seen), oldest first."""
        if self.last_seen_id is None:
            self.initialize()
        try:
            with self._connect() as conn:
                rows = conn.execute(
                    "SELECT id, symbol, direction, entry_price, confidence, metadata, created_at "
                    "FROM signal_log WHERE type = ? AND id > ? ORDER BY id ASC",
                    (OF_SIGNAL_TYPE, self.last_seen_id),
                ).fetchall()
        except sqlite3.Error as e:
            logger.error(f"DB read error: {e}")
            return []

        out = []
        for row in rows:
            self.last_seen_id = row["id"]
            sig = self._parse(row)
            # optional extra filters
            if sig["confidence"] is not None and sig["confidence"] < OF_MIN_CONFIDENCE:
                continue
            if sig["imbalance"] < OF_MIN_IMBALANCE:
                continue
            out.append(sig)

        if rows:
            self._save_last_seen(self.last_seen_id)
        return out

📜 Git History

4480f03feat(of-trader): signal reader + dry-run loop4 weeks ago
Show last diff
Loading...