From d678c04d275ebfce83d7cba3a2e91d1b6644316b Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Tue, 3 Feb 2026 07:55:32 -0600 Subject: [PATCH] Add Nexus support to ContextPropagation sample --- .editorconfig | 1 + .gitignore | 1 + Directory.Build.props | 4 +- .../ContextPropagationInterceptor.cs | 47 +++++++++++++++++++ .../INexusGreetingService.cs | 16 +++++++ .../NexusGreetingHandlerWorkflow.workflow.cs | 27 +++++++++++ .../NexusGreetingService.cs | 23 +++++++++ src/ContextPropagation/Program.cs | 4 +- src/ContextPropagation/README.md | 8 ++-- .../SayHelloWorkflow.workflow.cs | 10 +++- ...emporalioSamples.ContextPropagation.csproj | 4 ++ .../SayHelloWorkflowTests.cs | 20 ++++++-- tests/TimeSkippingServerFact.cs | 2 +- 13 files changed, 154 insertions(+), 13 deletions(-) create mode 100644 src/ContextPropagation/INexusGreetingService.cs create mode 100644 src/ContextPropagation/NexusGreetingHandlerWorkflow.workflow.cs create mode 100644 src/ContextPropagation/NexusGreetingService.cs diff --git a/.editorconfig b/.editorconfig index 5068e98..14835a8 100644 --- a/.editorconfig +++ b/.editorconfig @@ -98,6 +98,7 @@ dotnet_diagnostic.SA1513.severity = none dotnet_diagnostic.SA1515.severity = none # Do not require XML doc in samples +dotnet_diagnostic.CS1591.severity = none dotnet_diagnostic.SA1600.severity = none dotnet_diagnostic.SA1602.severity = none diff --git a/.gitignore b/.gitignore index 97a330f..7ee9b6d 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ obj/ /.vs /.vscode /.idea +.claude/ diff --git a/Directory.Build.props b/Directory.Build.props index 772f1fd..95c3854 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -5,10 +5,8 @@ Temporal true true - + true enable enable MIT diff --git a/src/ContextPropagation/ContextPropagationInterceptor.cs b/src/ContextPropagation/ContextPropagationInterceptor.cs index 4292b66..784070d 100644 --- a/src/ContextPropagation/ContextPropagationInterceptor.cs +++ b/src/ContextPropagation/ContextPropagationInterceptor.cs @@ -1,6 +1,7 @@ namespace TemporalioSamples.ContextPropagation; using System.Threading.Tasks; +using NexusRpc.Handlers; using Temporalio.Api.Common.V1; using Temporalio.Client; using Temporalio.Client.Interceptors; @@ -39,6 +40,10 @@ public WorkflowInboundInterceptor InterceptWorkflow(WorkflowInboundInterceptor n public ActivityInboundInterceptor InterceptActivity(ActivityInboundInterceptor nextInterceptor) => new ContextPropagationActivityInboundInterceptor(this, nextInterceptor); + public NexusOperationInboundInterceptor InterceptNexusOperation( + NexusOperationInboundInterceptor nextInterceptor) => + new ContextPropagationNexusOperationInboundInterceptor(this, nextInterceptor); + private Dictionary HeaderFromContext(IDictionary? existing) { var ret = existing != null ? @@ -67,6 +72,28 @@ private TResult WithHeadersApplied( return func(); } + private Dictionary HeaderFromContextForNexus(IDictionary? existing) + { + var ret = existing != null ? + new Dictionary(existing) : new Dictionary(1); + // Nexus headers are string-based, so serialize context value to JSON. + // Alternative approach: could use payload converter and put entire payload as JSON on header. + ret[headerKey] = System.Text.Json.JsonSerializer.Serialize(context.Value); + return ret; + } + + private Task WithHeadersAppliedForNexusAsync( + IReadOnlyDictionary? headers, Func> func) + { + if (headers?.TryGetValue(headerKey, out var value) == true) + { + // Deserialize can return null for nullable types, which is expected + context.Value = System.Text.Json.JsonSerializer.Deserialize(value)!; + } + // These are async local, no need to unapply afterwards + return func(); + } + private class ContextPropagationClientOutboundInterceptor : ClientOutboundInterceptor { private readonly ContextPropagationInterceptor root; @@ -153,6 +180,11 @@ public override Task> StartChildWorkflow StartChildWorkflowInput input) => Next.StartChildWorkflowAsync( input with { Headers = root.HeaderFromContext(input.Headers) }); + + public override Task> StartNexusOperationAsync( + StartNexusOperationInput input) => + Next.StartNexusOperationAsync( + input with { Headers = root.HeaderFromContextForNexus(input.Headers) }); } private class ContextPropagationActivityInboundInterceptor : ActivityInboundInterceptor @@ -166,4 +198,19 @@ public ContextPropagationActivityInboundInterceptor( public override Task ExecuteActivityAsync(ExecuteActivityInput input) => root.WithHeadersApplied(input.Headers, () => Next.ExecuteActivityAsync(input)); } + + private class ContextPropagationNexusOperationInboundInterceptor : NexusOperationInboundInterceptor + { + private readonly ContextPropagationInterceptor root; + + public ContextPropagationNexusOperationInboundInterceptor( + ContextPropagationInterceptor root, NexusOperationInboundInterceptor next) + : base(next) => this.root = root; + + public override Task> ExecuteNexusOperationStartAsync( + ExecuteNexusOperationStartInput input) => + root.WithHeadersAppliedForNexusAsync( + input.Context.Headers, + () => base.ExecuteNexusOperationStartAsync(input)); + } } \ No newline at end of file diff --git a/src/ContextPropagation/INexusGreetingService.cs b/src/ContextPropagation/INexusGreetingService.cs new file mode 100644 index 0000000..1b504d1 --- /dev/null +++ b/src/ContextPropagation/INexusGreetingService.cs @@ -0,0 +1,16 @@ +namespace TemporalioSamples.ContextPropagation; + +using NexusRpc; + +[NexusService] +public interface INexusGreetingService +{ + static readonly string EndpointName = "context-propagation-greeting-service"; + + [NexusOperation] + GreetingOutput SayGreeting(GreetingInput input); + + public record GreetingInput(string Name); + + public record GreetingOutput(string Message); +} diff --git a/src/ContextPropagation/NexusGreetingHandlerWorkflow.workflow.cs b/src/ContextPropagation/NexusGreetingHandlerWorkflow.workflow.cs new file mode 100644 index 0000000..d64f8e9 --- /dev/null +++ b/src/ContextPropagation/NexusGreetingHandlerWorkflow.workflow.cs @@ -0,0 +1,27 @@ +namespace TemporalioSamples.ContextPropagation; + +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Temporalio.Workflows; + +[Workflow] +public class NexusGreetingHandlerWorkflow +{ + [WorkflowQuery] + public string? CapturedUserId { get; private set; } + + [WorkflowRun] + public async Task RunAsync( + INexusGreetingService.GreetingInput input) + { + // Capture context to prove propagation through Nexus to handler workflow + CapturedUserId = MyContext.UserId; + Workflow.Logger.LogInformation( + "Handler workflow executing for {Name}, called by user {UserId}", + input.Name, + CapturedUserId); + + var message = $"Greeting for {input.Name} (processed by user: {CapturedUserId})"; + return await Task.FromResult(new INexusGreetingService.GreetingOutput(message)); + } +} diff --git a/src/ContextPropagation/NexusGreetingService.cs b/src/ContextPropagation/NexusGreetingService.cs new file mode 100644 index 0000000..6860029 --- /dev/null +++ b/src/ContextPropagation/NexusGreetingService.cs @@ -0,0 +1,23 @@ +namespace TemporalioSamples.ContextPropagation; + +using Microsoft.Extensions.Logging; +using NexusRpc.Handlers; +using Temporalio.Nexus; + +[NexusServiceHandler(typeof(INexusGreetingService))] +public class NexusGreetingService +{ + [NexusOperationHandler] + public IOperationHandler SayGreeting() => + WorkflowRunOperationHandler.FromHandleFactory( + (WorkflowRunOperationContext context, INexusGreetingService.GreetingInput input) => + { + // Log context to show it was propagated to the handler + NexusOperationExecutionContext.Current.Logger.LogInformation( + "Nexus greeting service called by user {UserId}", MyContext.UserId); + + return context.StartWorkflowAsync( + (NexusGreetingHandlerWorkflow wf) => wf.RunAsync(input), + new() { Id = context.HandlerContext.RequestId }); + }); +} diff --git a/src/ContextPropagation/Program.cs b/src/ContextPropagation/Program.cs index 3b19e32..3fd1c10 100644 --- a/src/ContextPropagation/Program.cs +++ b/src/ContextPropagation/Program.cs @@ -40,7 +40,9 @@ async Task RunWorkerAsync() client, new TemporalWorkerOptions(taskQueue: "interceptors-sample"). AddAllActivities(new()). - AddWorkflow()); + AddWorkflow(). + AddNexusService(new NexusGreetingService()). + AddWorkflow()); try { await worker.ExecuteAsync(tokenSource.Token); diff --git a/src/ContextPropagation/README.md b/src/ContextPropagation/README.md index 9d4ee3e..bac10ff 100644 --- a/src/ContextPropagation/README.md +++ b/src/ContextPropagation/README.md @@ -1,8 +1,8 @@ # Interceptors -This sample demonstrates how to use interceptors to propagate contextual information from an `AsyncLocal` throughout the -workflows and activities. While this demonstrates context propagation specifically, it can also be used to show how to -create interceptors for any other purpose. +This sample demonstrates how to use interceptors to propagate contextual information from an `AsyncLocal` throughout +workflows, activities, and Nexus operations. While this demonstrates context propagation specifically, it can also be +used to show how to create interceptors for any other purpose. To run, first see [README.md](../../README.md) for prerequisites. Then, run the following from this directory in a separate terminal to start the worker: @@ -14,4 +14,4 @@ Then in another terminal, run the workflow from this directory: dotnet run workflow The workflow terminal will show the completed workflow result and the worker terminal will show the contextual user ID -is present in the workflow and activity. \ No newline at end of file +is present in the workflow, Nexus operation handler, Nexus handler workflow, and activity. \ No newline at end of file diff --git a/src/ContextPropagation/SayHelloWorkflow.workflow.cs b/src/ContextPropagation/SayHelloWorkflow.workflow.cs index c84c2c3..950dee8 100644 --- a/src/ContextPropagation/SayHelloWorkflow.workflow.cs +++ b/src/ContextPropagation/SayHelloWorkflow.workflow.cs @@ -13,8 +13,16 @@ public async Task RunAsync(string name) { Workflow.Logger.LogInformation("Workflow called by user {UserId}", MyContext.UserId); - // Wait for signal then run activity + // Wait for signal then call Nexus service and run activity await Workflow.WaitConditionAsync(() => complete); + + // Call Nexus service to demonstrate context propagation through Nexus + var nexusClient = Workflow.CreateNexusClient( + INexusGreetingService.EndpointName); + var nexusResult = await nexusClient.ExecuteNexusOperationAsync( + svc => svc.SayGreeting(new(name))); + Workflow.Logger.LogInformation("Nexus result: {Result}", nexusResult.Message); + return await Workflow.ExecuteActivityAsync( (SayHelloActivities act) => act.SayHello(name), new() { StartToCloseTimeout = TimeSpan.FromMinutes(5) }); diff --git a/src/ContextPropagation/TemporalioSamples.ContextPropagation.csproj b/src/ContextPropagation/TemporalioSamples.ContextPropagation.csproj index e3b6154..4f0c9ed 100644 --- a/src/ContextPropagation/TemporalioSamples.ContextPropagation.csproj +++ b/src/ContextPropagation/TemporalioSamples.ContextPropagation.csproj @@ -4,4 +4,8 @@ Exe + + + + \ No newline at end of file diff --git a/tests/ContextPropagation/SayHelloWorkflowTests.cs b/tests/ContextPropagation/SayHelloWorkflowTests.cs index ed44bbf..dc022b8 100644 --- a/tests/ContextPropagation/SayHelloWorkflowTests.cs +++ b/tests/ContextPropagation/SayHelloWorkflowTests.cs @@ -19,6 +19,11 @@ public SayHelloWorkflowTests(ITestOutputHelper output, WorkflowEnvironment env) [Fact] public async Task RunAsync_ContextPropagation_ReachesActivity() { + // Create Nexus endpoint for testing context propagation through Nexus + var taskQueue = $"tq-{Guid.NewGuid()}"; + await Env.TestEnv.CreateNexusEndpointAsync( + INexusGreetingService.EndpointName, taskQueue); + // Update the client to use the interceptor var clientOptions = (TemporalClientOptions)Client.Options.Clone(); clientOptions.Interceptors = @@ -44,12 +49,14 @@ static string SayHello(string name) return $"Mock for {name}"; } - // Run worker + // Run worker with Nexus service, handler workflow, and activity using var worker = new TemporalWorker( client, - new TemporalWorkerOptions($"tq-{Guid.NewGuid()}"). + new TemporalWorkerOptions(taskQueue). AddActivity(SayHello). - AddWorkflow()); + AddWorkflow(). + AddNexusService(new NexusGreetingService()). + AddWorkflow()); await worker.ExecuteAsync(async () => { // Set context value, start workflow, set to something else @@ -62,6 +69,13 @@ await worker.ExecuteAsync(async () => // Send signal, check result await handle.SignalAsync(wf => wf.SignalCompleteAsync()); Assert.Equal("Mock for some-name", await handle.GetResultAsync()); + + // Verify context propagated through Nexus to handler workflow + var history = await handle.FetchHistoryAsync(); + var nexusStartedEvent = history.Events.First(e => e.NexusOperationStartedEventAttributes != null); + var handlerWorkflowId = nexusStartedEvent.Links.First().WorkflowEvent!.WorkflowId; + var handlerHandle = client.GetWorkflowHandle(handlerWorkflowId); + Assert.Equal("test-user", await handlerHandle.QueryAsync(wf => wf.CapturedUserId)); }); } } diff --git a/tests/TimeSkippingServerFact.cs b/tests/TimeSkippingServerFact.cs index bedd833..7341925 100644 --- a/tests/TimeSkippingServerFact.cs +++ b/tests/TimeSkippingServerFact.cs @@ -5,7 +5,7 @@ namespace TemporalioSamples.Tests; /// /// The time-skipping test server can only run on x86/x64 currently. -/// +/// public sealed class TimeSkippingServerFactAttribute : FactAttribute { public TimeSkippingServerFactAttribute()