diff --git a/CHANGELOG.md b/CHANGELOG.md index 0bfca7d74..0330f51ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - Wrapped isObjIntegral() and test - Added structured_optimization_trace recipe for structured optimization progress tracking - Added methods: getPrimalDualIntegral() +- Added realtime_trace_jsonl recipe for real-time optimization progress tracking with JSONL streaming output ### Fixed - getBestSol() now returns None for infeasible problems instead of a Solution with NULL pointer - all fundamental callbacks now raise an error if not implemented diff --git a/src/pyscipopt/recipes/realtime_trace_jsonl.py b/src/pyscipopt/recipes/realtime_trace_jsonl.py new file mode 100644 index 000000000..ef8f681ab --- /dev/null +++ b/src/pyscipopt/recipes/realtime_trace_jsonl.py @@ -0,0 +1,135 @@ +import json + +from pyscipopt import SCIP_EVENTTYPE, Eventhdlr + + +class _TraceRun: + """ + Record optimization progress in real time while the solver is running. + + Args + ---- + model: pyscipopt.Model + path: str | None + - None: in-memory only + - str : also write JSONL (one JSON object per line) for streaming/real-time consumption + + Returns + ------- + None + Updates `model.data["trace"]` as a side effect. + + Usage + ----- + optimizeTrace(model) # real-time in-memory trace + optimizeTrace(model, path="trace.jsonl") # real-time JSONL stream + in-memory + optimizeNogilTrace(model, path="trace.jsonl") # nogil variant + """ + + def __init__(self, model, path=None): + self.model = model + self.path = path + self._fh = None + self._handler = None + + self._last_snapshot = {} + + def __enter__(self): + if not hasattr(self.model, "data") or self.model.data is None: + self.model.data = {} + self.model.data.setdefault("trace", []) + + if self.path is not None: + self._fh = open(self.path, "w") + + class _TraceEventhdlr(Eventhdlr): + def eventinit(s): + self.model.catchEvent(SCIP_EVENTTYPE.BESTSOLFOUND, s) + self.model.catchEvent(SCIP_EVENTTYPE.DUALBOUNDIMPROVED, s) + + def eventexec(s, event): + et = event.getType() + if et == SCIP_EVENTTYPE.BESTSOLFOUND: + snapshot = self._snapshot_now() + self._last_snapshot = snapshot + self._write_event("bestsol_found", fields=snapshot, flush=True) + elif et == SCIP_EVENTTYPE.DUALBOUNDIMPROVED: + snapshot = self._snapshot_now() + self._last_snapshot = snapshot + self._write_event( + "dualbound_improved", fields=snapshot, flush=False + ) + + self._handler = _TraceEventhdlr() + self.model.includeEventhdlr( + self._handler, "realtime_trace_jsonl", "Realtime trace jsonl handler" + ) + + return self + + def __exit__(self, exc_type, exc, tb): + fields = {} + if self._last_snapshot: + fields.update(self._last_snapshot) + + if exc_type is not None: + fields.update( + { + "status": "exception", + "exception": exc_type.__name__, + "message": str(exc) if exc is not None else None, + } + ) + + try: + self._write_event("run_end", fields=fields, flush=True) + finally: + if self._fh: + try: + self._fh.close() + finally: + self._fh = None + + if self._handler is not None: + for et in ( + SCIP_EVENTTYPE.BESTSOLFOUND, + SCIP_EVENTTYPE.DUALBOUNDIMPROVED, + ): + try: + self.model.dropEvent(et, self._handler) + except Exception: + pass + self._handler = None + + return False + + def _snapshot_now(self) -> dict: + return { + "time": self.model.getSolvingTime(), + "primalbound": self.model.getPrimalbound(), + "dualbound": self.model.getDualbound(), + "gap": self.model.getGap(), + "nodes": self.model.getNNodes(), + "nsol": self.model.getNSols(), + } + + def _write_event(self, event_type, fields=None, flush=True): + event = {"type": event_type} + if fields: + event.update(fields) + + self.model.data["trace"].append(event) + if self._fh is not None: + self._fh.write(json.dumps(event) + "\n") + if flush: + self._fh.flush() + + +def optimizeTrace(model, path=None): + with _TraceRun(model, path): + model.optimize() + + +def optimizeNogilTrace(model, path=None): + with _TraceRun(model, path): + model.optimizeNogil() diff --git a/tests/test_recipe_realtime_trace_jsonl.py b/tests/test_recipe_realtime_trace_jsonl.py new file mode 100644 index 000000000..61db00bbc --- /dev/null +++ b/tests/test_recipe_realtime_trace_jsonl.py @@ -0,0 +1,87 @@ +import json +from random import randint + +import pytest +from helpers.utils import bin_packing_model + +from pyscipopt import SCIP_EVENTTYPE, Eventhdlr +from pyscipopt.recipes.realtime_trace_jsonl import optimizeNogilTrace, optimizeTrace + + +@pytest.fixture( + params=[optimizeTrace, optimizeNogilTrace], ids=["optimize", "optimize_nogil"] +) +def optimize(request): + return request.param + + +def test_realtime_trace_in_memory(optimize): + model = bin_packing_model(sizes=[randint(1, 40) for _ in range(120)], capacity=50) + model.setParam("limits/time", 5) + + model.data = {"test": True} + + optimize(model, path=None) + + assert "test" in model.data + assert "trace" in model.data + + required_fields = {"time", "primalbound", "dualbound", "gap", "nodes", "nsol"} + + types = [r["type"] for r in model.data["trace"]] + assert ("bestsol_found" in types) or ("dualbound_improved" in types) + + for record in model.data["trace"]: + if record["type"] != "run_end": + assert required_fields <= set(record.keys()) + + primalbounds = [r["primalbound"] for r in model.data["trace"] if "primalbound" in r] + for i in range(1, len(primalbounds)): + assert primalbounds[i] <= primalbounds[i - 1] + + dualbounds = [r["dualbound"] for r in model.data["trace"] if "dualbound" in r] + for i in range(1, len(dualbounds)): + assert dualbounds[i] >= dualbounds[i - 1] + + assert "run_end" in types + + +def test_realtime_trace_file_output(optimize, tmp_path): + model = bin_packing_model(sizes=[randint(1, 40) for _ in range(120)], capacity=50) + model.setParam("limits/time", 5) + + path = tmp_path / "trace.jsonl" + + optimize(model, path=str(path)) + + assert path.exists() + + records = [json.loads(line) for line in path.read_text().splitlines()] + assert len(records) > 0 + + types = [r["type"] for r in records] + assert "run_end" in types + + +class _InterruptOnBest(Eventhdlr): + def eventinit(self): + self.model.catchEvent(SCIP_EVENTTYPE.BESTSOLFOUND, self) + + def eventexec(self, event): + self.model.interruptSolve() + + +def test_optimize_with_trace_records_run_end_on_interrupt(optimize): + model = bin_packing_model( + sizes=[randint(1, 40) for _ in range(120)], + capacity=50, + ) + model.setParam("limits/time", 5) + + model.includeEventhdlr(_InterruptOnBest(), "stopper", "Interrupt on bestsol") + + optimize(model, path=None) + + types = [r["type"] for r in model.data["trace"]] + assert "bestsol_found" in types + assert "run_end" in types