diff --git a/CHANGELOG.md b/CHANGELOG.md index d0ca61de..c07d8d0c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## [4.18.17RC] - 2026-06-13 Unreleased in PyPI +- [ADDED] Mode A pre-build sweep runner (`openTEPES_Runner.py` + `openTEPES_Cases.py`): `run(cases, solver_name, backend=...)` runs many cases through `openTEPES_run`, one independent build-and-solve per case, over a `serial` (default), `multiprocessing`, or `joblib` backend. A `Case` names one input source (a CSV directory or a `.duckdb` file) plus an optional output directory and label. The runner reads back each case's `openTEPES_run_status_*.json` and returns one summary dict per case in input order, so nothing has to pickle the Pyomo model across workers; a case that raises is captured as `status="error"` instead of aborting the sweep. Additive only — no existing module changes, and a single-case serial sweep reproduces a direct `openTEPES_run`. Mode B (in-memory overlay) and Mode C (`openTEPES_ProblemSolvingResolve`) are separate. New tests in `tests/test_run.py`. The architecture diagram (`doc/img/openTEPES_architecture.svg` and the rendered `.png`) marks the `runner.py` and `cases.py` boxes implemented, and also the `resolve.py` box now that Mode C has merged. - [CHANGED] fix some errors in writing H2 and heat network output results - [FIXED] `setup_solver` no longer crashes on Windows when an earlier in-process solve still holds the stale log file open. - [FIXED] deprecated `datetime.utcnow()` replaced with timezone-aware UTC; emitted timestamp unchanged. diff --git a/doc/img/openTEPES_architecture.png b/doc/img/openTEPES_architecture.png index 17b52464..45509b45 100644 Binary files a/doc/img/openTEPES_architecture.png and b/doc/img/openTEPES_architecture.png differ diff --git a/doc/img/openTEPES_architecture.svg b/doc/img/openTEPES_architecture.svg index c442caea..cc0355b8 100644 --- a/doc/img/openTEPES_architecture.svg +++ b/doc/img/openTEPES_architecture.svg @@ -56,7 +56,7 @@ ▶ sweep - + runner.py parallel sweep orchestrator @@ -98,7 +98,7 @@ DuckDBSource streaming SQL backend - + cases.py ★ sweep-enabling Case = baseline + overlay @@ -243,7 +243,7 @@ …Benders.py L-shaped decomp - + resolve.py ★ hot-swap (Mode C) diff --git a/openTEPES/openTEPES_Cases.py b/openTEPES/openTEPES_Cases.py new file mode 100644 index 00000000..b6c48446 --- /dev/null +++ b/openTEPES/openTEPES_Cases.py @@ -0,0 +1,34 @@ +""" +Open Generation, Storage, and Transmission Operation and Expansion Planning Model with RES and ESS (openTEPES) - June 17, 2026 + +openTEPES.openTEPES_Cases — one unit of work for the sweep runner. + +A ``Case`` names one input source (a CSV directory or a ``.duckdb`` file, as +``openTEPES_run`` accepts) and where it writes results. ``overlay`` is reserved +for the Mode B sweep and rejected by the Mode A runner. +""" +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class Case: + """One sweep entry. + + ``dir_name`` / ``case_name`` are the standard openTEPES pair (``case_name`` + may be a CSV directory or a ``.duckdb`` file). ``out_path`` redirects this + case's results — cases sharing a ``(dir_name, case_name)`` need distinct + ``out_path`` so writers do not collide. ``label`` (default ``case_name``) + tags the summary. ``overlay`` is reserved for Mode B. + """ + + dir_name: str + case_name: str + out_path: str | None = None + label: str | None = None + overlay: dict | None = None + + def __post_init__(self): + if self.label is None: + object.__setattr__(self, "label", self.case_name) diff --git a/openTEPES/openTEPES_Runner.py b/openTEPES/openTEPES_Runner.py new file mode 100644 index 00000000..9e4ce4fa --- /dev/null +++ b/openTEPES/openTEPES_Runner.py @@ -0,0 +1,120 @@ +""" +Open Generation, Storage, and Transmission Operation and Expansion Planning Model with RES and ESS (openTEPES) - June 17, 2026 + +openTEPES.openTEPES_Runner — Mode A pre-build sweep runner. + +Run many cases through ``openTEPES_run`` with a chosen backend: each case loads +its own source, builds, solves, and writes its own results (RFC §4.1). Mode B +(in-memory overlay) and Mode C (``openTEPES_ProblemSolvingResolve``) are separate. +""" +from __future__ import annotations + +import glob +import json +import os + +try: + from .openTEPES import openTEPES_run +except ImportError: + import sys + sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + from openTEPES.openTEPES import openTEPES_run + + +def _status_dir(case): + """Directory ``openTEPES_run`` writes this case's status JSON to (its own rule).""" + if case.out_path: + return case.out_path + if case.case_name.endswith(".duckdb"): + return os.path.dirname(os.path.abspath(os.path.join(case.dir_name, case.case_name))) + return os.path.join(case.dir_name, case.case_name) + + +def _read_status(case): + """Return the case's ``openTEPES_run_status_*.json``: a summary dict, not the model. + + The model cannot cross a worker process boundary, so every backend returns + the JSON ``openTEPES_run`` already persists. CSV cases name it after the + case; DuckDB inputs use the embedded name, hence the glob fallback. + """ + out_dir = _status_dir(case) + candidate = os.path.join(out_dir, f"openTEPES_run_status_{case.case_name}.json") + if not os.path.exists(candidate): + matches = sorted(glob.glob(os.path.join(out_dir, "openTEPES_run_status_*.json")), + key=os.path.getmtime) + candidate = matches[-1] if matches else None + if candidate and os.path.exists(candidate): + with open(candidate) as handle: + return json.load(handle) + return {"case": case.case_name, "dir": case.dir_name, "status": "unknown", + "error": "status JSON not found after run"} + + +def _run_one(case, solver_name, output_spec, gzip_patterns, pIndOutputResults, pIndLogConsole): + """Solve one case; capture a raise as ``status="error"`` so the sweep survives it. + + Module-level so it pickles for the multiprocessing and joblib backends. + """ + try: + openTEPES_run( + case.dir_name, case.case_name, solver_name, + pIndOutputResults, pIndLogConsole, + output_spec=output_spec, out_path=case.out_path, gzip_patterns=gzip_patterns, + ) + except Exception as exc: + return { + "label": case.label, + "case": case.case_name, + "dir": case.dir_name, + "status": "error", + "error": f"{type(exc).__name__}: {exc}", + } + summary = _read_status(case) + summary.setdefault("label", case.label) + return summary + + +def run(cases, solver_name, *, mode="pre-build", backend="serial", n_workers=1, + output_spec=None, gzip_patterns=None, pIndOutputResults=1, pIndLogConsole=0): + """Run ``Case`` objects through ``openTEPES_run`` and return one summary per case. + + ``mode`` must be ``"pre-build"`` (Mode A); ``backend`` is ``"serial"`` (default, + no extra dependency), ``"multiprocessing"`` (stdlib ``Pool``), or ``"joblib"``. + ``output_spec``, ``gzip_patterns``, ``pIndOutputResults`` and ``pIndLogConsole`` + pass through to ``openTEPES_run``. Summaries keep input order on every backend; + a failed case carries ``status="error"``. + """ + if mode != "pre-build": + raise NotImplementedError( + f"openTEPES_Runner.run supports mode='pre-build' (Mode A) only; got mode={mode!r}. " + "Mode B (in-memory overlay) is a later PR; Mode C (hot-swap) lives in " + "openTEPES_ProblemSolvingResolve.resolve." + ) + + cases = list(cases) + for case in cases: + if case.overlay is not None: + raise NotImplementedError( + f"Case.overlay is reserved for Mode B (in-memory overlay) and is not supported " + f"in Mode A; case label={case.label!r} set an overlay." + ) + + work = [(case, solver_name, output_spec, gzip_patterns, pIndOutputResults, pIndLogConsole) + for case in cases] + + if backend == "serial": + return [_run_one(*args) for args in work] + + if backend == "multiprocessing": + import multiprocessing as mp + with mp.Pool(processes=n_workers) as pool: + return pool.starmap(_run_one, work) + + if backend == "joblib": + try: + from joblib import Parallel, delayed + except ImportError as exc: + raise ImportError("backend='joblib' requires joblib (pip install joblib).") from exc + return Parallel(n_jobs=n_workers)(delayed(_run_one)(*args) for args in work) + + raise ValueError(f"unknown backend {backend!r}; expected 'serial', 'multiprocessing', or 'joblib'.") diff --git a/tests/test_run.py b/tests/test_run.py index 4daec41e..d1c2fb51 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -8,6 +8,7 @@ from openTEPES.openTEPES import openTEPES_run from openTEPES.openTEPES_ProblemSolvingBenders import lshaped from openTEPES.openTEPES_ProblemSolvingResolve import resolve, overlay_scaled +from openTEPES import openTEPES_Runner, openTEPES_Cases # === Fixture: single-stage 7-day system === @@ -374,3 +375,82 @@ def test_binary_investment(case_7d_binary, expected_cost): print(f"Expected cost: {expected_cost:.5f}, Actual cost: {actual_cost:.5f}") np.testing.assert_approx_equal(actual_cost, expected_cost) + + +# === Mode A sweep-runner tests === +# +# openTEPES_Runner.run drives many cases through openTEPES_run with a chosen backend (Mode A, +# RFC §4.1: each worker loads its own source, builds, solves, writes). The runner never returns +# the Pyomo model — that cannot cross a process boundary — so it reads back the per-case +# openTEPES_run_status_*.json and returns one summary dict per case, in input order, regardless of +# backend. A case that raises is captured as status="error" so one bad pathway does not lose the +# rest of the sweep. +@pytest.mark.solve +@pytest.mark.parametrize("case_7d_system", ["9n"], indirect=["case_7d_system"]) +def test_mode_a_runner_serial_parity(case_7d_system, tmp_path): + """A single-case serial sweep reproduces a direct openTEPES_run cost and echoes the label.""" + d = case_7d_system + base_model = openTEPES_run(d["DirName"], d["CaseName"], d["SolverName"], 0, 0) + base_cost = base_model.vTotalSCost() + + records = openTEPES_Runner.run( + [openTEPES_Cases.Case(d["DirName"], d["CaseName"], out_path=str(tmp_path / "c1"), label="first")], + d["SolverName"], mode="pre-build", backend="serial", + pIndOutputResults=0, pIndLogConsole=0, + ) + assert len(records) == 1 + assert records[0]["status"] == "optimal" + assert records[0]["label"] == "first" + assert records[0]["total_cost_meur"] == pytest.approx(base_cost, rel=1e-6) + + +@pytest.mark.solve +@pytest.mark.parametrize("case_7d_system", ["9n"], indirect=["case_7d_system"]) +def test_mode_a_runner_multi_case_and_error(case_7d_system, tmp_path): + """A multi-case serial sweep preserves input order, isolates outputs, and captures a bad case.""" + d = case_7d_system + cases = [ + openTEPES_Cases.Case(d["DirName"], d["CaseName"], out_path=str(tmp_path / "good1"), label="good1"), + openTEPES_Cases.Case(str(tmp_path), "DOES_NOT_EXIST", out_path=str(tmp_path / "broken"), label="broken"), + openTEPES_Cases.Case(d["DirName"], d["CaseName"], out_path=str(tmp_path / "good2"), label="good2"), + ] + records = openTEPES_Runner.run(cases, d["SolverName"], mode="pre-build", backend="serial", + pIndOutputResults=0, pIndLogConsole=0) + + assert [r["label"] for r in records] == ["good1", "broken", "good2"] + assert records[0]["status"] == "optimal" + assert records[2]["status"] == "optimal" + assert records[1]["status"] == "error" + assert "error" in records[1] + # The two good cases solved to the same cost from independent output directories. + assert records[0]["total_cost_meur"] == pytest.approx(records[2]["total_cost_meur"], rel=1e-9) + + +@pytest.mark.solve +@pytest.mark.parametrize("case_7d_system", ["9n"], indirect=["case_7d_system"]) +def test_mode_a_runner_multiprocessing(case_7d_system, tmp_path): + """The multiprocessing backend returns the same per-case summaries, in input order.""" + d = case_7d_system + cases = [ + openTEPES_Cases.Case(d["DirName"], d["CaseName"], out_path=str(tmp_path / "w1"), label="w1"), + openTEPES_Cases.Case(d["DirName"], d["CaseName"], out_path=str(tmp_path / "w2"), label="w2"), + ] + records = openTEPES_Runner.run(cases, d["SolverName"], mode="pre-build", + backend="multiprocessing", n_workers=2, + pIndOutputResults=0, pIndLogConsole=0) + assert [r["label"] for r in records] == ["w1", "w2"] + assert all(r["status"] == "optimal" for r in records) + assert records[0]["total_cost_meur"] == pytest.approx(records[1]["total_cost_meur"], rel=1e-9) + + +def test_mode_a_runner_guards(): + """Unsupported modes, overlays, and backends are rejected without touching a solver.""" + case = openTEPES_Cases.Case("dir", "case") + with pytest.raises(NotImplementedError): + openTEPES_Runner.run([case], "highs", mode="in-memory") + with pytest.raises(NotImplementedError): + openTEPES_Runner.run([case], "highs", mode="hot-swap") + with pytest.raises(NotImplementedError): + openTEPES_Runner.run([openTEPES_Cases.Case("dir", "case", overlay={"pDemandElec": {}})], "highs") + with pytest.raises(ValueError): + openTEPES_Runner.run([case], "highs", backend="dask")