diff --git a/src/ml4t/backtest/_validation/case_study_lean.py b/src/ml4t/backtest/_validation/case_study_lean.py new file mode 100644 index 0000000..b61f091 --- /dev/null +++ b/src/ml4t/backtest/_validation/case_study_lean.py @@ -0,0 +1,257 @@ +"""Case-study LEAN parity: actual LEAN vs ml4t-backtest[lean] on real book weights. + +This reconstructs the case-study parity harness used by the Chapter 16 +``16_case_study_lean_parity`` notebook. Each ``chapter16_`` LEAN +workspace is self-contained (``main.py`` + ``weights.csv`` + ``rebalance_dates.csv`` ++ ``asset_symbols.csv`` + LEAN daily zips), so both sides reproduce from it: + +* **LEAN side** -- run the workspace project through actual LEAN (Docker) via + :func:`run_lean_backtest`; the algorithm logs its fills to ``ml4t_order_events.csv`` + and equity to ``ml4t_daily_equity.csv``. +* **ml4t side** -- decode the workspace's own daily zips (so both engines consume + byte-identical prices), then replay the identical target-weight strategy through + the ``lean`` profile. + +Parity is asserted on the sorted daily fill multiset +``(timestamp, asset, side, quantity, 4-decimal price)`` plus terminal portfolio value. + +The ml4t strategy mirrors the workspace ``main.py`` exactly, including LEAN's +fill-forward semantics: a name dropped from the target universe is still sized +(off its last known close) and liquidated at the next real bar, rather than left +untouched once it has no bar on a rebalance day. +""" + +from __future__ import annotations + +import csv +import json +import math +import re +import zipfile +from pathlib import Path + +import pandas as pd +import polars as pl + +from ..config import BacktestConfig, CommissionType, SlippageType +from ..datafeed import DataFeed +from ..engine import Engine +from ..strategy import Strategy + + +def parse_workspace_params(workspace_dir: Path) -> dict: + """Read start/end/cash/fee from a chapter16 LEAN workspace ``main.py``.""" + text = (workspace_dir / "main.py").read_text(encoding="utf-8") + + def _date(macro: str) -> str: + m = re.search(rf"set_{macro}_date\((\d+),\s*(\d+),\s*(\d+)\)", text) + if not m: + raise ValueError(f"no set_{macro}_date in {workspace_dir / 'main.py'}") + y, mo, d = (int(x) for x in m.groups()) + return f"{y:04d}-{mo:02d}-{d:02d}" + + cash = re.search(r"set_cash\(([\d.]+)\)", text) + fee = re.search(r"Ml4tPercentFeeModel\(([\d.]+)\)", text) + return { + "start": _date("start"), + "end": _date("end"), + "initial_cash": float(cash.group(1)) if cash else 1_000_000.0, + "fee": float(fee.group(1)) if fee else 0.0, + } + + +def _decode_lean_zip(zip_path: Path) -> pd.DataFrame: + """Decode a LEAN daily OHLCV zip (prices stored as x10000 integers).""" + with zipfile.ZipFile(zip_path) as zf: + raw = zf.read(zf.namelist()[0]).decode() + rows = [] + for line in raw.splitlines(): + if not line.strip(): + continue + dt, o, h, lo, c, v = line.split(",") + rows.append( + { + "timestamp": pd.Timestamp(dt.split()[0]), + "open": int(o) / 10000.0, + "high": int(h) / 10000.0, + "low": int(lo) / 10000.0, + "close": int(c) / 10000.0, + "volume": float(v), + } + ) + return pd.DataFrame(rows) + + +def load_workspace(workspace_dir: Path, data_daily: Path) -> dict: + """Load weights, rebalance dates, asset map and price panel for one workspace.""" + params = parse_workspace_params(workspace_dir) + + asset_to_ticker: dict[str, str] = {} + with (workspace_dir / "asset_symbols.csv").open(newline="") as f: + for row in csv.DictReader(f): + a, t = row["asset"].strip(), row["ticker"].strip() + if a and t: + asset_to_ticker[a] = t + asset_order = list(asset_to_ticker) + + targets: dict[str, dict[str, float]] = {} + with (workspace_dir / "weights.csv").open(newline="") as f: + for row in csv.DictReader(f): + targets.setdefault(row["timestamp"], {})[row["asset"]] = float(row["target_weight"]) + + rebalance_dates = { + ln.strip() + for ln in (workspace_dir / "rebalance_dates.csv").read_text().splitlines() + if ln.strip() + } + + start, end = pd.Timestamp(params["start"]), pd.Timestamp(params["end"]) + frames = [] + for asset, ticker in asset_to_ticker.items(): + df = _decode_lean_zip(data_daily / f"{ticker.lower()}.zip") + df["symbol"] = asset + frames.append(df) + prices = ( + pl.from_pandas(pd.concat(frames, ignore_index=True)) + .select("symbol", "timestamp", "open", "high", "low", "close", "volume") + .filter((pl.col("timestamp") >= start) & (pl.col("timestamp") <= end)) + .sort("timestamp", "symbol") + ) + return { + "params": params, + "asset_order": asset_order, + "asset_to_ticker": asset_to_ticker, + "targets": targets, + "rebalance_dates": rebalance_dates, + "prices": prices, + } + + +class CaseStudyWeightStrategy(Strategy): + """Target-weight rebalance mirroring the LEAN workspace ``main.py``. + + On each rebalance date, size ``target_qty = signed_floor(weight * equity / close)`` + and submit the delta as a market order. Includes LEAN-style fill-forward: an + asset with no real bar on a rebalance day is still sized off its last known + close (so a dropped name liquidates), until its final real bar (delisting). + """ + + def __init__(self, asset_order, targets, rebalance_dates, last_active): + self.asset_order = asset_order + self.targets = targets + self.rebalance_dates = rebalance_dates + self.last_active = last_active + self.last_close: dict[str, float] = {} + + @staticmethod + def _target_quantity(weight: float, portfolio_value: float, price: float) -> int: + raw = (weight * portfolio_value) / price + if raw >= 0: + return int(math.floor(raw + 1e-12)) + return -int(math.floor(abs(raw) + 1e-12)) + + def on_data(self, timestamp, data, context, broker): + for asset in self.asset_order: + bar = data.get(asset) + if bar is not None: + px = float(bar["close"]) + if px > 0: + self.last_close[asset] = px + + ts = pd.Timestamp(timestamp).normalize() + if ts.strftime("%Y-%m-%d") not in self.rebalance_dates: + return + targets = self.targets.get(ts.strftime("%Y-%m-%d"), {}) + pv = broker.equity() + for asset in self.asset_order: + if ts > self.last_active.get(asset, ts): + continue + price = self.last_close.get(asset) + if price is None or price <= 0: + continue + target_qty = self._target_quantity(targets.get(asset, 0.0), pv, price) + pos = broker.get_position(asset) + current = int(pos.quantity) if pos else 0 + delta = target_qty - current + if delta != 0: + broker.submit_order(asset, delta) + + +def _surface(trades: pd.DataFrame) -> pd.DataFrame: + out = trades.copy() + out["timestamp"] = pd.to_datetime(out["timestamp"]).dt.normalize() + out["asset"] = out["asset"].astype(str) + out["side"] = out["side"].astype(str).str.lower() + out["quantity"] = out["quantity"].astype(float).abs() + out["price"] = out["price"].astype(float).round(4) + return out[["timestamp", "asset", "side", "quantity", "price"]].reset_index(drop=True) + + +def run_ml4t_lean(workspace_dir: Path, data_daily: Path) -> dict: + """Run ml4t-backtest[lean] on a workspace's weights; return value + fill surface.""" + wd = load_workspace(workspace_dir, data_daily) + cfg = BacktestConfig.from_preset("lean") + cfg.initial_cash = wd["params"]["initial_cash"] + cfg.allow_short_selling = True + cfg.allow_leverage = True + cfg.commission_type = CommissionType.PERCENTAGE + cfg.commission_rate = wd["params"]["fee"] + cfg.commission_per_share = 0.0 + cfg.commission_minimum = 0.0 + cfg.slippage_type = SlippageType.NONE + cfg.slippage_rate = 0.0 + + last_active = { + row["symbol"]: pd.Timestamp(row["timestamp"]) + for row in wd["prices"].group_by("symbol").agg(pl.col("timestamp").max()).to_dicts() + } + feed = DataFeed(prices_df=wd["prices"]) + strat = CaseStudyWeightStrategy( + wd["asset_order"], wd["targets"], wd["rebalance_dates"], last_active + ) + result = Engine.from_config(feed, strat, config=cfg).run() + fills = result.to_fills_dataframe().to_pandas().rename(columns={"symbol": "asset"}) + return {"final_value": float(result["final_value"]), "fills": _surface(fills)} + + +def lean_side(workspace_dir: Path) -> dict: + """Load the LEAN-side fills + terminal value the workspace algorithm logged. + + Reads the artifacts the LEAN ``main.py`` writes at the workspace root: + ``ml4t_order_events.csv`` (fills), ``ml4t_daily_equity.csv`` (terminal value), + ``ml4t_symbol_map.json`` (obfuscated ticker -> real asset). + """ + symbol_map = json.loads((workspace_dir / "ml4t_symbol_map.json").read_text(encoding="utf-8")) + equity = pd.read_csv(workspace_dir / "ml4t_daily_equity.csv") + final_value = float(equity["equity"].iloc[-1]) + + events = pd.read_csv(workspace_dir / "ml4t_order_events.csv", low_memory=False) + filled = events[events["status"].astype(str) == "Filled"].copy() + filled["asset"] = filled["symbol"].astype(str).map(symbol_map).fillna(filled["symbol"]) + filled = filled.rename(columns={"fill_quantity": "quantity", "fill_price": "price"}) + filled["side"] = filled["direction"].astype(str).str.lower() + fills = _surface(filled) + return {"final_value": final_value, "fills": fills, "n_trades": len(fills)} + + +def compare(lean: dict, ml4t: dict) -> dict: + """Compare LEAN vs ml4t fill surfaces + terminal value.""" + cols = ["timestamp", "asset", "side", "quantity", "price"] + ls = lean["fills"].sort_values(cols).reset_index(drop=True) + ms = ml4t["fills"].sort_values(cols).reset_index(drop=True) + multiset_match = len(ls) == len(ms) and not (ls[cols] != ms[cols]).any().any() + raw_match = ( + len(ls) == len(ms) + and not (lean["fills"][cols].reset_index(drop=True) != ml4t["fills"][cols]).any().any() + ) + return { + "lean_final_value": lean["final_value"], + "ml4t_final_value": ml4t["final_value"], + "final_value_gap_usd": ml4t["final_value"] - lean["final_value"], + "final_value_gap_pct": ml4t["final_value"] / lean["final_value"] - 1.0, + "lean_fills": len(lean["fills"]), + "ml4t_fills": len(ml4t["fills"]), + "fill_gap": len(ml4t["fills"]) - len(lean["fills"]), + "sorted_fill_multiset_match": bool(multiset_match), + "raw_row_order_match": bool(raw_match), + } diff --git a/tests/benchmark/test_case_study_lean.py b/tests/benchmark/test_case_study_lean.py new file mode 100644 index 0000000..992f6a8 --- /dev/null +++ b/tests/benchmark/test_case_study_lean.py @@ -0,0 +1,47 @@ +"""Regression test: ml4t-backtest[lean] reproduces LEAN on real case-study weights. + +Uses the committed chapter16 LEAN workspaces (weights + daily zips + the LEAN-side +fills the algorithm logged). The ml4t[lean] reconstruction must match LEAN's fill +multiset and terminal value exactly. No Docker required — the LEAN side is the +committed reference artifact; only the ml4t side is recomputed here. +""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from ml4t.backtest._validation.case_study_lean import compare, lean_side, run_ml4t_lean + +WORKSPACE = Path(__file__).resolve().parents[2] / "validation" / "lean" / "workspace" +DATA_DAILY = WORKSPACE / "data" / "equity" / "usa" / "daily" +CASE_STUDIES = [ + "chapter16_etfs", + "chapter16_sp500_equity_option_analytics", + "chapter16_us_equities_panel", +] + + +# The generic equity-decomposition invariant (initial + closed_pnl + open_pnl == +# final_value) does not model 2x margin, but parity here is asserted directly +# against LEAN's terminal value (external ground truth), which matches exactly. +@pytest.mark.no_invariant_check +@pytest.mark.parametrize("project", CASE_STUDIES) +def test_case_study_lean_parity(project: str) -> None: + workspace_dir = WORKSPACE / project + if not (workspace_dir / "ml4t_order_events.csv").exists(): + pytest.skip(f"LEAN reference artifacts missing for {project}") + + lean = lean_side(workspace_dir) + ml4t = run_ml4t_lean(workspace_dir, DATA_DAILY) + result = compare(lean, ml4t) + + assert result["sorted_fill_multiset_match"], ( + f"{project}: fill multiset diverges " + f"(lean={result['lean_fills']} ml4t={result['ml4t_fills']})" + ) + assert result["fill_gap"] == 0 + assert abs(result["final_value_gap_usd"]) < 1e-4, ( + f"{project}: terminal value gap ${result['final_value_gap_usd']:.6f}" + ) diff --git a/validation/benchmark_suite.py b/validation/benchmark_suite.py index cdfdcb9..d1427bc 100644 --- a/validation/benchmark_suite.py +++ b/validation/benchmark_suite.py @@ -1076,28 +1076,44 @@ def on_data(self, timestamp, data, context, broker): broker.cancel_order(pending_order.order_id) broker.submit_order(asset_name, target_qty - current_qty) + def _disable_commission(cfg: BacktestConfig) -> None: + # Clear every direct commission field, not just type+rate. The broker + # auto-activates a cost model whenever any direct field (per_share / + # per_trade / rate) is non-zero, even when commission_type is NONE, so a + # partial reset would leave a preset's per-share/minimum commission live + # (e.g. zipline_strict carries per_share=0.005, minimum=1.0) and silently + # re-enable it — breaking parity against the zero-commission reference run. + cfg.commission_type = CommissionType.NONE + cfg.commission_rate = 0.0 + cfg.commission_per_share = 0.0 + cfg.commission_per_trade = 0.0 + cfg.commission_minimum = 0.0 + + def _disable_slippage(cfg: BacktestConfig) -> None: + cfg.slippage_type = SlippageType.NONE + cfg.slippage_rate = 0.0 + cfg.slippage_fixed = 0.0 + cfg.slippage_spread = 0.0 + cfg.slippage_spread_by_asset = {} + def build_ml4t_config(no_costs: bool) -> BacktestConfig: cfg = BacktestConfig.from_preset(profile_name) cfg.initial_cash = config.initial_cash cfg.allow_short_selling = True if no_costs: - cfg.commission_type = CommissionType.NONE - cfg.commission_rate = 0.0 - cfg.slippage_type = SlippageType.NONE - cfg.slippage_rate = 0.0 + _disable_commission(cfg) + _disable_slippage(cfg) else: if config.commission_pct > 0: cfg.commission_type = CommissionType.PERCENTAGE cfg.commission_rate = config.commission_pct else: - cfg.commission_type = CommissionType.NONE - cfg.commission_rate = 0.0 + _disable_commission(cfg) if config.slippage_pct > 0: cfg.slippage_type = SlippageType.PERCENTAGE cfg.slippage_rate = config.slippage_pct else: - cfg.slippage_type = SlippageType.NONE - cfg.slippage_rate = 0.0 + _disable_slippage(cfg) return cfg # Warm-up run (smaller data) @@ -2093,6 +2109,15 @@ def on_order_event(self, order_event: OrderEvent): target_trace_df=target_trace, ) + # Report the decoded fill count as num_trades for an apples-to-apples + # cross-engine comparison. load_lean_artifacts prefers LEAN's + # tradeStatistics.totalNumberOfTrades, which counts closed round-trip + # trades, whereas every other engine in this suite reports individual + # fills via len(trades_df). Using fills here keeps the parity surface + # consistent (LEAN fills == ml4t fills on the canonical benchmark). + if trades_df is not None and not trades_df.empty: + num_trades = int(len(trades_df)) + return BenchmarkResult( framework="LEAN CLI", scenario=config.name,