From de9df70a0f6f98cbd9ce836f9c9a516f9d6393cc Mon Sep 17 00:00:00 2001 From: Shyju Krishnankutty Date: Fri, 26 Jun 2026 11:17:06 -0700 Subject: [PATCH 1/3] Fix DurableTask CustomStatus 16 KB overflow on multi-executor workflows (#5745) Publish only a bounded trailing window of events to the orchestration CustomStatus (tagged with an absolute EventsStartIndex) instead of the full cumulative event log, which could exceed the Durable Functions 16 KB CustomStatus cap and fail the orchestration. The complete event log still flows through the uncapped orchestration output and is drained by the streaming consumer at completion, so no event is lost or reordered. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../CHANGELOG.md | 1 + .../Workflows/DurableStreamingWorkflowRun.cs | 30 +++- .../Workflows/DurableWorkflowLiveStatus.cs | 21 ++- .../Workflows/DurableWorkflowRunner.cs | 127 +++++++++++++++- .../DurableStreamingWorkflowRunTests.cs | 141 ++++++++++++++++++ .../DurableWorkflowRunnerEventWindowTests.cs | 120 +++++++++++++++ 6 files changed, 429 insertions(+), 11 deletions(-) create mode 100644 dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableWorkflowRunnerEventWindowTests.cs diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/CHANGELOG.md b/dotnet/src/Microsoft.Agents.AI.DurableTask/CHANGELOG.md index 2264d994280..b8e44109bb4 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/CHANGELOG.md +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/CHANGELOG.md @@ -2,6 +2,7 @@ ## [Unreleased] +- Bound the live workflow status to a trailing event window so multi-executor workflows with large typed outputs no longer overflow the Durable Functions 16 KB custom status cap ([#5745](https://github.com/microsoft/agent-framework/issues/5745)) - Fix issue with resuming checkpoint after package version upgrade ([#6670](https://github.com/microsoft/agent-framework/pull/6670)) - Bind MCP threadId to the current agent and guard cross-agent session dispatch ([#6531](https://github.com/microsoft/agent-framework/pull/6531)) - Added support for durable workflows ([#4436](https://github.com/microsoft/agent-framework/pull/4436)) diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableStreamingWorkflowRun.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableStreamingWorkflowRun.cs index d05d01b4549..4d6c8c3c978 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableStreamingWorkflowRun.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableStreamingWorkflowRun.cs @@ -131,7 +131,7 @@ private async IAsyncEnumerable WatchStreamAsync( { if (DurableWorkflowLiveStatus.TryParse(metadata.SerializedCustomStatus, out DurableWorkflowLiveStatus liveStatus)) { - (List events, lastReadEventIndex) = DrainNewEvents(liveStatus.Events, lastReadEventIndex); + (List events, lastReadEventIndex) = DrainNewEvents(liveStatus.Events, liveStatus.EventsStartIndex, lastReadEventIndex); foreach (WorkflowEvent evt in events) { hasNewEvents = true; @@ -175,7 +175,9 @@ private async IAsyncEnumerable WatchStreamAsync( // SerializedOutput as a DurableWorkflowResult wrapper. if (TryParseWorkflowResult(metadata.SerializedOutput, out DurableWorkflowResult? outputResult)) { - (List events, _) = DrainNewEvents(outputResult.Events, lastReadEventIndex); + // The output carries the full, untrimmed event log starting at absolute index 0, + // so any events that scrolled out of the live window are backfilled here. + (List events, _) = DrainNewEvents(outputResult.Events, windowStartIndex: 0, lastReadEventIndex); foreach (WorkflowEvent evt in events) { yield return evt; @@ -285,14 +287,30 @@ await this._client.RaiseEventAsync( } /// - /// Deserializes and returns any events beyond from the list. + /// Deserializes and returns any events not yet read, given a published window that begins at + /// absolute index and the consumer's absolute + /// . /// - private static (List Events, int UpdatedIndex) DrainNewEvents(List serializedEvents, int lastReadIndex) + /// + /// When the consumer has fallen behind the window (its is older than + /// the window's first element), the missing events are not in this window. They are not skipped: the + /// index is left unchanged so they are recovered from the full workflow output at completion. + /// + private static (List Events, int UpdatedIndex) DrainNewEvents(List windowEvents, int windowStartIndex, int lastReadIndex) { List events = []; - while (lastReadIndex < serializedEvents.Count) + + // Consumer is behind the published window — defer the gap to the completion backfill. + if (lastReadIndex < windowStartIndex) + { + return (events, lastReadIndex); + } + + int relativeIndex = lastReadIndex - windowStartIndex; + while (relativeIndex < windowEvents.Count) { - string serializedEvent = serializedEvents[lastReadIndex]; + string serializedEvent = windowEvents[relativeIndex]; + relativeIndex++; lastReadIndex++; WorkflowEvent? workflowEvent = TryDeserializeEvent(serializedEvent); diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowLiveStatus.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowLiveStatus.cs index 5e381ce0eb2..7ea608094df 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowLiveStatus.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowLiveStatus.cs @@ -28,10 +28,29 @@ internal sealed class DurableWorkflowLiveStatus public List PendingEvents { get; set; } = []; /// - /// Gets or sets the serialized workflow events emitted so far. + /// Gets or sets the serialized workflow events currently published in the live status. /// + /// + /// This may be a bounded trailing window of the workflow's full event sequence rather than + /// every event emitted so far. Durable Functions caps custom status at 16 KB (UTF-16), so + /// older events are omitted once the cumulative size would exceed that ceiling. + /// gives the absolute position of the first element here, and the complete, untrimmed event log + /// is always available from the workflow output () at completion. + /// public List Events { get; set; } = []; + /// + /// Gets or sets the absolute index, within the workflow's full event sequence, of the first + /// element of . + /// + /// + /// This is 0 when carries the full log from the beginning. It is greater + /// than zero when only a trailing window is published (older events trimmed to respect the custom + /// status size limit). Consumers use it to map window positions back to absolute event indices so + /// no event is delivered twice or skipped. + /// + public int EventsStartIndex { get; set; } + /// /// Attempts to deserialize a serialized custom status string into a . /// diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowRunner.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowRunner.cs index b458bf98b08..398807e82e2 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowRunner.cs @@ -71,6 +71,19 @@ internal sealed class DurableWorkflowRunner { private const int MaxSupersteps = 100; + // Durable Functions caps orchestration custom status at 16 KB (UTF-16), i.e. 8192 .NET chars. + // We target a value comfortably below that so the JSON envelope, the eventsStartIndex field, and + // small estimation differences (e.g. the real serializer vs. our per-element estimate) cannot push + // the payload over the hard limit. + private const int LiveStatusTargetChars = 7600; + + // Fixed headroom reserved for the JSON envelope (property names, brackets, eventsStartIndex digits). + private const int LiveStatusEnvelopeBudgetChars = 256; + + // Per-pending-event overhead (JSON object braces, property names, quotes) added to the + // measured EventName/Input lengths when estimating the PendingEvents portion of the status. + private const int PendingEventOverheadChars = 64; + /// /// Initializes a new instance of the class. /// @@ -277,6 +290,13 @@ public SuperstepState(Workflow workflow, DurableEdgeMap edgeMap) /// public List AccumulatedEvents { get; } = []; + /// + /// Running estimate, in UTF-16 characters, of the serialized cost of + /// when written as a JSON string array. Used to decide cheaply whether the full event log still + /// fits within the custom status size budget before falling back to windowing. + /// + public int AccumulatedEventsCharCost { get; set; } + /// /// Workflow status published via SetCustomStatus so external clients can poll for streaming events and pending HITL requests. /// @@ -385,6 +405,10 @@ private static bool ProcessSuperstepResults( // Accumulate events for the durable workflow status (streaming) state.AccumulatedEvents.AddRange(resultInfo.Events); + foreach (string serializedEvent in resultInfo.Events) + { + state.AccumulatedEventsCharCost += SerializedElementCost(serializedEvent); + } // Check for halt request haltRequested |= resultInfo.HaltRequested; @@ -473,21 +497,116 @@ private static void ApplyClearedScopes(Dictionary shared, List for live streaming. /// /// - /// Custom status is the only orchestration state readable by external clients while - /// the orchestration is still running. It is cleared by the framework on completion, - /// so events are also included in for final retrieval. + /// + /// Custom status is the only orchestration state readable by external clients while the + /// orchestration is still running. Durable Functions caps it at 16 KB (UTF-16), so the + /// full cumulative event log cannot always be published live: a workflow with enough + /// executors and/or large typed outputs would otherwise overflow the cap and fail the + /// orchestration (see issue #5745). + /// + /// + /// To stay within the cap, only a bounded trailing window of the most recent events is + /// published, tagged with so the + /// consumer can map window positions to absolute indices. The complete, untrimmed log is + /// still returned in (the orchestration output is + /// not subject to the custom status cap) and is drained by the consumer at completion, so no + /// event is ever lost — events that scroll out of the live window are delivered at completion + /// instead of mid-run. + /// /// private static void PublishEventsToLiveStatus( TaskOrchestrationContext context, SuperstepState state) { - state.LiveStatus.Events = state.AccumulatedEvents; + (List windowEvents, int startIndex) = BuildEventWindow( + state.AccumulatedEvents, state.AccumulatedEventsCharCost, state.LiveStatus.PendingEvents); + state.LiveStatus.Events = windowEvents; + state.LiveStatus.EventsStartIndex = startIndex; // Pass the object directly — the framework's DataConverter handles serialization. // Pre-serializing would cause double-serialization (string wrapped in JSON quotes). context.SetCustomStatus(state.LiveStatus); } + /// + /// Selects the largest trailing window of whose + /// serialized size fits within the custom status budget, newest events first. + /// + /// + /// The window to publish and the absolute index of its first element. When even the single newest + /// event exceeds the budget, an empty window is returned with a start index equal to the event count, + /// so the oversized event is never written to custom status (it is still delivered via the output at + /// completion). + /// + internal static (List Window, int StartIndex) BuildEventWindow( + List accumulatedEvents, + int accumulatedEventsCharCost, + List pendingEvents) + { + List all = accumulatedEvents; + + // Reserve budget for the JSON envelope and the PendingEvents portion of the status. + int reserved = LiveStatusEnvelopeBudgetChars + EstimatePendingEventsCost(pendingEvents); + int eventsBudget = Math.Max(0, LiveStatusTargetChars - reserved); + + // Fast path: the entire event log still fits, so publish it from the beginning. + if (accumulatedEventsCharCost <= eventsBudget) + { + return (all, 0); + } + + // Otherwise include as many of the most recent events as fit, scanning newest to oldest. + int total = 0; + int start = all.Count; + for (int i = all.Count - 1; i >= 0; i--) + { + int cost = SerializedElementCost(all[i]); + if (total + cost > eventsBudget) + { + break; + } + + total += cost; + start = i; + } + + return start < all.Count + ? (all.GetRange(start, all.Count - start), start) + : ([], all.Count); + } + + /// + /// Estimates the serialized cost, in UTF-16 characters, of a single serialized event when written + /// as a JSON string element (escaped payload plus surrounding quotes and a separating comma). + /// + /// + /// Uses the default JSON escaping (matching the serializer that writes the custom status), so the + /// estimate is an upper bound on the actual contribution — never an underestimate that could lead to + /// an overflow. + /// + internal static int SerializedElementCost(string serializedEvent) + => JsonEncodedText.Encode(serializedEvent).Value.Length + 3; + + /// + /// Estimates the serialized cost, in UTF-16 characters, of the pending request ports carried in the + /// live status, so the event window leaves room for them. + /// + private static int EstimatePendingEventsCost(List pendingEvents) + { + if (pendingEvents.Count == 0) + { + return 0; + } + + int cost = 0; + foreach (PendingRequestPortStatus pending in pendingEvents) + { + cost += (pending.EventName?.Length ?? 0) + (pending.Input?.Length ?? 0) + PendingEventOverheadChars; + } + + return cost; + } + /// /// Routes executor output (explicit messages or return value) to successor executors. /// diff --git a/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableStreamingWorkflowRunTests.cs b/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableStreamingWorkflowRunTests.cs index 404ed3496d6..5ca01511eb2 100644 --- a/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableStreamingWorkflowRunTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableStreamingWorkflowRunTests.cs @@ -48,6 +48,12 @@ private static string SerializeCustomStatusWithPendingEvents( return JsonSerializer.Serialize(status, DurableSerialization.Options); } + private static string SerializeCustomStatusWindow(List events, int eventsStartIndex) + { + DurableWorkflowLiveStatus status = new() { Events = events, EventsStartIndex = eventsStartIndex }; + return JsonSerializer.Serialize(status, DurableSerialization.Options); + } + private static Workflow CreateTestWorkflowWithRequestPort(string requestPortId) { FunctionExecutor start = new("start", (_, _, _) => default); @@ -805,6 +811,141 @@ public void ExtractResult_CamelCaseSerializedObject_DeserializesToPascalCaseMemb #endregion + #region Windowed live status (issue #5745) + + [Fact] + public async Task WatchStreamAsync_SlidingWindow_MapsAbsoluteIndicesWithoutDuplicatesOrGapsAsync() + { + // Arrange — the producer publishes only a bounded trailing window of recent events, + // tagged with an absolute EventsStartIndex. A consumer that keeps up must still receive + // every event exactly once as the window slides forward. + List all = [.. Enumerable.Range(0, 6).Select(i => SerializeEvent(new DurableHaltRequestedEvent($"executor-{i}")))]; + + // Poll 1: window [0..2] @ start 0; Poll 2: window [2..4] @ start 2; Poll 3: window [4..5] @ start 4. + string window1 = SerializeCustomStatusWindow([all[0], all[1], all[2]], 0); + string window2 = SerializeCustomStatusWindow([all[2], all[3], all[4]], 2); + string window3 = SerializeCustomStatusWindow([all[4], all[5]], 4); + string serializedOutput = SerializeWorkflowResult("done", all); + + int callCount = 0; + Mock mockClient = new("test"); + mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny())) + .ReturnsAsync(() => + { + callCount++; + return callCount switch + { + 1 => CreateMetadata(OrchestrationRuntimeStatus.Running, serializedCustomStatus: window1), + 2 => CreateMetadata(OrchestrationRuntimeStatus.Running, serializedCustomStatus: window2), + 3 => CreateMetadata(OrchestrationRuntimeStatus.Running, serializedCustomStatus: window3), + _ => CreateMetadata(OrchestrationRuntimeStatus.Completed, serializedOutput: serializedOutput), + }; + }); + + DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow()); + + // Act + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + events.Add(evt); + } + + // Assert — all 6 halt events in order, exactly once, then completion. + Assert.Equal(7, events.Count); + for (int i = 0; i < 6; i++) + { + Assert.Equal($"executor-{i}", Assert.IsType(events[i]).ExecutorId); + } + + Assert.IsType(events[6]); + } + + [Fact] + public async Task WatchStreamAsync_ConsumerBehindWindow_RecoversAllEventsAtCompletionAsync() + { + // Arrange — the consumer's first read lands on a window that has already advanced past the + // earliest events (EventsStartIndex > 0). Those events are not in the window; they must be + // backfilled from the full output at completion, with nothing skipped. + List all = [.. Enumerable.Range(0, 6).Select(i => SerializeEvent(new DurableHaltRequestedEvent($"executor-{i}")))]; + + // Only the trailing window [3..5] is live; events 0..2 already scrolled out. + string window = SerializeCustomStatusWindow([all[3], all[4], all[5]], 3); + string serializedOutput = SerializeWorkflowResult("done", all); + + int callCount = 0; + Mock mockClient = new("test"); + mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny())) + .ReturnsAsync(() => + { + callCount++; + return callCount == 1 + ? CreateMetadata(OrchestrationRuntimeStatus.Running, serializedCustomStatus: window) + : CreateMetadata(OrchestrationRuntimeStatus.Completed, serializedOutput: serializedOutput); + }); + + DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow()); + + // Act + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + events.Add(evt); + } + + // Assert — every event delivered exactly once (deferred ones arrive at completion), in order. + Assert.Equal(7, events.Count); + for (int i = 0; i < 6; i++) + { + Assert.Equal($"executor-{i}", Assert.IsType(events[i]).ExecutorId); + } + + Assert.IsType(events[6]); + } + + [Fact] + public async Task WatchStreamAsync_EmptyWindowForOversizedEvent_DeliversAtCompletionAsync() + { + // Arrange — an event too large to fit the custom status budget is excluded from the live + // window entirely (empty Events, EventsStartIndex == event count), so the orchestration never + // overflows. The event must still be delivered via the output at completion. + List all = [.. Enumerable.Range(0, 3).Select(i => SerializeEvent(new DurableHaltRequestedEvent($"executor-{i}")))]; + + string emptyWindow = SerializeCustomStatusWindow([], all.Count); + string serializedOutput = SerializeWorkflowResult("done", all); + + int callCount = 0; + Mock mockClient = new("test"); + mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny())) + .ReturnsAsync(() => + { + callCount++; + return callCount == 1 + ? CreateMetadata(OrchestrationRuntimeStatus.Running, serializedCustomStatus: emptyWindow) + : CreateMetadata(OrchestrationRuntimeStatus.Completed, serializedOutput: serializedOutput); + }); + + DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow()); + + // Act + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + events.Add(evt); + } + + // Assert + Assert.Equal(4, events.Count); + for (int i = 0; i < 3; i++) + { + Assert.Equal($"executor-{i}", Assert.IsType(events[i]).ExecutorId); + } + + Assert.IsType(events[3]); + } + + #endregion + private sealed class TestPayload { public string? Name { get; set; } diff --git a/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableWorkflowRunnerEventWindowTests.cs b/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableWorkflowRunnerEventWindowTests.cs new file mode 100644 index 00000000000..4dc70a9159e --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableWorkflowRunnerEventWindowTests.cs @@ -0,0 +1,120 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Text.Json; +using Microsoft.Agents.AI.DurableTask.Workflows; + +namespace Microsoft.Agents.AI.DurableTask.UnitTests.Workflows; + +/// +/// Tests for the bounded live-status event window that keeps the orchestration custom status under the +/// Durable Functions 16 KB (UTF-16) cap. See issue #5745. +/// +public sealed class DurableWorkflowRunnerEventWindowTests +{ + // Durable Functions caps custom status at 16 KB UTF-16 == 8192 .NET chars. + private const int CustomStatusCharLimit = 8192; + + private static string MakeEvent(int index, int approxChars) + => $"{{\"index\":{index},\"data\":\"{new string('x', approxChars)}\"}}"; + + private static int CharCost(IEnumerable events) + => events.Sum(DurableWorkflowRunner.SerializedElementCost); + + private static int SerializedStatusLength(List window, int startIndex) + { + DurableWorkflowLiveStatus status = new() { Events = window, EventsStartIndex = startIndex }; + return JsonSerializer.Serialize(status, DurableSerialization.Options).Length; + } + + [Fact] + public void BuildEventWindow_WithinBudget_ReturnsFullListFromZero() + { + // Arrange — a handful of small events well under the budget. + List events = [.. Enumerable.Range(0, 5).Select(i => MakeEvent(i, 100))]; + + // Act + (List window, int startIndex) = DurableWorkflowRunner.BuildEventWindow(events, CharCost(events), []); + + // Assert — published in full, starting at absolute index 0. + Assert.Equal(0, startIndex); + Assert.Equal(events, window); + Assert.True(SerializedStatusLength(window, startIndex) <= CustomStatusCharLimit); + } + + [Fact] + public void BuildEventWindow_ExceedsBudget_TrimsOldestAndStaysUnderCap() + { + // Arrange — 40 events of ~1 KB each (~40 KB cumulative), far over the 16 KB custom status cap. + List events = [.. Enumerable.Range(0, 40).Select(i => MakeEvent(i, 1000))]; + + // Act + (List window, int startIndex) = DurableWorkflowRunner.BuildEventWindow(events, CharCost(events), []); + + // Assert — a non-empty trailing window that fits under the cap. + Assert.True(startIndex > 0, "Expected the oldest events to be trimmed."); + Assert.NotEmpty(window); + Assert.True( + SerializedStatusLength(window, startIndex) <= CustomStatusCharLimit, + "Published status must stay under the 16 KB custom status cap."); + + // The window is the contiguous tail ending at the most recent event. + Assert.Equal(events.GetRange(startIndex, events.Count - startIndex), window); + Assert.Equal(events[^1], window[^1]); + } + + [Fact] + public void BuildEventWindow_SingleOversizedEvent_ReturnsEmptyWindowAtEnd() + { + // Arrange — one event larger than the entire budget. It cannot be published live without + // overflowing the cap, so it must be excluded (delivered via the output at completion instead). + List events = [MakeEvent(0, 20_000)]; + + // Act + (List window, int startIndex) = DurableWorkflowRunner.BuildEventWindow(events, CharCost(events), []); + + // Assert — empty window, start index past the (excluded) event; status stays trivially under cap. + Assert.Empty(window); + Assert.Equal(events.Count, startIndex); + Assert.True(SerializedStatusLength(window, startIndex) <= CustomStatusCharLimit); + } + + [Fact] + public void BuildEventWindow_OversizedTailEventThenSmallEvents_ExcludesOversizedEvent() + { + // Arrange — a giant event in the middle followed by small ones. The window should include the + // recent small events but stop before the oversized one. + List events = + [ + MakeEvent(0, 100), + MakeEvent(1, 20_000), // oversized — cannot be carried live + MakeEvent(2, 100), + MakeEvent(3, 100), + ]; + + // Act + (List window, int startIndex) = DurableWorkflowRunner.BuildEventWindow(events, CharCost(events), []); + + // Assert — window starts after the oversized event. + Assert.Equal(2, startIndex); + Assert.Equal([events[2], events[3]], window); + Assert.True(SerializedStatusLength(window, startIndex) <= CustomStatusCharLimit); + } + + [Fact] + public void BuildEventWindow_LargePendingEvents_ShrinkWindowToLeaveRoom() + { + // Arrange — events that on their own would nearly fill the budget, plus a large pending request + // port input. The window must shrink so the combined status stays under the cap. + List events = [.. Enumerable.Range(0, 20).Select(i => MakeEvent(i, 500))]; + List pending = [new(EventName: "approval", Input: new string('p', 3000))]; + + // Act + (List window, int startIndex) = DurableWorkflowRunner.BuildEventWindow(events, CharCost(events), pending); + + // Assert — combined status (events window + pending events) stays under the cap. + DurableWorkflowLiveStatus status = new() { Events = window, EventsStartIndex = startIndex, PendingEvents = pending }; + int length = JsonSerializer.Serialize(status, DurableSerialization.Options).Length; + Assert.True(length <= CustomStatusCharLimit, $"Combined status length {length} exceeded cap."); + Assert.True(startIndex > 0, "Expected the window to shrink to make room for pending events."); + } +} From 159bae85a43ebcd8be7cb7c0f55eda38cd9975b9 Mon Sep 17 00:00:00 2001 From: Shyju Krishnankutty Date: Fri, 26 Jun 2026 12:12:18 -0700 Subject: [PATCH 2/3] Address PR review: fix format, conservative pending cost, request-port windowing - Add UTF-8 BOM to the new test file to satisfy the repo .editorconfig charset rule (fixes check-format). - EstimatePendingEventsCost now measures JSON-escaped length (JsonEncodedText.Encode) for EventName/Input so the reserve is conservative for quote/backslash-heavy payloads. - Re-window the live status on the RequestPort publish path (DurableExecutorDispatcher) via new DurableWorkflowRunner.TrimLiveStatusToBudget so adding a large pending input to a near-full window cannot overflow the 16 KB custom status cap. - Point the CHANGELOG bullet at the PR per the DurableTask area convention. Add covering tests for TrimLiveStatusToBudget. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../CHANGELOG.md | 2 +- .../Workflows/DurableExecutorDispatcher.cs | 5 +- .../Workflows/DurableWorkflowRunner.cs | 43 ++++++++++++++++- .../DurableWorkflowRunnerEventWindowTests.cs | 47 ++++++++++++++++++- 4 files changed, 92 insertions(+), 5 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/CHANGELOG.md b/dotnet/src/Microsoft.Agents.AI.DurableTask/CHANGELOG.md index b8e44109bb4..9459859a5ab 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/CHANGELOG.md +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/CHANGELOG.md @@ -2,7 +2,7 @@ ## [Unreleased] -- Bound the live workflow status to a trailing event window so multi-executor workflows with large typed outputs no longer overflow the Durable Functions 16 KB custom status cap ([#5745](https://github.com/microsoft/agent-framework/issues/5745)) +- Bound the live workflow status to a trailing event window so multi-executor workflows with large typed outputs no longer overflow the Durable Functions 16 KB custom status cap ([#6775](https://github.com/microsoft/agent-framework/pull/6775)) - Fix issue with resuming checkpoint after package version upgrade ([#6670](https://github.com/microsoft/agent-framework/pull/6670)) - Bind MCP threadId to the current agent and guard cross-agent session dispatch ([#6531](https://github.com/microsoft/agent-framework/pull/6531)) - Added support for durable workflows ([#4436](https://github.com/microsoft/agent-framework/pull/4436)) diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableExecutorDispatcher.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableExecutorDispatcher.cs index b2440cfd831..645a365eacb 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableExecutorDispatcher.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableExecutorDispatcher.cs @@ -113,8 +113,11 @@ private static async Task ExecuteRequestPortAsync( logger.LogWaitingForExternalEvent(eventName); - // Publish pending request so external clients can discover what input is needed + // Publish pending request so external clients can discover what input is needed. + // Adding a pending event grows the reserved budget, so re-window the events first to keep this + // direct status write within the Durable Functions custom status cap (issue #5745). liveStatus.PendingEvents.Add(new PendingRequestPortStatus(EventName: eventName, Input: input)); + DurableWorkflowRunner.TrimLiveStatusToBudget(liveStatus); context.SetCustomStatus(liveStatus); // Wait until the external actor raises the event diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowRunner.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowRunner.cs index 398807e82e2..45b1ecf3aaf 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowRunner.cs @@ -575,6 +575,37 @@ internal static (List Window, int StartIndex) BuildEventWindow( : ([], all.Count); } + /// + /// Re-windows an already-published live status in place so its events fit the custom status budget + /// alongside the current . + /// + /// + /// Used by code paths that mutate after the last + /// call (e.g. a request port adding a pending input). Adding a + /// pending event grows the reserved budget, which can push a previously-fitting event window over the + /// 16 KB cap. Trimming the trailing window here keeps the write within the limit while advancing + /// so the consumer still maps window positions + /// to absolute indices. Trimmed events remain available via the orchestration output at completion, so + /// none are lost. + /// + internal static void TrimLiveStatusToBudget(DurableWorkflowLiveStatus liveStatus) + { + int cost = 0; + foreach (string serializedEvent in liveStatus.Events) + { + cost += SerializedElementCost(serializedEvent); + } + + (List window, int relativeStart) = BuildEventWindow(liveStatus.Events, cost, liveStatus.PendingEvents); + if (relativeStart == 0 && window.Count == liveStatus.Events.Count) + { + return; + } + + liveStatus.EventsStartIndex += relativeStart; + liveStatus.Events = window; + } + /// /// Estimates the serialized cost, in UTF-16 characters, of a single serialized event when written /// as a JSON string element (escaped payload plus surrounding quotes and a separating comma). @@ -591,7 +622,13 @@ internal static int SerializedElementCost(string serializedEvent) /// Estimates the serialized cost, in UTF-16 characters, of the pending request ports carried in the /// live status, so the event window leaves room for them. /// - private static int EstimatePendingEventsCost(List pendingEvents) + /// + /// The is serialized request data; quotes, backslashes, + /// and control characters in it are escaped again when the live status is serialized. The cost is + /// therefore measured on the JSON-escaped length (matching ) so the + /// reserve is conservative and never undercounts. + /// + internal static int EstimatePendingEventsCost(List pendingEvents) { if (pendingEvents.Count == 0) { @@ -601,7 +638,9 @@ private static int EstimatePendingEventsCost(List pend int cost = 0; foreach (PendingRequestPortStatus pending in pendingEvents) { - cost += (pending.EventName?.Length ?? 0) + (pending.Input?.Length ?? 0) + PendingEventOverheadChars; + cost += JsonEncodedText.Encode(pending.EventName ?? string.Empty).Value.Length + + JsonEncodedText.Encode(pending.Input ?? string.Empty).Value.Length + + PendingEventOverheadChars; } return cost; diff --git a/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableWorkflowRunnerEventWindowTests.cs b/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableWorkflowRunnerEventWindowTests.cs index 4dc70a9159e..715e1a80e12 100644 --- a/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableWorkflowRunnerEventWindowTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableWorkflowRunnerEventWindowTests.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft. All rights reserved. +// Copyright (c) Microsoft. All rights reserved. using System.Text.Json; using Microsoft.Agents.AI.DurableTask.Workflows; @@ -117,4 +117,49 @@ public void BuildEventWindow_LargePendingEvents_ShrinkWindowToLeaveRoom() Assert.True(length <= CustomStatusCharLimit, $"Combined status length {length} exceeded cap."); Assert.True(startIndex > 0, "Expected the window to shrink to make room for pending events."); } + + [Fact] + public void TrimLiveStatusToBudget_WithinBudget_LeavesStatusUnchanged() + { + // Arrange — an already-published small window with no pending events. + List events = [.. Enumerable.Range(0, 5).Select(i => MakeEvent(i, 100))]; + DurableWorkflowLiveStatus status = new() { Events = events, EventsStartIndex = 0 }; + + // Act + DurableWorkflowRunner.TrimLiveStatusToBudget(status); + + // Assert — nothing trimmed. + Assert.Equal(0, status.EventsStartIndex); + Assert.Equal(events, status.Events); + Assert.True(SerializedStatusLength(status.Events, status.EventsStartIndex) <= CustomStatusCharLimit); + } + + [Fact] + public void TrimLiveStatusToBudget_LargePendingAddedToPublishedWindow_ShrinksAndStaysUnderCap() + { + // Arrange — a previously-published trailing window that fit the budget on its own and is already + // offset (absolute indices start at 6), mirroring PublishEventsToLiveStatus output. A request port + // then adds a large pending input after that publish — the direct-write path from issue #5745. + const int InitialStart = 6; + List events = [.. Enumerable.Range(0, 20).Select(i => MakeEvent(InitialStart + i, 300))]; + DurableWorkflowLiveStatus status = new() + { + Events = events, + EventsStartIndex = InitialStart, + PendingEvents = [new(EventName: "approval", Input: new string('p', 4000))], + }; + + // Act + DurableWorkflowRunner.TrimLiveStatusToBudget(status); + + // Assert — the combined status (events window + pending) stays under the cap, the window shrank to + // make room, and the absolute start index advanced past where it began. + int length = JsonSerializer.Serialize(status, DurableSerialization.Options).Length; + Assert.True(length <= CustomStatusCharLimit, $"Combined status length {length} exceeded cap."); + Assert.True(status.EventsStartIndex > InitialStart, "Expected the window to shrink and advance the start index."); + Assert.NotEmpty(status.Events); + + // The retained events are still the contiguous tail ending at the most recent event. + Assert.Equal(events[^1], status.Events[^1]); + } } From 61ad23bf3b4bda06559bf3197767da021647d2a4 Mon Sep 17 00:00:00 2001 From: Shyju Krishnankutty Date: Fri, 26 Jun 2026 12:35:09 -0700 Subject: [PATCH 3/3] Clarify SerializedElementCost escaping doc (PR #6775 review) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../Workflows/DurableWorkflowRunner.cs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowRunner.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowRunner.cs index 45b1ecf3aaf..0da6b9aab11 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowRunner.cs @@ -611,9 +611,11 @@ internal static void TrimLiveStatusToBudget(DurableWorkflowLiveStatus liveStatus /// as a JSON string element (escaped payload plus surrounding quotes and a separating comma). /// /// - /// Uses the default JSON escaping (matching the serializer that writes the custom status), so the - /// estimate is an upper bound on the actual contribution — never an underestimate that could lead to - /// an overflow. + /// Uses with the + /// default (strict) encoder. This is intentionally stricter than the serializer that actually writes the custom + /// status, which uses and so + /// escapes fewer characters. Because the strict encoder never produces a shorter result, the estimate is an upper + /// bound on the actual contribution — never an underestimate that could lead to an overflow. /// internal static int SerializedElementCost(string serializedEvent) => JsonEncodedText.Encode(serializedEvent).Value.Length + 3;