diff --git a/tutorials/workflow/python/challenges-tips/README.md b/tutorials/workflow/python/challenges-tips/README.md index e06dfe172..de6432570 100644 --- a/tutorials/workflow/python/challenges-tips/README.md +++ b/tutorials/workflow/python/challenges-tips/README.md @@ -6,5 +6,4 @@ This section provides some tips with code snippets to understand the limitations - [Deterministic workflows](deterministic_workflow.py) - [Idempotent activities](idempotent_activity.py) -- [Versioning workflows](versioning_workflow.py) - [Workflow & activity payload size](payload_size_workflow.py) diff --git a/tutorials/workflow/python/challenges-tips/versioning_workflow.py b/tutorials/workflow/python/challenges-tips/versioning_workflow.py deleted file mode 100644 index 6637325e7..000000000 --- a/tutorials/workflow/python/challenges-tips/versioning_workflow.py +++ /dev/null @@ -1,64 +0,0 @@ -import dapr.ext.workflow as wf - -wf_runtime = wf.WorkflowRuntime() - -""" -This is the initial version of the workflow. -Note that the input argument for both activities is the orderItem (string). -""" -@wf_runtime.workflow(name='versioning_workflow_1') -def versioning_workflow_1(ctx: wf.DaprWorkflowContext, order_item: str): - result_a = yield ctx.call_activity(activity_a, input=order_item) - result_b = yield ctx.call_activity(activity_b, input=order_item) - - return result_a + result_b - -@wf_runtime.activity(name='activity_a') -def activity_a(ctx: wf.WorkflowActivityContext, order_item: str) -> int: - """ - This activity processes the order item and returns an integer result. - """ - print(f'activity_a: Received input: {order_item}.', flush=True) - return 10 - -@wf_runtime.activity(name='activity_b') -def activity_b(ctx: wf.WorkflowActivityContext, order_item: str) -> int: - """ - This activity processes the order item and returns another integer result. - """ - print(f'activity_b: Received input: {order_item}.', flush=True) - return 20 - -""" -This is the updated version of the workflow. -The input for activity_b has changed from order_item (string) to result_a (int). -If there are in-flight workflow instances that were started with the previous version -of this workflow, these will fail when the new version of the workflow is deployed -and the workflow name remains the same, since the runtime parameters do not match with the persisted state. -It is recommended to version workflows by creating a new workflow class with a new name: -{workflowname}_1 -> {workflowname}_2 -Try to avoid making breaking changes in perpetual workflows (that use the `continue_as_new` method) -since these are difficult to replace with a new version. -""" -@wf_runtime.workflow(name='versioning_workflow_2') -def versioning_workflow_2(ctx: wf.DaprWorkflowContext, order_item: str): - result_a = yield ctx.call_activity(activity_a, input=order_item) - result_b = yield ctx.call_activity(activity_b, input=result_a) - - return result_a + result_b - -@wf_runtime.activity(name='activity_a') -def activity_a(ctx: wf.WorkflowActivityContext, order_item: str) -> int: - """ - This activity processes the order item and returns an integer result. - """ - print(f'activity_a: Received input: {order_item}.', flush=True) - return 10 - -@wf_runtime.activity(name='activity_b') -def activity_b(ctx: wf.WorkflowActivityContext, number: int) -> int: - """ - This activity processes a number and returns another integer result. - """ - print(f'activity_b: Received input: {number}.', flush=True) - return number + 10 \ No newline at end of file diff --git a/tutorials/workflow/python/versioning/README.md b/tutorials/workflow/python/versioning/README.md new file mode 100644 index 000000000..787fa44f1 --- /dev/null +++ b/tutorials/workflow/python/versioning/README.md @@ -0,0 +1,118 @@ +# Versioning Workflows + +This tutorial demonstrates how to version your workflows. For more information about workflow versioning see the [Dapr docs](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-features-concepts/#versioning). + +This tutorial uses a scenario where a `notify_user` workflow initially sends an email to users. The workflow is then updated to send an SMS instead. Two versioning strategies are demonstrated: **named versions** and **patching**. + +## Inspect the original workflow + +Open the `before.py` file in the `tutorials/workflow/python/versioning/versioning` folder. This file contains the original workflow definition that sends an email to notify a user. + +```mermaid +graph LR + SW((Start Workflow)) + SE[send_email] + EW((End Workflow)) + SW --> SE --> EW +``` + +## Inspect the named versioned workflow + +Review the `after_named_version.py` file. This file shows how to create a new workflow version using named versions. A new version of the workflow (`notify_user_new`) is created that calls the `send_sms` activity, while the original version is kept as `notify_user` for any workflow instances that might already be running. + +```mermaid +graph LR + SW1((notify_user + old)) + SW2((notify_user + new)) + SE[send_email] + SS[send_sms] + EW1((End Workflow)) + EW2((End Workflow)) + SW1 --> SE --> EW1 + SW2 --> SS --> EW2 +``` + +The workflow engine will always execute the latest version of the workflow when new instances are started. Older versions will only be executed for workflows that were already in-flight when the update was deployed. + +## Inspect the patching workflow + +Review the `after_patching.py` file. This file shows how to create a new workflow version using patching. The `ctx.is_patched("send_sms")` check determines whether the workflow should execute the new `send_sms` activity or the original `send_email` activity based on if the patch `send_sms` is applied. + +```mermaid +graph LR + SW((Start Workflow)) + P{is_patched + send_sms?} + SE[send_email] + SS[send_sms] + EW((End Workflow)) + SW --> P + P -- Yes --> SS --> EW + P -- No --> SE --> EW +``` + +New workflow instances will execute the patched code path (`send_sms`) since the workflow has already had a patch applied, while already started, in-flight workflows will continue using the original code path (`send_email`). + +## Run the tutorial + +1. Use a terminal to navigate to the `tutorials/workflow/python/versioning/versioning` folder. +2. Install the dependencies using pip: + + ```bash + pip3 install -r requirements.txt + ``` + +3. Navigate back one level to the `versioning` folder. By default, the `dapr.yaml` file is set to run `before.py`: + + ```yaml + command: ["python3", "before.py"] + ``` + + To try a versioned workflow, change the `command` field in this file to one of the following, depending on which version you'd like to run: + + - For the named version example (`after_named_version.py`): + ```yaml + command: ["python3", "after_named_version.py"] + ``` + - For the patching version example (`after_patching.py`): + ```yaml + command: ["python3", "after_patching.py"] + ``` + + Make sure to save the file after making your changes. + +4. Use the Dapr CLI to run the application: + + ```bash + dapr run -f . + ``` + +5. Use the POST request in the [`versioning.http`](./versioning.http) file to start the workflow, or use this cURL command: + + ```bash + curl -i --request POST http://localhost:5263/start + ``` + + When running `before.py`, the expected app logs are: + + ```text + == APP - versioning == send_email: Received input: user_id. + ``` + + When running `after_named_version.py` or `after_patching.py`, the expected app logs change from `send_email` to `send_sms` and are: + + ```text + == APP - versioning == send_sms: Received input: user_id. + ``` + +6. Use the GET request in the [`versioning.http`](./versioning.http) file to get the status of the workflow, or use this cURL command: + + ```bash + curl --request GET --url http://localhost:3561/v1.0/workflows/dapr/ + ``` + + Where `` is the workflow instance ID you received in the `instance_id` property in the previous step. + +7. Stop the Dapr Multi-App run process by pressing `Ctrl+C`. diff --git a/tutorials/workflow/python/versioning/dapr.yaml b/tutorials/workflow/python/versioning/dapr.yaml new file mode 100644 index 000000000..a1f141956 --- /dev/null +++ b/tutorials/workflow/python/versioning/dapr.yaml @@ -0,0 +1,11 @@ +version: 1 +common: + resourcesPath: ../../resources +apps: + - appID: versioning + appDirPath: versioning + appPort: 5263 + daprHTTPPort: 3561 + command: ["python3", "before.py"] + appLogDestination: console + daprdLogDestination: console diff --git a/tutorials/workflow/python/versioning/makefile b/tutorials/workflow/python/versioning/makefile new file mode 100644 index 000000000..f67239779 --- /dev/null +++ b/tutorials/workflow/python/versioning/makefile @@ -0,0 +1,2 @@ +include ../../../../docker.mk +include ../../../../validate.mk \ No newline at end of file diff --git a/tutorials/workflow/python/versioning/versioning.http b/tutorials/workflow/python/versioning/versioning.http new file mode 100644 index 000000000..4fe189d5c --- /dev/null +++ b/tutorials/workflow/python/versioning/versioning.http @@ -0,0 +1,11 @@ +@apphost=http://localhost:5263 + +### Start the workflow +# @name startWorkflowRequest +POST {{ apphost }}/start + + +@instanceId={{startWorkflowRequest.response.body.instance_id}} +@daprHost=http://localhost:3561 +### Get the workflow status +GET {{ daprHost }}/v1.0/workflows/dapr/{{ instanceId }} diff --git a/tutorials/workflow/python/versioning/versioning/after_named_version.py b/tutorials/workflow/python/versioning/versioning/after_named_version.py new file mode 100644 index 000000000..e7aa13742 --- /dev/null +++ b/tutorials/workflow/python/versioning/versioning/after_named_version.py @@ -0,0 +1,49 @@ +from fastapi import FastAPI, status +from contextlib import asynccontextmanager +import uvicorn + +import dapr.ext.workflow as wf + +wf_runtime = wf.WorkflowRuntime() + +@asynccontextmanager +async def lifespan(app: FastAPI): + wf_runtime.start() + yield + wf_runtime.shutdown() + +app = FastAPI(lifespan=lifespan) + +@app.post("/start", status_code=status.HTTP_202_ACCEPTED) +async def start_workflow(): + wf_client = wf.DaprWorkflowClient() + instance_id = wf_client.schedule_new_workflow( + workflow=notify_user, + input="user_id" + + ) + return {"instance_id": instance_id} + +@wf_runtime.versioned_workflow(name='notify_user', is_latest=False) +def notify_user(ctx: wf.DaprWorkflowContext, wf_input: str): + result = yield ctx.call_activity(send_email, input=wf_input) + return result + +@wf_runtime.versioned_workflow(name='notify_user', is_latest=True) +def notify_user_new(ctx: wf.DaprWorkflowContext, wf_input: str): + result = yield ctx.call_activity(send_sms, input=wf_input) + return result + +@wf_runtime.activity +def send_email(ctx: wf.WorkflowActivityContext, act_input: str) -> str: + print(f'send_email: Received input: {act_input}.', flush=True) + return f"Sent email to {act_input}" + +@wf_runtime.activity +def send_sms(ctx: wf.WorkflowActivityContext, act_input: str) -> str: + print(f'send_sms: Received input: {act_input}.', flush=True) + return f"Sent SMS to {act_input}" + + +if __name__ == "__main__": + uvicorn.run(app, host="0.0.0.0", port=5263) diff --git a/tutorials/workflow/python/versioning/versioning/after_patching.py b/tutorials/workflow/python/versioning/versioning/after_patching.py new file mode 100644 index 000000000..8e2c042b4 --- /dev/null +++ b/tutorials/workflow/python/versioning/versioning/after_patching.py @@ -0,0 +1,48 @@ +from fastapi import FastAPI, status +from contextlib import asynccontextmanager +import uvicorn + +import dapr.ext.workflow as wf + +wf_runtime = wf.WorkflowRuntime() + +@asynccontextmanager +async def lifespan(app: FastAPI): + wf_runtime.start() + yield + wf_runtime.shutdown() + +app = FastAPI(lifespan=lifespan) + +@app.post("/start", status_code=status.HTTP_202_ACCEPTED) +async def start_workflow(): + wf_client = wf.DaprWorkflowClient() + instance_id = wf_client.schedule_new_workflow( + workflow=notify_user, + input="user_id" + + ) + return {"instance_id": instance_id} + +@wf_runtime.versioned_workflow(name='notify_user', is_latest=True) +def notify_user(ctx: wf.DaprWorkflowContext, wf_input: str): + if ctx.is_patched("send_sms"): + result = yield ctx.call_activity(send_sms, input=wf_input) + else: + result = yield ctx.call_activity(send_email, input=wf_input) + return result + + +@wf_runtime.activity +def send_email(ctx: wf.WorkflowActivityContext, act_input: str) -> str: + print(f'send_email: Received input: {act_input}.', flush=True) + return f"Sent email to {act_input}" + +@wf_runtime.activity +def send_sms(ctx: wf.WorkflowActivityContext, act_input: str) -> str: + print(f'send_sms: Received input: {act_input}.', flush=True) + return f"Sent SMS to {act_input}" + + +if __name__ == "__main__": + uvicorn.run(app, host="0.0.0.0", port=5263) diff --git a/tutorials/workflow/python/versioning/versioning/before.py b/tutorials/workflow/python/versioning/versioning/before.py new file mode 100644 index 000000000..fcb567df2 --- /dev/null +++ b/tutorials/workflow/python/versioning/versioning/before.py @@ -0,0 +1,40 @@ +from fastapi import FastAPI, status +from contextlib import asynccontextmanager +import uvicorn + +import dapr.ext.workflow as wf + +wf_runtime = wf.WorkflowRuntime() + +@asynccontextmanager +async def lifespan(app: FastAPI): + wf_runtime.start() + yield + wf_runtime.shutdown() + +app = FastAPI(lifespan=lifespan) + +@app.post("/start", status_code=status.HTTP_202_ACCEPTED) +async def start_workflow(): + wf_client = wf.DaprWorkflowClient() + instance_id = wf_client.schedule_new_workflow( + workflow=notify_user, + input="user_id" + + ) + return {"instance_id": instance_id} + + +@wf_runtime.versioned_workflow(name='notify_user', is_latest=True) +def notify_user(ctx: wf.DaprWorkflowContext, wf_input: str): + result = yield ctx.call_activity(send_email, input=wf_input) + return result + +@wf_runtime.activity +def send_email(ctx: wf.WorkflowActivityContext, act_input: str) -> str: + print(f'send_email: Received input: {act_input}.', flush=True) + return f"Sent email to {act_input}" + + +if __name__ == "__main__": + uvicorn.run(app, host="0.0.0.0", port=5263) diff --git a/tutorials/workflow/python/versioning/versioning/requirements.txt b/tutorials/workflow/python/versioning/versioning/requirements.txt new file mode 100644 index 000000000..d8ca0a7c7 --- /dev/null +++ b/tutorials/workflow/python/versioning/versioning/requirements.txt @@ -0,0 +1,4 @@ +dapr>=1.17.0rc3 +dapr-ext-workflow>=1.17.0rc3 +fastapi>=0.115.0 +uvicorn>=0.34.2