Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## [4.18.17RC] - 2026-06-13 Unreleased in PyPI

- [ADDED] Mode A pre-build sweep runner (`openTEPES_Runner.py` + `openTEPES_Cases.py`): `run(cases, solver_name, backend=...)` runs many cases through `openTEPES_run`, one independent build-and-solve per case, over a `serial` (default), `multiprocessing`, or `joblib` backend. A `Case` names one input source (a CSV directory or a `.duckdb` file) plus an optional output directory and label. The runner reads back each case's `openTEPES_run_status_*.json` and returns one summary dict per case in input order, so nothing has to pickle the Pyomo model across workers; a case that raises is captured as `status="error"` instead of aborting the sweep. Additive only — no existing module changes, and a single-case serial sweep reproduces a direct `openTEPES_run`. Mode B (in-memory overlay) and Mode C (`openTEPES_ProblemSolvingResolve`) are separate. New tests in `tests/test_run.py`. The architecture diagram (`doc/img/openTEPES_architecture.svg` and the rendered `.png`) marks the `runner.py` and `cases.py` boxes implemented, and also the `resolve.py` box now that Mode C has merged.
- [CHANGED] fix some errors in writing H2 and heat network output results
- [FIXED] `setup_solver` no longer crashes on Windows when an earlier in-process solve still holds the stale log file open.
- [FIXED] deprecated `datetime.utcnow()` replaced with timezone-aware UTC; emitted timestamp unchanged.
Expand Down
Binary file modified doc/img/openTEPES_architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 3 additions & 3 deletions doc/img/openTEPES_architecture.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
34 changes: 34 additions & 0 deletions openTEPES/openTEPES_Cases.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""
Open Generation, Storage, and Transmission Operation and Expansion Planning Model with RES and ESS (openTEPES) - June 17, 2026

openTEPES.openTEPES_Cases — one unit of work for the sweep runner.

A ``Case`` names one input source (a CSV directory or a ``.duckdb`` file, as
``openTEPES_run`` accepts) and where it writes results. ``overlay`` is reserved
for the Mode B sweep and rejected by the Mode A runner.
"""
from __future__ import annotations

from dataclasses import dataclass


@dataclass(frozen=True)
class Case:
"""One sweep entry.

``dir_name`` / ``case_name`` are the standard openTEPES pair (``case_name``
may be a CSV directory or a ``.duckdb`` file). ``out_path`` redirects this
case's results — cases sharing a ``(dir_name, case_name)`` need distinct
``out_path`` so writers do not collide. ``label`` (default ``case_name``)
tags the summary. ``overlay`` is reserved for Mode B.
"""

dir_name: str
case_name: str
out_path: str | None = None
label: str | None = None
overlay: dict | None = None

def __post_init__(self):
if self.label is None:
object.__setattr__(self, "label", self.case_name)
120 changes: 120 additions & 0 deletions openTEPES/openTEPES_Runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""
Open Generation, Storage, and Transmission Operation and Expansion Planning Model with RES and ESS (openTEPES) - June 17, 2026

openTEPES.openTEPES_Runner — Mode A pre-build sweep runner.

Run many cases through ``openTEPES_run`` with a chosen backend: each case loads
its own source, builds, solves, and writes its own results (RFC §4.1). Mode B
(in-memory overlay) and Mode C (``openTEPES_ProblemSolvingResolve``) are separate.
"""
from __future__ import annotations

import glob
import json
import os

try:
from .openTEPES import openTEPES_run
except ImportError:
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from openTEPES.openTEPES import openTEPES_run


def _status_dir(case):
"""Directory ``openTEPES_run`` writes this case's status JSON to (its own rule)."""
if case.out_path:
return case.out_path
if case.case_name.endswith(".duckdb"):
return os.path.dirname(os.path.abspath(os.path.join(case.dir_name, case.case_name)))
return os.path.join(case.dir_name, case.case_name)


def _read_status(case):
"""Return the case's ``openTEPES_run_status_*.json``: a summary dict, not the model.

The model cannot cross a worker process boundary, so every backend returns
the JSON ``openTEPES_run`` already persists. CSV cases name it after the
case; DuckDB inputs use the embedded name, hence the glob fallback.
"""
out_dir = _status_dir(case)
candidate = os.path.join(out_dir, f"openTEPES_run_status_{case.case_name}.json")
if not os.path.exists(candidate):
matches = sorted(glob.glob(os.path.join(out_dir, "openTEPES_run_status_*.json")),
key=os.path.getmtime)
candidate = matches[-1] if matches else None
if candidate and os.path.exists(candidate):
with open(candidate) as handle:
return json.load(handle)
return {"case": case.case_name, "dir": case.dir_name, "status": "unknown",
"error": "status JSON not found after run"}


def _run_one(case, solver_name, output_spec, gzip_patterns, pIndOutputResults, pIndLogConsole):
"""Solve one case; capture a raise as ``status="error"`` so the sweep survives it.

Module-level so it pickles for the multiprocessing and joblib backends.
"""
try:
openTEPES_run(
case.dir_name, case.case_name, solver_name,
pIndOutputResults, pIndLogConsole,
output_spec=output_spec, out_path=case.out_path, gzip_patterns=gzip_patterns,
)
except Exception as exc:
return {
"label": case.label,
"case": case.case_name,
"dir": case.dir_name,
"status": "error",
"error": f"{type(exc).__name__}: {exc}",
}
summary = _read_status(case)
summary.setdefault("label", case.label)
return summary


def run(cases, solver_name, *, mode="pre-build", backend="serial", n_workers=1,
output_spec=None, gzip_patterns=None, pIndOutputResults=1, pIndLogConsole=0):
"""Run ``Case`` objects through ``openTEPES_run`` and return one summary per case.

``mode`` must be ``"pre-build"`` (Mode A); ``backend`` is ``"serial"`` (default,
no extra dependency), ``"multiprocessing"`` (stdlib ``Pool``), or ``"joblib"``.
``output_spec``, ``gzip_patterns``, ``pIndOutputResults`` and ``pIndLogConsole``
pass through to ``openTEPES_run``. Summaries keep input order on every backend;
a failed case carries ``status="error"``.
"""
if mode != "pre-build":
raise NotImplementedError(
f"openTEPES_Runner.run supports mode='pre-build' (Mode A) only; got mode={mode!r}. "
"Mode B (in-memory overlay) is a later PR; Mode C (hot-swap) lives in "
"openTEPES_ProblemSolvingResolve.resolve."
)

cases = list(cases)
for case in cases:
if case.overlay is not None:
raise NotImplementedError(
f"Case.overlay is reserved for Mode B (in-memory overlay) and is not supported "
f"in Mode A; case label={case.label!r} set an overlay."
)

work = [(case, solver_name, output_spec, gzip_patterns, pIndOutputResults, pIndLogConsole)
for case in cases]

if backend == "serial":
return [_run_one(*args) for args in work]

if backend == "multiprocessing":
import multiprocessing as mp
with mp.Pool(processes=n_workers) as pool:
return pool.starmap(_run_one, work)

if backend == "joblib":
try:
from joblib import Parallel, delayed
except ImportError as exc:
raise ImportError("backend='joblib' requires joblib (pip install joblib).") from exc
return Parallel(n_jobs=n_workers)(delayed(_run_one)(*args) for args in work)

raise ValueError(f"unknown backend {backend!r}; expected 'serial', 'multiprocessing', or 'joblib'.")
80 changes: 80 additions & 0 deletions tests/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from openTEPES.openTEPES import openTEPES_run
from openTEPES.openTEPES_ProblemSolvingBenders import lshaped
from openTEPES.openTEPES_ProblemSolvingResolve import resolve, overlay_scaled
from openTEPES import openTEPES_Runner, openTEPES_Cases


# === Fixture: single-stage 7-day system ===
Expand Down Expand Up @@ -374,3 +375,82 @@ def test_binary_investment(case_7d_binary, expected_cost):
print(f"Expected cost: {expected_cost:.5f}, Actual cost: {actual_cost:.5f}")

np.testing.assert_approx_equal(actual_cost, expected_cost)


# === Mode A sweep-runner tests ===
#
# openTEPES_Runner.run drives many cases through openTEPES_run with a chosen backend (Mode A,
# RFC §4.1: each worker loads its own source, builds, solves, writes). The runner never returns
# the Pyomo model — that cannot cross a process boundary — so it reads back the per-case
# openTEPES_run_status_*.json and returns one summary dict per case, in input order, regardless of
# backend. A case that raises is captured as status="error" so one bad pathway does not lose the
# rest of the sweep.
@pytest.mark.solve
@pytest.mark.parametrize("case_7d_system", ["9n"], indirect=["case_7d_system"])
def test_mode_a_runner_serial_parity(case_7d_system, tmp_path):
"""A single-case serial sweep reproduces a direct openTEPES_run cost and echoes the label."""
d = case_7d_system
base_model = openTEPES_run(d["DirName"], d["CaseName"], d["SolverName"], 0, 0)
base_cost = base_model.vTotalSCost()

records = openTEPES_Runner.run(
[openTEPES_Cases.Case(d["DirName"], d["CaseName"], out_path=str(tmp_path / "c1"), label="first")],
d["SolverName"], mode="pre-build", backend="serial",
pIndOutputResults=0, pIndLogConsole=0,
)
assert len(records) == 1
assert records[0]["status"] == "optimal"
assert records[0]["label"] == "first"
assert records[0]["total_cost_meur"] == pytest.approx(base_cost, rel=1e-6)


@pytest.mark.solve
@pytest.mark.parametrize("case_7d_system", ["9n"], indirect=["case_7d_system"])
def test_mode_a_runner_multi_case_and_error(case_7d_system, tmp_path):
"""A multi-case serial sweep preserves input order, isolates outputs, and captures a bad case."""
d = case_7d_system
cases = [
openTEPES_Cases.Case(d["DirName"], d["CaseName"], out_path=str(tmp_path / "good1"), label="good1"),
openTEPES_Cases.Case(str(tmp_path), "DOES_NOT_EXIST", out_path=str(tmp_path / "broken"), label="broken"),
openTEPES_Cases.Case(d["DirName"], d["CaseName"], out_path=str(tmp_path / "good2"), label="good2"),
]
records = openTEPES_Runner.run(cases, d["SolverName"], mode="pre-build", backend="serial",
pIndOutputResults=0, pIndLogConsole=0)

assert [r["label"] for r in records] == ["good1", "broken", "good2"]
assert records[0]["status"] == "optimal"
assert records[2]["status"] == "optimal"
assert records[1]["status"] == "error"
assert "error" in records[1]
# The two good cases solved to the same cost from independent output directories.
assert records[0]["total_cost_meur"] == pytest.approx(records[2]["total_cost_meur"], rel=1e-9)


@pytest.mark.solve
@pytest.mark.parametrize("case_7d_system", ["9n"], indirect=["case_7d_system"])
def test_mode_a_runner_multiprocessing(case_7d_system, tmp_path):
"""The multiprocessing backend returns the same per-case summaries, in input order."""
d = case_7d_system
cases = [
openTEPES_Cases.Case(d["DirName"], d["CaseName"], out_path=str(tmp_path / "w1"), label="w1"),
openTEPES_Cases.Case(d["DirName"], d["CaseName"], out_path=str(tmp_path / "w2"), label="w2"),
]
records = openTEPES_Runner.run(cases, d["SolverName"], mode="pre-build",
backend="multiprocessing", n_workers=2,
pIndOutputResults=0, pIndLogConsole=0)
assert [r["label"] for r in records] == ["w1", "w2"]
assert all(r["status"] == "optimal" for r in records)
assert records[0]["total_cost_meur"] == pytest.approx(records[1]["total_cost_meur"], rel=1e-9)


def test_mode_a_runner_guards():
"""Unsupported modes, overlays, and backends are rejected without touching a solver."""
case = openTEPES_Cases.Case("dir", "case")
with pytest.raises(NotImplementedError):
openTEPES_Runner.run([case], "highs", mode="in-memory")
with pytest.raises(NotImplementedError):
openTEPES_Runner.run([case], "highs", mode="hot-swap")
with pytest.raises(NotImplementedError):
openTEPES_Runner.run([openTEPES_Cases.Case("dir", "case", overlay={"pDemandElec": {}})], "highs")
with pytest.raises(ValueError):
openTEPES_Runner.run([case], "highs", backend="dask")
Loading