Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Prerequisites:
* [ContextPropagation](src/ContextPropagation) - Context propagation via interceptors.
* [CounterInterceptor](src/CounterInterceptor/) - Simple Workflow and Client Interceptors example.
* [DependencyInjection](src/DependencyInjection) - How to inject dependencies in activities and use generic hosts for workers
* [Dsl](src/Dsl) - Workflow that interprets and executes workflow steps from a YAML-based DSL.
* [EagerWorkflowStart](src/EagerWorkflowStart) - Demonstrates usage of Eager Workflow Start to reduce latency for workflows that start with a local activity.
* [Encryption](src/Encryption) - End-to-end encryption with Temporal payload codecs.
* [EnvConfig](src/EnvConfig) - Load client configuration from TOML files with programmatic overrides
Expand Down
15 changes: 15 additions & 0 deletions TemporalioSamples.sln
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.Timer", "
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.UpdatableTimer", "src\UpdatableTimer\TemporalioSamples.UpdatableTimer.csproj", "{5D02DFEA-DC08-4B7B-8E26-EDAC1942D347}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.Dsl", "src\Dsl\TemporalioSamples.Dsl.csproj", "{AF077751-E4B9-4696-93CB-74653F0BB6C4}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -603,6 +605,18 @@ Global
{5D02DFEA-DC08-4B7B-8E26-EDAC1942D347}.Release|x64.Build.0 = Release|Any CPU
{5D02DFEA-DC08-4B7B-8E26-EDAC1942D347}.Release|x86.ActiveCfg = Release|Any CPU
{5D02DFEA-DC08-4B7B-8E26-EDAC1942D347}.Release|x86.Build.0 = Release|Any CPU
{AF077751-E4B9-4696-93CB-74653F0BB6C4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{AF077751-E4B9-4696-93CB-74653F0BB6C4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AF077751-E4B9-4696-93CB-74653F0BB6C4}.Debug|x64.ActiveCfg = Debug|Any CPU
{AF077751-E4B9-4696-93CB-74653F0BB6C4}.Debug|x64.Build.0 = Debug|Any CPU
{AF077751-E4B9-4696-93CB-74653F0BB6C4}.Debug|x86.ActiveCfg = Debug|Any CPU
{AF077751-E4B9-4696-93CB-74653F0BB6C4}.Debug|x86.Build.0 = Debug|Any CPU
{AF077751-E4B9-4696-93CB-74653F0BB6C4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AF077751-E4B9-4696-93CB-74653F0BB6C4}.Release|Any CPU.Build.0 = Release|Any CPU
{AF077751-E4B9-4696-93CB-74653F0BB6C4}.Release|x64.ActiveCfg = Release|Any CPU
{AF077751-E4B9-4696-93CB-74653F0BB6C4}.Release|x64.Build.0 = Release|Any CPU
{AF077751-E4B9-4696-93CB-74653F0BB6C4}.Release|x86.ActiveCfg = Release|Any CPU
{AF077751-E4B9-4696-93CB-74653F0BB6C4}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -653,5 +667,6 @@ Global
{52CE80AF-09C3-4209-8A21-6CFFAA3B2B01} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{B37B3E98-4B04-48B8-9017-F0EDEDC7BD98} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{5D02DFEA-DC08-4B7B-8E26-EDAC1942D347} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{AF077751-E4B9-4696-93CB-74653F0BB6C4} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
EndGlobalSection
EndGlobal
42 changes: 42 additions & 0 deletions src/Dsl/DslActivities.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
namespace TemporalioSamples.Dsl;

using Microsoft.Extensions.Logging;
using Temporalio.Activities;

public static class DslActivities
{
[Activity("activity1")]
public static string Activity1(string arg)
{
ActivityExecutionContext.Current.Logger.LogInformation("Executing activity1 with arg: {Arg}", arg);
return $"[result from activity1: {arg}]";
}

[Activity("activity2")]
public static string Activity2(string arg)
{
ActivityExecutionContext.Current.Logger.LogInformation("Executing activity2 with arg: {Arg}", arg);
return $"[result from activity2: {arg}]";
}

[Activity("activity3")]
public static string Activity3(string arg1, string arg2)
{
ActivityExecutionContext.Current.Logger.LogInformation("Executing activity3 with args: {Arg1} and {Arg2}", arg1, arg2);
return $"[result from activity3: {arg1} {arg2}]";
}

[Activity("activity4")]
public static string Activity4(string arg)
{
ActivityExecutionContext.Current.Logger.LogInformation("Executing activity4 with arg: {Arg}", arg);
return $"[result from activity4: {arg}]";
}

[Activity("activity5")]
public static string Activity5(string arg1, string arg2)
{
ActivityExecutionContext.Current.Logger.LogInformation("Executing activity5 with args: {Arg1} and {Arg2}", arg1, arg2);
return $"[result from activity5: {arg1} {arg2}]";
}
}
136 changes: 136 additions & 0 deletions src/Dsl/DslInput.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
using System.Text.Json.Serialization;
using YamlDotNet.Serialization;

namespace TemporalioSamples.Dsl;

public record DslInput
{
public record ActivityInvocation
{
required public string Name { get; init; }

public IReadOnlyList<string> Arguments { get; init; } = Array.Empty<string>();

public string? Result { get; init; }
}

[JsonPolymorphic(TypeDiscriminatorPropertyName = "statementType")]
[JsonDerivedType(typeof(ActivityStatement), "activity")]
[JsonDerivedType(typeof(SequenceStatement), "sequence")]
[JsonDerivedType(typeof(ParallelStatement), "parallel")]
public abstract record Statement;

public record ActivityStatement : Statement
{
required public ActivityInvocation Activity { get; init; }
}

public record Sequence
{
required public IReadOnlyList<Statement> Elements { get; init; }
}

public record SequenceStatement : Statement
{
required public Sequence Sequence { get; init; }
}

public record ParallelBranches
{
required public IReadOnlyList<Statement> Branches { get; init; }
}

public record ParallelStatement : Statement
{
required public ParallelBranches Parallel { get; init; }
}

required public Statement Root { get; init; }

public Dictionary<string, object> Variables { get; init; } = new();

public static DslInput Parse(string yamlContent)
{
var deserializer = new DeserializerBuilder().Build();

var yamlObject = deserializer.Deserialize<Dictionary<string, object>>(yamlContent)
?? throw new InvalidOperationException("Failed to parse YAML");

return ConvertToDslInput(yamlObject);
}

private static DslInput ConvertToDslInput(Dictionary<string, object> yaml)
{
var variables = new Dictionary<string, object>();
if (yaml.TryGetValue("variables", out var varsObj) && varsObj is Dictionary<object, object> varsDict)
{
foreach (var kvp in varsDict)
{
variables[kvp.Key.ToString() ?? string.Empty] = kvp.Value;
}
}

var rootObj = yaml["root"];
var root = ConvertToStatement(rootObj);

return new DslInput { Root = root, Variables = variables };
}

private static Statement ConvertToStatement(object obj)
{
if (obj is not Dictionary<object, object> dict)
{
throw new ArgumentException("Statement must be a dictionary");
}

if (dict.TryGetValue("activity", out var activityObj))
{
return new ActivityStatement { Activity = ConvertToActivityInvocation(activityObj) };
}

if (dict.TryGetValue("sequence", out var sequenceObj))
{
var seqDict = (Dictionary<object, object>)sequenceObj;
var elements = ((List<object>)seqDict["elements"])
.Select(ConvertToStatement)
.ToList();
return new SequenceStatement { Sequence = new Sequence { Elements = elements } };
}

if (dict.TryGetValue("parallel", out var parallelObj))
{
var parDict = (Dictionary<object, object>)parallelObj;
var branches = ((List<object>)parDict["branches"])
.Select(ConvertToStatement)
.ToList();
return new ParallelStatement { Parallel = new ParallelBranches { Branches = branches } };
}

throw new ArgumentException("Unknown statement type");
}

private static ActivityInvocation ConvertToActivityInvocation(object obj)
{
var dict = (Dictionary<object, object>)obj;
var name = dict["name"].ToString() ?? throw new ArgumentException("Activity name is required");

var arguments = new List<string>();
if (dict.TryGetValue("arguments", out var argsObj) && argsObj is List<object> argsList)
{
arguments = argsList.Select(a => a.ToString() ?? string.Empty).ToList();
}

string? result = null;
if (dict.TryGetValue("result", out var resultObj))
{
result = resultObj.ToString();
}

return new ActivityInvocation
{
Name = name,
Arguments = arguments,
Result = result,
};
}
}
52 changes: 52 additions & 0 deletions src/Dsl/DslWorkflow.workflow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
namespace TemporalioSamples.Dsl;

using Microsoft.Extensions.Logging;
using Temporalio.Workflows;

[Workflow]
public class DslWorkflow
{
private readonly Dictionary<string, object> variables;

[WorkflowInit]
public DslWorkflow(DslInput input) => variables = input.Variables;

[WorkflowRun]
public async Task<Dictionary<string, object>> RunAsync(DslInput input)
{
Workflow.Logger.LogInformation("Running DSL workflow");
await ExecuteStatementAsync(input.Root);
Workflow.Logger.LogInformation("DSL workflow completed");
return variables;
}

private async Task ExecuteStatementAsync(DslInput.Statement statement)
{
switch (statement)
{
case DslInput.ActivityStatement stmt:
// Invoke activity loading arguments from variables and optionally storing result as a variable
var result = await Workflow.ExecuteActivityAsync<object>(
stmt.Activity.Name,
stmt.Activity.Arguments.Select(arg => variables[arg]).ToArray(),
new ActivityOptions { StartToCloseTimeout = TimeSpan.FromMinutes(1) });

if (!string.IsNullOrEmpty(stmt.Activity.Result))
{
variables[stmt.Activity.Result] = result;
}
break;
case DslInput.SequenceStatement stmt:
foreach (var element in stmt.Sequence.Elements)
{
await ExecuteStatementAsync(element);
}
break;
case DslInput.ParallelStatement stmt:
await Workflow.WhenAllAsync(stmt.Parallel.Branches.Select(ExecuteStatementAsync));
break;
default:
throw new InvalidOperationException($"Unknown statement type: {statement.GetType().Name}");
}
}
}
76 changes: 76 additions & 0 deletions src/Dsl/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
using Microsoft.Extensions.Logging;
using Temporalio.Client;
using Temporalio.Client.EnvConfig;
using Temporalio.Worker;
using TemporalioSamples.Dsl;

// Create a client to localhost on default namespace
var connectOptions = ClientEnvConfig.LoadClientConnectOptions();
if (string.IsNullOrEmpty(connectOptions.TargetHost))
{
connectOptions.TargetHost = "localhost:7233";
}

connectOptions.LoggerFactory = LoggerFactory.Create(builder =>
builder.
AddSimpleConsole(options => options.TimestampFormat = "[HH:mm:ss] ").
SetMinimumLevel(LogLevel.Information));
var client = await TemporalClient.ConnectAsync(connectOptions);

async Task RunWorkerAsync()
{
// Cancellation token cancelled on ctrl+c
using var tokenSource = new CancellationTokenSource();
Console.CancelKeyPress += (_, eventArgs) =>
{
tokenSource.Cancel();
eventArgs.Cancel = true;
};

// Run worker until cancelled
Console.WriteLine("Running worker");
using var worker = new TemporalWorker(
client,
new TemporalWorkerOptions(taskQueue: "dsl-sample")
.AddAllActivities(typeof(DslActivities), null)
.AddWorkflow<DslWorkflow>());
try
{
await worker.ExecuteAsync(tokenSource.Token);
}
catch (OperationCanceledException)
{
Console.WriteLine("Worker cancelled");
}
}

async Task ExecuteWorkflowAsync(string yamlFile)
{
var yamlContent = await File.ReadAllTextAsync(yamlFile);
var dslInput = DslInput.Parse(yamlContent);

Console.WriteLine($"Executing workflow from {yamlFile}");
var result = await client.ExecuteWorkflowAsync(
(DslWorkflow wf) => wf.RunAsync(dslInput),
new(id: $"dsl-workflow-{Guid.NewGuid()}", taskQueue: "dsl-sample"));

Console.WriteLine("Workflow completed. Final variables:");
foreach (var kvp in result)
{
Console.WriteLine($" {kvp.Key}: {kvp.Value}");
}
}

switch (args.ElementAtOrDefault(0))
{
case "worker":
await RunWorkerAsync();
break;
case "workflow":
var yamlFile = args.ElementAtOrDefault(1)
?? throw new ArgumentException("Must provide YAML file path as second argument");
await ExecuteWorkflowAsync(yamlFile);
break;
default:
throw new ArgumentException("Must pass 'worker' or 'workflow' as the first argument");
}
20 changes: 20 additions & 0 deletions src/Dsl/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# DSL

This sample demonstrates a Temporal workflow that interprets and executes arbitrary workflow steps defined in a
YAML-based Domain Specific Language (DSL).

To run, first see [README.md](../../README.md) for prerequisites. Then, run the following from this directory
in a separate terminal to start the worker:

dotnet run worker

Then in another terminal, run a workflow from this directory:

dotnet run workflow workflow1.yaml

Or run the more complex parallel workflow:

dotnet run workflow workflow2.yaml

The worker terminal will show logs of activities being executed, and the workflow terminal will display the final
variables after the workflow completes.
20 changes: 20 additions & 0 deletions src/Dsl/TemporalioSamples.Dsl.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="YamlDotNet" Version="16.2.0" />
</ItemGroup>

<ItemGroup>
<None Update="workflow1.yaml">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="workflow2.yaml">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>

</Project>
Loading