← Back
β˜†
"""
Step 7.1: Analyze sweep results β€” comprehensive report.

Reads sweep_results_Xd.json and produces:
1. Top 20 by PnL, PF (min 30 trades), WR (min 30 trades)
2. Best R:R ratio (aggregate stats)
3. Parameter sensitivity (which param matters most)
4. Stability: top combos that appear in multiple R:R groups
5. Saves report to analyze_report_Xd.txt

Usage:
    python3 backtests/analyze_sweep.py --days 7
    python3 backtests/analyze_sweep.py --days 14
"""

import os
import sys
import json
import argparse
from collections import defaultdict


def load_results(days):
    path = os.path.join(os.path.dirname(__file__), f"sweep_results_{days}d.json")
    if not os.path.exists(path):
        print(f"❌ Not found: {path}")
        sys.exit(1)
    with open(path) as f:
        data = json.load(f)
    print(f"Loaded {path}: {data['valid_results']} results, date {data['date']}")
    return data


def section(title):
    return f"\n{'='*70}\n  {title}\n{'='*70}\n"


def fmt_row(r, rank=None):
    prefix = f"  #{rank:<3}" if rank else "  "
    return (
        f"{prefix} R:R={r['rr']:<4} TP={r['tp_pct']:<4}% SL={r['sl_pct']:<4}% "
        f"Z={r['z_entry']:.1f}/{r['z_max']:.1f} "
        f"NATR={r['natr_min']:.2f}-{r['natr_max']:.1f} CHOPβ‰₯{r['chop_min']} "
        f"| Tr={r['trades']:>4} WR={r['wr']:>5.1f}% PnL={r['pnl']:>+7.1f} PF={r['pf']:>5.2f}"
    )


def analyze(data):
    results = data["results"]
    report = []
    report.append(f"Sweep Analysis Report β€” {data['days']}d data")
    report.append(f"Date: {data['date']}")
    report.append(f"Total valid combos: {data['valid_results']}")
    report.append(f"Elapsed: {data['elapsed_sec']}s")

    # ── 1. TOP 20 BY PNL ──
    report.append(section("1. TOP 20 BY PNL"))
    by_pnl = sorted(results, key=lambda x: x["pnl"], reverse=True)
    for i, r in enumerate(by_pnl[:20], 1):
        report.append(fmt_row(r, i))

    # ── 2. TOP 20 BY PF (min 30 trades) ──
    report.append(section("2. TOP 20 BY PROFIT FACTOR (min 30 trades)"))
    qualified = [r for r in results if r["trades"] >= 30]
    by_pf = sorted(qualified, key=lambda x: x["pf"], reverse=True)
    for i, r in enumerate(by_pf[:20], 1):
        report.append(fmt_row(r, i))

    # ── 3. TOP 20 BY WIN RATE (min 30 trades) ──
    report.append(section("3. TOP 20 BY WIN RATE (min 30 trades)"))
    by_wr = sorted(qualified, key=lambda x: x["wr"], reverse=True)
    for i, r in enumerate(by_wr[:20], 1):
        report.append(fmt_row(r, i))

    # ── 4. BEST R:R RATIO ──
    report.append(section("4. BEST R:R RATIO (aggregate)"))
    rr_groups = defaultdict(list)
    for r in results:
        rr_groups[r["rr"]].append(r)

    report.append(f"  {'R:R':<6} {'Count':>6} {'Avg PnL':>8} {'Med PnL':>8} {'Avg WR':>7} {'Avg PF':>7} {'%Profit':>8}")
    report.append("  " + "-" * 60)
    for rr in sorted(rr_groups.keys()):
        group = rr_groups[rr]
        pnls = [r["pnl"] for r in group]
        wrs = [r["wr"] for r in group]
        pfs = [r["pf"] for r in group if r["pf"] < 100]  # exclude inf PFs
        profitable = sum(1 for p in pnls if p > 0)
        pnls_sorted = sorted(pnls)
        median_pnl = pnls_sorted[len(pnls_sorted)//2]
        report.append(
            f"  {rr:<6} {len(group):>6} {sum(pnls)/len(pnls):>+7.1f} {median_pnl:>+7.1f} "
            f"{sum(wrs)/len(wrs):>6.1f}% {sum(pfs)/len(pfs) if pfs else 0:>6.2f} "
            f"{profitable/len(group)*100:>6.1f}%"
        )

    # ── 5. BEST R:R+TP/SL PAIR ──
    report.append(section("5. BEST TP/SL PAIR (aggregate across all filters)"))
    tpsl_groups = defaultdict(list)
    for r in results:
        key = f"TP={r['tp_pct']}% SL={r['sl_pct']}%"
        tpsl_groups[key].append(r)

    report.append(f"  {'TP/SL Pair':<22} {'Count':>6} {'Avg PnL':>8} {'Med PnL':>8} {'Avg WR':>7} {'%Profit':>8}")
    report.append("  " + "-" * 65)
    tpsl_sorted = sorted(tpsl_groups.items(), key=lambda x: sum(r["pnl"] for r in x[1])/len(x[1]), reverse=True)
    for key, group in tpsl_sorted:
        pnls = [r["pnl"] for r in group]
        wrs = [r["wr"] for r in group]
        profitable = sum(1 for p in pnls if p > 0)
        pnls_sorted = sorted(pnls)
        median_pnl = pnls_sorted[len(pnls_sorted)//2]
        report.append(
            f"  {key:<22} {len(group):>6} {sum(pnls)/len(pnls):>+7.1f} {median_pnl:>+7.1f} "
            f"{sum(wrs)/len(wrs):>6.1f}% {profitable/len(group)*100:>6.1f}%"
        )

    # ── 6. PARAMETER SENSITIVITY ──
    report.append(section("6. PARAMETER SENSITIVITY (which param matters most)"))

    param_names = {
        "z_entry": "Z Entry",
        "z_max": "Z Max",
        "natr_min": "NATR Min",
        "natr_max": "NATR Max",
        "chop_min": "CHOP Min",
    }

    for param_key, param_label in param_names.items():
        report.append(f"\n  --- {param_label} ---")
        groups = defaultdict(list)
        for r in results:
            groups[r[param_key]].append(r)

        report.append(f"  {'Value':<8} {'Count':>6} {'Avg PnL':>8} {'Med PnL':>8} {'Avg WR':>7} {'%Profit':>8}")
        report.append("  " + "-" * 50)
        for val in sorted(groups.keys()):
            group = groups[val]
            pnls = [r["pnl"] for r in group]
            wrs = [r["wr"] for r in group]
            profitable = sum(1 for p in pnls if p > 0)
            pnls_sorted = sorted(pnls)
            median_pnl = pnls_sorted[len(pnls_sorted)//2]
            report.append(
                f"  {val:<8} {len(group):>6} {sum(pnls)/len(pnls):>+7.1f} {median_pnl:>+7.1f} "
                f"{sum(wrs)/len(wrs):>6.1f}% {profitable/len(group)*100:>6.1f}%"
            )

    # ── 7. STABILITY: Combos in Top 50 PnL ──
    report.append(section("7. STABILITY: Common filter sets in Top 50 PnL"))
    top50 = by_pnl[:50]
    filter_freq = defaultdict(int)
    for r in top50:
        key = f"Z={r['z_entry']}/{r['z_max']} NATR={r['natr_min']}-{r['natr_max']} CHOPβ‰₯{r['chop_min']}"
        filter_freq[key] += 1

    report.append(f"  {'Filter Set':<50} {'Count':>5}")
    report.append("  " + "-" * 58)
    for key, cnt in sorted(filter_freq.items(), key=lambda x: -x[1]):
        report.append(f"  {key:<50} {cnt:>5}")

    # ── 8. WORST COMBOS (what to avoid) ──
    report.append(section("8. WORST 10 COMBOS (what to avoid)"))
    worst = sorted(results, key=lambda x: x["pnl"])[:10]
    for i, r in enumerate(worst, 1):
        report.append(fmt_row(r, i))

    # ── 9. GOLDEN COMBOS: PnL > 0 AND PF > 1.3 AND trades >= 50 ──
    report.append(section("9. GOLDEN COMBOS (PnL>0 AND PF>1.3 AND tradesβ‰₯50)"))
    golden = [r for r in results if r["pnl"] > 0 and r["pf"] > 1.3 and r["trades"] >= 50]
    golden.sort(key=lambda x: x["pnl"], reverse=True)
    report.append(f"  Found {len(golden)} golden combos out of {len(results)} total")
    for i, r in enumerate(golden[:20], 1):
        report.append(fmt_row(r, i))

    # ── 10. RECOMMENDATION CANDIDATES ──
    report.append(section("10. RECOMMENDED CANDIDATES FOR STEP 8 (14d robustness)"))
    # Pick top 5 from golden, diverse filter sets
    seen_filters = set()
    candidates = []
    for r in golden:
        fkey = f"{r['z_entry']}/{r['z_max']}/{r['natr_min']}/{r['natr_max']}/{r['chop_min']}"
        if fkey not in seen_filters:
            seen_filters.add(fkey)
            candidates.append(r)
            if len(candidates) >= 5:
                break

    if not candidates:
        # Fallback: top 5 by PnL with trades >= 30
        candidates = [r for r in by_pnl if r["trades"] >= 30][:5]

    report.append(f"  {len(candidates)} diverse candidates:")
    for i, r in enumerate(candidates, 1):
        report.append(fmt_row(r, i))

    report.append(f"\n  >>> Next: run these {len(candidates)} combos on 14d data (Step 8)")

    return "\n".join(report), candidates


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--days", type=int, default=7)
    args = parser.parse_args()

    data = load_results(args.days)
    report_text, candidates = analyze(data)

    # Print to console
    print(report_text)

    # Save report
    report_path = os.path.join(os.path.dirname(__file__), f"analyze_report_{args.days}d.txt")
    with open(report_path, "w") as f:
        f.write(report_text)
    print(f"\nπŸ“„ Report saved to {report_path}")

    # Save candidates for step 8
    cand_path = os.path.join(os.path.dirname(__file__), f"candidates_{args.days}d.json")
    with open(cand_path, "w") as f:
        json.dump(candidates, f, indent=2)
    print(f"πŸ“‹ Candidates saved to {cand_path}")


if __name__ == "__main__":
    main()

πŸ“œ Git History

dd32dfdchore: initial commit β€” version control setup5 weeks ago
Show last diff
Loading...