Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit 1ff4521

Browse files
committed
fix: add graceful shutdown to response_stream thread
Signed-off-by: Samantha Coyle <sam@diagrid.io>
1 parent 51bfe1c commit 1ff4521

1 file changed

Lines changed: 95 additions & 15 deletions

File tree

durabletask/worker.py

Lines changed: 95 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -363,12 +363,7 @@ def invalidate_connection():
363363

364364
# Wait for the reader thread to finish
365365
if current_reader_thread is not None:
366-
try:
367-
current_reader_thread.join(timeout=2)
368-
if current_reader_thread.is_alive():
369-
self._logger.warning("Stream reader thread did not shut down gracefully")
370-
except Exception:
371-
pass
366+
current_reader_thread.join(timeout=1)
372367
current_reader_thread = None
373368

374369
# Close the channel
@@ -434,20 +429,46 @@ def stream_reader():
434429
stream = self._response_stream
435430
if stream is None:
436431
return
432+
433+
# Use next() to allow shutdown check between items
434+
# This matches Go's pattern: check ctx.Err() after each stream.Recv()
437435
while True:
438436
if self._shutdown.is_set():
439437
break
440-
438+
441439
try:
440+
# NOTE: next(stream) blocks until gRPC returns the next work item or cancels the stream.
441+
# There is no way to interrupt this blocking call in Python gRPC. When shutdown is
442+
# initiated, the channel closure propagates to this call, which can take several seconds.
443+
# The thread will exit once gRPC raises grpc.RpcError with StatusCode.CANCELLED.
442444
work_item = next(stream)
445+
# Check shutdown again after getting item (in case shutdown happened during next())
446+
if self._shutdown.is_set():
447+
break
443448
work_item_queue.put(work_item)
444449
except StopIteration:
445450
# stream ended naturally
446451
break
447-
except Exception as e:
448-
work_item_queue.put(e)
452+
except grpc.RpcError as rpc_error:
453+
# Check if this is due to shutdown/cancellation
454+
if self._shutdown.is_set() or rpc_error.code() == grpc.StatusCode.CANCELLED:
455+
self._logger.debug(f"Stream reader: stream cancelled during shutdown (code={rpc_error.code()})")
456+
break
457+
# Other RPC errors - put in queue for async loop to handle
458+
self._logger.warning(f"Stream reader: RPC error (code={rpc_error.code()}): {rpc_error}")
459+
break
460+
except Exception as stream_error:
461+
# Check if this is due to shutdown
462+
if self._shutdown.is_set():
463+
self._logger.info(f"Stream reader: exception during shutdown: {type(stream_error).__name__}: {stream_error}")
464+
break
465+
# Other stream errors - put in queue for async loop to handle
466+
self._logger.warning(f"Stream reader: unexpected error: {stream_error}")
467+
break
468+
449469
except Exception as e:
450-
work_item_queue.put(e)
470+
if not self._shutdown.is_set():
471+
work_item_queue.put(e)
451472
finally:
452473
# signal that the stream reader is done (ie matching Go's context cancellation)
453474
try:
@@ -477,6 +498,13 @@ def stream_reader():
477498
# Essentially check for ctx.Done() in Go
478499
if work_item == SHUTDOWN_SENTINEL:
479500
break
501+
502+
if self._shutdown.is_set():
503+
self._logger.debug("Shutdown detected, ignoring work item")
504+
break
505+
if self._async_worker_manager._shutdown or loop.is_closed():
506+
self._logger.debug("Async worker manager shut down or loop closed, exiting work item processing")
507+
break
480508
if isinstance(work_item, Exception):
481509
raise work_item
482510
request_type = work_item.WhichOneof("request")
@@ -504,11 +532,25 @@ def stream_reader():
504532
except grpc.RpcError:
505533
raise # let it be captured/parsed by outer except and avoid noisy log
506534
except Exception as e:
507-
self._logger.warning(f"Error in work item stream: {e}")
535+
if self._async_worker_manager._shutdown or loop.is_closed():
536+
break
537+
invalidate_connection()
508538
raise e
509539
current_reader_thread.join(timeout=1)
510-
self._logger.info("Work item stream ended normally")
540+
541+
if self._shutdown.is_set():
542+
self._logger.info(f"Disconnected from {self._host_address}")
543+
else:
544+
self._logger.info("Work item stream ended normally")
545+
# When stream ends (SHUTDOWN_SENTINEL received), always break outer loop
546+
# The stream reader has exited, so we should exit too, not reconnect
547+
# This matches Go SDK behavior where stream ending causes the listener to exit
548+
break
511549
except grpc.RpcError as rpc_error:
550+
# Check shutdown first - if shutting down, exit immediately
551+
if self._shutdown.is_set():
552+
self._logger.debug("Shutdown detected during RPC error handling, exiting")
553+
break
512554
should_invalidate = should_invalidate_connection(rpc_error)
513555
if should_invalidate:
514556
invalidate_connection()
@@ -539,11 +581,41 @@ def stream_reader():
539581
self._logger.warning(
540582
f"Application-level gRPC error ({error_code}): {rpc_error}"
541583
)
542-
self._shutdown.wait(1)
584+
except RuntimeError as ex:
585+
# RuntimeError often indicates asyncio loop issues (e.g., "cannot schedule new futures after shutdown")
586+
# Check shutdown state first
587+
if self._shutdown.is_set():
588+
self._logger.debug(f"Shutdown detected during RuntimeError handling, exiting: {ex}")
589+
break
590+
# Check if async worker manager is shut down or loop is closed
591+
try:
592+
loop = asyncio.get_running_loop()
593+
if self._async_worker_manager._shutdown or loop.is_closed():
594+
self._logger.debug(f"Async worker manager shut down or loop closed, exiting: {ex}")
595+
break
596+
except RuntimeError:
597+
# No event loop running, treat as shutdown
598+
self._logger.debug(f"No event loop running, exiting: {ex}")
599+
break
600+
# If we can't get the loop or it's in a bad state, and we got a RuntimeError,
601+
# it's likely shutdown-related. Break to prevent infinite retries.
602+
break
543603
except Exception as ex:
604+
if self._shutdown.is_set():
605+
self._logger.debug(f"Shutdown detected during exception handling, exiting: {ex}")
606+
break
607+
# Check if async worker manager is shut down or loop is closed
608+
try:
609+
loop = asyncio.get_running_loop()
610+
if self._async_worker_manager._shutdown or loop.is_closed():
611+
self._logger.debug(f"Async worker manager shut down or loop closed, exiting: {ex}")
612+
break
613+
except RuntimeError:
614+
# No event loop running, treat as shutdown
615+
self._logger.debug(f"No event loop running, exiting: {ex}")
616+
break
544617
invalidate_connection()
545618
self._logger.warning(f"Unexpected error: {ex}")
546-
self._shutdown.wait(1)
547619
invalidate_connection()
548620
self._logger.info("No longer listening for work items")
549621
self._async_worker_manager.shutdown()
@@ -566,9 +638,9 @@ def stop(self):
566638
return
567639

568640
self._logger.info("Stopping gRPC worker...")
569-
self._shutdown.set()
570641
if self._response_stream is not None:
571642
self._response_stream.cancel()
643+
self._shutdown.set()
572644
# Explicitly close the gRPC channel to ensure OTel interceptors and other resources are cleaned up
573645
if self._current_channel is not None:
574646
try:
@@ -1517,8 +1589,12 @@ def __init__(self, concurrency_options: ConcurrencyOptions):
15171589

15181590
def _ensure_queues_for_current_loop(self):
15191591
"""Ensure queues are bound to the current event loop."""
1592+
if self._shutdown:
1593+
return
15201594
try:
15211595
current_loop = asyncio.get_running_loop()
1596+
if current_loop.is_closed():
1597+
return
15221598
except RuntimeError:
15231599
# No event loop running, can't create queues
15241600
return
@@ -1658,6 +1734,8 @@ async def _run_func(self, func, *args, **kwargs):
16581734
return result
16591735

16601736
def submit_activity(self, func, *args, **kwargs):
1737+
if self._shutdown:
1738+
return
16611739
work_item = (func, args, kwargs)
16621740
self._ensure_queues_for_current_loop()
16631741
if self.activity_queue is not None:
@@ -1667,6 +1745,8 @@ def submit_activity(self, func, *args, **kwargs):
16671745
self._pending_activity_work.append(work_item)
16681746

16691747
def submit_orchestration(self, func, *args, **kwargs):
1748+
if self._shutdown:
1749+
return
16701750
work_item = (func, args, kwargs)
16711751
self._ensure_queues_for_current_loop()
16721752
if self.orchestration_queue is not None:

0 commit comments

Comments
 (0)