Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions picopt/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

from confuse import Configuration, MappingTemplate
from confuse.templates import (
AttrDict,
Choice,
Integer,
Optional,
Expand Down Expand Up @@ -214,9 +213,6 @@ def get_config(
self._set_timestamps(config_program)
self.set_format_handler_map(config_program)
ad = config.get(_build_template())
if not isinstance(ad, AttrDict):
msg = f"Expected AttrDict from confuse, got {type(ad).__name__}"
raise TypeError(msg)
return _settings_from_attrdict(ad.picopt)


Expand Down
43 changes: 29 additions & 14 deletions picopt/walk/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,20 +339,25 @@ def _inflight_count(self) -> int:
+ len(self._inflight_repack)
)

def _submit_ready_job(self) -> None:
"""Pop and submit one job from the ready dequeue."""
job, node = self._ready.popleft()
# Skip jobs whose owning node got cancelled while they were queued.
if node is not None and node.state is NodeState.CANCELLED:
match job:
case UnpackJob() | RepackJob():
# node's own job — the cancel walk already decremented
# the parent counter, nothing to do.
pass
case _:
node.pending = max(0, node.pending - 1)
return
fut = self._executor.submit(job.run)
@staticmethod
def _drop_cancelled_ready_job(job: Job, node: ContainerNode) -> None:
"""
Decrement parent pending counter for a leaf of a cancelled subtree.

UnpackJob/RepackJob jobs belong to ``node`` itself; the cancel walk
already decremented the parent's pending counter for them, so they
need no further bookkeeping.
"""
match job:
case UnpackJob() | RepackJob():
pass
case _:
node.pending = max(0, node.pending - 1)

def _track_submitted_job(
self, fut: Future, job: Job, node: ContainerNode | None
) -> None:
"""Record an in-flight future under the right map and update state."""
match job:
case UnpackJob():
assert node is not None
Expand All @@ -367,6 +372,16 @@ def _submit_ready_job(self) -> None:
node.state = NodeState.REPACKING
self._inflight_repack[fut] = node

def _submit_ready_job(self) -> None:
"""Pop and submit one job from the ready dequeue."""
job, node = self._ready.popleft()
# Skip jobs whose owning node got cancelled while they were queued.
if node is not None and node.state is NodeState.CANCELLED:
self._drop_cancelled_ready_job(job, node)
return
fut = self._executor.submit(job.run)
self._track_submitted_job(fut, job, node)

def _submit_ready(self) -> None:
"""Submit ready jobs up to the backpressure cap."""
cap = 2 * self._max_workers
Expand Down