Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/uipath/runtime/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,10 @@ def to_dict(self) -> dict[str, Any]:
result["error"] = self.error.model_dump()

return result


class UiPathSuspensionResult(BaseModel):
"""Result of a runtime suspension."""

runtime_result: UiPathRuntimeResult
fired_triggers_map: dict[str, Any] | None = Field(default=None)
84 changes: 69 additions & 15 deletions src/uipath/runtime/resumable/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@
)
from uipath.runtime.debug.breakpoint import UiPathBreakpointResult
from uipath.runtime.events import UiPathRuntimeEvent
from uipath.runtime.result import UiPathRuntimeResult, UiPathRuntimeStatus
from uipath.runtime.result import (
UiPathRuntimeResult,
UiPathRuntimeStatus,
UiPathSuspensionResult,
)
from uipath.runtime.resumable.protocols import (
UiPathResumableStorageProtocol,
UiPathResumeTriggerProtocol,
)
from uipath.runtime.resumable.trigger import UiPathResumeTrigger
from uipath.runtime.schema import UiPathRuntimeSchema

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -56,6 +61,7 @@ async def execute(
self,
input: dict[str, Any] | None = None,
options: UiPathExecuteOptions | None = None,
fired_triggers_map: dict[str, Any] | None = None,
) -> UiPathRuntimeResult:
"""Execute with resume trigger handling.

Expand All @@ -66,19 +72,35 @@ async def execute(
Returns:
Execution result, potentially with resume trigger attached
"""
# If resuming, restore trigger from storage
# check if we are resuming
if options and options.resume:
input = await self._restore_resume_input(input)
if fired_triggers_map:
input = fired_triggers_map
else:
# restore trigger from storage
input = await self._restore_resume_input(input)

# Execute the delegate
result = await self.delegate.execute(input, options=options)
# If suspended, create and persist trigger
return await self._handle_suspension(result)
suspension_result = await self._handle_suspension(result)
if not suspension_result.fired_triggers_map:
return suspension_result.runtime_result

# some triggers are already fired, runtime can be resumed
resume_options = options or UiPathExecuteOptions(resume=True)
if not resume_options.resume:
resume_options = UiPathExecuteOptions(resume=True)
return await self.execute(
Comment on lines +91 to +94
Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When auto-resuming, the code replaces user-provided options with a fresh UiPathExecuteOptions(resume=True) if options.resume is false. This drops any other option fields (e.g., breakpoints). Preserve existing options by copying/updating resume=True instead of creating a new instance that loses other settings.

Copilot uses AI. Check for mistakes.
fired_triggers_map=suspension_result.fired_triggers_map,
options=resume_options,
)
Comment on lines +90 to +97
Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Auto-resume is implemented via recursive self.execute(...) calls with no guardrail. If the delegate keeps returning SUSPENDED while triggers appear “fired” immediately (or if there’s a logic loop), this can lead to unbounded recursion and eventually RecursionError/stack growth. Consider rewriting as an iterative loop and/or enforcing a maximum auto-resume depth / progress check.

Copilot uses AI. Check for mistakes.

async def stream(
self,
input: dict[str, Any] | None = None,
options: UiPathStreamOptions | None = None,
fired_triggers_map: dict[str, Any] | None = None,
) -> AsyncGenerator[UiPathRuntimeEvent, None]:
"""Stream with resume trigger handling.

Expand All @@ -89,9 +111,13 @@ async def stream(
Yields:
Runtime events during execution, final event is UiPathRuntimeResult
"""
# If resuming, restore trigger from storage
# check if we are resuming
if options and options.resume:
input = await self._restore_resume_input(input)
if fired_triggers_map:
input = fired_triggers_map
else:
# restore trigger from storage
input = await self._restore_resume_input(input)

final_result: UiPathRuntimeResult | None = None
async for event in self.delegate.stream(input, options=options):
Expand All @@ -102,7 +128,21 @@ async def stream(

# If suspended, create and persist trigger
if final_result:
yield await self._handle_suspension(final_result)
suspension_result = await self._handle_suspension(final_result)

if not suspension_result.fired_triggers_map:
yield suspension_result.runtime_result
return

# some triggers are already fired, runtime can be resumed
resume_options = options or UiPathStreamOptions(resume=True)
if not resume_options.resume:
resume_options = UiPathStreamOptions(resume=True)
Comment on lines +138 to +140
Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When auto-resuming in stream(), the code constructs a new UiPathStreamOptions(resume=True) if options.resume is false, which drops any other option fields (e.g., breakpoints). Preserve the caller’s options by copying/updating resume=True rather than replacing the options object.

Copilot uses AI. Check for mistakes.
async for event in self.stream(
fired_triggers_map=suspension_result.fired_triggers_map,
options=resume_options,
):
yield event
Comment on lines +137 to +145
Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Auto-resume in stream() uses recursive calls to self.stream(...) with no depth/progress guard. A run that repeatedly suspends with triggers that immediately appear fired can recurse indefinitely and grow the call stack. Consider an iterative loop and/or a maximum auto-resume depth similar to execute().

Copilot uses AI. Check for mistakes.

async def _restore_resume_input(
self, input: dict[str, Any] | None
Expand Down Expand Up @@ -142,6 +182,11 @@ async def _restore_resume_input(
if not triggers:
return None

return await self._build_resume_map(triggers)

async def _build_resume_map(
self, triggers: list[UiPathResumeTrigger]
) -> dict[str, Any]:
# Build resume map: {interrupt_id: resume_data}
resume_map: dict[str, Any] = {}
for trigger in triggers:
Expand All @@ -160,18 +205,17 @@ async def _restore_resume_input(

async def _handle_suspension(
self, result: UiPathRuntimeResult
) -> UiPathRuntimeResult:
) -> UiPathSuspensionResult:
"""Create and persist resume trigger if execution was suspended.

Args:
result: The execution result to check for suspension
"""
# Only handle suspensions
if result.status != UiPathRuntimeStatus.SUSPENDED:
return result

if isinstance(result, UiPathBreakpointResult):
return result
# Only handle interrupt suspensions
if result.status != UiPathRuntimeStatus.SUSPENDED or isinstance(
result, UiPathBreakpointResult
):
return UiPathSuspensionResult(runtime_result=result)

suspended_result = UiPathRuntimeResult(
status=UiPathRuntimeStatus.SUSPENDED,
Expand Down Expand Up @@ -205,7 +249,17 @@ async def _handle_suspension(
# Backward compatibility: set single trigger directly
suspended_result.trigger = suspended_result.triggers[0]

return suspended_result
# check if any trigger can be resumed
# Note: when resuming a job, orchestrator deletes all triggers associated with it,
# thus we can resume the runtime at this point without worrying a trigger may be fired 'twice'
triggers = await self.storage.get_triggers(self.runtime_id)

return UiPathSuspensionResult(
runtime_result=suspended_result,
fired_triggers_map=await self._build_resume_map(triggers)
if triggers
else None,
)
Comment on lines +257 to +262
Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_handle_suspension() now calls _build_resume_map() during suspension to decide whether to auto-resume. UiPathResumeTriggerReaderProtocol.read_trigger() is documented to return None when no data is available; treat that as still pending. Ensure _build_resume_map() does not delete triggers / include {interrupt_id: None} when read_trigger() returns None, otherwise triggers can be prematurely consumed and the runtime can auto-resume with missing data.

Copilot uses AI. Check for mistakes.

async def get_schema(self) -> UiPathRuntimeSchema:
"""Passthrough schema from delegate runtime."""
Expand Down
189 changes: 189 additions & 0 deletions tests/test_resumable.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,192 @@ async def read_trigger_impl_3(trigger: UiPathResumeTrigger) -> dict[str, Any]:
assert result.output["completed"] is True
assert "int-2" in result.output["resume_data"]
assert "int-3" in result.output["resume_data"]


@pytest.mark.asyncio
async def test_resumable_auto_resumes_when_triggers_already_fired():
"""When triggers are already fired during suspension, runtime should auto-resume."""

runtime_impl = MultiTriggerMockRuntime()
storage = StatefulStorageMock()
trigger_manager = make_trigger_manager_mock()

read_count = {"count": 0}

# configure trigger manager to return triggers as already fired only on first batch
async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]:
read_count["count"] += 1
# first two triggers (int-1, int-2) are immediately available
# subsequent triggers are pending
if trigger.interrupt_id in ["int-1", "int-2"] and read_count["count"] <= 2:
return {"approved": True}
raise UiPathPendingTriggerError("pending")

trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore

resumable = UiPathResumableRuntime(
delegate=runtime_impl,
storage=storage,
trigger_manager=trigger_manager,
runtime_id="runtime-1",
)

# First execution - should suspend with int-1 and int-2, but since both are
# already fired, it should auto-resume and suspend again with int-2 and int-3
result = await resumable.execute({})

# The runtime should have auto-resumed once and suspended again
assert result.status == UiPathRuntimeStatus.SUSPENDED
assert result.triggers is not None
assert len(result.triggers) == 2
# After auto-resume, we should be at second suspension with int-2, int-3
assert {t.interrupt_id for t in result.triggers} == {"int-2", "int-3"}

# Delegate should have been executed twice (initial + auto-resume)
assert runtime_impl.execution_count == 2


@pytest.mark.asyncio
async def test_resumable_auto_resumes_partial_fired_triggers():
"""When only some triggers are fired during suspension, auto-resume with those."""

runtime_impl = MultiTriggerMockRuntime()
storage = StatefulStorageMock()
trigger_manager = make_trigger_manager_mock()

# Configure trigger manager so int-1 is fired but int-2 is pending
async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]:
if trigger.interrupt_id == "int-1":
return {"approved": True}
raise UiPathPendingTriggerError("still pending")

trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore

resumable = UiPathResumableRuntime(
delegate=runtime_impl,
storage=storage,
trigger_manager=trigger_manager,
runtime_id="runtime-1",
)

# First execution - int-1 fires immediately, int-2 stays pending
# Should auto-resume with int-1 and suspend with int-2, int-3
result = await resumable.execute({})

assert result.status == UiPathRuntimeStatus.SUSPENDED
assert result.triggers is not None
# After auto-resume with int-1, should have int-2 (still pending) + int-3 (new)
assert {t.interrupt_id for t in result.triggers} == {"int-2", "int-3"}

# Verify int-1 was consumed (deleted from storage)
remaining_triggers = await storage.get_triggers("runtime-1")
assert all(t.interrupt_id != "int-1" for t in remaining_triggers)


@pytest.mark.asyncio
async def test_resumable_auto_resumes_multiple_times():
"""When triggers keep being fired immediately, keep auto-resuming until complete."""

runtime_impl = MultiTriggerMockRuntime()
storage = StatefulStorageMock()
trigger_manager = make_trigger_manager_mock()

# All triggers are always immediately available
async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]:
return {"approved": True}

trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore

resumable = UiPathResumableRuntime(
delegate=runtime_impl,
storage=storage,
trigger_manager=trigger_manager,
runtime_id="runtime-1",
)

# Execute once - should auto-resume through all suspensions
result = await resumable.execute({})

# Should complete successfully after auto-resuming twice
# 1st exec: suspend with int-1, int-2 -> auto-resume
# 2nd exec: suspend with int-2, int-3 -> auto-resume
# 3rd exec: complete
assert result.status == UiPathRuntimeStatus.SUCCESSFUL
assert isinstance(result.output, dict)
assert result.output["completed"] is True

# Delegate should have been executed 3 times
assert runtime_impl.execution_count == 3


@pytest.mark.asyncio
async def test_resumable_stream_auto_resumes_when_triggers_fired():
"""Stream auto-resume when triggers are already fired."""

runtime_impl = MultiTriggerMockRuntime()
storage = StatefulStorageMock()
trigger_manager = make_trigger_manager_mock()

read_attempts = {"count": 0}

# Configure int-1 to be immediately fired, int-2 pending
async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]:
read_attempts["count"] += 1
if trigger.interrupt_id == "int-1" and read_attempts["count"] <= 1:
return {"approved": True}
raise UiPathPendingTriggerError("pending")

trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore

resumable = UiPathResumableRuntime(
delegate=runtime_impl,
storage=storage,
trigger_manager=trigger_manager,
runtime_id="runtime-1",
)

# Stream should auto-resume and yield final result after auto-resume
events = []
async for event in resumable.stream({}):
events.append(event)

# Should have received exactly one final result (after auto-resume)
assert len(events) == 1
assert isinstance(events[0], UiPathRuntimeResult)
assert events[0].status == UiPathRuntimeStatus.SUSPENDED

# Should be at second suspension (after auto-resume with int-1)
assert events[0].triggers is not None
assert {t.interrupt_id for t in events[0].triggers} == {"int-2", "int-3"}


@pytest.mark.asyncio
async def test_resumable_no_auto_resume_when_all_triggers_pending():
"""When all triggers are pending, should NOT auto-resume."""

runtime_impl = MultiTriggerMockRuntime()
storage = StatefulStorageMock()
trigger_manager = make_trigger_manager_mock()

# All triggers are pending
async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]:
raise UiPathPendingTriggerError("pending")

trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore

resumable = UiPathResumableRuntime(
delegate=runtime_impl,
storage=storage,
trigger_manager=trigger_manager,
runtime_id="runtime-1",
)

# Execute - should suspend
result = await resumable.execute({})

assert result.status == UiPathRuntimeStatus.SUSPENDED
assert result.triggers is not None
assert {t.interrupt_id for t in result.triggers} == {"int-1", "int-2"}

# Delegate should have been executed only once)
Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra closing parenthesis in comment: “Delegate should have been executed only once)”.

Suggested change
# Delegate should have been executed only once)
# Delegate should have been executed only once.

Copilot uses AI. Check for mistakes.
assert runtime_impl.execution_count == 1
Loading