Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions deadpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,19 +557,35 @@ def run_task(self, fn, args, kwargs, timeout, fut: Future):
results = worker.get_results()
except EOFError:
self._statistics.tasks_failed.increment()
fut.set_exception(
ProcessError("Worker process died unexpectedly")
)
if not fut.done():
try:
fut.set_exception(
ProcessError("Worker process died unexpectedly")
)
except InvalidStateError:
pass
except BaseException as e:
self._statistics.tasks_failed.increment()
logger.debug(f"Unexpected exception from worker: {e}")
fut.set_exception(e)
if not fut.done():
try:
fut.set_exception(e)
except InvalidStateError:
pass
else:
if isinstance(results, BaseException):
self._statistics.tasks_failed.increment()
fut.set_exception(results)
if not fut.done():
try:
fut.set_exception(results)
except InvalidStateError:
pass
else:
fut.set_result(results)
if not fut.done():
try:
fut.set_result(results)
except InvalidStateError:
pass

if isinstance(results, TimeoutError):
self._statistics.tasks_failed.increment()
Expand Down
44 changes: 43 additions & 1 deletion tests/test_deadpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import sys
import time
import multiprocessing as mp
from concurrent.futures import CancelledError, as_completed
import threading
from concurrent.futures import CancelledError, InvalidStateError, as_completed
from contextlib import contextmanager
from functools import partial

Expand Down Expand Up @@ -634,6 +635,47 @@ def test_cancel_and_kill():
fut.result()


def delayed_error(delay=0.5):
time.sleep(delay)
raise ValueError("delayed error from worker")


def test_set_exception_on_cancelled_future():
"""Cancelling a future whose worker raises an exception must not
cause an InvalidStateError in the run_task thread.

Reproduces the bug: deadpool futures stay PENDING (never set to
RUNNING), so fut.cancel() succeeds even while a worker is
processing. When the worker finishes with an exception, run_task
calls fut.set_exception() on the already-cancelled future, raising
InvalidStateError.
"""
thread_exceptions = []
original_hook = threading.excepthook

def capture_hook(args):
thread_exceptions.append(args)

threading.excepthook = capture_hook
try:
exe = deadpool.Deadpool(max_workers=1)
fut = exe.submit(delayed_error, 0.5)
time.sleep(0.2) # let worker start
fut.cancel() # cancel while worker is still running
time.sleep(1.0) # let worker finish and run_task process results
exe.shutdown(wait=True)

assert fut.cancelled()
invalid_state_errors = [
e for e in thread_exceptions if isinstance(e.exc_value, InvalidStateError)
]
assert (
not invalid_state_errors
), f"InvalidStateError raised in background thread: {invalid_state_errors}"
finally:
threading.excepthook = original_hook


def test_trim_memory():
"""Just testing it doesn't fail."""
deadpool.trim_memory()
Expand Down
Loading