From 5523b65b17ec77aafe88cb5cb099b7b18927e0a0 Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Tue, 27 Jan 2026 15:31:44 +0100 Subject: [PATCH 1/7] python tutorial for workflow versioning Signed-off-by: Albert Callarisa --- .../challenges-tips/versioning_workflow.py | 64 ----------- .../workflow/python/versioning/README.md | 104 ++++++++++++++++++ .../workflow/python/versioning/dapr.yaml | 11 ++ tutorials/workflow/python/versioning/makefile | 2 + .../python/versioning/versioning.http | 11 ++ .../python/versioning/versioning/app.py | 43 ++++++++ .../versioning/named_versioned_workflow.py | 23 ++++ .../versioning/patching_workflow.py | 20 ++++ .../versioning/versioning/requirements.txt | 4 + .../python/versioning/versioning/runtime.py | 3 + 10 files changed, 221 insertions(+), 64 deletions(-) delete mode 100644 tutorials/workflow/python/challenges-tips/versioning_workflow.py create mode 100644 tutorials/workflow/python/versioning/README.md create mode 100644 tutorials/workflow/python/versioning/dapr.yaml create mode 100644 tutorials/workflow/python/versioning/makefile create mode 100644 tutorials/workflow/python/versioning/versioning.http create mode 100644 tutorials/workflow/python/versioning/versioning/app.py create mode 100644 tutorials/workflow/python/versioning/versioning/named_versioned_workflow.py create mode 100644 tutorials/workflow/python/versioning/versioning/patching_workflow.py create mode 100644 tutorials/workflow/python/versioning/versioning/requirements.txt create mode 100644 tutorials/workflow/python/versioning/versioning/runtime.py diff --git a/tutorials/workflow/python/challenges-tips/versioning_workflow.py b/tutorials/workflow/python/challenges-tips/versioning_workflow.py deleted file mode 100644 index 6637325e7..000000000 --- a/tutorials/workflow/python/challenges-tips/versioning_workflow.py +++ /dev/null @@ -1,64 +0,0 @@ -import dapr.ext.workflow as wf - -wf_runtime = wf.WorkflowRuntime() - -""" -This is the initial version of the workflow. -Note that the input argument for both activities is the orderItem (string). -""" -@wf_runtime.workflow(name='versioning_workflow_1') -def versioning_workflow_1(ctx: wf.DaprWorkflowContext, order_item: str): - result_a = yield ctx.call_activity(activity_a, input=order_item) - result_b = yield ctx.call_activity(activity_b, input=order_item) - - return result_a + result_b - -@wf_runtime.activity(name='activity_a') -def activity_a(ctx: wf.WorkflowActivityContext, order_item: str) -> int: - """ - This activity processes the order item and returns an integer result. - """ - print(f'activity_a: Received input: {order_item}.', flush=True) - return 10 - -@wf_runtime.activity(name='activity_b') -def activity_b(ctx: wf.WorkflowActivityContext, order_item: str) -> int: - """ - This activity processes the order item and returns another integer result. - """ - print(f'activity_b: Received input: {order_item}.', flush=True) - return 20 - -""" -This is the updated version of the workflow. -The input for activity_b has changed from order_item (string) to result_a (int). -If there are in-flight workflow instances that were started with the previous version -of this workflow, these will fail when the new version of the workflow is deployed -and the workflow name remains the same, since the runtime parameters do not match with the persisted state. -It is recommended to version workflows by creating a new workflow class with a new name: -{workflowname}_1 -> {workflowname}_2 -Try to avoid making breaking changes in perpetual workflows (that use the `continue_as_new` method) -since these are difficult to replace with a new version. -""" -@wf_runtime.workflow(name='versioning_workflow_2') -def versioning_workflow_2(ctx: wf.DaprWorkflowContext, order_item: str): - result_a = yield ctx.call_activity(activity_a, input=order_item) - result_b = yield ctx.call_activity(activity_b, input=result_a) - - return result_a + result_b - -@wf_runtime.activity(name='activity_a') -def activity_a(ctx: wf.WorkflowActivityContext, order_item: str) -> int: - """ - This activity processes the order item and returns an integer result. - """ - print(f'activity_a: Received input: {order_item}.', flush=True) - return 10 - -@wf_runtime.activity(name='activity_b') -def activity_b(ctx: wf.WorkflowActivityContext, number: int) -> int: - """ - This activity processes a number and returns another integer result. - """ - print(f'activity_b: Received input: {number}.', flush=True) - return number + 10 \ No newline at end of file diff --git a/tutorials/workflow/python/versioning/README.md b/tutorials/workflow/python/versioning/README.md new file mode 100644 index 000000000..b3b6c28cd --- /dev/null +++ b/tutorials/workflow/python/versioning/README.md @@ -0,0 +1,104 @@ +# Versioning Workflows + +This tutorial demonstrates how to version your workflows. For more information about workflow versioning see the [Dapr docs](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-features-concepts/#versioning). + +## Inspect the patching workflow code + +Open the `patching_workflow.py` file in the `tutorials/workflow/python/versioning/versioning` folder. This file contains the definition for the workflow. + +```mermaid +graph LR + SW((Start Workflow)) + A1[activity1] + A2[activity2] + P1[patch1 patched?]@{shape: diamond} + EW((End + Workflow)) + SW --> P1 + P1 --> A1 + P1 --> A2 + A1 --> EW + A2 --> EW +``` + +The workflow starts by evaluating whether the `patch1` patch is patched. If it is, the workflow will continue to the `activity1` activity. If it is not, the workflow will continue to the `activity2` activity. + +## Inspect the named versioned workflow code + +Also open the `named_versioned_workflow.py` file in the `tutorials/workflow/python/versioning/versioning` folder. This file contains the definition for the named versioned workflow. + +```mermaid +graph LR + SW1((Start Version 1)) + SW2((Start Version 2)) + A1[activity1] + A2[activity2] + EW1((End Workflow)) + EW2((End Workflow)) + SW1 --> A1 + SW2 --> A2 + A1 --> EW1 + A2 --> EW2 +``` + +In this case, two versions of the workflow are defined. The first version starts with the `activity1` activity and the second version starts with the `activity2` activity. + +The wokflow engine will always execute the latest version of the workflow. Older versions of the workflow will be only executed for workflows that were started with such version. + + +## Run the tutorial + +1. Use a terminal to navigate to the `tutorials/workflow/python/versioning/versioning` folder. +2. Install the dependencies using pip: + + ```bash + pip3 install -r requirements.txt + ``` + +3. Navigate back one level to the `versioning` folder and use the Dapr CLI to run the Dapr Multi-App run file + + + ```bash + dapr run -f . + ``` + + +4. Use the POST request in the [`versioning.http`](./versioning.http) file to start the workflow, or use this cURL command: + + ```bash + curl -i --request POST http://localhost:5263/start + ``` + + The input for the workflow is a string with the value `This`. The expected app logs are as follows: + + ```text + == APP - versioning == activity1: Received input: This. + == APP - versioning == activity2: Received input: This is. + == APP - versioning == activity3: Received input: This is task. + ``` + +5. Use the GET request in the [`versioning.http`](./versioning.http) file to get the status of the workflow, or use this cURL command: + + ```bash + curl --request GET --url http://localhost:3561/v1.0/workflows/dapr/ + ``` + + Where `` is the workflow instance ID you received in the `instance_id` property in the previous step. + + The expected serialized output of the workflow is: + + ```txt + "\"This is task versioning\"" + ``` + +6. Stop the Dapr Multi-App run process by pressing `Ctrl+C`. diff --git a/tutorials/workflow/python/versioning/dapr.yaml b/tutorials/workflow/python/versioning/dapr.yaml new file mode 100644 index 000000000..c07f1fc15 --- /dev/null +++ b/tutorials/workflow/python/versioning/dapr.yaml @@ -0,0 +1,11 @@ +version: 1 +common: + resourcesPath: ../../resources +apps: + - appID: versioning + appDirPath: versioning + appPort: 5263 + daprHTTPPort: 3561 + command: ["python3", "app.py"] + appLogDestination: console + daprdLogDestination: console diff --git a/tutorials/workflow/python/versioning/makefile b/tutorials/workflow/python/versioning/makefile new file mode 100644 index 000000000..f67239779 --- /dev/null +++ b/tutorials/workflow/python/versioning/makefile @@ -0,0 +1,2 @@ +include ../../../../docker.mk +include ../../../../validate.mk \ No newline at end of file diff --git a/tutorials/workflow/python/versioning/versioning.http b/tutorials/workflow/python/versioning/versioning.http new file mode 100644 index 000000000..e75730566 --- /dev/null +++ b/tutorials/workflow/python/versioning/versioning.http @@ -0,0 +1,11 @@ +@apphost=http://localhost:5263 + +### Start the task_chaining workflow +# @name startWorkflowRequest +POST {{ apphost }}/start + + +@instanceId={{startWorkflowRequest.response.body.instance_id}} +@daprHost=http://localhost:3561 +### Get the workflow status +GET {{ daprHost }}/v1.0/workflows/dapr/{{ instanceId }} diff --git a/tutorials/workflow/python/versioning/versioning/app.py b/tutorials/workflow/python/versioning/versioning/app.py new file mode 100644 index 000000000..669cfdd5c --- /dev/null +++ b/tutorials/workflow/python/versioning/versioning/app.py @@ -0,0 +1,43 @@ +from fastapi import FastAPI, status +from contextlib import asynccontextmanager +from runtime import wf_runtime +from patching_workflow import patching_workflow +from named_versioned_workflow import named_versioned_workflow +import dapr.ext.workflow as wf +import uvicorn + +@asynccontextmanager +async def lifespan(app: FastAPI): + wf_runtime.start() + yield + wf_runtime.shutdown() + +app = FastAPI(lifespan=lifespan) + +@app.post("/start", status_code=status.HTTP_202_ACCEPTED) +async def start_workflow(): + wf_client = wf.DaprWorkflowClient() + patching_instance_id = wf_client.schedule_new_workflow( + workflow=patching_workflow, + input="input" + + ) + named_versioned_instance_id = wf_client.schedule_new_workflow( + workflow=named_versioned_workflow, + input="input" + ) + return { + "workflows_instances": [ + { + "instance_id": patching_instance_id, + "workflow_name": "patching_workflow" + }, + { + "instance_id": named_versioned_instance_id, + "workflow_name": "named_versioned_workflow" + } + ] + } + +if __name__ == "__main__": + uvicorn.run(app, host="0.0.0.0", port=5263) diff --git a/tutorials/workflow/python/versioning/versioning/named_versioned_workflow.py b/tutorials/workflow/python/versioning/versioning/named_versioned_workflow.py new file mode 100644 index 000000000..23975b190 --- /dev/null +++ b/tutorials/workflow/python/versioning/versioning/named_versioned_workflow.py @@ -0,0 +1,23 @@ +import dapr.ext.workflow as wf + +from runtime import wf_runtime + +@wf_runtime.versioned_workflow(name='named_versioned_workflow', is_latest=False) +def named_versioned_workflow(ctx: wf.DaprWorkflowContext, wf_input: str): + result = yield ctx.call_activity(named_versioned_activity_1, input=wf_input) + return result + +@wf_runtime.versioned_workflow(name='named_versioned_workflow', is_latest=True) +def named_versioned_workflow_fixed(ctx: wf.DaprWorkflowContext, wf_input: str): + result = yield ctx.call_activity(named_versioned_activity_2, input=wf_input) + return result + +@wf_runtime.activity +def named_versioned_activity_1(ctx: wf.WorkflowActivityContext, act_input: str) -> str: + print(f'named_versioned_activity_1: Received input: {act_input}.', flush=True) + return f"{act_input} is" + +@wf_runtime.activity +def named_versioned_activity_2(ctx: wf.WorkflowActivityContext, act_input: str) -> str: + print(f'named_versioned_activity_2: Received input: {act_input}.', flush=True) + return f"{act_input} task" diff --git a/tutorials/workflow/python/versioning/versioning/patching_workflow.py b/tutorials/workflow/python/versioning/versioning/patching_workflow.py new file mode 100644 index 000000000..e0f247a41 --- /dev/null +++ b/tutorials/workflow/python/versioning/versioning/patching_workflow.py @@ -0,0 +1,20 @@ +import dapr.ext.workflow as wf +from runtime import wf_runtime + +@wf_runtime.workflow +def patching_workflow(ctx: wf.DaprWorkflowContext, wf_input: str): + if ctx.is_patched("patch1"): + result = yield ctx.call_activity(patching_workflow_activity_2, input=wf_input) + else: + result = yield ctx.call_activity(patching_workflow_activity_1, input=wf_input) + return result + +@wf_runtime.activity +def patching_workflow_activity_1(ctx: wf.WorkflowActivityContext, act_input: str) -> str: + print(f'patching_workflow_activity_1: Received input: {act_input}.', flush=True) + return f"activity1: {act_input}" + +@wf_runtime.activity +def patching_workflow_activity_2(ctx: wf.WorkflowActivityContext, act_input: str) -> str: + print(f'patching_workflow_activity_2: Received input: {act_input}.', flush=True) + return f"activity2: {act_input}" diff --git a/tutorials/workflow/python/versioning/versioning/requirements.txt b/tutorials/workflow/python/versioning/versioning/requirements.txt new file mode 100644 index 000000000..d8ca0a7c7 --- /dev/null +++ b/tutorials/workflow/python/versioning/versioning/requirements.txt @@ -0,0 +1,4 @@ +dapr>=1.17.0rc3 +dapr-ext-workflow>=1.17.0rc3 +fastapi>=0.115.0 +uvicorn>=0.34.2 diff --git a/tutorials/workflow/python/versioning/versioning/runtime.py b/tutorials/workflow/python/versioning/versioning/runtime.py new file mode 100644 index 000000000..1503b7425 --- /dev/null +++ b/tutorials/workflow/python/versioning/versioning/runtime.py @@ -0,0 +1,3 @@ +import dapr.ext.workflow as wf + +wf_runtime = wf.WorkflowRuntime() From 5d56177964ffa3655a3d1752e582b00b4ae22182 Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Tue, 27 Jan 2026 15:35:18 +0100 Subject: [PATCH 2/7] Fixed versioning readme with actual output of the app. Signed-off-by: Albert Callarisa --- tutorials/workflow/python/challenges-tips/README.md | 1 - tutorials/workflow/python/versioning/README.md | 5 ++--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/tutorials/workflow/python/challenges-tips/README.md b/tutorials/workflow/python/challenges-tips/README.md index e06dfe172..de6432570 100644 --- a/tutorials/workflow/python/challenges-tips/README.md +++ b/tutorials/workflow/python/challenges-tips/README.md @@ -6,5 +6,4 @@ This section provides some tips with code snippets to understand the limitations - [Deterministic workflows](deterministic_workflow.py) - [Idempotent activities](idempotent_activity.py) -- [Versioning workflows](versioning_workflow.py) - [Workflow & activity payload size](payload_size_workflow.py) diff --git a/tutorials/workflow/python/versioning/README.md b/tutorials/workflow/python/versioning/README.md index b3b6c28cd..eb344ab71 100644 --- a/tutorials/workflow/python/versioning/README.md +++ b/tutorials/workflow/python/versioning/README.md @@ -82,9 +82,8 @@ The wokflow engine will always execute the latest version of the workflow. Older The input for the workflow is a string with the value `This`. The expected app logs are as follows: ```text - == APP - versioning == activity1: Received input: This. - == APP - versioning == activity2: Received input: This is. - == APP - versioning == activity3: Received input: This is task. + == APP - versioning == patching_workflow_activity_2: Received input: input. + == APP - versioning == named_versioned_activity_2: Received input: input. ``` 5. Use the GET request in the [`versioning.http`](./versioning.http) file to get the status of the workflow, or use this cURL command: From 7e8dfc0f1934af80fcd2156a9118563e9ffb7664 Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Tue, 27 Jan 2026 15:51:45 +0100 Subject: [PATCH 3/7] Adjusted examples Signed-off-by: Albert Callarisa --- tutorials/workflow/python/versioning/README.md | 8 ++++---- .../workflow/python/versioning/versioning.http | 13 ++++++++++--- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/tutorials/workflow/python/versioning/README.md b/tutorials/workflow/python/versioning/README.md index eb344ab71..9987f4b21 100644 --- a/tutorials/workflow/python/versioning/README.md +++ b/tutorials/workflow/python/versioning/README.md @@ -21,7 +21,7 @@ graph LR A2 --> EW ``` -The workflow starts by evaluating whether the `patch1` patch is patched. If it is, the workflow will continue to the `activity1` activity. If it is not, the workflow will continue to the `activity2` activity. +The workflow starts by evaluating whether the `patch1` patch is patched. If it is, the workflow will continue to the `activity2` activity. If it is not, the workflow will continue to the `activity1` activity. ## Inspect the named versioned workflow code @@ -79,7 +79,7 @@ The wokflow engine will always execute the latest version of the workflow. Older curl -i --request POST http://localhost:5263/start ``` - The input for the workflow is a string with the value `This`. The expected app logs are as follows: + The input for the workflow is a string with the value `input`. The expected app logs are as follows: ```text == APP - versioning == patching_workflow_activity_2: Received input: input. @@ -94,10 +94,10 @@ The wokflow engine will always execute the latest version of the workflow. Older Where `` is the workflow instance ID you received in the `instance_id` property in the previous step. - The expected serialized output of the workflow is: + The expected serialized output for the named versioned workflow is: ```txt - "\"This is task versioning\"" + "\"input task\"" ``` 6. Stop the Dapr Multi-App run process by pressing `Ctrl+C`. diff --git a/tutorials/workflow/python/versioning/versioning.http b/tutorials/workflow/python/versioning/versioning.http index e75730566..dfc0e1903 100644 --- a/tutorials/workflow/python/versioning/versioning.http +++ b/tutorials/workflow/python/versioning/versioning.http @@ -1,11 +1,18 @@ @apphost=http://localhost:5263 -### Start the task_chaining workflow +### Start the versioning workflows # @name startWorkflowRequest POST {{ apphost }}/start -@instanceId={{startWorkflowRequest.response.body.instance_id}} +@instanceId={{startWorkflowRequest.response.body.workflows_instances[0].instance_id}} @daprHost=http://localhost:3561 -### Get the workflow status +### Get the patching workflow status GET {{ daprHost }}/v1.0/workflows/dapr/{{ instanceId }} + + +@instanceId={{startWorkflowRequest.response.body.workflows_instances[1].instance_id}} +@daprHost=http://localhost:3561 +### Get the named versioned workflow status +GET {{ daprHost }}/v1.0/workflows/dapr/{{ instanceId }} + From 6244b3f13b94c301a8aea9e63a72432f786b003a Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Tue, 10 Feb 2026 17:06:35 +0100 Subject: [PATCH 4/7] Removed mechanical markdown Signed-off-by: Albert Callarisa --- tutorials/workflow/python/versioning/README.md | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/tutorials/workflow/python/versioning/README.md b/tutorials/workflow/python/versioning/README.md index 9987f4b21..9c65c5b6a 100644 --- a/tutorials/workflow/python/versioning/README.md +++ b/tutorials/workflow/python/versioning/README.md @@ -57,21 +57,9 @@ The wokflow engine will always execute the latest version of the workflow. Older 3. Navigate back one level to the `versioning` folder and use the Dapr CLI to run the Dapr Multi-App run file - ```bash dapr run -f . ``` - 4. Use the POST request in the [`versioning.http`](./versioning.http) file to start the workflow, or use this cURL command: From c9445433b4032b41a410f9564e57adfa034effb5 Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Thu, 12 Feb 2026 14:50:57 +0100 Subject: [PATCH 5/7] Redesigned python tutorial for workflow versioning Signed-off-by: Albert Callarisa --- .../workflow/python/versioning/README.md | 88 +++++++++++-------- .../workflow/python/versioning/dapr.yaml | 2 +- .../python/versioning/versioning.http | 13 +-- .../versioning/after_named_version.py | 49 +++++++++++ .../versioning/versioning/after_patching.py | 48 ++++++++++ .../python/versioning/versioning/app.py | 43 --------- .../python/versioning/versioning/before.py | 40 +++++++++ .../versioning/named_versioned_workflow.py | 23 ----- .../versioning/patching_workflow.py | 20 ----- .../python/versioning/versioning/runtime.py | 3 - 10 files changed, 190 insertions(+), 139 deletions(-) create mode 100644 tutorials/workflow/python/versioning/versioning/after_named_version.py create mode 100644 tutorials/workflow/python/versioning/versioning/after_patching.py delete mode 100644 tutorials/workflow/python/versioning/versioning/app.py create mode 100644 tutorials/workflow/python/versioning/versioning/before.py delete mode 100644 tutorials/workflow/python/versioning/versioning/named_versioned_workflow.py delete mode 100644 tutorials/workflow/python/versioning/versioning/patching_workflow.py delete mode 100644 tutorials/workflow/python/versioning/versioning/runtime.py diff --git a/tutorials/workflow/python/versioning/README.md b/tutorials/workflow/python/versioning/README.md index 9c65c5b6a..47212582c 100644 --- a/tutorials/workflow/python/versioning/README.md +++ b/tutorials/workflow/python/versioning/README.md @@ -2,49 +2,58 @@ This tutorial demonstrates how to version your workflows. For more information about workflow versioning see the [Dapr docs](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-features-concepts/#versioning). -## Inspect the patching workflow code +This tutorial uses a scenario where a `notify_user` workflow initially sends an email to users. The workflow is then updated to send an SMS instead. Two versioning strategies are demonstrated: **named versions** and **patching**. -Open the `patching_workflow.py` file in the `tutorials/workflow/python/versioning/versioning` folder. This file contains the definition for the workflow. +## Inspect the original workflow + +Open the `before.py` file in the `tutorials/workflow/python/versioning/versioning` folder. This file contains the original workflow definition that sends an email to notify a user. ```mermaid graph LR SW((Start Workflow)) - A1[activity1] - A2[activity2] - P1[patch1 patched?]@{shape: diamond} - EW((End - Workflow)) - SW --> P1 - P1 --> A1 - P1 --> A2 - A1 --> EW - A2 --> EW + SE[send_email] + EW((End Workflow)) + SW --> SE --> EW ``` -The workflow starts by evaluating whether the `patch1` patch is patched. If it is, the workflow will continue to the `activity2` activity. If it is not, the workflow will continue to the `activity1` activity. - -## Inspect the named versioned workflow code +## Inspect the named versioned workflow -Also open the `named_versioned_workflow.py` file in the `tutorials/workflow/python/versioning/versioning` folder. This file contains the definition for the named versioned workflow. +Open the `after_named_version.py` file. This file shows how to migrate the workflow using named versions. A new version of the workflow (`notify_user`) is created that calls `send_sms`, while the original version is kept as `notify_user_v1` for any in-flight workflow instances. ```mermaid graph LR - SW1((Start Version 1)) - SW2((Start Version 2)) - A1[activity1] - A2[activity2] + SW1((notify_user + old)) + SW2((notify_user + new)) + SE[send_email] + SS[send_sms] EW1((End Workflow)) EW2((End Workflow)) - SW1 --> A1 - SW2 --> A2 - A1 --> EW1 - A2 --> EW2 + SW1 --> SE --> EW1 + SW2 --> SS --> EW2 ``` -In this case, two versions of the workflow are defined. The first version starts with the `activity1` activity and the second version starts with the `activity2` activity. +The workflow engine will always execute the latest version of the workflow for new instances. Older versions will only be executed for workflows that were already in-flight when the update was deployed. + +## Inspect the patching workflow + +Open the `after_patching.py` file. This file shows how to migrate the workflow using patching. The `ctx.is_patched("send_sms")` check determines whether the workflow should execute the new `send_sms` activity or the original `send_email` activity. -The wokflow engine will always execute the latest version of the workflow. Older versions of the workflow will be only executed for workflows that were started with such version. +```mermaid +graph LR + SW((Start Workflow)) + P{is_patched + send_sms?} + SE[send_email] + SS[send_sms] + EW((End Workflow)) + SW --> P + P -- Yes --> SS --> EW + P -- No --> SE --> EW +``` +New workflow instances will execute the patched code path (`send_sms`), while in-flight workflows will continue using the original code path (`send_email`). ## Run the tutorial @@ -55,26 +64,33 @@ The wokflow engine will always execute the latest version of the workflow. Older pip3 install -r requirements.txt ``` -3. Navigate back one level to the `versioning` folder and use the Dapr CLI to run the Dapr Multi-App run file +3. Navigate back one level to the `versioning` folder. The `dapr.yaml` file is configured to run `before.py` by default. To try a versioned workflow, update the `command` field in `dapr.yaml` to run `after_named_version.py` or `after_patching.py` instead. + +4. Use the Dapr CLI to run the application: ```bash dapr run -f . ``` -4. Use the POST request in the [`versioning.http`](./versioning.http) file to start the workflow, or use this cURL command: +5. Use the POST request in the [`versioning.http`](./versioning.http) file to start the workflow, or use this cURL command: ```bash curl -i --request POST http://localhost:5263/start ``` - The input for the workflow is a string with the value `input`. The expected app logs are as follows: + When running `before.py`, the expected app logs are: + + ```text + == APP - versioning == send_email: Received input: user_id. + ``` + + When running `after_named_version.py` or `after_patching.py`, the expected app logs are: ```text - == APP - versioning == patching_workflow_activity_2: Received input: input. - == APP - versioning == named_versioned_activity_2: Received input: input. + == APP - versioning == send_sms: Received input: user_id. ``` -5. Use the GET request in the [`versioning.http`](./versioning.http) file to get the status of the workflow, or use this cURL command: +6. Use the GET request in the [`versioning.http`](./versioning.http) file to get the status of the workflow, or use this cURL command: ```bash curl --request GET --url http://localhost:3561/v1.0/workflows/dapr/ @@ -82,10 +98,4 @@ The wokflow engine will always execute the latest version of the workflow. Older Where `` is the workflow instance ID you received in the `instance_id` property in the previous step. - The expected serialized output for the named versioned workflow is: - - ```txt - "\"input task\"" - ``` - -6. Stop the Dapr Multi-App run process by pressing `Ctrl+C`. +7. Stop the Dapr Multi-App run process by pressing `Ctrl+C`. diff --git a/tutorials/workflow/python/versioning/dapr.yaml b/tutorials/workflow/python/versioning/dapr.yaml index c07f1fc15..a1f141956 100644 --- a/tutorials/workflow/python/versioning/dapr.yaml +++ b/tutorials/workflow/python/versioning/dapr.yaml @@ -6,6 +6,6 @@ apps: appDirPath: versioning appPort: 5263 daprHTTPPort: 3561 - command: ["python3", "app.py"] + command: ["python3", "before.py"] appLogDestination: console daprdLogDestination: console diff --git a/tutorials/workflow/python/versioning/versioning.http b/tutorials/workflow/python/versioning/versioning.http index dfc0e1903..4fe189d5c 100644 --- a/tutorials/workflow/python/versioning/versioning.http +++ b/tutorials/workflow/python/versioning/versioning.http @@ -1,18 +1,11 @@ @apphost=http://localhost:5263 -### Start the versioning workflows +### Start the workflow # @name startWorkflowRequest POST {{ apphost }}/start -@instanceId={{startWorkflowRequest.response.body.workflows_instances[0].instance_id}} +@instanceId={{startWorkflowRequest.response.body.instance_id}} @daprHost=http://localhost:3561 -### Get the patching workflow status +### Get the workflow status GET {{ daprHost }}/v1.0/workflows/dapr/{{ instanceId }} - - -@instanceId={{startWorkflowRequest.response.body.workflows_instances[1].instance_id}} -@daprHost=http://localhost:3561 -### Get the named versioned workflow status -GET {{ daprHost }}/v1.0/workflows/dapr/{{ instanceId }} - diff --git a/tutorials/workflow/python/versioning/versioning/after_named_version.py b/tutorials/workflow/python/versioning/versioning/after_named_version.py new file mode 100644 index 000000000..e7aa13742 --- /dev/null +++ b/tutorials/workflow/python/versioning/versioning/after_named_version.py @@ -0,0 +1,49 @@ +from fastapi import FastAPI, status +from contextlib import asynccontextmanager +import uvicorn + +import dapr.ext.workflow as wf + +wf_runtime = wf.WorkflowRuntime() + +@asynccontextmanager +async def lifespan(app: FastAPI): + wf_runtime.start() + yield + wf_runtime.shutdown() + +app = FastAPI(lifespan=lifespan) + +@app.post("/start", status_code=status.HTTP_202_ACCEPTED) +async def start_workflow(): + wf_client = wf.DaprWorkflowClient() + instance_id = wf_client.schedule_new_workflow( + workflow=notify_user, + input="user_id" + + ) + return {"instance_id": instance_id} + +@wf_runtime.versioned_workflow(name='notify_user', is_latest=False) +def notify_user(ctx: wf.DaprWorkflowContext, wf_input: str): + result = yield ctx.call_activity(send_email, input=wf_input) + return result + +@wf_runtime.versioned_workflow(name='notify_user', is_latest=True) +def notify_user_new(ctx: wf.DaprWorkflowContext, wf_input: str): + result = yield ctx.call_activity(send_sms, input=wf_input) + return result + +@wf_runtime.activity +def send_email(ctx: wf.WorkflowActivityContext, act_input: str) -> str: + print(f'send_email: Received input: {act_input}.', flush=True) + return f"Sent email to {act_input}" + +@wf_runtime.activity +def send_sms(ctx: wf.WorkflowActivityContext, act_input: str) -> str: + print(f'send_sms: Received input: {act_input}.', flush=True) + return f"Sent SMS to {act_input}" + + +if __name__ == "__main__": + uvicorn.run(app, host="0.0.0.0", port=5263) diff --git a/tutorials/workflow/python/versioning/versioning/after_patching.py b/tutorials/workflow/python/versioning/versioning/after_patching.py new file mode 100644 index 000000000..8e2c042b4 --- /dev/null +++ b/tutorials/workflow/python/versioning/versioning/after_patching.py @@ -0,0 +1,48 @@ +from fastapi import FastAPI, status +from contextlib import asynccontextmanager +import uvicorn + +import dapr.ext.workflow as wf + +wf_runtime = wf.WorkflowRuntime() + +@asynccontextmanager +async def lifespan(app: FastAPI): + wf_runtime.start() + yield + wf_runtime.shutdown() + +app = FastAPI(lifespan=lifespan) + +@app.post("/start", status_code=status.HTTP_202_ACCEPTED) +async def start_workflow(): + wf_client = wf.DaprWorkflowClient() + instance_id = wf_client.schedule_new_workflow( + workflow=notify_user, + input="user_id" + + ) + return {"instance_id": instance_id} + +@wf_runtime.versioned_workflow(name='notify_user', is_latest=True) +def notify_user(ctx: wf.DaprWorkflowContext, wf_input: str): + if ctx.is_patched("send_sms"): + result = yield ctx.call_activity(send_sms, input=wf_input) + else: + result = yield ctx.call_activity(send_email, input=wf_input) + return result + + +@wf_runtime.activity +def send_email(ctx: wf.WorkflowActivityContext, act_input: str) -> str: + print(f'send_email: Received input: {act_input}.', flush=True) + return f"Sent email to {act_input}" + +@wf_runtime.activity +def send_sms(ctx: wf.WorkflowActivityContext, act_input: str) -> str: + print(f'send_sms: Received input: {act_input}.', flush=True) + return f"Sent SMS to {act_input}" + + +if __name__ == "__main__": + uvicorn.run(app, host="0.0.0.0", port=5263) diff --git a/tutorials/workflow/python/versioning/versioning/app.py b/tutorials/workflow/python/versioning/versioning/app.py deleted file mode 100644 index 669cfdd5c..000000000 --- a/tutorials/workflow/python/versioning/versioning/app.py +++ /dev/null @@ -1,43 +0,0 @@ -from fastapi import FastAPI, status -from contextlib import asynccontextmanager -from runtime import wf_runtime -from patching_workflow import patching_workflow -from named_versioned_workflow import named_versioned_workflow -import dapr.ext.workflow as wf -import uvicorn - -@asynccontextmanager -async def lifespan(app: FastAPI): - wf_runtime.start() - yield - wf_runtime.shutdown() - -app = FastAPI(lifespan=lifespan) - -@app.post("/start", status_code=status.HTTP_202_ACCEPTED) -async def start_workflow(): - wf_client = wf.DaprWorkflowClient() - patching_instance_id = wf_client.schedule_new_workflow( - workflow=patching_workflow, - input="input" - - ) - named_versioned_instance_id = wf_client.schedule_new_workflow( - workflow=named_versioned_workflow, - input="input" - ) - return { - "workflows_instances": [ - { - "instance_id": patching_instance_id, - "workflow_name": "patching_workflow" - }, - { - "instance_id": named_versioned_instance_id, - "workflow_name": "named_versioned_workflow" - } - ] - } - -if __name__ == "__main__": - uvicorn.run(app, host="0.0.0.0", port=5263) diff --git a/tutorials/workflow/python/versioning/versioning/before.py b/tutorials/workflow/python/versioning/versioning/before.py new file mode 100644 index 000000000..fcb567df2 --- /dev/null +++ b/tutorials/workflow/python/versioning/versioning/before.py @@ -0,0 +1,40 @@ +from fastapi import FastAPI, status +from contextlib import asynccontextmanager +import uvicorn + +import dapr.ext.workflow as wf + +wf_runtime = wf.WorkflowRuntime() + +@asynccontextmanager +async def lifespan(app: FastAPI): + wf_runtime.start() + yield + wf_runtime.shutdown() + +app = FastAPI(lifespan=lifespan) + +@app.post("/start", status_code=status.HTTP_202_ACCEPTED) +async def start_workflow(): + wf_client = wf.DaprWorkflowClient() + instance_id = wf_client.schedule_new_workflow( + workflow=notify_user, + input="user_id" + + ) + return {"instance_id": instance_id} + + +@wf_runtime.versioned_workflow(name='notify_user', is_latest=True) +def notify_user(ctx: wf.DaprWorkflowContext, wf_input: str): + result = yield ctx.call_activity(send_email, input=wf_input) + return result + +@wf_runtime.activity +def send_email(ctx: wf.WorkflowActivityContext, act_input: str) -> str: + print(f'send_email: Received input: {act_input}.', flush=True) + return f"Sent email to {act_input}" + + +if __name__ == "__main__": + uvicorn.run(app, host="0.0.0.0", port=5263) diff --git a/tutorials/workflow/python/versioning/versioning/named_versioned_workflow.py b/tutorials/workflow/python/versioning/versioning/named_versioned_workflow.py deleted file mode 100644 index 23975b190..000000000 --- a/tutorials/workflow/python/versioning/versioning/named_versioned_workflow.py +++ /dev/null @@ -1,23 +0,0 @@ -import dapr.ext.workflow as wf - -from runtime import wf_runtime - -@wf_runtime.versioned_workflow(name='named_versioned_workflow', is_latest=False) -def named_versioned_workflow(ctx: wf.DaprWorkflowContext, wf_input: str): - result = yield ctx.call_activity(named_versioned_activity_1, input=wf_input) - return result - -@wf_runtime.versioned_workflow(name='named_versioned_workflow', is_latest=True) -def named_versioned_workflow_fixed(ctx: wf.DaprWorkflowContext, wf_input: str): - result = yield ctx.call_activity(named_versioned_activity_2, input=wf_input) - return result - -@wf_runtime.activity -def named_versioned_activity_1(ctx: wf.WorkflowActivityContext, act_input: str) -> str: - print(f'named_versioned_activity_1: Received input: {act_input}.', flush=True) - return f"{act_input} is" - -@wf_runtime.activity -def named_versioned_activity_2(ctx: wf.WorkflowActivityContext, act_input: str) -> str: - print(f'named_versioned_activity_2: Received input: {act_input}.', flush=True) - return f"{act_input} task" diff --git a/tutorials/workflow/python/versioning/versioning/patching_workflow.py b/tutorials/workflow/python/versioning/versioning/patching_workflow.py deleted file mode 100644 index e0f247a41..000000000 --- a/tutorials/workflow/python/versioning/versioning/patching_workflow.py +++ /dev/null @@ -1,20 +0,0 @@ -import dapr.ext.workflow as wf -from runtime import wf_runtime - -@wf_runtime.workflow -def patching_workflow(ctx: wf.DaprWorkflowContext, wf_input: str): - if ctx.is_patched("patch1"): - result = yield ctx.call_activity(patching_workflow_activity_2, input=wf_input) - else: - result = yield ctx.call_activity(patching_workflow_activity_1, input=wf_input) - return result - -@wf_runtime.activity -def patching_workflow_activity_1(ctx: wf.WorkflowActivityContext, act_input: str) -> str: - print(f'patching_workflow_activity_1: Received input: {act_input}.', flush=True) - return f"activity1: {act_input}" - -@wf_runtime.activity -def patching_workflow_activity_2(ctx: wf.WorkflowActivityContext, act_input: str) -> str: - print(f'patching_workflow_activity_2: Received input: {act_input}.', flush=True) - return f"activity2: {act_input}" diff --git a/tutorials/workflow/python/versioning/versioning/runtime.py b/tutorials/workflow/python/versioning/versioning/runtime.py deleted file mode 100644 index 1503b7425..000000000 --- a/tutorials/workflow/python/versioning/versioning/runtime.py +++ /dev/null @@ -1,3 +0,0 @@ -import dapr.ext.workflow as wf - -wf_runtime = wf.WorkflowRuntime() From a011eda748e4771ab21e189ce11390da7df4aed2 Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Fri, 13 Feb 2026 12:25:36 +0100 Subject: [PATCH 6/7] Addressed comments from @alicejgibbons Signed-off-by: Albert Callarisa --- .../workflow/python/versioning/README.md | 29 +++++++++++++++---- 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/tutorials/workflow/python/versioning/README.md b/tutorials/workflow/python/versioning/README.md index 47212582c..70a1342e8 100644 --- a/tutorials/workflow/python/versioning/README.md +++ b/tutorials/workflow/python/versioning/README.md @@ -18,7 +18,7 @@ graph LR ## Inspect the named versioned workflow -Open the `after_named_version.py` file. This file shows how to migrate the workflow using named versions. A new version of the workflow (`notify_user`) is created that calls `send_sms`, while the original version is kept as `notify_user_v1` for any in-flight workflow instances. +Review the `after_named_version.py` file. This file shows how to create a new workflow version using named versions. A new version of the workflow (`notify_user`) is created that calls the `send_sms` activity, while the original version is kept as `notify_user_v1` for any workflow instances that might already be running. ```mermaid graph LR @@ -34,11 +34,11 @@ graph LR SW2 --> SS --> EW2 ``` -The workflow engine will always execute the latest version of the workflow for new instances. Older versions will only be executed for workflows that were already in-flight when the update was deployed. +The workflow engine will always execute the latest version of the workflow when new instances are started. Older versions will only be executed for workflows that were already in-flight when the update was deployed. ## Inspect the patching workflow -Open the `after_patching.py` file. This file shows how to migrate the workflow using patching. The `ctx.is_patched("send_sms")` check determines whether the workflow should execute the new `send_sms` activity or the original `send_email` activity. +Review the `after_patching.py` file. This file shows how to create a new workflow version using patching. The `ctx.is_patched("send_sms")` check determines whether the workflow should execute the new `send_sms` activity or the original `send_email` activity based on if the patch `send_sms` is applied. ```mermaid graph LR @@ -53,7 +53,7 @@ graph LR P -- No --> SE --> EW ``` -New workflow instances will execute the patched code path (`send_sms`), while in-flight workflows will continue using the original code path (`send_email`). +New workflow instances will execute the patched code path (`send_sms`) since the workflow has already had a patch applied, while already started, in-flight workflows will continue using the original code path (`send_email`). ## Run the tutorial @@ -64,7 +64,24 @@ New workflow instances will execute the patched code path (`send_sms`), while in pip3 install -r requirements.txt ``` -3. Navigate back one level to the `versioning` folder. The `dapr.yaml` file is configured to run `before.py` by default. To try a versioned workflow, update the `command` field in `dapr.yaml` to run `after_named_version.py` or `after_patching.py` instead. +3. Navigate back one level to the `versioning` folder. By default, the `dapr.yaml` file is set to run `before.py`: + + ```yaml + command: ["python3", "before.py"] + ``` + + To try a versioned workflow, change the `command` field in this file to one of the following, depending on which version you'd like to run: + + - For the named version example (`after_named_version.py`): + ```yaml + command: ["python3", "after_named_version.py"] + ``` + - For the patching version example (`after_patching.py`): + ```yaml + command: ["python3", "after_patching.py"] + ``` + + Make sure to save the file after making your changes. 4. Use the Dapr CLI to run the application: @@ -84,7 +101,7 @@ New workflow instances will execute the patched code path (`send_sms`), while in == APP - versioning == send_email: Received input: user_id. ``` - When running `after_named_version.py` or `after_patching.py`, the expected app logs are: + When running `after_named_version.py` or `after_patching.py`, the expected app logs change from `send_email` to `send_sms` and are: ```text == APP - versioning == send_sms: Received input: user_id. From 4631195a5635d3daa6675fd3d290037ad98424df Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Fri, 13 Feb 2026 14:22:04 +0100 Subject: [PATCH 7/7] Fixed incorrect version names Signed-off-by: Albert Callarisa --- tutorials/workflow/python/versioning/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tutorials/workflow/python/versioning/README.md b/tutorials/workflow/python/versioning/README.md index 70a1342e8..787fa44f1 100644 --- a/tutorials/workflow/python/versioning/README.md +++ b/tutorials/workflow/python/versioning/README.md @@ -18,7 +18,7 @@ graph LR ## Inspect the named versioned workflow -Review the `after_named_version.py` file. This file shows how to create a new workflow version using named versions. A new version of the workflow (`notify_user`) is created that calls the `send_sms` activity, while the original version is kept as `notify_user_v1` for any workflow instances that might already be running. +Review the `after_named_version.py` file. This file shows how to create a new workflow version using named versions. A new version of the workflow (`notify_user_new`) is created that calls the `send_sms` activity, while the original version is kept as `notify_user` for any workflow instances that might already be running. ```mermaid graph LR