← Назад
""" Step 1: Fetch and cache 30-day kline data for all liquid symbols. Saves to data_cache_30d.json so we don't re-download each sweep. Usage: cd /home/app/trading-bot-bybit python3 backtests/step1_fetch_data.py """ import json import time import numpy as np from datetime import datetime from pybit.unified_trading import HTTP TIMEFRAME = "5" DAYS = 30 VWAP_PERIOD = 50 MIN_VOLUME_24H = 20_000_000 MAX_SYMBOLS = 60 BLACKLIST = {"BTCUSDT", "ETHUSDT", "TRXUSDT", "USDCUSDT", "BTCPERP", "ETHPERP"} session = HTTP(testnet=False) def get_symbols(): resp = session.get_tickers(category="linear") if resp["retCode"] != 0: return [] symbols = [] for t in resp["result"]["list"]: sym = t["symbol"] if not sym.endswith("USDT") or sym in BLACKLIST: continue vol = float(t.get("turnover24h", 0)) if vol >= MIN_VOLUME_24H: symbols.append({"symbol": sym, "volume_24h": vol}) symbols.sort(key=lambda x: x["volume_24h"], reverse=True) return symbols[:MAX_SYMBOLS] def fetch_klines(symbol, interval, days): all_klines = [] bars_needed = days * 24 * 60 // int(interval) end_time = int(datetime.now().timestamp() * 1000) while len(all_klines) < bars_needed: try: resp = session.get_kline(category="linear", symbol=symbol, interval=interval, limit=1000, end=end_time) if resp["retCode"] != 0: break items = resp["result"]["list"] if not items: break for item in items: all_klines.append([ int(item[0]), float(item[1]), float(item[2]), float(item[3]), float(item[4]), float(item[5]), ]) end_time = int(items[-1][0]) - 1 if len(items) < 1000: break time.sleep(0.05) except Exception as e: print(f" ERR: {e}") time.sleep(1) all_klines.reverse() # Deduplicate seen = set() unique = [] for k in all_klines: if k[0] not in seen: seen.add(k[0]) unique.append(k) return unique[-bars_needed:] if len(unique) > bars_needed else unique def calc_indicators(closes, highs, lows, volumes): n = len(closes) # Z-VWAP z_scores = np.zeros(n) for i in range(VWAP_PERIOD, n): h = highs[i-VWAP_PERIOD:i] l = lows[i-VWAP_PERIOD:i] c = closes[i-VWAP_PERIOD:i] v = volumes[i-VWAP_PERIOD:i] tp = (h + l + c) / 3 ctv = np.cumsum(tp * v) cv = np.cumsum(v) cv_safe = np.where(cv == 0, 1, cv) vwap_arr = ctv / cv_safe vwap = vwap_arr[-1] dev = c - vwap_arr std = np.std(dev) if std > 0: z_scores[i] = (closes[i] - vwap) / std # NATR (14) natr = np.zeros(n) for i in range(14, n): trs = [] for j in range(i-13, i+1): tr = max(highs[j] - lows[j], abs(highs[j] - closes[j-1]), abs(lows[j] - closes[j-1])) trs.append(tr) atr = np.mean(trs) natr[i] = (atr / closes[i]) * 100 if closes[i] > 0 else 0 # CHOP (14) chop = np.full(n, 50.0) for i in range(14, n): atr_sum = 0 for j in range(i-13, i+1): tr = max(highs[j] - lows[j], abs(highs[j] - closes[j-1]), abs(lows[j] - closes[j-1])) atr_sum += tr hi = np.max(highs[i-13:i+1]) lo = np.min(lows[i-13:i+1]) rng = hi - lo if rng > 0: chop[i] = 100 * np.log10(atr_sum / rng) / np.log10(14) return z_scores.tolist(), natr.tolist(), chop.tolist() def main(): print("=" * 60) print(" STEP 1: Fetch & Cache 30-day Data") print("=" * 60) symbols_data = get_symbols() print(f"\n {len(symbols_data)} symbols with vol >= ${MIN_VOLUME_24H/1e6:.0f}M") cache = {"fetched_at": datetime.now().isoformat(), "days": DAYS, "timeframe": TIMEFRAME, "symbols": {}} for idx, sd in enumerate(symbols_data): sym = sd["symbol"] print(f" [{idx+1}/{len(symbols_data)}] {sym} (${sd['volume_24h']/1e6:.0f}M)...", end=" ", flush=True) time.sleep(0.15) klines = fetch_klines(sym, TIMEFRAME, DAYS) if len(klines) < VWAP_PERIOD + 100: print(f"skip ({len(klines)} bars)") continue closes = np.array([k[4] for k in klines]) highs = np.array([k[2] for k in klines]) lows = np.array([k[3] for k in klines]) volumes = np.array([k[5] for k in klines]) z, natr, chop = calc_indicators(closes, highs, lows, volumes) cache["symbols"][sym] = { "volume_24h": sd["volume_24h"], "bars": len(klines), "closes": closes.tolist(), "highs": highs.tolist(), "lows": lows.tolist(), "z": z, "natr": natr, "chop": chop, } print(f"OK ({len(klines)} bars)") out = "/home/app/trading-bot-bybit/backtests/data_cache_30d.json" with open(out, "w") as f: json.dump(cache, f) total_bars = sum(s["bars"] for s in cache["symbols"].values()) print(f"\n Cached {len(cache['symbols'])} symbols, {total_bars:,} total bars") print(f" Saved to {out} ({os.path.getsize(out) / 1e6:.1f} MB)") import os if __name__ == "__main__": main()