diff --git a/README.md b/README.md index a67753e..fc7f2b4 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ Prerequisites: * [ContextPropagation](src/ContextPropagation) - Context propagation via interceptors. * [CounterInterceptor](src/CounterInterceptor/) - Simple Workflow and Client Interceptors example. * [DependencyInjection](src/DependencyInjection) - How to inject dependencies in activities and use generic hosts for workers +* [Dsl](src/Dsl) - Workflow that interprets and executes workflow steps from a YAML-based DSL. * [EagerWorkflowStart](src/EagerWorkflowStart) - Demonstrates usage of Eager Workflow Start to reduce latency for workflows that start with a local activity. * [Encryption](src/Encryption) - End-to-end encryption with Temporal payload codecs. * [EnvConfig](src/EnvConfig) - Load client configuration from TOML files with programmatic overrides diff --git a/TemporalioSamples.sln b/TemporalioSamples.sln index 6ec4192..44388c5 100644 --- a/TemporalioSamples.sln +++ b/TemporalioSamples.sln @@ -101,6 +101,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.Timer", " EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.UpdatableTimer", "src\UpdatableTimer\TemporalioSamples.UpdatableTimer.csproj", "{5D02DFEA-DC08-4B7B-8E26-EDAC1942D347}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.Dsl", "src\Dsl\TemporalioSamples.Dsl.csproj", "{AF077751-E4B9-4696-93CB-74653F0BB6C4}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -603,6 +605,18 @@ Global {5D02DFEA-DC08-4B7B-8E26-EDAC1942D347}.Release|x64.Build.0 = Release|Any CPU {5D02DFEA-DC08-4B7B-8E26-EDAC1942D347}.Release|x86.ActiveCfg = Release|Any CPU {5D02DFEA-DC08-4B7B-8E26-EDAC1942D347}.Release|x86.Build.0 = Release|Any CPU + {AF077751-E4B9-4696-93CB-74653F0BB6C4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {AF077751-E4B9-4696-93CB-74653F0BB6C4}.Debug|Any CPU.Build.0 = Debug|Any CPU + {AF077751-E4B9-4696-93CB-74653F0BB6C4}.Debug|x64.ActiveCfg = Debug|Any CPU + {AF077751-E4B9-4696-93CB-74653F0BB6C4}.Debug|x64.Build.0 = Debug|Any CPU + {AF077751-E4B9-4696-93CB-74653F0BB6C4}.Debug|x86.ActiveCfg = Debug|Any CPU + {AF077751-E4B9-4696-93CB-74653F0BB6C4}.Debug|x86.Build.0 = Debug|Any CPU + {AF077751-E4B9-4696-93CB-74653F0BB6C4}.Release|Any CPU.ActiveCfg = Release|Any CPU + {AF077751-E4B9-4696-93CB-74653F0BB6C4}.Release|Any CPU.Build.0 = Release|Any CPU + {AF077751-E4B9-4696-93CB-74653F0BB6C4}.Release|x64.ActiveCfg = Release|Any CPU + {AF077751-E4B9-4696-93CB-74653F0BB6C4}.Release|x64.Build.0 = Release|Any CPU + {AF077751-E4B9-4696-93CB-74653F0BB6C4}.Release|x86.ActiveCfg = Release|Any CPU + {AF077751-E4B9-4696-93CB-74653F0BB6C4}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -653,5 +667,6 @@ Global {52CE80AF-09C3-4209-8A21-6CFFAA3B2B01} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} {B37B3E98-4B04-48B8-9017-F0EDEDC7BD98} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} {5D02DFEA-DC08-4B7B-8E26-EDAC1942D347} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} + {AF077751-E4B9-4696-93CB-74653F0BB6C4} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} EndGlobalSection EndGlobal diff --git a/src/Dsl/DslActivities.cs b/src/Dsl/DslActivities.cs new file mode 100644 index 0000000..fc66fa2 --- /dev/null +++ b/src/Dsl/DslActivities.cs @@ -0,0 +1,42 @@ +namespace TemporalioSamples.Dsl; + +using Microsoft.Extensions.Logging; +using Temporalio.Activities; + +public static class DslActivities +{ + [Activity("activity1")] + public static string Activity1(string arg) + { + ActivityExecutionContext.Current.Logger.LogInformation("Executing activity1 with arg: {Arg}", arg); + return $"[result from activity1: {arg}]"; + } + + [Activity("activity2")] + public static string Activity2(string arg) + { + ActivityExecutionContext.Current.Logger.LogInformation("Executing activity2 with arg: {Arg}", arg); + return $"[result from activity2: {arg}]"; + } + + [Activity("activity3")] + public static string Activity3(string arg1, string arg2) + { + ActivityExecutionContext.Current.Logger.LogInformation("Executing activity3 with args: {Arg1} and {Arg2}", arg1, arg2); + return $"[result from activity3: {arg1} {arg2}]"; + } + + [Activity("activity4")] + public static string Activity4(string arg) + { + ActivityExecutionContext.Current.Logger.LogInformation("Executing activity4 with arg: {Arg}", arg); + return $"[result from activity4: {arg}]"; + } + + [Activity("activity5")] + public static string Activity5(string arg1, string arg2) + { + ActivityExecutionContext.Current.Logger.LogInformation("Executing activity5 with args: {Arg1} and {Arg2}", arg1, arg2); + return $"[result from activity5: {arg1} {arg2}]"; + } +} diff --git a/src/Dsl/DslInput.cs b/src/Dsl/DslInput.cs new file mode 100644 index 0000000..690d6a9 --- /dev/null +++ b/src/Dsl/DslInput.cs @@ -0,0 +1,136 @@ +using System.Text.Json.Serialization; +using YamlDotNet.Serialization; + +namespace TemporalioSamples.Dsl; + +public record DslInput +{ + public record ActivityInvocation + { + required public string Name { get; init; } + + public IReadOnlyList Arguments { get; init; } = Array.Empty(); + + public string? Result { get; init; } + } + + [JsonPolymorphic(TypeDiscriminatorPropertyName = "statementType")] + [JsonDerivedType(typeof(ActivityStatement), "activity")] + [JsonDerivedType(typeof(SequenceStatement), "sequence")] + [JsonDerivedType(typeof(ParallelStatement), "parallel")] + public abstract record Statement; + + public record ActivityStatement : Statement + { + required public ActivityInvocation Activity { get; init; } + } + + public record Sequence + { + required public IReadOnlyList Elements { get; init; } + } + + public record SequenceStatement : Statement + { + required public Sequence Sequence { get; init; } + } + + public record ParallelBranches + { + required public IReadOnlyList Branches { get; init; } + } + + public record ParallelStatement : Statement + { + required public ParallelBranches Parallel { get; init; } + } + + required public Statement Root { get; init; } + + public Dictionary Variables { get; init; } = new(); + + public static DslInput Parse(string yamlContent) + { + var deserializer = new DeserializerBuilder().Build(); + + var yamlObject = deserializer.Deserialize>(yamlContent) + ?? throw new InvalidOperationException("Failed to parse YAML"); + + return ConvertToDslInput(yamlObject); + } + + private static DslInput ConvertToDslInput(Dictionary yaml) + { + var variables = new Dictionary(); + if (yaml.TryGetValue("variables", out var varsObj) && varsObj is Dictionary varsDict) + { + foreach (var kvp in varsDict) + { + variables[kvp.Key.ToString() ?? string.Empty] = kvp.Value; + } + } + + var rootObj = yaml["root"]; + var root = ConvertToStatement(rootObj); + + return new DslInput { Root = root, Variables = variables }; + } + + private static Statement ConvertToStatement(object obj) + { + if (obj is not Dictionary dict) + { + throw new ArgumentException("Statement must be a dictionary"); + } + + if (dict.TryGetValue("activity", out var activityObj)) + { + return new ActivityStatement { Activity = ConvertToActivityInvocation(activityObj) }; + } + + if (dict.TryGetValue("sequence", out var sequenceObj)) + { + var seqDict = (Dictionary)sequenceObj; + var elements = ((List)seqDict["elements"]) + .Select(ConvertToStatement) + .ToList(); + return new SequenceStatement { Sequence = new Sequence { Elements = elements } }; + } + + if (dict.TryGetValue("parallel", out var parallelObj)) + { + var parDict = (Dictionary)parallelObj; + var branches = ((List)parDict["branches"]) + .Select(ConvertToStatement) + .ToList(); + return new ParallelStatement { Parallel = new ParallelBranches { Branches = branches } }; + } + + throw new ArgumentException("Unknown statement type"); + } + + private static ActivityInvocation ConvertToActivityInvocation(object obj) + { + var dict = (Dictionary)obj; + var name = dict["name"].ToString() ?? throw new ArgumentException("Activity name is required"); + + var arguments = new List(); + if (dict.TryGetValue("arguments", out var argsObj) && argsObj is List argsList) + { + arguments = argsList.Select(a => a.ToString() ?? string.Empty).ToList(); + } + + string? result = null; + if (dict.TryGetValue("result", out var resultObj)) + { + result = resultObj.ToString(); + } + + return new ActivityInvocation + { + Name = name, + Arguments = arguments, + Result = result, + }; + } +} \ No newline at end of file diff --git a/src/Dsl/DslWorkflow.workflow.cs b/src/Dsl/DslWorkflow.workflow.cs new file mode 100644 index 0000000..9714d3c --- /dev/null +++ b/src/Dsl/DslWorkflow.workflow.cs @@ -0,0 +1,52 @@ +namespace TemporalioSamples.Dsl; + +using Microsoft.Extensions.Logging; +using Temporalio.Workflows; + +[Workflow] +public class DslWorkflow +{ + private readonly Dictionary variables; + + [WorkflowInit] + public DslWorkflow(DslInput input) => variables = input.Variables; + + [WorkflowRun] + public async Task> RunAsync(DslInput input) + { + Workflow.Logger.LogInformation("Running DSL workflow"); + await ExecuteStatementAsync(input.Root); + Workflow.Logger.LogInformation("DSL workflow completed"); + return variables; + } + + private async Task ExecuteStatementAsync(DslInput.Statement statement) + { + switch (statement) + { + case DslInput.ActivityStatement stmt: + // Invoke activity loading arguments from variables and optionally storing result as a variable + var result = await Workflow.ExecuteActivityAsync( + stmt.Activity.Name, + stmt.Activity.Arguments.Select(arg => variables[arg]).ToArray(), + new ActivityOptions { StartToCloseTimeout = TimeSpan.FromMinutes(1) }); + + if (!string.IsNullOrEmpty(stmt.Activity.Result)) + { + variables[stmt.Activity.Result] = result; + } + break; + case DslInput.SequenceStatement stmt: + foreach (var element in stmt.Sequence.Elements) + { + await ExecuteStatementAsync(element); + } + break; + case DslInput.ParallelStatement stmt: + await Workflow.WhenAllAsync(stmt.Parallel.Branches.Select(ExecuteStatementAsync)); + break; + default: + throw new InvalidOperationException($"Unknown statement type: {statement.GetType().Name}"); + } + } +} diff --git a/src/Dsl/Program.cs b/src/Dsl/Program.cs new file mode 100644 index 0000000..454af8f --- /dev/null +++ b/src/Dsl/Program.cs @@ -0,0 +1,76 @@ +using Microsoft.Extensions.Logging; +using Temporalio.Client; +using Temporalio.Client.EnvConfig; +using Temporalio.Worker; +using TemporalioSamples.Dsl; + +// Create a client to localhost on default namespace +var connectOptions = ClientEnvConfig.LoadClientConnectOptions(); +if (string.IsNullOrEmpty(connectOptions.TargetHost)) +{ + connectOptions.TargetHost = "localhost:7233"; +} + +connectOptions.LoggerFactory = LoggerFactory.Create(builder => + builder. + AddSimpleConsole(options => options.TimestampFormat = "[HH:mm:ss] "). + SetMinimumLevel(LogLevel.Information)); +var client = await TemporalClient.ConnectAsync(connectOptions); + +async Task RunWorkerAsync() +{ + // Cancellation token cancelled on ctrl+c + using var tokenSource = new CancellationTokenSource(); + Console.CancelKeyPress += (_, eventArgs) => + { + tokenSource.Cancel(); + eventArgs.Cancel = true; + }; + + // Run worker until cancelled + Console.WriteLine("Running worker"); + using var worker = new TemporalWorker( + client, + new TemporalWorkerOptions(taskQueue: "dsl-sample") + .AddAllActivities(typeof(DslActivities), null) + .AddWorkflow()); + try + { + await worker.ExecuteAsync(tokenSource.Token); + } + catch (OperationCanceledException) + { + Console.WriteLine("Worker cancelled"); + } +} + +async Task ExecuteWorkflowAsync(string yamlFile) +{ + var yamlContent = await File.ReadAllTextAsync(yamlFile); + var dslInput = DslInput.Parse(yamlContent); + + Console.WriteLine($"Executing workflow from {yamlFile}"); + var result = await client.ExecuteWorkflowAsync( + (DslWorkflow wf) => wf.RunAsync(dslInput), + new(id: $"dsl-workflow-{Guid.NewGuid()}", taskQueue: "dsl-sample")); + + Console.WriteLine("Workflow completed. Final variables:"); + foreach (var kvp in result) + { + Console.WriteLine($" {kvp.Key}: {kvp.Value}"); + } +} + +switch (args.ElementAtOrDefault(0)) +{ + case "worker": + await RunWorkerAsync(); + break; + case "workflow": + var yamlFile = args.ElementAtOrDefault(1) + ?? throw new ArgumentException("Must provide YAML file path as second argument"); + await ExecuteWorkflowAsync(yamlFile); + break; + default: + throw new ArgumentException("Must pass 'worker' or 'workflow' as the first argument"); +} diff --git a/src/Dsl/README.md b/src/Dsl/README.md new file mode 100644 index 0000000..3588b97 --- /dev/null +++ b/src/Dsl/README.md @@ -0,0 +1,20 @@ +# DSL + +This sample demonstrates a Temporal workflow that interprets and executes arbitrary workflow steps defined in a +YAML-based Domain Specific Language (DSL). + +To run, first see [README.md](../../README.md) for prerequisites. Then, run the following from this directory +in a separate terminal to start the worker: + + dotnet run worker + +Then in another terminal, run a workflow from this directory: + + dotnet run workflow workflow1.yaml + +Or run the more complex parallel workflow: + + dotnet run workflow workflow2.yaml + +The worker terminal will show logs of activities being executed, and the workflow terminal will display the final +variables after the workflow completes. diff --git a/src/Dsl/TemporalioSamples.Dsl.csproj b/src/Dsl/TemporalioSamples.Dsl.csproj new file mode 100644 index 0000000..a135da0 --- /dev/null +++ b/src/Dsl/TemporalioSamples.Dsl.csproj @@ -0,0 +1,20 @@ + + + + Exe + + + + + + + + + PreserveNewest + + + PreserveNewest + + + + diff --git a/src/Dsl/workflow1.yaml b/src/Dsl/workflow1.yaml new file mode 100644 index 0000000..85da523 --- /dev/null +++ b/src/Dsl/workflow1.yaml @@ -0,0 +1,28 @@ +# This sample workflows execute 3 steps in sequence. +# 1) Activity1, takes arg1 as input, and put result as result1. +# 2) Activity2, takes result1 as input, and put result as result2. +# 3) Activity3, takes args2 and result2 as input, and put result as result3. + +variables: + arg1: value1 + arg2: value2 + +root: + sequence: + elements: + - activity: + name: activity1 + arguments: + - arg1 + result: result1 + - activity: + name: activity2 + arguments: + - result1 + result: result2 + - activity: + name: activity3 + arguments: + - arg2 + - result2 + result: result3 \ No newline at end of file diff --git a/src/Dsl/workflow2.yaml b/src/Dsl/workflow2.yaml new file mode 100644 index 0000000..cf19fdd --- /dev/null +++ b/src/Dsl/workflow2.yaml @@ -0,0 +1,58 @@ +# This sample workflow executes 3 steps in sequence. +# 1) activity1, takes arg1 as input, and put result as result1. +# 2) it runs a parallel block which runs below sequence branches in parallel +# 2.1) sequence 1 +# 2.1.1) activity2, takes result1 as input, and put result as result2 +# 2.1.2) activity3, takes arg2 and result2 as input, and put result as result3 +# 2.2) sequence 2 +# 2.2.1) activity4, takes result1 as input, and put result as result4 +# 2.2.2) activity5, takes arg3 and result4 as input, and put result as result5 +# 3) activity3, takes result3 and result5 as input, and put result as result6. + +variables: + arg1: value1 + arg2: value2 + arg3: value3 + +root: + sequence: + elements: + - activity: + name: activity1 + arguments: + - arg1 + result: result1 + - parallel: + branches: + - sequence: + elements: + - activity: + name: activity2 + arguments: + - result1 + result: result2 + - activity: + name: activity3 + arguments: + - arg2 + - result2 + result: result3 + - sequence: + elements: + - activity: + name: activity4 + arguments: + - result1 + result: result4 + - activity: + name: activity5 + arguments: + - arg3 + - result4 + result: result5 + - activity: + name: activity3 + arguments: + - result3 + - result5 + result: result6 \ No newline at end of file diff --git a/tests/Dsl/DslWorkflowTests.cs b/tests/Dsl/DslWorkflowTests.cs new file mode 100644 index 0000000..62ed62f --- /dev/null +++ b/tests/Dsl/DslWorkflowTests.cs @@ -0,0 +1,163 @@ +namespace TemporalioSamples.Tests.Dsl; + +using Temporalio.Api.Enums.V1; +using Temporalio.Client; +using Temporalio.Worker; +using TemporalioSamples.Dsl; +using Xunit; +using Xunit.Abstractions; + +public class DslWorkflowTests(ITestOutputHelper output, WorkflowEnvironment env) + : WorkflowEnvironmentTestBase(output, env) +{ + [Fact] + public async Task RunAsync_Workflow1_SimpleSequence_Succeeds() + { + const string yaml = + """ + variables: + arg1: value1 + arg2: value2 + + root: + sequence: + elements: + - activity: + name: activity1 + arguments: + - arg1 + result: result1 + - activity: + name: activity2 + arguments: + - result1 + result: result2 + - activity: + name: activity3 + arguments: + - arg2 + - result2 + result: result3 + """; + + var input = DslInput.Parse(yaml); + + using var worker = new TemporalWorker( + Client, + new TemporalWorkerOptions($"dsl-test-{Guid.NewGuid()}") + .AddAllActivities(typeof(DslActivities), null) + .AddWorkflow()); + + await worker.ExecuteAsync(async () => + { + var result = await Client.ExecuteWorkflowAsync( + (DslWorkflow wf) => wf.RunAsync(input), + new(id: $"dsl-workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!)); + + Assert.Equal("value1", result["arg1"].ToString()); + Assert.Equal("value2", result["arg2"].ToString()); + Assert.Equal("[result from activity1: value1]", result["result1"].ToString()); + Assert.Equal("[result from activity2: [result from activity1: value1]]", result["result2"].ToString()); + Assert.Equal("[result from activity3: value2 [result from activity2: [result from activity1: value1]]]", result["result3"].ToString()); + }); + } + + [Fact] + public async Task RunAsync_Workflow2_ReturnsExpectedVariables() + { + const string yaml = + """ + variables: + arg1: value1 + arg2: value2 + arg3: value3 + + root: + sequence: + elements: + - activity: + name: activity1 + arguments: + - arg1 + result: result1 + - parallel: + branches: + - sequence: + elements: + - activity: + name: activity2 + arguments: + - result1 + result: result2 + - activity: + name: activity3 + arguments: + - arg2 + - result2 + result: result3 + - sequence: + elements: + - activity: + name: activity4 + arguments: + - result1 + result: result4 + - activity: + name: activity5 + arguments: + - arg3 + - result4 + result: result5 + - activity: + name: activity3 + arguments: + - result3 + - result5 + result: result6 + """; + + var input = DslInput.Parse(yaml); + + using var worker = new TemporalWorker( + Client, + new TemporalWorkerOptions($"dsl-test-{Guid.NewGuid()}") + .AddAllActivities(typeof(DslActivities), null) + .AddWorkflow()); + + await worker.ExecuteAsync(async () => + { + var handle = await Client.StartWorkflowAsync( + (DslWorkflow wf) => wf.RunAsync(input), + new(id: $"dsl-workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!)); + + var result = await handle.GetResultAsync(); + + Assert.Equal("value1", result["arg1"].ToString()); + Assert.Equal("value2", result["arg2"].ToString()); + Assert.Equal("value3", result["arg3"].ToString()); + + Assert.Equal("[result from activity1: value1]", result["result1"].ToString()); + Assert.Equal("[result from activity2: [result from activity1: value1]]", result["result2"].ToString()); + Assert.Equal("[result from activity3: value2 [result from activity2: [result from activity1: value1]]]", result["result3"].ToString()); + Assert.Equal("[result from activity4: [result from activity1: value1]]", result["result4"].ToString()); + Assert.Equal("[result from activity5: value3 [result from activity4: [result from activity1: value1]]]", result["result5"].ToString()); + Assert.Equal( + "[result from activity3: [result from activity3: value2 [result from activity2: " + + "[result from activity1: value1]]] [result from activity5: " + + "value3 [result from activity4: [result from activity1: value1]]]]", + result["result6"].ToString()); + + // Collect all activity events and confirm they are in order expected + var history = await handle.FetchHistoryAsync(); + var activityNames = history.Events + .Where(e => e.EventType == EventType.ActivityTaskScheduled) + .Select(e => e.ActivityTaskScheduledEventAttributes.ActivityType.Name) + .ToList(); + + Assert.Equal(6, activityNames.Count); + Assert.Equal("activity1", activityNames[0]); + Assert.Equal(["activity2", "activity3", "activity4", "activity5"], activityNames.Skip(1).Take(4).Order().ToList()); + Assert.Equal("activity3", activityNames[5]); + }); + } +} diff --git a/tests/TemporalioSamples.Tests.csproj b/tests/TemporalioSamples.Tests.csproj index d734711..2fa5780 100644 --- a/tests/TemporalioSamples.Tests.csproj +++ b/tests/TemporalioSamples.Tests.csproj @@ -38,6 +38,7 @@ +