Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 257 additions & 0 deletions src/ml4t/backtest/_validation/case_study_lean.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
"""Case-study LEAN parity: actual LEAN vs ml4t-backtest[lean] on real book weights.

This reconstructs the case-study parity harness used by the Chapter 16
``16_case_study_lean_parity`` notebook. Each ``chapter16_<case_study>`` LEAN
workspace is self-contained (``main.py`` + ``weights.csv`` + ``rebalance_dates.csv``
+ ``asset_symbols.csv`` + LEAN daily zips), so both sides reproduce from it:

* **LEAN side** -- run the workspace project through actual LEAN (Docker) via
:func:`run_lean_backtest`; the algorithm logs its fills to ``ml4t_order_events.csv``
and equity to ``ml4t_daily_equity.csv``.
* **ml4t side** -- decode the workspace's own daily zips (so both engines consume
byte-identical prices), then replay the identical target-weight strategy through
Comment on lines +8 to +12
the ``lean`` profile.

Parity is asserted on the sorted daily fill multiset
``(timestamp, asset, side, quantity, 4-decimal price)`` plus terminal portfolio value.

The ml4t strategy mirrors the workspace ``main.py`` exactly, including LEAN's
fill-forward semantics: a name dropped from the target universe is still sized
(off its last known close) and liquidated at the next real bar, rather than left
untouched once it has no bar on a rebalance day.
"""

from __future__ import annotations

import csv
import json
import math
import re
import zipfile
from pathlib import Path

import pandas as pd
import polars as pl

from ..config import BacktestConfig, CommissionType, SlippageType
from ..datafeed import DataFeed
from ..engine import Engine
from ..strategy import Strategy


def parse_workspace_params(workspace_dir: Path) -> dict:
"""Read start/end/cash/fee from a chapter16 LEAN workspace ``main.py``."""
text = (workspace_dir / "main.py").read_text(encoding="utf-8")

def _date(macro: str) -> str:
m = re.search(rf"set_{macro}_date\((\d+),\s*(\d+),\s*(\d+)\)", text)
if not m:
raise ValueError(f"no set_{macro}_date in {workspace_dir / 'main.py'}")
y, mo, d = (int(x) for x in m.groups())
return f"{y:04d}-{mo:02d}-{d:02d}"

cash = re.search(r"set_cash\(([\d.]+)\)", text)
fee = re.search(r"Ml4tPercentFeeModel\(([\d.]+)\)", text)
return {
"start": _date("start"),
"end": _date("end"),
"initial_cash": float(cash.group(1)) if cash else 1_000_000.0,
"fee": float(fee.group(1)) if fee else 0.0,
}


def _decode_lean_zip(zip_path: Path) -> pd.DataFrame:
"""Decode a LEAN daily OHLCV zip (prices stored as x10000 integers)."""
with zipfile.ZipFile(zip_path) as zf:
raw = zf.read(zf.namelist()[0]).decode()
rows = []
for line in raw.splitlines():
if not line.strip():
continue
dt, o, h, lo, c, v = line.split(",")
rows.append(
{
"timestamp": pd.Timestamp(dt.split()[0]),
"open": int(o) / 10000.0,
"high": int(h) / 10000.0,
"low": int(lo) / 10000.0,
"close": int(c) / 10000.0,
"volume": float(v),
}
)
return pd.DataFrame(rows)


def load_workspace(workspace_dir: Path, data_daily: Path) -> dict:
"""Load weights, rebalance dates, asset map and price panel for one workspace."""
params = parse_workspace_params(workspace_dir)

asset_to_ticker: dict[str, str] = {}
with (workspace_dir / "asset_symbols.csv").open(newline="") as f:
for row in csv.DictReader(f):
a, t = row["asset"].strip(), row["ticker"].strip()
if a and t:
asset_to_ticker[a] = t
asset_order = list(asset_to_ticker)

targets: dict[str, dict[str, float]] = {}
with (workspace_dir / "weights.csv").open(newline="") as f:
for row in csv.DictReader(f):
targets.setdefault(row["timestamp"], {})[row["asset"]] = float(row["target_weight"])

rebalance_dates = {
ln.strip()
for ln in (workspace_dir / "rebalance_dates.csv").read_text().splitlines()
if ln.strip()
}

start, end = pd.Timestamp(params["start"]), pd.Timestamp(params["end"])
frames = []
for asset, ticker in asset_to_ticker.items():
df = _decode_lean_zip(data_daily / f"{ticker.lower()}.zip")
df["symbol"] = asset
frames.append(df)
prices = (
pl.from_pandas(pd.concat(frames, ignore_index=True))
.select("symbol", "timestamp", "open", "high", "low", "close", "volume")
.filter((pl.col("timestamp") >= start) & (pl.col("timestamp") <= end))
.sort("timestamp", "symbol")
)
return {
"params": params,
"asset_order": asset_order,
"asset_to_ticker": asset_to_ticker,
"targets": targets,
"rebalance_dates": rebalance_dates,
"prices": prices,
}


class CaseStudyWeightStrategy(Strategy):
"""Target-weight rebalance mirroring the LEAN workspace ``main.py``.

On each rebalance date, size ``target_qty = signed_floor(weight * equity / close)``
and submit the delta as a market order. Includes LEAN-style fill-forward: an
asset with no real bar on a rebalance day is still sized off its last known
close (so a dropped name liquidates), until its final real bar (delisting).
"""

def __init__(self, asset_order, targets, rebalance_dates, last_active):
self.asset_order = asset_order
self.targets = targets
self.rebalance_dates = rebalance_dates
self.last_active = last_active
self.last_close: dict[str, float] = {}

@staticmethod
def _target_quantity(weight: float, portfolio_value: float, price: float) -> int:
raw = (weight * portfolio_value) / price
if raw >= 0:
return int(math.floor(raw + 1e-12))
return -int(math.floor(abs(raw) + 1e-12))

def on_data(self, timestamp, data, context, broker):
for asset in self.asset_order:
bar = data.get(asset)
if bar is not None:
px = float(bar["close"])
if px > 0:
self.last_close[asset] = px

ts = pd.Timestamp(timestamp).normalize()
if ts.strftime("%Y-%m-%d") not in self.rebalance_dates:
return
targets = self.targets.get(ts.strftime("%Y-%m-%d"), {})
pv = broker.equity()
for asset in self.asset_order:
if ts > self.last_active.get(asset, ts):
continue
price = self.last_close.get(asset)
if price is None or price <= 0:
continue
target_qty = self._target_quantity(targets.get(asset, 0.0), pv, price)
pos = broker.get_position(asset)
current = int(pos.quantity) if pos else 0
delta = target_qty - current
if delta != 0:
broker.submit_order(asset, delta)


def _surface(trades: pd.DataFrame) -> pd.DataFrame:
out = trades.copy()
out["timestamp"] = pd.to_datetime(out["timestamp"]).dt.normalize()
out["asset"] = out["asset"].astype(str)
out["side"] = out["side"].astype(str).str.lower()
out["quantity"] = out["quantity"].astype(float).abs()
out["price"] = out["price"].astype(float).round(4)
return out[["timestamp", "asset", "side", "quantity", "price"]].reset_index(drop=True)


def run_ml4t_lean(workspace_dir: Path, data_daily: Path) -> dict:
"""Run ml4t-backtest[lean] on a workspace's weights; return value + fill surface."""
wd = load_workspace(workspace_dir, data_daily)
cfg = BacktestConfig.from_preset("lean")
cfg.initial_cash = wd["params"]["initial_cash"]
cfg.allow_short_selling = True
cfg.allow_leverage = True
cfg.commission_type = CommissionType.PERCENTAGE
cfg.commission_rate = wd["params"]["fee"]
cfg.commission_per_share = 0.0
cfg.commission_minimum = 0.0
cfg.slippage_type = SlippageType.NONE
cfg.slippage_rate = 0.0

last_active = {
row["symbol"]: pd.Timestamp(row["timestamp"])
for row in wd["prices"].group_by("symbol").agg(pl.col("timestamp").max()).to_dicts()
}
feed = DataFeed(prices_df=wd["prices"])
strat = CaseStudyWeightStrategy(
wd["asset_order"], wd["targets"], wd["rebalance_dates"], last_active
)
result = Engine.from_config(feed, strat, config=cfg).run()
fills = result.to_fills_dataframe().to_pandas().rename(columns={"symbol": "asset"})
return {"final_value": float(result["final_value"]), "fills": _surface(fills)}


def lean_side(workspace_dir: Path) -> dict:
"""Load the LEAN-side fills + terminal value the workspace algorithm logged.

Reads the artifacts the LEAN ``main.py`` writes at the workspace root:
``ml4t_order_events.csv`` (fills), ``ml4t_daily_equity.csv`` (terminal value),
``ml4t_symbol_map.json`` (obfuscated ticker -> real asset).
"""
symbol_map = json.loads((workspace_dir / "ml4t_symbol_map.json").read_text(encoding="utf-8"))
equity = pd.read_csv(workspace_dir / "ml4t_daily_equity.csv")
final_value = float(equity["equity"].iloc[-1])

events = pd.read_csv(workspace_dir / "ml4t_order_events.csv", low_memory=False)
filled = events[events["status"].astype(str) == "Filled"].copy()
filled["asset"] = filled["symbol"].astype(str).map(symbol_map).fillna(filled["symbol"])
filled = filled.rename(columns={"fill_quantity": "quantity", "fill_price": "price"})
filled["side"] = filled["direction"].astype(str).str.lower()
fills = _surface(filled)
return {"final_value": final_value, "fills": fills, "n_trades": len(fills)}


def compare(lean: dict, ml4t: dict) -> dict:
"""Compare LEAN vs ml4t fill surfaces + terminal value."""
cols = ["timestamp", "asset", "side", "quantity", "price"]
ls = lean["fills"].sort_values(cols).reset_index(drop=True)
ms = ml4t["fills"].sort_values(cols).reset_index(drop=True)
multiset_match = len(ls) == len(ms) and not (ls[cols] != ms[cols]).any().any()
raw_match = (
len(ls) == len(ms)
and not (lean["fills"][cols].reset_index(drop=True) != ml4t["fills"][cols]).any().any()
)
return {
"lean_final_value": lean["final_value"],
"ml4t_final_value": ml4t["final_value"],
"final_value_gap_usd": ml4t["final_value"] - lean["final_value"],
"final_value_gap_pct": ml4t["final_value"] / lean["final_value"] - 1.0,
"lean_fills": len(lean["fills"]),
"ml4t_fills": len(ml4t["fills"]),
"fill_gap": len(ml4t["fills"]) - len(lean["fills"]),
"sorted_fill_multiset_match": bool(multiset_match),
"raw_row_order_match": bool(raw_match),
}
47 changes: 47 additions & 0 deletions tests/benchmark/test_case_study_lean.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""Regression test: ml4t-backtest[lean] reproduces LEAN on real case-study weights.

Uses the committed chapter16 LEAN workspaces (weights + daily zips + the LEAN-side
fills the algorithm logged). The ml4t[lean] reconstruction must match LEAN's fill
multiset and terminal value exactly. No Docker required — the LEAN side is the
committed reference artifact; only the ml4t side is recomputed here.
"""

from __future__ import annotations

from pathlib import Path

import pytest

from ml4t.backtest._validation.case_study_lean import compare, lean_side, run_ml4t_lean

WORKSPACE = Path(__file__).resolve().parents[2] / "validation" / "lean" / "workspace"
DATA_DAILY = WORKSPACE / "data" / "equity" / "usa" / "daily"
CASE_STUDIES = [
"chapter16_etfs",
"chapter16_sp500_equity_option_analytics",
"chapter16_us_equities_panel",
]


# The generic equity-decomposition invariant (initial + closed_pnl + open_pnl ==
# final_value) does not model 2x margin, but parity here is asserted directly
# against LEAN's terminal value (external ground truth), which matches exactly.
@pytest.mark.no_invariant_check
@pytest.mark.parametrize("project", CASE_STUDIES)
def test_case_study_lean_parity(project: str) -> None:
workspace_dir = WORKSPACE / project
if not (workspace_dir / "ml4t_order_events.csv").exists():
pytest.skip(f"LEAN reference artifacts missing for {project}")

Comment on lines +32 to +35
lean = lean_side(workspace_dir)
ml4t = run_ml4t_lean(workspace_dir, DATA_DAILY)
result = compare(lean, ml4t)

assert result["sorted_fill_multiset_match"], (
f"{project}: fill multiset diverges "
f"(lean={result['lean_fills']} ml4t={result['ml4t_fills']})"
)
assert result["fill_gap"] == 0
assert abs(result["final_value_gap_usd"]) < 1e-4, (
f"{project}: terminal value gap ${result['final_value_gap_usd']:.6f}"
)
41 changes: 33 additions & 8 deletions validation/benchmark_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -1076,28 +1076,44 @@ def on_data(self, timestamp, data, context, broker):
broker.cancel_order(pending_order.order_id)
broker.submit_order(asset_name, target_qty - current_qty)

def _disable_commission(cfg: BacktestConfig) -> None:
# Clear every direct commission field, not just type+rate. The broker
# auto-activates a cost model whenever any direct field (per_share /
# per_trade / rate) is non-zero, even when commission_type is NONE, so a
# partial reset would leave a preset's per-share/minimum commission live
# (e.g. zipline_strict carries per_share=0.005, minimum=1.0) and silently
# re-enable it — breaking parity against the zero-commission reference run.
cfg.commission_type = CommissionType.NONE
cfg.commission_rate = 0.0
cfg.commission_per_share = 0.0
cfg.commission_per_trade = 0.0
cfg.commission_minimum = 0.0

def _disable_slippage(cfg: BacktestConfig) -> None:
cfg.slippage_type = SlippageType.NONE
cfg.slippage_rate = 0.0
cfg.slippage_fixed = 0.0
cfg.slippage_spread = 0.0
cfg.slippage_spread_by_asset = {}

def build_ml4t_config(no_costs: bool) -> BacktestConfig:
cfg = BacktestConfig.from_preset(profile_name)
cfg.initial_cash = config.initial_cash
cfg.allow_short_selling = True
if no_costs:
cfg.commission_type = CommissionType.NONE
cfg.commission_rate = 0.0
cfg.slippage_type = SlippageType.NONE
cfg.slippage_rate = 0.0
_disable_commission(cfg)
_disable_slippage(cfg)
else:
if config.commission_pct > 0:
cfg.commission_type = CommissionType.PERCENTAGE
cfg.commission_rate = config.commission_pct
else:
cfg.commission_type = CommissionType.NONE
cfg.commission_rate = 0.0
_disable_commission(cfg)
if config.slippage_pct > 0:
cfg.slippage_type = SlippageType.PERCENTAGE
cfg.slippage_rate = config.slippage_pct
else:
cfg.slippage_type = SlippageType.NONE
cfg.slippage_rate = 0.0
_disable_slippage(cfg)
return cfg

# Warm-up run (smaller data)
Expand Down Expand Up @@ -2093,6 +2109,15 @@ def on_order_event(self, order_event: OrderEvent):
target_trace_df=target_trace,
)

# Report the decoded fill count as num_trades for an apples-to-apples
# cross-engine comparison. load_lean_artifacts prefers LEAN's
# tradeStatistics.totalNumberOfTrades, which counts closed round-trip
# trades, whereas every other engine in this suite reports individual
# fills via len(trades_df). Using fills here keeps the parity surface
# consistent (LEAN fills == ml4t fills on the canonical benchmark).
if trades_df is not None and not trades_df.empty:
num_trades = int(len(trades_df))
Comment on lines +2118 to +2119

return BenchmarkResult(
framework="LEAN CLI",
scenario=config.name,
Expand Down