From 017179d97267b0bdf3f640a1c68af792cabaec5d Mon Sep 17 00:00:00 2001 From: Caleb Hattingh Date: Wed, 8 Apr 2026 01:48:49 +0200 Subject: [PATCH 1/2] fix: handle situation where fut is cancelled before results --- deadpool.py | 30 ++++++++++++++++++++++------ tests/test_deadpool.py | 45 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 68 insertions(+), 7 deletions(-) diff --git a/deadpool.py b/deadpool.py index 38ad171..bd9e95b 100644 --- a/deadpool.py +++ b/deadpool.py @@ -557,19 +557,37 @@ def run_task(self, fn, args, kwargs, timeout, fut: Future): results = worker.get_results() except EOFError: self._statistics.tasks_failed.increment() - fut.set_exception( - ProcessError("Worker process died unexpectedly") - ) + if not fut.done(): + try: + fut.set_exception( + ProcessError( + "Worker process died unexpectedly" + ) + ) + except InvalidStateError: + pass except BaseException as e: self._statistics.tasks_failed.increment() logger.debug(f"Unexpected exception from worker: {e}") - fut.set_exception(e) + if not fut.done(): + try: + fut.set_exception(e) + except InvalidStateError: + pass else: if isinstance(results, BaseException): self._statistics.tasks_failed.increment() - fut.set_exception(results) + if not fut.done(): + try: + fut.set_exception(results) + except InvalidStateError: + pass else: - fut.set_result(results) + if not fut.done(): + try: + fut.set_result(results) + except InvalidStateError: + pass if isinstance(results, TimeoutError): self._statistics.tasks_failed.increment() diff --git a/tests/test_deadpool.py b/tests/test_deadpool.py index 242a70b..2863aae 100644 --- a/tests/test_deadpool.py +++ b/tests/test_deadpool.py @@ -7,7 +7,8 @@ import sys import time import multiprocessing as mp -from concurrent.futures import CancelledError, as_completed +import threading +from concurrent.futures import CancelledError, InvalidStateError, as_completed from contextlib import contextmanager from functools import partial @@ -634,6 +635,48 @@ def test_cancel_and_kill(): fut.result() +def delayed_error(delay=0.5): + time.sleep(delay) + raise ValueError("delayed error from worker") + + +def test_set_exception_on_cancelled_future(): + """Cancelling a future whose worker raises an exception must not + cause an InvalidStateError in the run_task thread. + + Reproduces the bug: deadpool futures stay PENDING (never set to + RUNNING), so fut.cancel() succeeds even while a worker is + processing. When the worker finishes with an exception, run_task + calls fut.set_exception() on the already-cancelled future, raising + InvalidStateError. + """ + thread_exceptions = [] + original_hook = threading.excepthook + + def capture_hook(args): + thread_exceptions.append(args) + + threading.excepthook = capture_hook + try: + exe = deadpool.Deadpool(max_workers=1) + fut = exe.submit(delayed_error, 0.5) + time.sleep(0.2) # let worker start + fut.cancel() # cancel while worker is still running + time.sleep(1.0) # let worker finish and run_task process results + exe.shutdown(wait=True) + + assert fut.cancelled() + invalid_state_errors = [ + e for e in thread_exceptions + if isinstance(e.exc_value, InvalidStateError) + ] + assert not invalid_state_errors, ( + f"InvalidStateError raised in background thread: {invalid_state_errors}" + ) + finally: + threading.excepthook = original_hook + + def test_trim_memory(): """Just testing it doesn't fail.""" deadpool.trim_memory() From e18a20f2777f93b980e047b683894994c31ddd6e Mon Sep 17 00:00:00 2001 From: Caleb Hattingh Date: Wed, 8 Apr 2026 01:57:47 +0200 Subject: [PATCH 2/2] Black formatter --- deadpool.py | 4 +--- tests/test_deadpool.py | 9 ++++----- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/deadpool.py b/deadpool.py index bd9e95b..f60128f 100644 --- a/deadpool.py +++ b/deadpool.py @@ -560,9 +560,7 @@ def run_task(self, fn, args, kwargs, timeout, fut: Future): if not fut.done(): try: fut.set_exception( - ProcessError( - "Worker process died unexpectedly" - ) + ProcessError("Worker process died unexpectedly") ) except InvalidStateError: pass diff --git a/tests/test_deadpool.py b/tests/test_deadpool.py index 2863aae..eee376f 100644 --- a/tests/test_deadpool.py +++ b/tests/test_deadpool.py @@ -667,12 +667,11 @@ def capture_hook(args): assert fut.cancelled() invalid_state_errors = [ - e for e in thread_exceptions - if isinstance(e.exc_value, InvalidStateError) + e for e in thread_exceptions if isinstance(e.exc_value, InvalidStateError) ] - assert not invalid_state_errors, ( - f"InvalidStateError raised in background thread: {invalid_state_errors}" - ) + assert ( + not invalid_state_errors + ), f"InvalidStateError raised in background thread: {invalid_state_errors}" finally: threading.excepthook = original_hook