Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Temporalio" Version="1.11.1" />
<PackageReference Include="Temporalio.Extensions.DiagnosticSource" Version="1.11.1" />
<PackageReference Include="Temporalio.Extensions.Hosting" Version="1.11.1" />
<PackageReference Include="Temporalio.Extensions.OpenTelemetry" Version="1.11.1" />
<PackageReference Include="Temporalio" Version="1.12.0" />
<PackageReference Include="Temporalio.Extensions.DiagnosticSource" Version="1.12.0" />
<PackageReference Include="Temporalio.Extensions.Hosting" Version="1.12.0" />
<PackageReference Include="Temporalio.Extensions.OpenTelemetry" Version="1.12.0" />
<!--
Can also reference the SDK downloaded to a local directory:
<ProjectReference Include="$(MSBuildThisFileDirectory)..\temporal-sdk-dotnet\src\Temporalio\Temporalio.csproj" />
Expand All @@ -35,4 +35,4 @@
<PackageReference Include="Microsoft.VisualStudio.Threading.Analyzers" Version="17.10.48" PrivateAssets="all" />
<PackageReference Include="StyleCop.Analyzers" Version="1.2.0-beta.435" PrivateAssets="all" />
</ItemGroup>
</Project>
</Project>
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Prerequisites:
* [Saga](src/Saga) - Demonstrates how to implement a saga pattern.
* [Schedules](src/Schedules) - How to schedule workflows to be run at specific times in the future.
* [SignalsQueries](src/SignalsQueries) - A loyalty program using Signals and Queries.
* [StandaloneActivity](src/StandaloneActivity) - Execute activities directly from a client, without a workflow.
* [SleepForDays](src/SleepForDays/) - Use a timer to send an email every 30 days.
* [Timer](src/Timer) - Use a timer to implement a monthly subscription; handle workflow cancellation.
* [UpdatableTimer](src/UpdatableTimer) - A timer that can be updated while sleeping.
Expand Down
18 changes: 18 additions & 0 deletions TemporalioSamples.sln
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.NexusCanc
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "NexusCancellation", "NexusCancellation", "{7123C63D-3158-4C9A-8EAD-6D4F1295BC04}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "StandaloneActivity", "StandaloneActivity", "{EAB0C45A-7620-D2D2-2901-5E7FCBFFDA77}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.StandaloneActivity", "src\StandaloneActivity\TemporalioSamples.StandaloneActivity.csproj", "{240517A1-13B5-4A67-8519-BFCF2C4591B9}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -633,6 +637,18 @@ Global
{6D0BE4C4-9C4F-4A3D-78F1-B0B761568559}.Release|x64.Build.0 = Release|Any CPU
{6D0BE4C4-9C4F-4A3D-78F1-B0B761568559}.Release|x86.ActiveCfg = Release|Any CPU
{6D0BE4C4-9C4F-4A3D-78F1-B0B761568559}.Release|x86.Build.0 = Release|Any CPU
{240517A1-13B5-4A67-8519-BFCF2C4591B9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{240517A1-13B5-4A67-8519-BFCF2C4591B9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{240517A1-13B5-4A67-8519-BFCF2C4591B9}.Debug|x64.ActiveCfg = Debug|Any CPU
{240517A1-13B5-4A67-8519-BFCF2C4591B9}.Debug|x64.Build.0 = Debug|Any CPU
{240517A1-13B5-4A67-8519-BFCF2C4591B9}.Debug|x86.ActiveCfg = Debug|Any CPU
{240517A1-13B5-4A67-8519-BFCF2C4591B9}.Debug|x86.Build.0 = Debug|Any CPU
{240517A1-13B5-4A67-8519-BFCF2C4591B9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{240517A1-13B5-4A67-8519-BFCF2C4591B9}.Release|Any CPU.Build.0 = Release|Any CPU
{240517A1-13B5-4A67-8519-BFCF2C4591B9}.Release|x64.ActiveCfg = Release|Any CPU
{240517A1-13B5-4A67-8519-BFCF2C4591B9}.Release|x64.Build.0 = Release|Any CPU
{240517A1-13B5-4A67-8519-BFCF2C4591B9}.Release|x86.ActiveCfg = Release|Any CPU
{240517A1-13B5-4A67-8519-BFCF2C4591B9}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -686,5 +702,7 @@ Global
{AF077751-E4B9-4696-93CB-74653F0BB6C4} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{6D0BE4C4-9C4F-4A3D-78F1-B0B761568559} = {7123C63D-3158-4C9A-8EAD-6D4F1295BC04}
{7123C63D-3158-4C9A-8EAD-6D4F1295BC04} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{EAB0C45A-7620-D2D2-2901-5E7FCBFFDA77} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{240517A1-13B5-4A67-8519-BFCF2C4591B9} = {EAB0C45A-7620-D2D2-2901-5E7FCBFFDA77}
EndGlobalSection
EndGlobal
5 changes: 3 additions & 2 deletions src/CounterInterceptor/MyCounterInterceptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,9 @@ internal ActivityInbound(MyCounterInterceptor root, ActivityInboundInterceptor n

public override Task<object?> ExecuteActivityAsync(ExecuteActivityInput input)
{
var id = ActivityExecutionContext.Current.Info.WorkflowId;
root.Increment(id, c => Interlocked.Increment(ref root.Counts[id].WorkflowActivityExecutions));
var info = ActivityExecutionContext.Current.Info;
var workflowId = info.IsWorkflowActivity ? info.WorkflowId! : throw new InvalidOperationException("Activity must be invoked from a workflow");
root.Increment(workflowId, c => Interlocked.Increment(ref root.Counts[workflowId].WorkflowActivityExecutions));
return base.ExecuteActivityAsync(input);
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/Mutex/Impl/MutexActivities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ public MutexActivities(ITemporalClient client)
public async Task SignalWithStartMutexWorkflowAsync(SignalWithStartMutexWorkflowInput input)
{
var activityInfo = ActivityExecutionContext.Current.Info;
var workflowId = activityInfo.IsWorkflowActivity ? activityInfo.WorkflowId! : throw new InvalidOperationException("Activity must be invoked from a workflow");

await this.client.StartWorkflowAsync(
(MutexWorkflow mw) => mw.RunAsync(MutexWorkflowInput.Empty),
new WorkflowOptions(input.MutexWorkflowId, activityInfo.TaskQueue)
{
StartSignal = RequestLockSignalName,
StartSignalArgs = new object[] { new LockRequest(activityInfo.WorkflowId, input.AcquireLockSignalName, input.LockTimeout), },
StartSignalArgs = new object[] { new LockRequest(workflowId, input.AcquireLockSignalName, input.LockTimeout), },
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ public class HelloCallerWorkflow
public async Task<string> RunAsync(string name)
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(Workflow.CancellationToken);
var client = Workflow.CreateNexusClient<IHelloService>(IHelloService.EndpointName);
var client = Workflow.CreateNexusWorkflowClient<IHelloService>(IHelloService.EndpointName);

// Concurrently execute an operation per language.
var tasks = Languages.Select(lang =>
client.ExecuteNexusOperationAsync(
svc => svc.SayHello(new IHelloService.HelloInput(name, lang)),
new NexusOperationOptions
new NexusWorkflowOperationOptions
{
// We set the CancellationType to WaitCancellationRequested, which means the caller waits
// for the request to be received by the handler before proceeding with the cancellation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class HelloCallerWorkflow
public async Task<string> RunAsync(string name, IHelloService.HelloLanguage language)
{
Workflow.Logger.LogInformation("Caller workflow called by user {UserId}", MyContext.UserId);
var output = await Workflow.CreateNexusClient<IHelloService>(IHelloService.EndpointName).
var output = await Workflow.CreateNexusWorkflowClient<IHelloService>(IHelloService.EndpointName).
ExecuteNexusOperationAsync(svc => svc.SayHello(new(name, language)));
return output.Message;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private class WorkflowOutbound(
string headerKey,
WorkflowOutboundInterceptor next) : WorkflowOutboundInterceptor(next)
{
public override Task<NexusOperationHandle<TResult>> StartNexusOperationAsync<TResult>(
public override Task<NexusWorkflowOperationHandle<TResult>> StartNexusOperationAsync<TResult>(
StartNexusOperationInput input)
{
if (context.Value is { } value)
Expand Down
2 changes: 1 addition & 1 deletion src/NexusMultiArg/Caller/HelloCallerWorkflow.workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public class HelloCallerWorkflow
[WorkflowRun]
public async Task<string> RunAsync(string name, IHelloService.HelloLanguage language)
{
var output = await Workflow.CreateNexusClient<IHelloService>(IHelloService.EndpointName).
var output = await Workflow.CreateNexusWorkflowClient<IHelloService>(IHelloService.EndpointName).
ExecuteNexusOperationAsync(svc => svc.SayHello(new(name, language)));
return output.Message;
}
Expand Down
2 changes: 1 addition & 1 deletion src/NexusSimple/Caller/EchoCallerWorkflow.workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public class EchoCallerWorkflow
[WorkflowRun]
public async Task<string> RunAsync(string message)
{
var output = await Workflow.CreateNexusClient<IHelloService>(IHelloService.EndpointName).
var output = await Workflow.CreateNexusWorkflowClient<IHelloService>(IHelloService.EndpointName).
ExecuteNexusOperationAsync(svc => svc.Echo(new(message)));
return output.Message;
}
Expand Down
2 changes: 1 addition & 1 deletion src/NexusSimple/Caller/HelloCallerWorkflow.workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public class HelloCallerWorkflow
[WorkflowRun]
public async Task<string> RunAsync(string name, IHelloService.HelloLanguage language)
{
var output = await Workflow.CreateNexusClient<IHelloService>(IHelloService.EndpointName).
var output = await Workflow.CreateNexusWorkflowClient<IHelloService>(IHelloService.EndpointName).
ExecuteNexusOperationAsync(svc => svc.SayHello(new(name, language)));
return output.Message;
}
Expand Down
12 changes: 12 additions & 0 deletions src/StandaloneActivity/MyActivities.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace TemporalioSamples.StandaloneActivity;

using Temporalio.Activities;

public static class MyActivities
{
[Activity]
public static Task<string> ComposeGreetingAsync(ComposeGreetingInput input) =>
Task.FromResult($"{input.Greeting}, {input.Name}!");
}

public record ComposeGreetingInput(string Greeting, string Name);
102 changes: 102 additions & 0 deletions src/StandaloneActivity/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
using Microsoft.Extensions.Logging;
using Temporalio.Client;
using Temporalio.Common.EnvConfig;
using Temporalio.Worker;
using TemporalioSamples.StandaloneActivity;

var connectOptions = ClientEnvConfig.LoadClientConnectOptions();
connectOptions.TargetHost ??= "localhost:7233";
connectOptions.LoggerFactory = LoggerFactory.Create(builder =>
builder.
AddSimpleConsole(options => options.TimestampFormat = "[HH:mm:ss] ").
SetMinimumLevel(LogLevel.Information));
var client = await TemporalClient.ConnectAsync(connectOptions);

const string taskQueue = "standalone-activity-sample";

async Task RunWorkerAsync()
{
using var tokenSource = new CancellationTokenSource();
Console.CancelKeyPress += (_, eventArgs) =>
{
tokenSource.Cancel();
eventArgs.Cancel = true;
};

Console.WriteLine("Running worker");
using var worker = new TemporalWorker(
client,
new TemporalWorkerOptions(taskQueue).
AddActivity(MyActivities.ComposeGreetingAsync));
try
{
await worker.ExecuteAsync(tokenSource.Token);
}
catch (OperationCanceledException)
{
Console.WriteLine("Worker cancelled");
}
}

async Task ExecuteActivityAsync()
{
var result = await client.ExecuteActivityAsync(
() => MyActivities.ComposeGreetingAsync(new ComposeGreetingInput("Hello", "World")),
new("standalone-activity-id", taskQueue)
{
ScheduleToCloseTimeout = TimeSpan.FromSeconds(10),
});
Console.WriteLine($"Activity result: {result}");
}

async Task StartActivityAsync()
{
var handle = await client.StartActivityAsync(
() => MyActivities.ComposeGreetingAsync(new ComposeGreetingInput("Hello", "World")),
new("standalone-activity-id", taskQueue)
{
ScheduleToCloseTimeout = TimeSpan.FromSeconds(10),
});
Console.WriteLine($"Started activity: {handle.Id}");

var result = await handle.GetResultAsync();
Console.WriteLine($"Activity result: {result}");
}

async Task ListActivitiesAsync()
{
await foreach (var info in client.ListActivitiesAsync(
$"TaskQueue = '{taskQueue}'"))
{
Console.WriteLine($"ActivityID: {info.ActivityId}, Type: {info.ActivityType}, Status: {info.Status}");
}
}

async Task CountActivitiesAsync()
{
var resp = await client.CountActivitiesAsync(
$"TaskQueue = '{taskQueue}'");
Console.WriteLine($"Total activities: {resp.Count}");
}

switch (args.ElementAtOrDefault(0))
{
case "worker":
await RunWorkerAsync();
break;
case "execute-activity":
await ExecuteActivityAsync();
break;
case "start-activity":
await StartActivityAsync();
break;
case "list-activities":
await ListActivitiesAsync();
break;
case "count-activities":
await CountActivitiesAsync();
break;
default:
throw new ArgumentException(
"Must pass 'worker', 'execute-activity', 'start-activity', 'list-activities', or 'count-activities' as the single argument");
}
36 changes: 36 additions & 0 deletions src/StandaloneActivity/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Standalone Activity

This sample shows how to execute Activities directly from a Temporal Client, without a Workflow.

For full documentation, see [Standalone Activities](https://docs.temporal.io/develop/dotnet/standalone-activities).

### Sample directory structure

- [MyActivities.cs](MyActivities.cs) - Activity definition with `[Activity]` attribute
- [Program.cs](Program.cs) - Worker, execute, start, list, and count commands

### Steps to run this sample

To run, first see [README.md](../../README.md) for prerequisites. Then, run the following from this directory
in a separate terminal to start the worker:

dotnet run worker

Then in another terminal, execute a standalone activity and wait for the result:

dotnet run execute-activity

Or start a standalone activity, get a handle, then wait for the result:

dotnet run start-activity

List standalone activity executions:

dotnet run list-activities

Count standalone activity executions:

dotnet run count-activities

Note: `list-activities` and `count-activities` are only available in the
[Standalone Activity prerelease CLI](https://github.com/temporalio/cli/releases/tag/v1.6.2-standalone-activity).
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
</PropertyGroup>

</Project>
36 changes: 36 additions & 0 deletions tests/StandaloneActivity/MyActivityTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
namespace TemporalioSamples.Tests.StandaloneActivity;

using Temporalio.Client;
using Temporalio.Testing;
using Temporalio.Worker;
using TemporalioSamples.StandaloneActivity;
using Xunit;
using Xunit.Abstractions;

public class MyActivityTests : TestBase
{
public MyActivityTests(ITestOutputHelper output)
: base(output)
{
}

[Fact(Skip = "Standalone Activity is not yet supported by WorkflowEnvironment.StartTimeSkippingAsync")]
public async Task ExecuteActivityAsync_SimpleRun_Succeeds()
{
await using var env = await WorkflowEnvironment.StartTimeSkippingAsync();
using var worker = new TemporalWorker(
env.Client,
new TemporalWorkerOptions("my-task-queue").
AddActivity(MyActivities.ComposeGreetingAsync));
await worker.ExecuteAsync(async () =>
{
var result = await env.Client.ExecuteActivityAsync(
() => MyActivities.ComposeGreetingAsync(new ComposeGreetingInput("Hello", "World")),
new($"act-{Guid.NewGuid()}", worker.Options.TaskQueue!)
{
ScheduleToCloseTimeout = TimeSpan.FromSeconds(10),
});
Assert.Equal("Hello, World!", result);
});
}
}
1 change: 1 addition & 0 deletions tests/TemporalioSamples.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
<ProjectReference Include="..\src\WorkerVersioning\TemporalioSamples.WorkerVersioning.csproj" />
<ProjectReference Include="..\src\SleepForDays\TemporalioSamples.SleepForDays.csproj" />
<ProjectReference Include="..\src\Dsl\TemporalioSamples.Dsl.csproj" />
<ProjectReference Include="..\src\StandaloneActivity\TemporalioSamples.StandaloneActivity.csproj" />
</ItemGroup>

</Project>
Loading