← Back
"""
Step 1: Fetch and cache 30-day kline data for all liquid symbols.
Saves to data_cache_30d.json so we don't re-download each sweep.

Usage:
  cd /home/app/trading-bot-bybit
  python3 backtests/step1_fetch_data.py
"""

import json
import time
import numpy as np
from datetime import datetime
from pybit.unified_trading import HTTP

TIMEFRAME = "5"
DAYS = 30
VWAP_PERIOD = 50
MIN_VOLUME_24H = 20_000_000
MAX_SYMBOLS = 60
BLACKLIST = {"BTCUSDT", "ETHUSDT", "TRXUSDT", "USDCUSDT", "BTCPERP", "ETHPERP"}

session = HTTP(testnet=False)


def get_symbols():
    resp = session.get_tickers(category="linear")
    if resp["retCode"] != 0:
        return []
    symbols = []
    for t in resp["result"]["list"]:
        sym = t["symbol"]
        if not sym.endswith("USDT") or sym in BLACKLIST:
            continue
        vol = float(t.get("turnover24h", 0))
        if vol >= MIN_VOLUME_24H:
            symbols.append({"symbol": sym, "volume_24h": vol})
    symbols.sort(key=lambda x: x["volume_24h"], reverse=True)
    return symbols[:MAX_SYMBOLS]


def fetch_klines(symbol, interval, days):
    all_klines = []
    bars_needed = days * 24 * 60 // int(interval)
    end_time = int(datetime.now().timestamp() * 1000)
    while len(all_klines) < bars_needed:
        try:
            resp = session.get_kline(category="linear", symbol=symbol,
                                     interval=interval, limit=1000, end=end_time)
            if resp["retCode"] != 0:
                break
            items = resp["result"]["list"]
            if not items:
                break
            for item in items:
                all_klines.append([
                    int(item[0]), float(item[1]), float(item[2]),
                    float(item[3]), float(item[4]), float(item[5]),
                ])
            end_time = int(items[-1][0]) - 1
            if len(items) < 1000:
                break
            time.sleep(0.05)
        except Exception as e:
            print(f"    ERR: {e}")
            time.sleep(1)
    all_klines.reverse()
    # Deduplicate
    seen = set()
    unique = []
    for k in all_klines:
        if k[0] not in seen:
            seen.add(k[0])
            unique.append(k)
    return unique[-bars_needed:] if len(unique) > bars_needed else unique


def calc_indicators(closes, highs, lows, volumes):
    n = len(closes)

    # Z-VWAP
    z_scores = np.zeros(n)
    for i in range(VWAP_PERIOD, n):
        h = highs[i-VWAP_PERIOD:i]
        l = lows[i-VWAP_PERIOD:i]
        c = closes[i-VWAP_PERIOD:i]
        v = volumes[i-VWAP_PERIOD:i]
        tp = (h + l + c) / 3
        ctv = np.cumsum(tp * v)
        cv = np.cumsum(v)
        cv_safe = np.where(cv == 0, 1, cv)
        vwap_arr = ctv / cv_safe
        vwap = vwap_arr[-1]
        dev = c - vwap_arr
        std = np.std(dev)
        if std > 0:
            z_scores[i] = (closes[i] - vwap) / std

    # NATR (14)
    natr = np.zeros(n)
    for i in range(14, n):
        trs = []
        for j in range(i-13, i+1):
            tr = max(highs[j] - lows[j],
                     abs(highs[j] - closes[j-1]),
                     abs(lows[j] - closes[j-1]))
            trs.append(tr)
        atr = np.mean(trs)
        natr[i] = (atr / closes[i]) * 100 if closes[i] > 0 else 0

    # CHOP (14)
    chop = np.full(n, 50.0)
    for i in range(14, n):
        atr_sum = 0
        for j in range(i-13, i+1):
            tr = max(highs[j] - lows[j],
                     abs(highs[j] - closes[j-1]),
                     abs(lows[j] - closes[j-1]))
            atr_sum += tr
        hi = np.max(highs[i-13:i+1])
        lo = np.min(lows[i-13:i+1])
        rng = hi - lo
        if rng > 0:
            chop[i] = 100 * np.log10(atr_sum / rng) / np.log10(14)

    return z_scores.tolist(), natr.tolist(), chop.tolist()


def main():
    print("=" * 60)
    print("  STEP 1: Fetch & Cache 30-day Data")
    print("=" * 60)

    symbols_data = get_symbols()
    print(f"\n  {len(symbols_data)} symbols with vol >= ${MIN_VOLUME_24H/1e6:.0f}M")

    cache = {"fetched_at": datetime.now().isoformat(), "days": DAYS, "timeframe": TIMEFRAME, "symbols": {}}

    for idx, sd in enumerate(symbols_data):
        sym = sd["symbol"]
        print(f"  [{idx+1}/{len(symbols_data)}] {sym} (${sd['volume_24h']/1e6:.0f}M)...", end=" ", flush=True)
        time.sleep(0.15)

        klines = fetch_klines(sym, TIMEFRAME, DAYS)
        if len(klines) < VWAP_PERIOD + 100:
            print(f"skip ({len(klines)} bars)")
            continue

        closes = np.array([k[4] for k in klines])
        highs = np.array([k[2] for k in klines])
        lows = np.array([k[3] for k in klines])
        volumes = np.array([k[5] for k in klines])

        z, natr, chop = calc_indicators(closes, highs, lows, volumes)

        cache["symbols"][sym] = {
            "volume_24h": sd["volume_24h"],
            "bars": len(klines),
            "closes": closes.tolist(),
            "highs": highs.tolist(),
            "lows": lows.tolist(),
            "z": z,
            "natr": natr,
            "chop": chop,
        }
        print(f"OK ({len(klines)} bars)")

    out = "/home/app/trading-bot-bybit/backtests/data_cache_30d.json"
    with open(out, "w") as f:
        json.dump(cache, f)

    total_bars = sum(s["bars"] for s in cache["symbols"].values())
    print(f"\n  Cached {len(cache['symbols'])} symbols, {total_bars:,} total bars")
    print(f"  Saved to {out} ({os.path.getsize(out) / 1e6:.1f} MB)")


import os
if __name__ == "__main__":
    main()

📜 Git History

dd32dfdchore: initial commit — version control setup5 weeks ago
Show last diff
Loading...