diff --git a/src/uipath/runtime/result.py b/src/uipath/runtime/result.py index 57159c8..8697f28 100644 --- a/src/uipath/runtime/result.py +++ b/src/uipath/runtime/result.py @@ -61,3 +61,10 @@ def to_dict(self) -> dict[str, Any]: result["error"] = self.error.model_dump() return result + + +class UiPathSuspensionResult(BaseModel): + """Result of a runtime suspension.""" + + runtime_result: UiPathRuntimeResult + fired_triggers_map: dict[str, Any] | None = Field(default=None) diff --git a/src/uipath/runtime/resumable/runtime.py b/src/uipath/runtime/resumable/runtime.py index ccac3b6..5913236 100644 --- a/src/uipath/runtime/resumable/runtime.py +++ b/src/uipath/runtime/resumable/runtime.py @@ -12,11 +12,16 @@ ) from uipath.runtime.debug.breakpoint import UiPathBreakpointResult from uipath.runtime.events import UiPathRuntimeEvent -from uipath.runtime.result import UiPathRuntimeResult, UiPathRuntimeStatus +from uipath.runtime.result import ( + UiPathRuntimeResult, + UiPathRuntimeStatus, + UiPathSuspensionResult, +) from uipath.runtime.resumable.protocols import ( UiPathResumableStorageProtocol, UiPathResumeTriggerProtocol, ) +from uipath.runtime.resumable.trigger import UiPathResumeTrigger from uipath.runtime.schema import UiPathRuntimeSchema logger = logging.getLogger(__name__) @@ -56,6 +61,7 @@ async def execute( self, input: dict[str, Any] | None = None, options: UiPathExecuteOptions | None = None, + fired_triggers_map: dict[str, Any] | None = None, ) -> UiPathRuntimeResult: """Execute with resume trigger handling. @@ -66,19 +72,35 @@ async def execute( Returns: Execution result, potentially with resume trigger attached """ - # If resuming, restore trigger from storage + # check if we are resuming if options and options.resume: - input = await self._restore_resume_input(input) + if fired_triggers_map: + input = fired_triggers_map + else: + # restore trigger from storage + input = await self._restore_resume_input(input) # Execute the delegate result = await self.delegate.execute(input, options=options) # If suspended, create and persist trigger - return await self._handle_suspension(result) + suspension_result = await self._handle_suspension(result) + if not suspension_result.fired_triggers_map: + return suspension_result.runtime_result + + # some triggers are already fired, runtime can be resumed + resume_options = options or UiPathExecuteOptions(resume=True) + if not resume_options.resume: + resume_options = UiPathExecuteOptions(resume=True) + return await self.execute( + fired_triggers_map=suspension_result.fired_triggers_map, + options=resume_options, + ) async def stream( self, input: dict[str, Any] | None = None, options: UiPathStreamOptions | None = None, + fired_triggers_map: dict[str, Any] | None = None, ) -> AsyncGenerator[UiPathRuntimeEvent, None]: """Stream with resume trigger handling. @@ -89,9 +111,13 @@ async def stream( Yields: Runtime events during execution, final event is UiPathRuntimeResult """ - # If resuming, restore trigger from storage + # check if we are resuming if options and options.resume: - input = await self._restore_resume_input(input) + if fired_triggers_map: + input = fired_triggers_map + else: + # restore trigger from storage + input = await self._restore_resume_input(input) final_result: UiPathRuntimeResult | None = None async for event in self.delegate.stream(input, options=options): @@ -102,7 +128,21 @@ async def stream( # If suspended, create and persist trigger if final_result: - yield await self._handle_suspension(final_result) + suspension_result = await self._handle_suspension(final_result) + + if not suspension_result.fired_triggers_map: + yield suspension_result.runtime_result + return + + # some triggers are already fired, runtime can be resumed + resume_options = options or UiPathStreamOptions(resume=True) + if not resume_options.resume: + resume_options = UiPathStreamOptions(resume=True) + async for event in self.stream( + fired_triggers_map=suspension_result.fired_triggers_map, + options=resume_options, + ): + yield event async def _restore_resume_input( self, input: dict[str, Any] | None @@ -142,6 +182,11 @@ async def _restore_resume_input( if not triggers: return None + return await self._build_resume_map(triggers) + + async def _build_resume_map( + self, triggers: list[UiPathResumeTrigger] + ) -> dict[str, Any]: # Build resume map: {interrupt_id: resume_data} resume_map: dict[str, Any] = {} for trigger in triggers: @@ -160,18 +205,17 @@ async def _restore_resume_input( async def _handle_suspension( self, result: UiPathRuntimeResult - ) -> UiPathRuntimeResult: + ) -> UiPathSuspensionResult: """Create and persist resume trigger if execution was suspended. Args: result: The execution result to check for suspension """ - # Only handle suspensions - if result.status != UiPathRuntimeStatus.SUSPENDED: - return result - - if isinstance(result, UiPathBreakpointResult): - return result + # Only handle interrupt suspensions + if result.status != UiPathRuntimeStatus.SUSPENDED or isinstance( + result, UiPathBreakpointResult + ): + return UiPathSuspensionResult(runtime_result=result) suspended_result = UiPathRuntimeResult( status=UiPathRuntimeStatus.SUSPENDED, @@ -205,7 +249,17 @@ async def _handle_suspension( # Backward compatibility: set single trigger directly suspended_result.trigger = suspended_result.triggers[0] - return suspended_result + # check if any trigger can be resumed + # Note: when resuming a job, orchestrator deletes all triggers associated with it, + # thus we can resume the runtime at this point without worrying a trigger may be fired 'twice' + triggers = await self.storage.get_triggers(self.runtime_id) + + return UiPathSuspensionResult( + runtime_result=suspended_result, + fired_triggers_map=await self._build_resume_map(triggers) + if triggers + else None, + ) async def get_schema(self) -> UiPathRuntimeSchema: """Passthrough schema from delegate runtime.""" diff --git a/tests/test_resumable.py b/tests/test_resumable.py index ab3398d..6da2350 100644 --- a/tests/test_resumable.py +++ b/tests/test_resumable.py @@ -257,3 +257,192 @@ async def read_trigger_impl_3(trigger: UiPathResumeTrigger) -> dict[str, Any]: assert result.output["completed"] is True assert "int-2" in result.output["resume_data"] assert "int-3" in result.output["resume_data"] + + +@pytest.mark.asyncio +async def test_resumable_auto_resumes_when_triggers_already_fired(): + """When triggers are already fired during suspension, runtime should auto-resume.""" + + runtime_impl = MultiTriggerMockRuntime() + storage = StatefulStorageMock() + trigger_manager = make_trigger_manager_mock() + + read_count = {"count": 0} + + # configure trigger manager to return triggers as already fired only on first batch + async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: + read_count["count"] += 1 + # first two triggers (int-1, int-2) are immediately available + # subsequent triggers are pending + if trigger.interrupt_id in ["int-1", "int-2"] and read_count["count"] <= 2: + return {"approved": True} + raise UiPathPendingTriggerError("pending") + + trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore + + resumable = UiPathResumableRuntime( + delegate=runtime_impl, + storage=storage, + trigger_manager=trigger_manager, + runtime_id="runtime-1", + ) + + # First execution - should suspend with int-1 and int-2, but since both are + # already fired, it should auto-resume and suspend again with int-2 and int-3 + result = await resumable.execute({}) + + # The runtime should have auto-resumed once and suspended again + assert result.status == UiPathRuntimeStatus.SUSPENDED + assert result.triggers is not None + assert len(result.triggers) == 2 + # After auto-resume, we should be at second suspension with int-2, int-3 + assert {t.interrupt_id for t in result.triggers} == {"int-2", "int-3"} + + # Delegate should have been executed twice (initial + auto-resume) + assert runtime_impl.execution_count == 2 + + +@pytest.mark.asyncio +async def test_resumable_auto_resumes_partial_fired_triggers(): + """When only some triggers are fired during suspension, auto-resume with those.""" + + runtime_impl = MultiTriggerMockRuntime() + storage = StatefulStorageMock() + trigger_manager = make_trigger_manager_mock() + + # Configure trigger manager so int-1 is fired but int-2 is pending + async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: + if trigger.interrupt_id == "int-1": + return {"approved": True} + raise UiPathPendingTriggerError("still pending") + + trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore + + resumable = UiPathResumableRuntime( + delegate=runtime_impl, + storage=storage, + trigger_manager=trigger_manager, + runtime_id="runtime-1", + ) + + # First execution - int-1 fires immediately, int-2 stays pending + # Should auto-resume with int-1 and suspend with int-2, int-3 + result = await resumable.execute({}) + + assert result.status == UiPathRuntimeStatus.SUSPENDED + assert result.triggers is not None + # After auto-resume with int-1, should have int-2 (still pending) + int-3 (new) + assert {t.interrupt_id for t in result.triggers} == {"int-2", "int-3"} + + # Verify int-1 was consumed (deleted from storage) + remaining_triggers = await storage.get_triggers("runtime-1") + assert all(t.interrupt_id != "int-1" for t in remaining_triggers) + + +@pytest.mark.asyncio +async def test_resumable_auto_resumes_multiple_times(): + """When triggers keep being fired immediately, keep auto-resuming until complete.""" + + runtime_impl = MultiTriggerMockRuntime() + storage = StatefulStorageMock() + trigger_manager = make_trigger_manager_mock() + + # All triggers are always immediately available + async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: + return {"approved": True} + + trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore + + resumable = UiPathResumableRuntime( + delegate=runtime_impl, + storage=storage, + trigger_manager=trigger_manager, + runtime_id="runtime-1", + ) + + # Execute once - should auto-resume through all suspensions + result = await resumable.execute({}) + + # Should complete successfully after auto-resuming twice + # 1st exec: suspend with int-1, int-2 -> auto-resume + # 2nd exec: suspend with int-2, int-3 -> auto-resume + # 3rd exec: complete + assert result.status == UiPathRuntimeStatus.SUCCESSFUL + assert isinstance(result.output, dict) + assert result.output["completed"] is True + + # Delegate should have been executed 3 times + assert runtime_impl.execution_count == 3 + + +@pytest.mark.asyncio +async def test_resumable_stream_auto_resumes_when_triggers_fired(): + """Stream auto-resume when triggers are already fired.""" + + runtime_impl = MultiTriggerMockRuntime() + storage = StatefulStorageMock() + trigger_manager = make_trigger_manager_mock() + + read_attempts = {"count": 0} + + # Configure int-1 to be immediately fired, int-2 pending + async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: + read_attempts["count"] += 1 + if trigger.interrupt_id == "int-1" and read_attempts["count"] <= 1: + return {"approved": True} + raise UiPathPendingTriggerError("pending") + + trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore + + resumable = UiPathResumableRuntime( + delegate=runtime_impl, + storage=storage, + trigger_manager=trigger_manager, + runtime_id="runtime-1", + ) + + # Stream should auto-resume and yield final result after auto-resume + events = [] + async for event in resumable.stream({}): + events.append(event) + + # Should have received exactly one final result (after auto-resume) + assert len(events) == 1 + assert isinstance(events[0], UiPathRuntimeResult) + assert events[0].status == UiPathRuntimeStatus.SUSPENDED + + # Should be at second suspension (after auto-resume with int-1) + assert events[0].triggers is not None + assert {t.interrupt_id for t in events[0].triggers} == {"int-2", "int-3"} + + +@pytest.mark.asyncio +async def test_resumable_no_auto_resume_when_all_triggers_pending(): + """When all triggers are pending, should NOT auto-resume.""" + + runtime_impl = MultiTriggerMockRuntime() + storage = StatefulStorageMock() + trigger_manager = make_trigger_manager_mock() + + # All triggers are pending + async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: + raise UiPathPendingTriggerError("pending") + + trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore + + resumable = UiPathResumableRuntime( + delegate=runtime_impl, + storage=storage, + trigger_manager=trigger_manager, + runtime_id="runtime-1", + ) + + # Execute - should suspend + result = await resumable.execute({}) + + assert result.status == UiPathRuntimeStatus.SUSPENDED + assert result.triggers is not None + assert {t.interrupt_id for t in result.triggers} == {"int-1", "int-2"} + + # Delegate should have been executed only once) + assert runtime_impl.execution_count == 1