diff --git a/deadpool.py b/deadpool.py index 38ad171..f60128f 100644 --- a/deadpool.py +++ b/deadpool.py @@ -557,19 +557,35 @@ def run_task(self, fn, args, kwargs, timeout, fut: Future): results = worker.get_results() except EOFError: self._statistics.tasks_failed.increment() - fut.set_exception( - ProcessError("Worker process died unexpectedly") - ) + if not fut.done(): + try: + fut.set_exception( + ProcessError("Worker process died unexpectedly") + ) + except InvalidStateError: + pass except BaseException as e: self._statistics.tasks_failed.increment() logger.debug(f"Unexpected exception from worker: {e}") - fut.set_exception(e) + if not fut.done(): + try: + fut.set_exception(e) + except InvalidStateError: + pass else: if isinstance(results, BaseException): self._statistics.tasks_failed.increment() - fut.set_exception(results) + if not fut.done(): + try: + fut.set_exception(results) + except InvalidStateError: + pass else: - fut.set_result(results) + if not fut.done(): + try: + fut.set_result(results) + except InvalidStateError: + pass if isinstance(results, TimeoutError): self._statistics.tasks_failed.increment() diff --git a/tests/test_deadpool.py b/tests/test_deadpool.py index 242a70b..eee376f 100644 --- a/tests/test_deadpool.py +++ b/tests/test_deadpool.py @@ -7,7 +7,8 @@ import sys import time import multiprocessing as mp -from concurrent.futures import CancelledError, as_completed +import threading +from concurrent.futures import CancelledError, InvalidStateError, as_completed from contextlib import contextmanager from functools import partial @@ -634,6 +635,47 @@ def test_cancel_and_kill(): fut.result() +def delayed_error(delay=0.5): + time.sleep(delay) + raise ValueError("delayed error from worker") + + +def test_set_exception_on_cancelled_future(): + """Cancelling a future whose worker raises an exception must not + cause an InvalidStateError in the run_task thread. + + Reproduces the bug: deadpool futures stay PENDING (never set to + RUNNING), so fut.cancel() succeeds even while a worker is + processing. When the worker finishes with an exception, run_task + calls fut.set_exception() on the already-cancelled future, raising + InvalidStateError. + """ + thread_exceptions = [] + original_hook = threading.excepthook + + def capture_hook(args): + thread_exceptions.append(args) + + threading.excepthook = capture_hook + try: + exe = deadpool.Deadpool(max_workers=1) + fut = exe.submit(delayed_error, 0.5) + time.sleep(0.2) # let worker start + fut.cancel() # cancel while worker is still running + time.sleep(1.0) # let worker finish and run_task process results + exe.shutdown(wait=True) + + assert fut.cancelled() + invalid_state_errors = [ + e for e in thread_exceptions if isinstance(e.exc_value, InvalidStateError) + ] + assert ( + not invalid_state_errors + ), f"InvalidStateError raised in background thread: {invalid_state_errors}" + finally: + threading.excepthook = original_hook + + def test_trim_memory(): """Just testing it doesn't fail.""" deadpool.trim_memory()