Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit 1a30848

Browse files
committed
fix: address feedback
Signed-off-by: Samantha Coyle <sam@diagrid.io>
1 parent 1ff4521 commit 1a30848

1 file changed

Lines changed: 9 additions & 11 deletions

File tree

durabletask/worker.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -460,11 +460,10 @@ def stream_reader():
460460
except Exception as stream_error:
461461
# Check if this is due to shutdown
462462
if self._shutdown.is_set():
463-
self._logger.info(f"Stream reader: exception during shutdown: {type(stream_error).__name__}: {stream_error}")
464-
break
463+
self._logger.debug(f"Stream reader: exception during shutdown: {type(stream_error).__name__}: {stream_error}")
465464
# Other stream errors - put in queue for async loop to handle
466465
self._logger.warning(f"Stream reader: unexpected error: {stream_error}")
467-
break
466+
raise
468467

469468
except Exception as e:
470469
if not self._shutdown.is_set():
@@ -499,9 +498,6 @@ def stream_reader():
499498
if work_item == SHUTDOWN_SENTINEL:
500499
break
501500

502-
if self._shutdown.is_set():
503-
self._logger.debug("Shutdown detected, ignoring work item")
504-
break
505501
if self._async_worker_manager._shutdown or loop.is_closed():
506502
self._logger.debug("Async worker manager shut down or loop closed, exiting work item processing")
507503
break
@@ -618,7 +614,6 @@ def stream_reader():
618614
self._logger.warning(f"Unexpected error: {ex}")
619615
invalidate_connection()
620616
self._logger.info("No longer listening for work items")
621-
self._async_worker_manager.shutdown()
622617

623618
# Cancel worker_task to ensure shutdown completes even if tasks are still running
624619
worker_task.cancel()
@@ -725,8 +720,9 @@ def _execute_orchestrator(
725720

726721
try:
727722
stub.CompleteOrchestratorTask(res)
728-
except grpc.RpcError as rpc_error: # type: ignore
729-
self._handle_grpc_execution_error(rpc_error, "orchestrator")
723+
except grpc.RpcError:
724+
# except grpc.RpcError as rpc_error: # type: ignore
725+
raise # self._handle_grpc_execution_error(rpc_error, "orchestrator")
730726
except Exception as ex:
731727
self._logger.exception(
732728
f"Failed to deliver orchestrator response for '{req.instanceId}' to sidecar: {ex}"
@@ -758,8 +754,10 @@ def _execute_activity(
758754

759755
try:
760756
stub.CompleteActivityTask(res)
761-
except grpc.RpcError as rpc_error: # type: ignore
762-
self._handle_grpc_execution_error(rpc_error, "activity")
757+
# except grpc.RpcError as rpc_error: # type: ignore
758+
# self._handle_grpc_execution_error(rpc_error, "activity")
759+
except grpc.RpcError:
760+
raise
763761
except Exception as ex:
764762
self._logger.exception(
765763
f"Failed to deliver activity response for '{req.name}#{req.taskId}' of orchestration ID '{instance_id}' to sidecar: {ex}"

0 commit comments

Comments
 (0)