#!/usr/bin/env python3
# Order-Flow deep analysis: window<=1%, anti-spoof, TP/SL/time-stop sweep
# Runs ON the VPS (cwd=/home/app/futures-screener). Dumps JSON to stdout.
import sqlite3, json, math
from datetime import datetime, timezone
FEE = 0.036 # round-trip maker %
def to_ms(s):
return int(datetime.strptime(s, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc).timestamp()*1000)
# ---- load OF signals ----
u = sqlite3.connect("/home/app/futures-screener/server/data/users.db")
of = u.execute("select id,symbol,direction,entry_price,confidence,mfe_pct,mae_pct,metadata,created_at "
"from signal_log where type='orderflow_imbalance'").fetchall()
u.close()
# ---- load depth snapshots grouped by symbol, sorted by ts ----
d = sqlite3.connect("/home/app/futures-screener/data/depth.db")
snaps = {}
for sym, ts, mark, data in d.execute("select symbol,ts,mark_price,data from snapshots order by symbol,ts"):
snaps.setdefault(sym, []).append((ts, mark, data))
d.close()
dmin = min(s[0] for arr in snaps.values() for s in arr)
dmax = max(s[0] for arr in snaps.values() for s in arr)
def bins_for(data_json, mark, maxpct):
"""return {('bid'/'ask', k): vol} for 0.1% bins, k=0..maxpct*10-1"""
j = json.loads(data_json)
out = {}
for side, key in (("bid","bids"), ("ask","asks")):
for ps, vol in j.get(key, {}).items():
p = float(ps)
dist = abs(p - mark)/mark*100.0
if dist > maxpct: continue
k = int(dist//0.1)
out[(side,k)] = out.get((side,k),0.0) + vol
return out
def imb_from_bins(b, maxk, persistent=None):
bid = sum(v for (s,k),v in b.items() if s=="bid" and k<maxk and (persistent is None or (s,k) in persistent))
ask = sum(v for (s,k),v in b.items() if s=="ask" and k<maxk and (persistent is None or (s,k) in persistent))
tot = bid+ask
return (bid-ask)/tot if tot>0 else 0.0
def nearest_idx(arr, t, tol=15000):
best=None; bd=tol
for i,(ts,_,_) in enumerate(arr):
if abs(ts-t)<bd: bd=abs(ts-t); best=i
return best
rows = [] # per-signal computed record
for sid, sym, direction, entry, conf, mfe, mae, meta, created in of:
arr = snaps.get(sym)
if not arr: continue
t0 = to_ms(created)
if t0 < dmin or t0 > dmax: continue
i0 = nearest_idx(arr, t0)
if i0 is None or i0==0: continue
ts0, mark0, data0 = arr[i0]
tsP, markP, dataP = arr[i0-1]
if ts0-tsP > 25000: continue # need a recent prior snapshot for anti-spoof
b0 = bins_for(data0, mark0, 3.0)
bP = bins_for(dataP, markP, 3.0)
# persistent bins: prior has >=50% of current bin's volume
persistent = set()
for kkey, v in b0.items():
pv = bP.get(kkey, 0.0)
if pv >= 0.5*v and v>0:
persistent.add(kkey)
imb_flat3 = imb_from_bins(b0, 30)
imb_w1 = imb_from_bins(b0, 10)
imb_clean = imb_from_bins(b0, 10, persistent) # window<=1% + anti-spoof
imb_w05 = imb_from_bins(b0, 5)
imb_clean05 = imb_from_bins(b0, 5, persistent) # window<=0.5% + anti-spoof
# path resolution: walk forward marks
entry = float(entry) if entry else mark0
side = 1 if direction=="LONG" else -1
path = [(ts,mk) for ts,mk,_ in arr[i0:]]
rows.append(dict(sid=sid,sym=sym,side=side,entry=entry,conf=conf,
imb_flat3=imb_flat3,imb_w1=imb_w1,imb_clean=imb_clean,
imb_w05=imb_w05,imb_clean05=imb_clean05,
t0=ts0, path=path))
def resolve(r, tp, sl, holdmin):
"""net % for one trade, path-resolved, conservative gap=SL."""
entry=r["entry"]; side=r["side"]; t0=r["t0"]
tp_p = entry*(1+side*tp/100.0); sl_p = entry*(1-side*sl/100.0)
deadline = t0 + holdmin*60000
for ts,mk in r["path"]:
hit_tp = (mk>=tp_p) if side>0 else (mk<=tp_p)
hit_sl = (mk<=sl_p) if side>0 else (mk>=sl_p)
if hit_sl: return -sl - FEE # conservative: SL wins on same/gap
if hit_tp: return tp - FEE
if ts>=deadline:
g = side*(mk-entry)/entry*100.0
return g - FEE
# ran out of path -> exit at last mark
if r["path"]:
mk=r["path"][-1][1]; g=side*(mk-entry)/entry*100.0; return g-FEE
return None
def quartile_stats(recs, key, tp, sl, hold):
vals=sorted(recs, key=lambda r:abs(r[key]))
n=len(vals);
if n<8: return None
qs=[]
for qi in range(4):
lo=qi*n//4; hi=(qi+1)*n//4
chunk=vals[lo:hi]
pls=[resolve(r,tp,sl,hold) for r in chunk]
pls=[p for p in pls if p is not None]
if not pls: qs.append(None); continue
wr=100.0*sum(1 for p in pls if p>0)/len(pls)
qs.append(dict(n=len(pls),wr=round(wr,1),net=round(sum(pls)/len(pls),3),
lo=round(abs(vals[lo][key]),3),hi=round(abs(vals[hi-1][key]),3)))
return qs
base_tp, base_sl, base_hold = 1.5, 0.5, 10
out = {"fee":FEE, "n_signals":len(of), "n_path":len(rows),
"depth_span_h":round((dmax-dmin)/3.6e6,2)}
# 1) metric comparison on Q4 (strongest) at base params
out["metric_cmp"]={}
for key,label in (("imb_flat3","flat ±3% (текущий)"),("imb_w1","окно ≤1%"),
("imb_clean","окно ≤1% + антиспуф"),("imb_w05","окно ≤0.5%"),
("imb_clean05","окно ≤0.5% + антиспуф")):
qs=quartile_stats(rows,key,base_tp,base_sl,base_hold)
out["metric_cmp"][key]={"label":label,"quartiles":qs}
# 2) Q4 of PRIMARY metric (window<=0.5% + anti-spoof) -> sweeps
PRIMARY="imb_clean05"
clean_sorted=sorted(rows,key=lambda r:abs(r[PRIMARY]))
q4=clean_sorted[3*len(clean_sorted)//4:]
out["q4_n"]=len(q4)
TPg=[0.8,1.0,1.5,2.0]; SLg=[0.3,0.5,0.8,1.0]; Hg=[3,5,10,15,30]
def cell(recs,tp,sl,hold):
pls=[resolve(r,tp,sl,hold) for r in recs]; pls=[p for p in pls if p is not None]
if not pls: return None
return dict(net=round(sum(pls)/len(pls),3),wr=round(100*sum(1 for p in pls if p>0)/len(pls),1),n=len(pls))
out["tp_sl"]={"tp":TPg,"sl":SLg,"hold":base_hold,
"cells":[[ (cell(q4,tp,sl,base_hold) or {}).get("net") for sl in SLg] for tp in TPg]}
out["hold_tp"]={"hold":Hg,"tp":TPg,"sl":base_sl,
"cells":[[ (cell(q4,tp,base_sl,h) or {}).get("net") for tp in TPg] for h in Hg]}
out["sl_hold"]={"sl":SLg,"hold":Hg,"tp":base_tp,
"cells":[[ (cell(q4,base_tp,sl,h) or {}).get("net") for h in Hg] for sl in SLg]}
# 3) best combos
combos=[]
for tp in TPg:
for sl in SLg:
for h in Hg:
c=cell(q4,tp,sl,h)
if c: combos.append(dict(tp=tp,sl=sl,hold=h,**c))
combos.sort(key=lambda x:-x["net"])
out["best"]=combos[:8]
# realistic best (SL>=0.5 to survive spread/slippage)
real=[c for c in combos if c["sl"]>=0.5]
out["best_real"]=real[:5]
# Rick's proposed combo: TP2 / SL0.5 / hold10 + its rank
rc=cell(q4,2.0,0.5,10)
rank=next((i+1 for i,c in enumerate(combos) if c["tp"]==2.0 and c["sl"]==0.5 and c["hold"]==10),None)
rank_real=next((i+1 for i,c in enumerate(real) if c["tp"]==2.0 and c["sl"]==0.5 and c["hold"]==10),None)
out["rick_combo"]={"tp":2.0,"sl":0.5,"hold":10,"rank":rank,"rank_real":rank_real,
"n_combos":len(combos),"n_real":len(real),**(rc or {})}
# 4) anti-spoof impact: how many bins dropped on avg in Q4
drop=[]
for r in q4:
pass
# 5) per-symbol edge on q4 (clustering / noise symbols)
from collections import defaultdict
persym=defaultdict(list)
for r in q4:
p=resolve(r,base_tp,base_sl,base_hold)
if p is not None: persym[r["sym"]].append(p)
syms=[]
for s,pls in persym.items():
if len(pls)>=3:
syms.append(dict(sym=s,n=len(pls),net=round(sum(pls)/len(pls),3),
wr=round(100*sum(1 for p in pls if p>0)/len(pls),1)))
syms.sort(key=lambda x:-x["net"])
out["per_symbol"]=syms
print(json.dumps(out))