-
Notifications
You must be signed in to change notification settings - Fork 1
feat: resume runtime on fired triggers #70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,11 +12,16 @@ | |
| ) | ||
| from uipath.runtime.debug.breakpoint import UiPathBreakpointResult | ||
| from uipath.runtime.events import UiPathRuntimeEvent | ||
| from uipath.runtime.result import UiPathRuntimeResult, UiPathRuntimeStatus | ||
| from uipath.runtime.result import ( | ||
| UiPathRuntimeResult, | ||
| UiPathRuntimeStatus, | ||
| UiPathSuspensionResult, | ||
| ) | ||
| from uipath.runtime.resumable.protocols import ( | ||
| UiPathResumableStorageProtocol, | ||
| UiPathResumeTriggerProtocol, | ||
| ) | ||
| from uipath.runtime.resumable.trigger import UiPathResumeTrigger | ||
| from uipath.runtime.schema import UiPathRuntimeSchema | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
@@ -56,6 +61,7 @@ async def execute( | |
| self, | ||
| input: dict[str, Any] | None = None, | ||
| options: UiPathExecuteOptions | None = None, | ||
| fired_triggers_map: dict[str, Any] | None = None, | ||
| ) -> UiPathRuntimeResult: | ||
| """Execute with resume trigger handling. | ||
|
|
||
|
|
@@ -66,19 +72,35 @@ async def execute( | |
| Returns: | ||
| Execution result, potentially with resume trigger attached | ||
| """ | ||
| # If resuming, restore trigger from storage | ||
| # check if we are resuming | ||
| if options and options.resume: | ||
| input = await self._restore_resume_input(input) | ||
| if fired_triggers_map: | ||
| input = fired_triggers_map | ||
| else: | ||
| # restore trigger from storage | ||
| input = await self._restore_resume_input(input) | ||
|
|
||
| # Execute the delegate | ||
| result = await self.delegate.execute(input, options=options) | ||
| # If suspended, create and persist trigger | ||
| return await self._handle_suspension(result) | ||
| suspension_result = await self._handle_suspension(result) | ||
| if not suspension_result.fired_triggers_map: | ||
| return suspension_result.runtime_result | ||
|
|
||
| # some triggers are already fired, runtime can be resumed | ||
| resume_options = options or UiPathExecuteOptions(resume=True) | ||
| if not resume_options.resume: | ||
| resume_options = UiPathExecuteOptions(resume=True) | ||
| return await self.execute( | ||
| fired_triggers_map=suspension_result.fired_triggers_map, | ||
| options=resume_options, | ||
| ) | ||
|
Comment on lines
+90
to
+97
|
||
|
|
||
| async def stream( | ||
| self, | ||
| input: dict[str, Any] | None = None, | ||
| options: UiPathStreamOptions | None = None, | ||
| fired_triggers_map: dict[str, Any] | None = None, | ||
| ) -> AsyncGenerator[UiPathRuntimeEvent, None]: | ||
| """Stream with resume trigger handling. | ||
|
|
||
|
|
@@ -89,9 +111,13 @@ async def stream( | |
| Yields: | ||
| Runtime events during execution, final event is UiPathRuntimeResult | ||
| """ | ||
| # If resuming, restore trigger from storage | ||
| # check if we are resuming | ||
| if options and options.resume: | ||
| input = await self._restore_resume_input(input) | ||
| if fired_triggers_map: | ||
| input = fired_triggers_map | ||
| else: | ||
| # restore trigger from storage | ||
| input = await self._restore_resume_input(input) | ||
|
|
||
| final_result: UiPathRuntimeResult | None = None | ||
| async for event in self.delegate.stream(input, options=options): | ||
|
|
@@ -102,7 +128,21 @@ async def stream( | |
|
|
||
| # If suspended, create and persist trigger | ||
| if final_result: | ||
| yield await self._handle_suspension(final_result) | ||
| suspension_result = await self._handle_suspension(final_result) | ||
|
|
||
| if not suspension_result.fired_triggers_map: | ||
| yield suspension_result.runtime_result | ||
| return | ||
|
|
||
| # some triggers are already fired, runtime can be resumed | ||
| resume_options = options or UiPathStreamOptions(resume=True) | ||
| if not resume_options.resume: | ||
| resume_options = UiPathStreamOptions(resume=True) | ||
|
Comment on lines
+138
to
+140
|
||
| async for event in self.stream( | ||
| fired_triggers_map=suspension_result.fired_triggers_map, | ||
| options=resume_options, | ||
| ): | ||
| yield event | ||
|
Comment on lines
+137
to
+145
|
||
|
|
||
| async def _restore_resume_input( | ||
| self, input: dict[str, Any] | None | ||
|
|
@@ -142,6 +182,11 @@ async def _restore_resume_input( | |
| if not triggers: | ||
| return None | ||
|
|
||
| return await self._build_resume_map(triggers) | ||
|
|
||
| async def _build_resume_map( | ||
| self, triggers: list[UiPathResumeTrigger] | ||
| ) -> dict[str, Any]: | ||
| # Build resume map: {interrupt_id: resume_data} | ||
| resume_map: dict[str, Any] = {} | ||
| for trigger in triggers: | ||
|
|
@@ -160,18 +205,17 @@ async def _restore_resume_input( | |
|
|
||
| async def _handle_suspension( | ||
| self, result: UiPathRuntimeResult | ||
| ) -> UiPathRuntimeResult: | ||
| ) -> UiPathSuspensionResult: | ||
| """Create and persist resume trigger if execution was suspended. | ||
|
|
||
| Args: | ||
| result: The execution result to check for suspension | ||
| """ | ||
| # Only handle suspensions | ||
| if result.status != UiPathRuntimeStatus.SUSPENDED: | ||
| return result | ||
|
|
||
| if isinstance(result, UiPathBreakpointResult): | ||
| return result | ||
| # Only handle interrupt suspensions | ||
| if result.status != UiPathRuntimeStatus.SUSPENDED or isinstance( | ||
| result, UiPathBreakpointResult | ||
| ): | ||
| return UiPathSuspensionResult(runtime_result=result) | ||
|
|
||
| suspended_result = UiPathRuntimeResult( | ||
| status=UiPathRuntimeStatus.SUSPENDED, | ||
|
|
@@ -205,7 +249,17 @@ async def _handle_suspension( | |
| # Backward compatibility: set single trigger directly | ||
| suspended_result.trigger = suspended_result.triggers[0] | ||
|
|
||
| return suspended_result | ||
| # check if any trigger can be resumed | ||
| # Note: when resuming a job, orchestrator deletes all triggers associated with it, | ||
| # thus we can resume the runtime at this point without worrying a trigger may be fired 'twice' | ||
| triggers = await self.storage.get_triggers(self.runtime_id) | ||
|
|
||
| return UiPathSuspensionResult( | ||
| runtime_result=suspended_result, | ||
| fired_triggers_map=await self._build_resume_map(triggers) | ||
| if triggers | ||
| else None, | ||
| ) | ||
|
Comment on lines
+257
to
+262
|
||
|
|
||
| async def get_schema(self) -> UiPathRuntimeSchema: | ||
| """Passthrough schema from delegate runtime.""" | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -257,3 +257,192 @@ async def read_trigger_impl_3(trigger: UiPathResumeTrigger) -> dict[str, Any]: | |||||
| assert result.output["completed"] is True | ||||||
| assert "int-2" in result.output["resume_data"] | ||||||
| assert "int-3" in result.output["resume_data"] | ||||||
|
|
||||||
|
|
||||||
| @pytest.mark.asyncio | ||||||
| async def test_resumable_auto_resumes_when_triggers_already_fired(): | ||||||
| """When triggers are already fired during suspension, runtime should auto-resume.""" | ||||||
|
|
||||||
| runtime_impl = MultiTriggerMockRuntime() | ||||||
| storage = StatefulStorageMock() | ||||||
| trigger_manager = make_trigger_manager_mock() | ||||||
|
|
||||||
| read_count = {"count": 0} | ||||||
|
|
||||||
| # configure trigger manager to return triggers as already fired only on first batch | ||||||
| async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: | ||||||
| read_count["count"] += 1 | ||||||
| # first two triggers (int-1, int-2) are immediately available | ||||||
| # subsequent triggers are pending | ||||||
| if trigger.interrupt_id in ["int-1", "int-2"] and read_count["count"] <= 2: | ||||||
| return {"approved": True} | ||||||
| raise UiPathPendingTriggerError("pending") | ||||||
|
|
||||||
| trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore | ||||||
|
|
||||||
| resumable = UiPathResumableRuntime( | ||||||
| delegate=runtime_impl, | ||||||
| storage=storage, | ||||||
| trigger_manager=trigger_manager, | ||||||
| runtime_id="runtime-1", | ||||||
| ) | ||||||
|
|
||||||
| # First execution - should suspend with int-1 and int-2, but since both are | ||||||
| # already fired, it should auto-resume and suspend again with int-2 and int-3 | ||||||
| result = await resumable.execute({}) | ||||||
|
|
||||||
| # The runtime should have auto-resumed once and suspended again | ||||||
| assert result.status == UiPathRuntimeStatus.SUSPENDED | ||||||
| assert result.triggers is not None | ||||||
| assert len(result.triggers) == 2 | ||||||
| # After auto-resume, we should be at second suspension with int-2, int-3 | ||||||
| assert {t.interrupt_id for t in result.triggers} == {"int-2", "int-3"} | ||||||
|
|
||||||
| # Delegate should have been executed twice (initial + auto-resume) | ||||||
| assert runtime_impl.execution_count == 2 | ||||||
|
|
||||||
|
|
||||||
| @pytest.mark.asyncio | ||||||
| async def test_resumable_auto_resumes_partial_fired_triggers(): | ||||||
| """When only some triggers are fired during suspension, auto-resume with those.""" | ||||||
|
|
||||||
| runtime_impl = MultiTriggerMockRuntime() | ||||||
| storage = StatefulStorageMock() | ||||||
| trigger_manager = make_trigger_manager_mock() | ||||||
|
|
||||||
| # Configure trigger manager so int-1 is fired but int-2 is pending | ||||||
| async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: | ||||||
| if trigger.interrupt_id == "int-1": | ||||||
| return {"approved": True} | ||||||
| raise UiPathPendingTriggerError("still pending") | ||||||
|
|
||||||
| trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore | ||||||
|
|
||||||
| resumable = UiPathResumableRuntime( | ||||||
| delegate=runtime_impl, | ||||||
| storage=storage, | ||||||
| trigger_manager=trigger_manager, | ||||||
| runtime_id="runtime-1", | ||||||
| ) | ||||||
|
|
||||||
| # First execution - int-1 fires immediately, int-2 stays pending | ||||||
| # Should auto-resume with int-1 and suspend with int-2, int-3 | ||||||
| result = await resumable.execute({}) | ||||||
|
|
||||||
| assert result.status == UiPathRuntimeStatus.SUSPENDED | ||||||
| assert result.triggers is not None | ||||||
| # After auto-resume with int-1, should have int-2 (still pending) + int-3 (new) | ||||||
| assert {t.interrupt_id for t in result.triggers} == {"int-2", "int-3"} | ||||||
|
|
||||||
| # Verify int-1 was consumed (deleted from storage) | ||||||
| remaining_triggers = await storage.get_triggers("runtime-1") | ||||||
| assert all(t.interrupt_id != "int-1" for t in remaining_triggers) | ||||||
|
|
||||||
|
|
||||||
| @pytest.mark.asyncio | ||||||
| async def test_resumable_auto_resumes_multiple_times(): | ||||||
| """When triggers keep being fired immediately, keep auto-resuming until complete.""" | ||||||
|
|
||||||
| runtime_impl = MultiTriggerMockRuntime() | ||||||
| storage = StatefulStorageMock() | ||||||
| trigger_manager = make_trigger_manager_mock() | ||||||
|
|
||||||
| # All triggers are always immediately available | ||||||
| async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: | ||||||
| return {"approved": True} | ||||||
|
|
||||||
| trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore | ||||||
|
|
||||||
| resumable = UiPathResumableRuntime( | ||||||
| delegate=runtime_impl, | ||||||
| storage=storage, | ||||||
| trigger_manager=trigger_manager, | ||||||
| runtime_id="runtime-1", | ||||||
| ) | ||||||
|
|
||||||
| # Execute once - should auto-resume through all suspensions | ||||||
| result = await resumable.execute({}) | ||||||
|
|
||||||
| # Should complete successfully after auto-resuming twice | ||||||
| # 1st exec: suspend with int-1, int-2 -> auto-resume | ||||||
| # 2nd exec: suspend with int-2, int-3 -> auto-resume | ||||||
| # 3rd exec: complete | ||||||
| assert result.status == UiPathRuntimeStatus.SUCCESSFUL | ||||||
| assert isinstance(result.output, dict) | ||||||
| assert result.output["completed"] is True | ||||||
|
|
||||||
| # Delegate should have been executed 3 times | ||||||
| assert runtime_impl.execution_count == 3 | ||||||
|
|
||||||
|
|
||||||
| @pytest.mark.asyncio | ||||||
| async def test_resumable_stream_auto_resumes_when_triggers_fired(): | ||||||
| """Stream auto-resume when triggers are already fired.""" | ||||||
|
|
||||||
| runtime_impl = MultiTriggerMockRuntime() | ||||||
| storage = StatefulStorageMock() | ||||||
| trigger_manager = make_trigger_manager_mock() | ||||||
|
|
||||||
| read_attempts = {"count": 0} | ||||||
|
|
||||||
| # Configure int-1 to be immediately fired, int-2 pending | ||||||
| async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: | ||||||
| read_attempts["count"] += 1 | ||||||
| if trigger.interrupt_id == "int-1" and read_attempts["count"] <= 1: | ||||||
| return {"approved": True} | ||||||
| raise UiPathPendingTriggerError("pending") | ||||||
|
|
||||||
| trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore | ||||||
|
|
||||||
| resumable = UiPathResumableRuntime( | ||||||
| delegate=runtime_impl, | ||||||
| storage=storage, | ||||||
| trigger_manager=trigger_manager, | ||||||
| runtime_id="runtime-1", | ||||||
| ) | ||||||
|
|
||||||
| # Stream should auto-resume and yield final result after auto-resume | ||||||
| events = [] | ||||||
| async for event in resumable.stream({}): | ||||||
| events.append(event) | ||||||
|
|
||||||
| # Should have received exactly one final result (after auto-resume) | ||||||
| assert len(events) == 1 | ||||||
| assert isinstance(events[0], UiPathRuntimeResult) | ||||||
| assert events[0].status == UiPathRuntimeStatus.SUSPENDED | ||||||
|
|
||||||
| # Should be at second suspension (after auto-resume with int-1) | ||||||
| assert events[0].triggers is not None | ||||||
| assert {t.interrupt_id for t in events[0].triggers} == {"int-2", "int-3"} | ||||||
|
|
||||||
|
|
||||||
| @pytest.mark.asyncio | ||||||
| async def test_resumable_no_auto_resume_when_all_triggers_pending(): | ||||||
| """When all triggers are pending, should NOT auto-resume.""" | ||||||
|
|
||||||
| runtime_impl = MultiTriggerMockRuntime() | ||||||
| storage = StatefulStorageMock() | ||||||
| trigger_manager = make_trigger_manager_mock() | ||||||
|
|
||||||
| # All triggers are pending | ||||||
| async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: | ||||||
| raise UiPathPendingTriggerError("pending") | ||||||
|
|
||||||
| trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore | ||||||
|
|
||||||
| resumable = UiPathResumableRuntime( | ||||||
| delegate=runtime_impl, | ||||||
| storage=storage, | ||||||
| trigger_manager=trigger_manager, | ||||||
| runtime_id="runtime-1", | ||||||
| ) | ||||||
|
|
||||||
| # Execute - should suspend | ||||||
| result = await resumable.execute({}) | ||||||
|
|
||||||
| assert result.status == UiPathRuntimeStatus.SUSPENDED | ||||||
| assert result.triggers is not None | ||||||
| assert {t.interrupt_id for t in result.triggers} == {"int-1", "int-2"} | ||||||
|
|
||||||
| # Delegate should have been executed only once) | ||||||
|
||||||
| # Delegate should have been executed only once) | |
| # Delegate should have been executed only once. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When auto-resuming, the code replaces user-provided
optionswith a freshUiPathExecuteOptions(resume=True)ifoptions.resumeis false. This drops any other option fields (e.g.,breakpoints). Preserve existing options by copying/updatingresume=Trueinstead of creating a new instance that loses other settings.