"""
Step 7.1: Analyze sweep results β comprehensive report.
Reads sweep_results_Xd.json and produces:
1. Top 20 by PnL, PF (min 30 trades), WR (min 30 trades)
2. Best R:R ratio (aggregate stats)
3. Parameter sensitivity (which param matters most)
4. Stability: top combos that appear in multiple R:R groups
5. Saves report to analyze_report_Xd.txt
Usage:
python3 backtests/analyze_sweep.py --days 7
python3 backtests/analyze_sweep.py --days 14
"""
import os
import sys
import json
import argparse
from collections import defaultdict
def load_results(days):
path = os.path.join(os.path.dirname(__file__), f"sweep_results_{days}d.json")
if not os.path.exists(path):
print(f"β Not found: {path}")
sys.exit(1)
with open(path) as f:
data = json.load(f)
print(f"Loaded {path}: {data['valid_results']} results, date {data['date']}")
return data
def section(title):
return f"\n{'='*70}\n {title}\n{'='*70}\n"
def fmt_row(r, rank=None):
prefix = f" #{rank:<3}" if rank else " "
return (
f"{prefix} R:R={r['rr']:<4} TP={r['tp_pct']:<4}% SL={r['sl_pct']:<4}% "
f"Z={r['z_entry']:.1f}/{r['z_max']:.1f} "
f"NATR={r['natr_min']:.2f}-{r['natr_max']:.1f} CHOPβ₯{r['chop_min']} "
f"| Tr={r['trades']:>4} WR={r['wr']:>5.1f}% PnL={r['pnl']:>+7.1f} PF={r['pf']:>5.2f}"
)
def analyze(data):
results = data["results"]
report = []
report.append(f"Sweep Analysis Report β {data['days']}d data")
report.append(f"Date: {data['date']}")
report.append(f"Total valid combos: {data['valid_results']}")
report.append(f"Elapsed: {data['elapsed_sec']}s")
# ββ 1. TOP 20 BY PNL ββ
report.append(section("1. TOP 20 BY PNL"))
by_pnl = sorted(results, key=lambda x: x["pnl"], reverse=True)
for i, r in enumerate(by_pnl[:20], 1):
report.append(fmt_row(r, i))
# ββ 2. TOP 20 BY PF (min 30 trades) ββ
report.append(section("2. TOP 20 BY PROFIT FACTOR (min 30 trades)"))
qualified = [r for r in results if r["trades"] >= 30]
by_pf = sorted(qualified, key=lambda x: x["pf"], reverse=True)
for i, r in enumerate(by_pf[:20], 1):
report.append(fmt_row(r, i))
# ββ 3. TOP 20 BY WIN RATE (min 30 trades) ββ
report.append(section("3. TOP 20 BY WIN RATE (min 30 trades)"))
by_wr = sorted(qualified, key=lambda x: x["wr"], reverse=True)
for i, r in enumerate(by_wr[:20], 1):
report.append(fmt_row(r, i))
# ββ 4. BEST R:R RATIO ββ
report.append(section("4. BEST R:R RATIO (aggregate)"))
rr_groups = defaultdict(list)
for r in results:
rr_groups[r["rr"]].append(r)
report.append(f" {'R:R':<6} {'Count':>6} {'Avg PnL':>8} {'Med PnL':>8} {'Avg WR':>7} {'Avg PF':>7} {'%Profit':>8}")
report.append(" " + "-" * 60)
for rr in sorted(rr_groups.keys()):
group = rr_groups[rr]
pnls = [r["pnl"] for r in group]
wrs = [r["wr"] for r in group]
pfs = [r["pf"] for r in group if r["pf"] < 100] # exclude inf PFs
profitable = sum(1 for p in pnls if p > 0)
pnls_sorted = sorted(pnls)
median_pnl = pnls_sorted[len(pnls_sorted)//2]
report.append(
f" {rr:<6} {len(group):>6} {sum(pnls)/len(pnls):>+7.1f} {median_pnl:>+7.1f} "
f"{sum(wrs)/len(wrs):>6.1f}% {sum(pfs)/len(pfs) if pfs else 0:>6.2f} "
f"{profitable/len(group)*100:>6.1f}%"
)
# ββ 5. BEST R:R+TP/SL PAIR ββ
report.append(section("5. BEST TP/SL PAIR (aggregate across all filters)"))
tpsl_groups = defaultdict(list)
for r in results:
key = f"TP={r['tp_pct']}% SL={r['sl_pct']}%"
tpsl_groups[key].append(r)
report.append(f" {'TP/SL Pair':<22} {'Count':>6} {'Avg PnL':>8} {'Med PnL':>8} {'Avg WR':>7} {'%Profit':>8}")
report.append(" " + "-" * 65)
tpsl_sorted = sorted(tpsl_groups.items(), key=lambda x: sum(r["pnl"] for r in x[1])/len(x[1]), reverse=True)
for key, group in tpsl_sorted:
pnls = [r["pnl"] for r in group]
wrs = [r["wr"] for r in group]
profitable = sum(1 for p in pnls if p > 0)
pnls_sorted = sorted(pnls)
median_pnl = pnls_sorted[len(pnls_sorted)//2]
report.append(
f" {key:<22} {len(group):>6} {sum(pnls)/len(pnls):>+7.1f} {median_pnl:>+7.1f} "
f"{sum(wrs)/len(wrs):>6.1f}% {profitable/len(group)*100:>6.1f}%"
)
# ββ 6. PARAMETER SENSITIVITY ββ
report.append(section("6. PARAMETER SENSITIVITY (which param matters most)"))
param_names = {
"z_entry": "Z Entry",
"z_max": "Z Max",
"natr_min": "NATR Min",
"natr_max": "NATR Max",
"chop_min": "CHOP Min",
}
for param_key, param_label in param_names.items():
report.append(f"\n --- {param_label} ---")
groups = defaultdict(list)
for r in results:
groups[r[param_key]].append(r)
report.append(f" {'Value':<8} {'Count':>6} {'Avg PnL':>8} {'Med PnL':>8} {'Avg WR':>7} {'%Profit':>8}")
report.append(" " + "-" * 50)
for val in sorted(groups.keys()):
group = groups[val]
pnls = [r["pnl"] for r in group]
wrs = [r["wr"] for r in group]
profitable = sum(1 for p in pnls if p > 0)
pnls_sorted = sorted(pnls)
median_pnl = pnls_sorted[len(pnls_sorted)//2]
report.append(
f" {val:<8} {len(group):>6} {sum(pnls)/len(pnls):>+7.1f} {median_pnl:>+7.1f} "
f"{sum(wrs)/len(wrs):>6.1f}% {profitable/len(group)*100:>6.1f}%"
)
# ββ 7. STABILITY: Combos in Top 50 PnL ββ
report.append(section("7. STABILITY: Common filter sets in Top 50 PnL"))
top50 = by_pnl[:50]
filter_freq = defaultdict(int)
for r in top50:
key = f"Z={r['z_entry']}/{r['z_max']} NATR={r['natr_min']}-{r['natr_max']} CHOPβ₯{r['chop_min']}"
filter_freq[key] += 1
report.append(f" {'Filter Set':<50} {'Count':>5}")
report.append(" " + "-" * 58)
for key, cnt in sorted(filter_freq.items(), key=lambda x: -x[1]):
report.append(f" {key:<50} {cnt:>5}")
# ββ 8. WORST COMBOS (what to avoid) ββ
report.append(section("8. WORST 10 COMBOS (what to avoid)"))
worst = sorted(results, key=lambda x: x["pnl"])[:10]
for i, r in enumerate(worst, 1):
report.append(fmt_row(r, i))
# ββ 9. GOLDEN COMBOS: PnL > 0 AND PF > 1.3 AND trades >= 50 ββ
report.append(section("9. GOLDEN COMBOS (PnL>0 AND PF>1.3 AND tradesβ₯50)"))
golden = [r for r in results if r["pnl"] > 0 and r["pf"] > 1.3 and r["trades"] >= 50]
golden.sort(key=lambda x: x["pnl"], reverse=True)
report.append(f" Found {len(golden)} golden combos out of {len(results)} total")
for i, r in enumerate(golden[:20], 1):
report.append(fmt_row(r, i))
# ββ 10. RECOMMENDATION CANDIDATES ββ
report.append(section("10. RECOMMENDED CANDIDATES FOR STEP 8 (14d robustness)"))
# Pick top 5 from golden, diverse filter sets
seen_filters = set()
candidates = []
for r in golden:
fkey = f"{r['z_entry']}/{r['z_max']}/{r['natr_min']}/{r['natr_max']}/{r['chop_min']}"
if fkey not in seen_filters:
seen_filters.add(fkey)
candidates.append(r)
if len(candidates) >= 5:
break
if not candidates:
# Fallback: top 5 by PnL with trades >= 30
candidates = [r for r in by_pnl if r["trades"] >= 30][:5]
report.append(f" {len(candidates)} diverse candidates:")
for i, r in enumerate(candidates, 1):
report.append(fmt_row(r, i))
report.append(f"\n >>> Next: run these {len(candidates)} combos on 14d data (Step 8)")
return "\n".join(report), candidates
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--days", type=int, default=7)
args = parser.parse_args()
data = load_results(args.days)
report_text, candidates = analyze(data)
# Print to console
print(report_text)
# Save report
report_path = os.path.join(os.path.dirname(__file__), f"analyze_report_{args.days}d.txt")
with open(report_path, "w") as f:
f.write(report_text)
print(f"\nπ Report saved to {report_path}")
# Save candidates for step 8
cand_path = os.path.join(os.path.dirname(__file__), f"candidates_{args.days}d.json")
with open(cand_path, "w") as f:
json.dump(candidates, f, indent=2)
print(f"π Candidates saved to {cand_path}")
if __name__ == "__main__":
main()