Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit b98630e

Browse files
committed
fix: set better type handler and handle both grpc.Future generator based and single grpc.Future _response_stream close methods
Signed-off-by: Casper Nielsen <casper@diagrid.io>
1 parent ef07dbd commit b98630e

2 files changed

Lines changed: 16 additions & 13 deletions

File tree

durabletask/worker.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from datetime import datetime, timedelta
1313
from threading import Event, Thread
1414
from types import GeneratorType
15-
from typing import Any, Generator, Optional, Sequence, TypeVar, Union
15+
from typing import Any, Generator, Iterator, Optional, Sequence, TypeVar, Union
1616

1717
import grpc
1818
from google.protobuf import empty_pb2
@@ -282,7 +282,7 @@ class TaskHubGrpcWorker:
282282
activity function.
283283
"""
284284

285-
_response_stream: Optional[grpc.Future] = None
285+
_response_stream: Optional[Union[Iterator[grpc.Future], grpc.Future]] = None
286286
_interceptors: Optional[list[shared.ClientInterceptor]] = None
287287

288288
def __init__(
@@ -420,9 +420,12 @@ def invalidate_connection():
420420
# Cancel the response stream first to signal the reader thread to stop
421421
if self._response_stream is not None:
422422
try:
423-
self._response_stream.cancel()
424-
except Exception:
425-
pass
423+
if hasattr(self._response_stream, "call"):
424+
self._response_stream.call.cancel() # type: ignore
425+
else:
426+
self._response_stream.cancel() # type: ignore
427+
except Exception as e:
428+
self._logger.warning(f"Error cancelling response stream: {e}")
426429
self._response_stream = None
427430

428431
# Wait for the reader thread to finish
@@ -739,13 +742,13 @@ def stop(self):
739742

740743
self._logger.info("Stopping gRPC worker...")
741744
if self._response_stream is not None:
742-
if isinstance(self._response_stream, grpc.Future):
743-
self._response_stream.cancel()
744-
elif hasattr(self._response_stream, "call"):
745-
# This is a generator returned by the gRPC stub
746-
self._response_stream.call.cancel()
747-
else:
748-
self._logger.warning("Unknown response stream type, cannot cancel directly")
745+
try:
746+
if hasattr(self._response_stream, "call"):
747+
self._response_stream.call.cancel() # type: ignore
748+
else:
749+
self._response_stream.cancel() # type: ignore
750+
except Exception as e:
751+
self._logger.warning(f"Error cancelling response stream: {e}")
749752
self._shutdown.set()
750753
# Explicitly close the gRPC channel to ensure OTel interceptors and other resources are cleaned up
751754
if self._current_channel is not None:

tests/durabletask/test_worker_stop.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def test_stop_with_unknown_stream_type(caplog):
3838
worker._response_stream = object()
3939
with caplog.at_level("WARNING"):
4040
worker.stop()
41-
assert any("Unknown response stream type" in m for m in caplog.text.splitlines())
41+
assert any("Error cancelling response stream: " in m for m in caplog.text.splitlines())
4242

4343

4444
def test_stop_with_none_stream():

0 commit comments

Comments
 (0)