← Назад"""
Step 1: Fetch and cache 30-day kline data for all liquid symbols.
Saves to data_cache_30d.json so we don't re-download each sweep.
Usage:
cd /home/app/trading-bot-bybit
python3 backtests/step1_fetch_data.py
"""
import json
import time
import numpy as np
from datetime import datetime
from pybit.unified_trading import HTTP
TIMEFRAME = "5"
DAYS = 30
VWAP_PERIOD = 50
MIN_VOLUME_24H = 20_000_000
MAX_SYMBOLS = 60
BLACKLIST = {"BTCUSDT", "ETHUSDT", "TRXUSDT", "USDCUSDT", "BTCPERP", "ETHPERP"}
session = HTTP(testnet=False)
def get_symbols():
resp = session.get_tickers(category="linear")
if resp["retCode"] != 0:
return []
symbols = []
for t in resp["result"]["list"]:
sym = t["symbol"]
if not sym.endswith("USDT") or sym in BLACKLIST:
continue
vol = float(t.get("turnover24h", 0))
if vol >= MIN_VOLUME_24H:
symbols.append({"symbol": sym, "volume_24h": vol})
symbols.sort(key=lambda x: x["volume_24h"], reverse=True)
return symbols[:MAX_SYMBOLS]
def fetch_klines(symbol, interval, days):
all_klines = []
bars_needed = days * 24 * 60 // int(interval)
end_time = int(datetime.now().timestamp() * 1000)
while len(all_klines) < bars_needed:
try:
resp = session.get_kline(category="linear", symbol=symbol,
interval=interval, limit=1000, end=end_time)
if resp["retCode"] != 0:
break
items = resp["result"]["list"]
if not items:
break
for item in items:
all_klines.append([
int(item[0]), float(item[1]), float(item[2]),
float(item[3]), float(item[4]), float(item[5]),
])
end_time = int(items[-1][0]) - 1
if len(items) < 1000:
break
time.sleep(0.05)
except Exception as e:
print(f" ERR: {e}")
time.sleep(1)
all_klines.reverse()
# Deduplicate
seen = set()
unique = []
for k in all_klines:
if k[0] not in seen:
seen.add(k[0])
unique.append(k)
return unique[-bars_needed:] if len(unique) > bars_needed else unique
def calc_indicators(closes, highs, lows, volumes):
n = len(closes)
# Z-VWAP
z_scores = np.zeros(n)
for i in range(VWAP_PERIOD, n):
h = highs[i-VWAP_PERIOD:i]
l = lows[i-VWAP_PERIOD:i]
c = closes[i-VWAP_PERIOD:i]
v = volumes[i-VWAP_PERIOD:i]
tp = (h + l + c) / 3
ctv = np.cumsum(tp * v)
cv = np.cumsum(v)
cv_safe = np.where(cv == 0, 1, cv)
vwap_arr = ctv / cv_safe
vwap = vwap_arr[-1]
dev = c - vwap_arr
std = np.std(dev)
if std > 0:
z_scores[i] = (closes[i] - vwap) / std
# NATR (14)
natr = np.zeros(n)
for i in range(14, n):
trs = []
for j in range(i-13, i+1):
tr = max(highs[j] - lows[j],
abs(highs[j] - closes[j-1]),
abs(lows[j] - closes[j-1]))
trs.append(tr)
atr = np.mean(trs)
natr[i] = (atr / closes[i]) * 100 if closes[i] > 0 else 0
# CHOP (14)
chop = np.full(n, 50.0)
for i in range(14, n):
atr_sum = 0
for j in range(i-13, i+1):
tr = max(highs[j] - lows[j],
abs(highs[j] - closes[j-1]),
abs(lows[j] - closes[j-1]))
atr_sum += tr
hi = np.max(highs[i-13:i+1])
lo = np.min(lows[i-13:i+1])
rng = hi - lo
if rng > 0:
chop[i] = 100 * np.log10(atr_sum / rng) / np.log10(14)
return z_scores.tolist(), natr.tolist(), chop.tolist()
def main():
print("=" * 60)
print(" STEP 1: Fetch & Cache 30-day Data")
print("=" * 60)
symbols_data = get_symbols()
print(f"\n {len(symbols_data)} symbols with vol >= ${MIN_VOLUME_24H/1e6:.0f}M")
cache = {"fetched_at": datetime.now().isoformat(), "days": DAYS, "timeframe": TIMEFRAME, "symbols": {}}
for idx, sd in enumerate(symbols_data):
sym = sd["symbol"]
print(f" [{idx+1}/{len(symbols_data)}] {sym} (${sd['volume_24h']/1e6:.0f}M)...", end=" ", flush=True)
time.sleep(0.15)
klines = fetch_klines(sym, TIMEFRAME, DAYS)
if len(klines) < VWAP_PERIOD + 100:
print(f"skip ({len(klines)} bars)")
continue
closes = np.array([k[4] for k in klines])
highs = np.array([k[2] for k in klines])
lows = np.array([k[3] for k in klines])
volumes = np.array([k[5] for k in klines])
z, natr, chop = calc_indicators(closes, highs, lows, volumes)
cache["symbols"][sym] = {
"volume_24h": sd["volume_24h"],
"bars": len(klines),
"closes": closes.tolist(),
"highs": highs.tolist(),
"lows": lows.tolist(),
"z": z,
"natr": natr,
"chop": chop,
}
print(f"OK ({len(klines)} bars)")
out = "/home/app/trading-bot-bybit/backtests/data_cache_30d.json"
with open(out, "w") as f:
json.dump(cache, f)
total_bars = sum(s["bars"] for s in cache["symbols"].values())
print(f"\n Cached {len(cache['symbols'])} symbols, {total_bars:,} total bars")
print(f" Saved to {out} ({os.path.getsize(out) / 1e6:.1f} MB)")
import os
if __name__ == "__main__":
main()