← Назад
""" Step 7.1: Analyze sweep results β€” comprehensive report. Reads sweep_results_Xd.json and produces: 1. Top 20 by PnL, PF (min 30 trades), WR (min 30 trades) 2. Best R:R ratio (aggregate stats) 3. Parameter sensitivity (which param matters most) 4. Stability: top combos that appear in multiple R:R groups 5. Saves report to analyze_report_Xd.txt Usage: python3 backtests/analyze_sweep.py --days 7 python3 backtests/analyze_sweep.py --days 14 """ import os import sys import json import argparse from collections import defaultdict def load_results(days): path = os.path.join(os.path.dirname(__file__), f"sweep_results_{days}d.json") if not os.path.exists(path): print(f"❌ Not found: {path}") sys.exit(1) with open(path) as f: data = json.load(f) print(f"Loaded {path}: {data['valid_results']} results, date {data['date']}") return data def section(title): return f"\n{'='*70}\n {title}\n{'='*70}\n" def fmt_row(r, rank=None): prefix = f" #{rank:<3}" if rank else " " return ( f"{prefix} R:R={r['rr']:<4} TP={r['tp_pct']:<4}% SL={r['sl_pct']:<4}% " f"Z={r['z_entry']:.1f}/{r['z_max']:.1f} " f"NATR={r['natr_min']:.2f}-{r['natr_max']:.1f} CHOPβ‰₯{r['chop_min']} " f"| Tr={r['trades']:>4} WR={r['wr']:>5.1f}% PnL={r['pnl']:>+7.1f} PF={r['pf']:>5.2f}" ) def analyze(data): results = data["results"] report = [] report.append(f"Sweep Analysis Report β€” {data['days']}d data") report.append(f"Date: {data['date']}") report.append(f"Total valid combos: {data['valid_results']}") report.append(f"Elapsed: {data['elapsed_sec']}s") # ── 1. TOP 20 BY PNL ── report.append(section("1. TOP 20 BY PNL")) by_pnl = sorted(results, key=lambda x: x["pnl"], reverse=True) for i, r in enumerate(by_pnl[:20], 1): report.append(fmt_row(r, i)) # ── 2. TOP 20 BY PF (min 30 trades) ── report.append(section("2. TOP 20 BY PROFIT FACTOR (min 30 trades)")) qualified = [r for r in results if r["trades"] >= 30] by_pf = sorted(qualified, key=lambda x: x["pf"], reverse=True) for i, r in enumerate(by_pf[:20], 1): report.append(fmt_row(r, i)) # ── 3. TOP 20 BY WIN RATE (min 30 trades) ── report.append(section("3. TOP 20 BY WIN RATE (min 30 trades)")) by_wr = sorted(qualified, key=lambda x: x["wr"], reverse=True) for i, r in enumerate(by_wr[:20], 1): report.append(fmt_row(r, i)) # ── 4. BEST R:R RATIO ── report.append(section("4. BEST R:R RATIO (aggregate)")) rr_groups = defaultdict(list) for r in results: rr_groups[r["rr"]].append(r) report.append(f" {'R:R':<6} {'Count':>6} {'Avg PnL':>8} {'Med PnL':>8} {'Avg WR':>7} {'Avg PF':>7} {'%Profit':>8}") report.append(" " + "-" * 60) for rr in sorted(rr_groups.keys()): group = rr_groups[rr] pnls = [r["pnl"] for r in group] wrs = [r["wr"] for r in group] pfs = [r["pf"] for r in group if r["pf"] < 100] # exclude inf PFs profitable = sum(1 for p in pnls if p > 0) pnls_sorted = sorted(pnls) median_pnl = pnls_sorted[len(pnls_sorted)//2] report.append( f" {rr:<6} {len(group):>6} {sum(pnls)/len(pnls):>+7.1f} {median_pnl:>+7.1f} " f"{sum(wrs)/len(wrs):>6.1f}% {sum(pfs)/len(pfs) if pfs else 0:>6.2f} " f"{profitable/len(group)*100:>6.1f}%" ) # ── 5. BEST R:R+TP/SL PAIR ── report.append(section("5. BEST TP/SL PAIR (aggregate across all filters)")) tpsl_groups = defaultdict(list) for r in results: key = f"TP={r['tp_pct']}% SL={r['sl_pct']}%" tpsl_groups[key].append(r) report.append(f" {'TP/SL Pair':<22} {'Count':>6} {'Avg PnL':>8} {'Med PnL':>8} {'Avg WR':>7} {'%Profit':>8}") report.append(" " + "-" * 65) tpsl_sorted = sorted(tpsl_groups.items(), key=lambda x: sum(r["pnl"] for r in x[1])/len(x[1]), reverse=True) for key, group in tpsl_sorted: pnls = [r["pnl"] for r in group] wrs = [r["wr"] for r in group] profitable = sum(1 for p in pnls if p > 0) pnls_sorted = sorted(pnls) median_pnl = pnls_sorted[len(pnls_sorted)//2] report.append( f" {key:<22} {len(group):>6} {sum(pnls)/len(pnls):>+7.1f} {median_pnl:>+7.1f} " f"{sum(wrs)/len(wrs):>6.1f}% {profitable/len(group)*100:>6.1f}%" ) # ── 6. PARAMETER SENSITIVITY ── report.append(section("6. PARAMETER SENSITIVITY (which param matters most)")) param_names = { "z_entry": "Z Entry", "z_max": "Z Max", "natr_min": "NATR Min", "natr_max": "NATR Max", "chop_min": "CHOP Min", } for param_key, param_label in param_names.items(): report.append(f"\n --- {param_label} ---") groups = defaultdict(list) for r in results: groups[r[param_key]].append(r) report.append(f" {'Value':<8} {'Count':>6} {'Avg PnL':>8} {'Med PnL':>8} {'Avg WR':>7} {'%Profit':>8}") report.append(" " + "-" * 50) for val in sorted(groups.keys()): group = groups[val] pnls = [r["pnl"] for r in group] wrs = [r["wr"] for r in group] profitable = sum(1 for p in pnls if p > 0) pnls_sorted = sorted(pnls) median_pnl = pnls_sorted[len(pnls_sorted)//2] report.append( f" {val:<8} {len(group):>6} {sum(pnls)/len(pnls):>+7.1f} {median_pnl:>+7.1f} " f"{sum(wrs)/len(wrs):>6.1f}% {profitable/len(group)*100:>6.1f}%" ) # ── 7. STABILITY: Combos in Top 50 PnL ── report.append(section("7. STABILITY: Common filter sets in Top 50 PnL")) top50 = by_pnl[:50] filter_freq = defaultdict(int) for r in top50: key = f"Z={r['z_entry']}/{r['z_max']} NATR={r['natr_min']}-{r['natr_max']} CHOPβ‰₯{r['chop_min']}" filter_freq[key] += 1 report.append(f" {'Filter Set':<50} {'Count':>5}") report.append(" " + "-" * 58) for key, cnt in sorted(filter_freq.items(), key=lambda x: -x[1]): report.append(f" {key:<50} {cnt:>5}") # ── 8. WORST COMBOS (what to avoid) ── report.append(section("8. WORST 10 COMBOS (what to avoid)")) worst = sorted(results, key=lambda x: x["pnl"])[:10] for i, r in enumerate(worst, 1): report.append(fmt_row(r, i)) # ── 9. GOLDEN COMBOS: PnL > 0 AND PF > 1.3 AND trades >= 50 ── report.append(section("9. GOLDEN COMBOS (PnL>0 AND PF>1.3 AND tradesβ‰₯50)")) golden = [r for r in results if r["pnl"] > 0 and r["pf"] > 1.3 and r["trades"] >= 50] golden.sort(key=lambda x: x["pnl"], reverse=True) report.append(f" Found {len(golden)} golden combos out of {len(results)} total") for i, r in enumerate(golden[:20], 1): report.append(fmt_row(r, i)) # ── 10. RECOMMENDATION CANDIDATES ── report.append(section("10. RECOMMENDED CANDIDATES FOR STEP 8 (14d robustness)")) # Pick top 5 from golden, diverse filter sets seen_filters = set() candidates = [] for r in golden: fkey = f"{r['z_entry']}/{r['z_max']}/{r['natr_min']}/{r['natr_max']}/{r['chop_min']}" if fkey not in seen_filters: seen_filters.add(fkey) candidates.append(r) if len(candidates) >= 5: break if not candidates: # Fallback: top 5 by PnL with trades >= 30 candidates = [r for r in by_pnl if r["trades"] >= 30][:5] report.append(f" {len(candidates)} diverse candidates:") for i, r in enumerate(candidates, 1): report.append(fmt_row(r, i)) report.append(f"\n >>> Next: run these {len(candidates)} combos on 14d data (Step 8)") return "\n".join(report), candidates def main(): parser = argparse.ArgumentParser() parser.add_argument("--days", type=int, default=7) args = parser.parse_args() data = load_results(args.days) report_text, candidates = analyze(data) # Print to console print(report_text) # Save report report_path = os.path.join(os.path.dirname(__file__), f"analyze_report_{args.days}d.txt") with open(report_path, "w") as f: f.write(report_text) print(f"\nπŸ“„ Report saved to {report_path}") # Save candidates for step 8 cand_path = os.path.join(os.path.dirname(__file__), f"candidates_{args.days}d.json") with open(cand_path, "w") as f: json.dump(candidates, f, indent=2) print(f"πŸ“‹ Candidates saved to {cand_path}") if __name__ == "__main__": main()