Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ dotnet_diagnostic.SA1513.severity = none
dotnet_diagnostic.SA1515.severity = none

# Do not require XML doc in samples
dotnet_diagnostic.CS1591.severity = none
dotnet_diagnostic.SA1600.severity = none
dotnet_diagnostic.SA1602.severity = none

Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ obj/
/.vs
/.vscode
/.idea
.claude/
4 changes: 1 addition & 3 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@
<Authors>Temporal</Authors>
<ContinuousIntegrationBuild Condition="'$(GITHUB_ACTIONS)' == 'true'">true</ContinuousIntegrationBuild>
<EnableNETAnalyzers>true</EnableNETAnalyzers>
<!--
TODO(cretz): Reenable when https://github.com/dotnet/format/issues/1800 fixed
<EnforceCodeStyleInBuild>true</EnforceCodeStyleInBuild>
-->
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
Expand Down
47 changes: 47 additions & 0 deletions src/ContextPropagation/ContextPropagationInterceptor.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace TemporalioSamples.ContextPropagation;

using System.Threading.Tasks;
using NexusRpc.Handlers;
using Temporalio.Api.Common.V1;
using Temporalio.Client;
using Temporalio.Client.Interceptors;
Expand Down Expand Up @@ -39,6 +40,10 @@ public WorkflowInboundInterceptor InterceptWorkflow(WorkflowInboundInterceptor n
public ActivityInboundInterceptor InterceptActivity(ActivityInboundInterceptor nextInterceptor) =>
new ContextPropagationActivityInboundInterceptor(this, nextInterceptor);

public NexusOperationInboundInterceptor InterceptNexusOperation(
NexusOperationInboundInterceptor nextInterceptor) =>
new ContextPropagationNexusOperationInboundInterceptor(this, nextInterceptor);

private Dictionary<string, Payload> HeaderFromContext(IDictionary<string, Payload>? existing)
{
var ret = existing != null ?
Expand Down Expand Up @@ -67,6 +72,28 @@ private TResult WithHeadersApplied<TResult>(
return func();
}

private Dictionary<string, string> HeaderFromContextForNexus(IDictionary<string, string>? existing)
{
var ret = existing != null ?
new Dictionary<string, string>(existing) : new Dictionary<string, string>(1);
// Nexus headers are string-based, so serialize context value to JSON.
// Alternative approach: could use payload converter and put entire payload as JSON on header.
ret[headerKey] = System.Text.Json.JsonSerializer.Serialize(context.Value);
return ret;
}

private Task<TResult> WithHeadersAppliedForNexusAsync<TResult>(
IReadOnlyDictionary<string, string>? headers, Func<Task<TResult>> func)
{
if (headers?.TryGetValue(headerKey, out var value) == true)
{
// Deserialize can return null for nullable types, which is expected
context.Value = System.Text.Json.JsonSerializer.Deserialize<T>(value)!;
}
// These are async local, no need to unapply afterwards
return func();
}

private class ContextPropagationClientOutboundInterceptor : ClientOutboundInterceptor
{
private readonly ContextPropagationInterceptor<T> root;
Expand Down Expand Up @@ -153,6 +180,11 @@ public override Task<ChildWorkflowHandle<TWorkflow, TResult>> StartChildWorkflow
StartChildWorkflowInput input) =>
Next.StartChildWorkflowAsync<TWorkflow, TResult>(
input with { Headers = root.HeaderFromContext(input.Headers) });

public override Task<NexusOperationHandle<TResult>> StartNexusOperationAsync<TResult>(
StartNexusOperationInput input) =>
Next.StartNexusOperationAsync<TResult>(
input with { Headers = root.HeaderFromContextForNexus(input.Headers) });
}

private class ContextPropagationActivityInboundInterceptor : ActivityInboundInterceptor
Expand All @@ -166,4 +198,19 @@ public ContextPropagationActivityInboundInterceptor(
public override Task<object?> ExecuteActivityAsync(ExecuteActivityInput input) =>
root.WithHeadersApplied(input.Headers, () => Next.ExecuteActivityAsync(input));
}

private class ContextPropagationNexusOperationInboundInterceptor : NexusOperationInboundInterceptor
{
private readonly ContextPropagationInterceptor<T> root;

public ContextPropagationNexusOperationInboundInterceptor(
ContextPropagationInterceptor<T> root, NexusOperationInboundInterceptor next)
: base(next) => this.root = root;

public override Task<OperationStartResult<object?>> ExecuteNexusOperationStartAsync(
ExecuteNexusOperationStartInput input) =>
root.WithHeadersAppliedForNexusAsync(
input.Context.Headers,
() => base.ExecuteNexusOperationStartAsync(input));
}
}
16 changes: 16 additions & 0 deletions src/ContextPropagation/INexusGreetingService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
namespace TemporalioSamples.ContextPropagation;

using NexusRpc;

[NexusService]
public interface INexusGreetingService
{
static readonly string EndpointName = "context-propagation-greeting-service";

[NexusOperation]
GreetingOutput SayGreeting(GreetingInput input);

public record GreetingInput(string Name);

public record GreetingOutput(string Message);
}
27 changes: 27 additions & 0 deletions src/ContextPropagation/NexusGreetingHandlerWorkflow.workflow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
namespace TemporalioSamples.ContextPropagation;

using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Temporalio.Workflows;

[Workflow]
public class NexusGreetingHandlerWorkflow
{
[WorkflowQuery]
public string? CapturedUserId { get; private set; }

[WorkflowRun]
public async Task<INexusGreetingService.GreetingOutput> RunAsync(
INexusGreetingService.GreetingInput input)
{
// Capture context to prove propagation through Nexus to handler workflow
CapturedUserId = MyContext.UserId;
Workflow.Logger.LogInformation(
"Handler workflow executing for {Name}, called by user {UserId}",
input.Name,
CapturedUserId);

var message = $"Greeting for {input.Name} (processed by user: {CapturedUserId})";
return await Task.FromResult(new INexusGreetingService.GreetingOutput(message));
}
}
23 changes: 23 additions & 0 deletions src/ContextPropagation/NexusGreetingService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace TemporalioSamples.ContextPropagation;

using Microsoft.Extensions.Logging;
using NexusRpc.Handlers;
using Temporalio.Nexus;

[NexusServiceHandler(typeof(INexusGreetingService))]
public class NexusGreetingService
{
[NexusOperationHandler]
public IOperationHandler<INexusGreetingService.GreetingInput, INexusGreetingService.GreetingOutput> SayGreeting() =>
WorkflowRunOperationHandler.FromHandleFactory(
(WorkflowRunOperationContext context, INexusGreetingService.GreetingInput input) =>
{
// Log context to show it was propagated to the handler
NexusOperationExecutionContext.Current.Logger.LogInformation(
"Nexus greeting service called by user {UserId}", MyContext.UserId);

return context.StartWorkflowAsync(
(NexusGreetingHandlerWorkflow wf) => wf.RunAsync(input),
new() { Id = context.HandlerContext.RequestId });
});
}
4 changes: 3 additions & 1 deletion src/ContextPropagation/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ async Task RunWorkerAsync()
client,
new TemporalWorkerOptions(taskQueue: "interceptors-sample").
AddAllActivities<SayHelloActivities>(new()).
AddWorkflow<SayHelloWorkflow>());
AddWorkflow<SayHelloWorkflow>().
AddNexusService(new NexusGreetingService()).
AddWorkflow<NexusGreetingHandlerWorkflow>());
try
{
await worker.ExecuteAsync(tokenSource.Token);
Expand Down
8 changes: 4 additions & 4 deletions src/ContextPropagation/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Interceptors

This sample demonstrates how to use interceptors to propagate contextual information from an `AsyncLocal` throughout the
workflows and activities. While this demonstrates context propagation specifically, it can also be used to show how to
create interceptors for any other purpose.
This sample demonstrates how to use interceptors to propagate contextual information from an `AsyncLocal` throughout
workflows, activities, and Nexus operations. While this demonstrates context propagation specifically, it can also be
used to show how to create interceptors for any other purpose.

To run, first see [README.md](../../README.md) for prerequisites. Then, run the following from this directory in a
separate terminal to start the worker:
Expand All @@ -14,4 +14,4 @@ Then in another terminal, run the workflow from this directory:
dotnet run workflow

The workflow terminal will show the completed workflow result and the worker terminal will show the contextual user ID
is present in the workflow and activity.
is present in the workflow, Nexus operation handler, Nexus handler workflow, and activity.
10 changes: 9 additions & 1 deletion src/ContextPropagation/SayHelloWorkflow.workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,16 @@ public async Task<string> RunAsync(string name)
{
Workflow.Logger.LogInformation("Workflow called by user {UserId}", MyContext.UserId);

// Wait for signal then run activity
// Wait for signal then call Nexus service and run activity
await Workflow.WaitConditionAsync(() => complete);

// Call Nexus service to demonstrate context propagation through Nexus
var nexusClient = Workflow.CreateNexusClient<INexusGreetingService>(
INexusGreetingService.EndpointName);
var nexusResult = await nexusClient.ExecuteNexusOperationAsync(
svc => svc.SayGreeting(new(name)));
Workflow.Logger.LogInformation("Nexus result: {Result}", nexusResult.Message);

return await Workflow.ExecuteActivityAsync(
(SayHelloActivities act) => act.SayHello(name),
new() { StartToCloseTimeout = TimeSpan.FromMinutes(5) });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,8 @@
<OutputType>Exe</OutputType>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="NexusRpc" Version="0.2.0" />
</ItemGroup>

</Project>
20 changes: 17 additions & 3 deletions tests/ContextPropagation/SayHelloWorkflowTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ public SayHelloWorkflowTests(ITestOutputHelper output, WorkflowEnvironment env)
[Fact]
public async Task RunAsync_ContextPropagation_ReachesActivity()
{
// Create Nexus endpoint for testing context propagation through Nexus
var taskQueue = $"tq-{Guid.NewGuid()}";
await Env.TestEnv.CreateNexusEndpointAsync(
INexusGreetingService.EndpointName, taskQueue);

// Update the client to use the interceptor
var clientOptions = (TemporalClientOptions)Client.Options.Clone();
clientOptions.Interceptors =
Expand All @@ -44,12 +49,14 @@ static string SayHello(string name)
return $"Mock for {name}";
}

// Run worker
// Run worker with Nexus service, handler workflow, and activity
using var worker = new TemporalWorker(
client,
new TemporalWorkerOptions($"tq-{Guid.NewGuid()}").
new TemporalWorkerOptions(taskQueue).
AddActivity(SayHello).
AddWorkflow<SayHelloWorkflow>());
AddWorkflow<SayHelloWorkflow>().
AddNexusService(new NexusGreetingService()).
AddWorkflow<NexusGreetingHandlerWorkflow>());
await worker.ExecuteAsync(async () =>
{
// Set context value, start workflow, set to something else
Expand All @@ -62,6 +69,13 @@ await worker.ExecuteAsync(async () =>
// Send signal, check result
await handle.SignalAsync(wf => wf.SignalCompleteAsync());
Assert.Equal("Mock for some-name", await handle.GetResultAsync());

// Verify context propagated through Nexus to handler workflow
var history = await handle.FetchHistoryAsync();
var nexusStartedEvent = history.Events.First(e => e.NexusOperationStartedEventAttributes != null);
var handlerWorkflowId = nexusStartedEvent.Links.First().WorkflowEvent!.WorkflowId;
var handlerHandle = client.GetWorkflowHandle<NexusGreetingHandlerWorkflow>(handlerWorkflowId);
Assert.Equal("test-user", await handlerHandle.QueryAsync(wf => wf.CapturedUserId));
});
}
}
2 changes: 1 addition & 1 deletion tests/TimeSkippingServerFact.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace TemporalioSamples.Tests;

/// <summary>
/// The time-skipping test server can only run on x86/x64 currently.
/// </remarks>
/// </summary>
public sealed class TimeSkippingServerFactAttribute : FactAttribute
{
public TimeSkippingServerFactAttribute()
Expand Down
Loading