diff --git a/picopt/config/__init__.py b/picopt/config/__init__.py index d3d3cde..ac16fca 100644 --- a/picopt/config/__init__.py +++ b/picopt/config/__init__.py @@ -16,7 +16,6 @@ from confuse import Configuration, MappingTemplate from confuse.templates import ( - AttrDict, Choice, Integer, Optional, @@ -214,9 +213,6 @@ def get_config( self._set_timestamps(config_program) self.set_format_handler_map(config_program) ad = config.get(_build_template()) - if not isinstance(ad, AttrDict): - msg = f"Expected AttrDict from confuse, got {type(ad).__name__}" - raise TypeError(msg) return _settings_from_attrdict(ad.picopt) diff --git a/picopt/walk/scheduler.py b/picopt/walk/scheduler.py index f2a8bed..eb8a31b 100644 --- a/picopt/walk/scheduler.py +++ b/picopt/walk/scheduler.py @@ -339,20 +339,25 @@ def _inflight_count(self) -> int: + len(self._inflight_repack) ) - def _submit_ready_job(self) -> None: - """Pop and submit one job from the ready dequeue.""" - job, node = self._ready.popleft() - # Skip jobs whose owning node got cancelled while they were queued. - if node is not None and node.state is NodeState.CANCELLED: - match job: - case UnpackJob() | RepackJob(): - # node's own job — the cancel walk already decremented - # the parent counter, nothing to do. - pass - case _: - node.pending = max(0, node.pending - 1) - return - fut = self._executor.submit(job.run) + @staticmethod + def _drop_cancelled_ready_job(job: Job, node: ContainerNode) -> None: + """ + Decrement parent pending counter for a leaf of a cancelled subtree. + + UnpackJob/RepackJob jobs belong to ``node`` itself; the cancel walk + already decremented the parent's pending counter for them, so they + need no further bookkeeping. + """ + match job: + case UnpackJob() | RepackJob(): + pass + case _: + node.pending = max(0, node.pending - 1) + + def _track_submitted_job( + self, fut: Future, job: Job, node: ContainerNode | None + ) -> None: + """Record an in-flight future under the right map and update state.""" match job: case UnpackJob(): assert node is not None @@ -367,6 +372,16 @@ def _submit_ready_job(self) -> None: node.state = NodeState.REPACKING self._inflight_repack[fut] = node + def _submit_ready_job(self) -> None: + """Pop and submit one job from the ready dequeue.""" + job, node = self._ready.popleft() + # Skip jobs whose owning node got cancelled while they were queued. + if node is not None and node.state is NodeState.CANCELLED: + self._drop_cancelled_ready_job(job, node) + return + fut = self._executor.submit(job.run) + self._track_submitted_job(fut, job, node) + def _submit_ready(self) -> None: """Submit ready jobs up to the backpressure cap.""" cap = 2 * self._max_workers