Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dotnet/src/Microsoft.Agents.AI.DurableTask/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## [Unreleased]

- Bound the live workflow status to a trailing event window so multi-executor workflows with large typed outputs no longer overflow the Durable Functions 16 KB custom status cap ([#6775](https://github.com/microsoft/agent-framework/pull/6775))
- Fix issue with resuming checkpoint after package version upgrade ([#6670](https://github.com/microsoft/agent-framework/pull/6670))
- Bind MCP threadId to the current agent and guard cross-agent session dispatch ([#6531](https://github.com/microsoft/agent-framework/pull/6531))
- Added support for durable workflows ([#4436](https://github.com/microsoft/agent-framework/pull/4436))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,11 @@ private static async Task<string> ExecuteRequestPortAsync(

logger.LogWaitingForExternalEvent(eventName);

// Publish pending request so external clients can discover what input is needed
// Publish pending request so external clients can discover what input is needed.
// Adding a pending event grows the reserved budget, so re-window the events first to keep this
// direct status write within the Durable Functions custom status cap (issue #5745).
liveStatus.PendingEvents.Add(new PendingRequestPortStatus(EventName: eventName, Input: input));
DurableWorkflowRunner.TrimLiveStatusToBudget(liveStatus);
context.SetCustomStatus(liveStatus);

// Wait until the external actor raises the event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private async IAsyncEnumerable<WorkflowEvent> WatchStreamAsync(
{
if (DurableWorkflowLiveStatus.TryParse(metadata.SerializedCustomStatus, out DurableWorkflowLiveStatus liveStatus))
{
(List<WorkflowEvent> events, lastReadEventIndex) = DrainNewEvents(liveStatus.Events, lastReadEventIndex);
(List<WorkflowEvent> events, lastReadEventIndex) = DrainNewEvents(liveStatus.Events, liveStatus.EventsStartIndex, lastReadEventIndex);
foreach (WorkflowEvent evt in events)
{
hasNewEvents = true;
Expand Down Expand Up @@ -175,7 +175,9 @@ private async IAsyncEnumerable<WorkflowEvent> WatchStreamAsync(
// SerializedOutput as a DurableWorkflowResult wrapper.
if (TryParseWorkflowResult(metadata.SerializedOutput, out DurableWorkflowResult? outputResult))
{
(List<WorkflowEvent> events, _) = DrainNewEvents(outputResult.Events, lastReadEventIndex);
// The output carries the full, untrimmed event log starting at absolute index 0,
// so any events that scrolled out of the live window are backfilled here.
(List<WorkflowEvent> events, _) = DrainNewEvents(outputResult.Events, windowStartIndex: 0, lastReadEventIndex);
foreach (WorkflowEvent evt in events)
{
yield return evt;
Expand Down Expand Up @@ -285,14 +287,30 @@ await this._client.RaiseEventAsync(
}

/// <summary>
/// Deserializes and returns any events beyond <paramref name="lastReadIndex"/> from the list.
/// Deserializes and returns any events not yet read, given a published window that begins at
/// absolute index <paramref name="windowStartIndex"/> and the consumer's absolute
/// <paramref name="lastReadIndex"/>.
/// </summary>
private static (List<WorkflowEvent> Events, int UpdatedIndex) DrainNewEvents(List<string> serializedEvents, int lastReadIndex)
/// <remarks>
/// When the consumer has fallen behind the window (its <paramref name="lastReadIndex"/> is older than
/// the window's first element), the missing events are not in this window. They are not skipped: the
/// index is left unchanged so they are recovered from the full workflow output at completion.
/// </remarks>
private static (List<WorkflowEvent> Events, int UpdatedIndex) DrainNewEvents(List<string> windowEvents, int windowStartIndex, int lastReadIndex)
{
List<WorkflowEvent> events = [];
while (lastReadIndex < serializedEvents.Count)

// Consumer is behind the published window — defer the gap to the completion backfill.
if (lastReadIndex < windowStartIndex)
{
return (events, lastReadIndex);
}

int relativeIndex = lastReadIndex - windowStartIndex;
while (relativeIndex < windowEvents.Count)
{
string serializedEvent = serializedEvents[lastReadIndex];
string serializedEvent = windowEvents[relativeIndex];
relativeIndex++;
lastReadIndex++;

WorkflowEvent? workflowEvent = TryDeserializeEvent(serializedEvent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,29 @@ internal sealed class DurableWorkflowLiveStatus
public List<PendingRequestPortStatus> PendingEvents { get; set; } = [];

/// <summary>
/// Gets or sets the serialized workflow events emitted so far.
/// Gets or sets the serialized workflow events currently published in the live status.
/// </summary>
/// <remarks>
/// This may be a bounded trailing window of the workflow's full event sequence rather than
/// every event emitted so far. Durable Functions caps custom status at 16&#160;KB (UTF-16), so
/// older events are omitted once the cumulative size would exceed that ceiling. <see cref="EventsStartIndex"/>
/// gives the absolute position of the first element here, and the complete, untrimmed event log
/// is always available from the workflow output (<see cref="DurableWorkflowResult.Events"/>) at completion.
/// </remarks>
public List<string> Events { get; set; } = [];

/// <summary>
/// Gets or sets the absolute index, within the workflow's full event sequence, of the first
/// element of <see cref="Events"/>.
/// </summary>
/// <remarks>
/// This is <c>0</c> when <see cref="Events"/> carries the full log from the beginning. It is greater
/// than zero when only a trailing window is published (older events trimmed to respect the custom
/// status size limit). Consumers use it to map window positions back to absolute event indices so
/// no event is delivered twice or skipped.
/// </remarks>
public int EventsStartIndex { get; set; }

/// <summary>
/// Attempts to deserialize a serialized custom status string into a <see cref="DurableWorkflowLiveStatus"/>.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,19 @@ internal sealed class DurableWorkflowRunner
{
private const int MaxSupersteps = 100;

// Durable Functions caps orchestration custom status at 16 KB (UTF-16), i.e. 8192 .NET chars.
// We target a value comfortably below that so the JSON envelope, the eventsStartIndex field, and
// small estimation differences (e.g. the real serializer vs. our per-element estimate) cannot push
// the payload over the hard limit.
private const int LiveStatusTargetChars = 7600;

// Fixed headroom reserved for the JSON envelope (property names, brackets, eventsStartIndex digits).
private const int LiveStatusEnvelopeBudgetChars = 256;

// Per-pending-event overhead (JSON object braces, property names, quotes) added to the
// measured EventName/Input lengths when estimating the PendingEvents portion of the status.
private const int PendingEventOverheadChars = 64;

/// <summary>
/// Initializes a new instance of the <see cref="DurableWorkflowRunner"/> class.
/// </summary>
Expand Down Expand Up @@ -277,6 +290,13 @@ public SuperstepState(Workflow workflow, DurableEdgeMap edgeMap)
/// </summary>
public List<string> AccumulatedEvents { get; } = [];

/// <summary>
/// Running estimate, in UTF-16 characters, of the serialized cost of <see cref="AccumulatedEvents"/>
/// when written as a JSON string array. Used to decide cheaply whether the full event log still
/// fits within the custom status size budget before falling back to windowing.
/// </summary>
public int AccumulatedEventsCharCost { get; set; }

/// <summary>
/// Workflow status published via <c>SetCustomStatus</c> so external clients can poll for streaming events and pending HITL requests.
/// </summary>
Expand Down Expand Up @@ -385,6 +405,10 @@ private static bool ProcessSuperstepResults(

// Accumulate events for the durable workflow status (streaming)
state.AccumulatedEvents.AddRange(resultInfo.Events);
foreach (string serializedEvent in resultInfo.Events)
{
state.AccumulatedEventsCharCost += SerializedElementCost(serializedEvent);
}

// Check for halt request
haltRequested |= resultInfo.HaltRequested;
Expand Down Expand Up @@ -473,21 +497,157 @@ private static void ApplyClearedScopes(Dictionary<string, string> shared, List<s
/// making them available to <see cref="DurableStreamingWorkflowRun"/> for live streaming.
/// </summary>
/// <remarks>
/// Custom status is the only orchestration state readable by external clients while
/// the orchestration is still running. It is cleared by the framework on completion,
/// so events are also included in <see cref="DurableWorkflowResult"/> for final retrieval.
/// <para>
/// Custom status is the only orchestration state readable by external clients while the
/// orchestration is still running. Durable Functions caps it at 16&#160;KB (UTF-16), so the
/// full cumulative event log cannot always be published live: a workflow with enough
/// executors and/or large typed outputs would otherwise overflow the cap and fail the
/// orchestration (see issue #5745).
/// </para>
/// <para>
/// To stay within the cap, only a bounded trailing window of the most recent events is
/// published, tagged with <see cref="DurableWorkflowLiveStatus.EventsStartIndex"/> so the
/// consumer can map window positions to absolute indices. The complete, untrimmed log is
/// still returned in <see cref="DurableWorkflowResult.Events"/> (the orchestration output is
/// not subject to the custom status cap) and is drained by the consumer at completion, so no
/// event is ever lost — events that scroll out of the live window are delivered at completion
/// instead of mid-run.
/// </para>
/// </remarks>
private static void PublishEventsToLiveStatus(
TaskOrchestrationContext context,
SuperstepState state)
{
state.LiveStatus.Events = state.AccumulatedEvents;
(List<string> windowEvents, int startIndex) = BuildEventWindow(
state.AccumulatedEvents, state.AccumulatedEventsCharCost, state.LiveStatus.PendingEvents);
Comment thread
kshyju marked this conversation as resolved.
state.LiveStatus.Events = windowEvents;
state.LiveStatus.EventsStartIndex = startIndex;

// Pass the object directly — the framework's DataConverter handles serialization.
// Pre-serializing would cause double-serialization (string wrapped in JSON quotes).
context.SetCustomStatus(state.LiveStatus);
}

/// <summary>
/// Selects the largest trailing window of <see cref="SuperstepState.AccumulatedEvents"/> whose
/// serialized size fits within the custom status budget, newest events first.
/// </summary>
/// <returns>
/// The window to publish and the absolute index of its first element. When even the single newest
/// event exceeds the budget, an empty window is returned with a start index equal to the event count,
/// so the oversized event is never written to custom status (it is still delivered via the output at
/// completion).
/// </returns>
internal static (List<string> Window, int StartIndex) BuildEventWindow(
List<string> accumulatedEvents,
int accumulatedEventsCharCost,
List<PendingRequestPortStatus> pendingEvents)
{
List<string> all = accumulatedEvents;

// Reserve budget for the JSON envelope and the PendingEvents portion of the status.
int reserved = LiveStatusEnvelopeBudgetChars + EstimatePendingEventsCost(pendingEvents);
int eventsBudget = Math.Max(0, LiveStatusTargetChars - reserved);

// Fast path: the entire event log still fits, so publish it from the beginning.
if (accumulatedEventsCharCost <= eventsBudget)
{
return (all, 0);
}

// Otherwise include as many of the most recent events as fit, scanning newest to oldest.
int total = 0;
int start = all.Count;
for (int i = all.Count - 1; i >= 0; i--)
{
int cost = SerializedElementCost(all[i]);
if (total + cost > eventsBudget)
{
break;
}

total += cost;
start = i;
}

return start < all.Count
? (all.GetRange(start, all.Count - start), start)
: ([], all.Count);
}

/// <summary>
/// Re-windows an already-published live status in place so its events fit the custom status budget
/// alongside the current <see cref="DurableWorkflowLiveStatus.PendingEvents"/>.
/// </summary>
/// <remarks>
/// Used by code paths that mutate <see cref="DurableWorkflowLiveStatus.PendingEvents"/> after the last
/// <see cref="PublishEventsToLiveStatus"/> call (e.g. a request port adding a pending input). Adding a
/// pending event grows the reserved budget, which can push a previously-fitting event window over the
/// 16&#160;KB cap. Trimming the trailing window here keeps the write within the limit while advancing
/// <see cref="DurableWorkflowLiveStatus.EventsStartIndex"/> so the consumer still maps window positions
/// to absolute indices. Trimmed events remain available via the orchestration output at completion, so
/// none are lost.
/// </remarks>
internal static void TrimLiveStatusToBudget(DurableWorkflowLiveStatus liveStatus)
{
int cost = 0;
foreach (string serializedEvent in liveStatus.Events)
{
cost += SerializedElementCost(serializedEvent);
}

(List<string> window, int relativeStart) = BuildEventWindow(liveStatus.Events, cost, liveStatus.PendingEvents);
if (relativeStart == 0 && window.Count == liveStatus.Events.Count)
{
return;
}

liveStatus.EventsStartIndex += relativeStart;
liveStatus.Events = window;
}

/// <summary>
/// Estimates the serialized cost, in UTF-16 characters, of a single serialized event when written
/// as a JSON string element (escaped payload plus surrounding quotes and a separating comma).
/// </summary>
/// <remarks>
/// Uses <see cref="JsonEncodedText.Encode(string, System.Text.Encodings.Web.JavaScriptEncoder?)"/> with the
/// default (strict) encoder. This is intentionally stricter than the serializer that actually writes the custom
/// status, which uses <see cref="System.Text.Encodings.Web.JavaScriptEncoder.UnsafeRelaxedJsonEscaping"/> and so
/// escapes fewer characters. Because the strict encoder never produces a shorter result, the estimate is an upper
/// bound on the actual contribution — never an underestimate that could lead to an overflow.
/// </remarks>
internal static int SerializedElementCost(string serializedEvent)
=> JsonEncodedText.Encode(serializedEvent).Value.Length + 3;

/// <summary>
/// Estimates the serialized cost, in UTF-16 characters, of the pending request ports carried in the
/// live status, so the event window leaves room for them.
/// </summary>
/// <remarks>
/// The <see cref="PendingRequestPortStatus.Input"/> is serialized request data; quotes, backslashes,
/// and control characters in it are escaped again when the live status is serialized. The cost is
/// therefore measured on the JSON-escaped length (matching <see cref="SerializedElementCost"/>) so the
/// reserve is conservative and never undercounts.
/// </remarks>
internal static int EstimatePendingEventsCost(List<PendingRequestPortStatus> pendingEvents)
{
if (pendingEvents.Count == 0)
{
return 0;
}

int cost = 0;
foreach (PendingRequestPortStatus pending in pendingEvents)
{
cost += JsonEncodedText.Encode(pending.EventName ?? string.Empty).Value.Length
+ JsonEncodedText.Encode(pending.Input ?? string.Empty).Value.Length
+ PendingEventOverheadChars;
}
Comment thread
Copilot marked this conversation as resolved.

return cost;
}

/// <summary>
/// Routes executor output (explicit messages or return value) to successor executors.
/// </summary>
Expand Down
Loading
Loading