Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public sealed class GatewayHub : Hub<IGatewayHubClient>
private readonly IAskUserResponseRegistry? _askUserResponseRegistry;
private readonly IOptionsMonitor<CompactionOptions> _compactionOptions;
private readonly ISessionEndMemoryFlusher? _sessionEndFlusher;
private readonly ISubAgentManager? _subAgentManager;
private readonly ILogger<GatewayHub> _logger;

public GatewayHub(
Expand All @@ -59,7 +60,8 @@ public GatewayHub(
ILogger<GatewayHub> logger,
IConversationStore? conversationStore = null,
IAskUserResponseRegistry? askUserResponseRegistry = null,
ISessionEndMemoryFlusher? sessionEndFlusher = null)
ISessionEndMemoryFlusher? sessionEndFlusher = null,
ISubAgentManager? subAgentManager = null)
{
_supervisor = supervisor;
_registry = registry;
Expand All @@ -75,6 +77,7 @@ public GatewayHub(
_conversationStore = conversationStore;
_askUserResponseRegistry = askUserResponseRegistry;
_sessionEndFlusher = sessionEndFlusher;
_subAgentManager = subAgentManager;
}

/// <summary>
Expand Down Expand Up @@ -480,6 +483,24 @@ public async Task ResetSession(AgentId agentId, SessionId sessionId)
var typedSessionId = NormalizeSessionId(sessionId);
await _supervisor.StopAsync(typedAgentId, typedSessionId, CancellationToken.None);

// Clean up any orphaned sub-agent sessions spawned by this session
if (_subAgentManager is not null)
{
try
{
var cleaned = await _subAgentManager.CleanupChildSessionsAsync(typedSessionId, CancellationToken.None);
if (cleaned > 0)
_logger.LogInformation(
"Cleaned up {Count} orphaned sub-agent session(s) for parent session '{SessionId}'.",
cleaned,
typedSessionId);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Sub-agent cleanup failed for session '{SessionId}', reset will proceed.", typedSessionId);
}
}

// Phase 2: session-end memory flush before archiving
if (_sessionEndFlusher is not null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,5 +415,14 @@ public Task<bool> KillAsync(string subAgentId, SessionId requestingSessionId, Ca
/// <returns>The on completed async result.</returns>
public Task OnCompletedAsync(string subAgentId, string resultSummary, CancellationToken ct = default)
=> Task.CompletedTask;

/// <summary>
/// Executes cleanup child sessions async.
/// </summary>
/// <param name="parentSessionId">The parent session id.</param>
/// <param name="ct">The ct.</param>
/// <returns>Always returns 0 (no-op).</returns>
public Task<int> CleanupChildSessionsAsync(SessionId parentSessionId, CancellationToken ct = default)
=> Task.FromResult(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,13 @@ public interface ISubAgentManager
/// <param name="ct">A cancellation token that can be used to cancel the operation.</param>
/// <returns>A task representing the asynchronous operation.</returns>
Task OnCompletedAsync(string subAgentId, string resultSummary, CancellationToken ct = default);

/// <summary>
/// Kills all running sub-agents for the specified parent session and archives their sessions.
/// Called when a parent session is reset to prevent orphaned sub-agent sessions.
/// </summary>
/// <param name="parentSessionId">The parent session whose sub-agents should be cleaned up.</param>
/// <param name="ct">A cancellation token that can be used to cancel the operation.</param>
/// <returns>The number of sub-agent sessions that were cleaned up.</returns>
Task<int> CleanupChildSessionsAsync(SessionId parentSessionId, CancellationToken ct = default);
}
63 changes: 63 additions & 0 deletions src/gateway/BotNexus.Gateway/Agents/DefaultSubAgentManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,69 @@ await _dispatcher.DispatchAsync(new InboundMessage
await CleanupChildAgentAsync(subAgentId, updated.ChildSessionId, CancellationToken.None);
}
}
/// <inheritdoc />
public async Task<int> CleanupChildSessionsAsync(SessionId parentSessionId, CancellationToken ct = default)
{
if (!_parentChildren.TryGetValue(parentSessionId, out var subAgentIds))
return 0;

var cleaned = 0;
foreach (var subAgentId in subAgentIds.ToArray())
{
if (!_subAgents.TryGetValue(subAgentId, out var info))
continue;

// Cancel the timeout CTS to stop a running sub-agent
if (_timeouts.TryRemove(subAgentId, out var timeoutCts))
{
try { timeoutCts.Cancel(); }
catch (ObjectDisposedException) { }
timeoutCts.Dispose();
}

// Clean up agent resources
await CleanupChildAgentAsync(subAgentId, info.ChildSessionId, ct);

// Mark as killed if still running
TryUpdateSubAgent(
subAgentId,
current => current.Status is SubAgentStatus.Running
? current with
{
Status = SubAgentStatus.Killed,
CompletedAt = DateTimeOffset.UtcNow,
ResultSummary = "Sub-agent killed: parent session was reset."
}
: current);

// Archive the child session in the session store
if (_sessionStore is not null)
{
try
{
await _sessionStore.ArchiveAsync(info.ChildSessionId, ct);
}
catch (Exception ex)
{
_logger.LogWarning(
ex,
"Failed archiving orphaned child session '{ChildSessionId}' for sub-agent '{SubAgentId}'.",
info.ChildSessionId,
subAgentId);
}
}

cleaned++;
_logger.LogInformation(
"Cleaned up orphaned sub-agent '{SubAgentId}' (child session '{ChildSessionId}') after parent session '{ParentSessionId}' reset.",
subAgentId,
info.ChildSessionId,
parentSessionId);
}

return cleaned;
}

private async Task RunSubAgentAsync(string subAgentId, IAgentHandle handle, string task, int timeoutSeconds, string? inheritedConversationId = null)
{
if (!_timeouts.TryGetValue(subAgentId, out var timeoutCts))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,26 @@ public async Task OnCompletedAsync(string subAgentId, string resultSummary, Canc
await parentHandle.FollowUpAsync($"Sub-agent {subAgentId} completed: {resultSummary}", ct);
}

public Task<int> CleanupChildSessionsAsync(SessionId parentSessionId, CancellationToken ct = default)
{
var killed = entries.Values
.Where(e => e.Info.ParentSessionId.Value.Equals(parentSessionId.Value, StringComparison.Ordinal))
.ToList();

foreach (var runtime in killed)
{
runtime.LifetimeCts.Cancel();
UpdateInfo(runtime.Info.SubAgentId, current => current with
{
Status = SubAgentStatus.Killed,
CompletedAt = DateTimeOffset.UtcNow,
ResultSummary = "Sub-agent killed: parent session was reset."
});
}

return Task.FromResult(killed.Count);
}

private async Task MonitorRunAsync(RuntimeEntry runtime, string task, int timeoutSeconds)
{
using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSeconds));
Expand Down
186 changes: 186 additions & 0 deletions tests/gateway/BotNexus.Gateway.Tests/Agents/SubAgentCleanupTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
using BotNexus.Domain.Primitives;
using BotNexus.Gateway.Abstractions.Agents;
using BotNexus.Gateway.Abstractions.Models;
using BotNexus.Gateway.Abstractions.Sessions;
using BotNexus.Gateway.Agents;
using BotNexus.Gateway.Configuration;
using Microsoft.Extensions.Logging.Abstractions;
using Moq;

namespace BotNexus.Gateway.Tests.Agents;

/// <summary>
/// Tests for DefaultSubAgentManager.CleanupChildSessionsAsync -- prevents orphaned sub-agent
/// sessions when a parent session is reset.
/// </summary>
public sealed class SubAgentCleanupTests
{
[Fact]
public async Task CleanupChildSessionsAsync_ReturnsZero_WhenNoChildrenExist()
{
var manager = CreateManager();

var count = await manager.CleanupChildSessionsAsync(SessionId.From("orphan-session"));

count.ShouldBe(0);
}

[Fact]
public async Task CleanupChildSessionsAsync_ArchivesChildSession_WhenChildExists()
{
var sessionStore = new Mock<ISessionStore>();
sessionStore.Setup(s => s.ArchiveAsync(It.IsAny<SessionId>(), It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask);

var childHandle = CreateInstantHandle();
var supervisor = new Mock<IAgentSupervisor>();
supervisor
.Setup(s => s.GetOrCreateAsync(It.IsAny<AgentId>(), It.IsAny<SessionId>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(childHandle.Object);
supervisor
.Setup(s => s.StopAsync(It.IsAny<AgentId>(), It.IsAny<SessionId>(), It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask);

var manager = CreateManager(supervisor.Object, sessionStore: sessionStore.Object);

var parentSessionId = SessionId.From("parent-session");
await manager.SpawnAsync(new SubAgentSpawnRequest
{
ParentAgentId = AgentId.From("parent-agent"),
ParentSessionId = parentSessionId,
Task = "do something",
TimeoutSeconds = 600
});

// Wait briefly for the background task to start
await Task.Delay(50);

var count = await manager.CleanupChildSessionsAsync(parentSessionId);

count.ShouldBe(1);
sessionStore.Verify(s => s.ArchiveAsync(It.IsAny<SessionId>(), It.IsAny<CancellationToken>()), Times.AtLeastOnce);
}

[Fact]
public async Task CleanupChildSessionsAsync_IsNonFatal_WhenSessionArchiveFails()
{
var sessionStore = new Mock<ISessionStore>();
sessionStore.Setup(s => s.ArchiveAsync(It.IsAny<SessionId>(), It.IsAny<CancellationToken>()))
.ThrowsAsync(new InvalidOperationException("session store unavailable"));

var childHandle = CreateInstantHandle();
var supervisor = new Mock<IAgentSupervisor>();
supervisor
.Setup(s => s.GetOrCreateAsync(It.IsAny<AgentId>(), It.IsAny<SessionId>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(childHandle.Object);
supervisor
.Setup(s => s.StopAsync(It.IsAny<AgentId>(), It.IsAny<SessionId>(), It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask);

var manager = CreateManager(supervisor.Object, sessionStore: sessionStore.Object);

var parentSessionId = SessionId.From("parent-session");
await manager.SpawnAsync(new SubAgentSpawnRequest
{
ParentAgentId = AgentId.From("parent-agent"),
ParentSessionId = parentSessionId,
Task = "do something",
TimeoutSeconds = 600
});

await Task.Delay(50);

// Should not throw even if archive fails
var count = await manager.CleanupChildSessionsAsync(parentSessionId);
count.ShouldBe(1);
}

[Fact]
public async Task CleanupChildSessionsAsync_DoesNotCleanOtherParentSessions()
{
var sessionStore = new Mock<ISessionStore>();
sessionStore.Setup(s => s.ArchiveAsync(It.IsAny<SessionId>(), It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask);

var childHandle = CreateInstantHandle();
var supervisor = new Mock<IAgentSupervisor>();
supervisor
.Setup(s => s.GetOrCreateAsync(It.IsAny<AgentId>(), It.IsAny<SessionId>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(childHandle.Object);
supervisor
.Setup(s => s.StopAsync(It.IsAny<AgentId>(), It.IsAny<SessionId>(), It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask);

var manager = CreateManager(supervisor.Object, sessionStore: sessionStore.Object);

var parentSessionA = SessionId.From("parent-session-a");
var parentSessionB = SessionId.From("parent-session-b");

await manager.SpawnAsync(new SubAgentSpawnRequest
{
ParentAgentId = AgentId.From("parent-agent"),
ParentSessionId = parentSessionA,
Task = "task A",
TimeoutSeconds = 600
});
await manager.SpawnAsync(new SubAgentSpawnRequest
{
ParentAgentId = AgentId.From("parent-agent"),
ParentSessionId = parentSessionB,
Task = "task B",
TimeoutSeconds = 600
});

await Task.Delay(50);

// Only clean up session A
var count = await manager.CleanupChildSessionsAsync(parentSessionA);

count.ShouldBe(1);

// Session B's child should still be trackable
var remaining = await manager.ListAsync(parentSessionB);
remaining.ShouldNotBeEmpty();
}

private static DefaultSubAgentManager CreateManager(
IAgentSupervisor? supervisor = null,
ISessionStore? sessionStore = null)
{
var registry = new Mock<IAgentRegistry>();
registry.Setup(r => r.Get(It.IsAny<AgentId>())).Returns(new AgentDescriptor
{
AgentId = AgentId.From("parent-agent"),
DisplayName = "Parent Agent",
ModelId = "gpt-4.1",
ApiProvider = "copilot"
});
registry.Setup(r => r.Contains(It.IsAny<AgentId>())).Returns(false);
registry.Setup(r => r.Register(It.IsAny<AgentDescriptor>()));
registry.Setup(r => r.Unregister(It.IsAny<AgentId>()));

var usedSupervisor = supervisor ?? Mock.Of<IAgentSupervisor>();
var activity = Mock.Of<BotNexus.Gateway.Abstractions.Activity.IActivityBroadcaster>();
var dispatcher = Mock.Of<BotNexus.Gateway.Abstractions.Channels.IChannelDispatcher>();

return new DefaultSubAgentManager(
usedSupervisor,
registry.Object,
activity,
dispatcher,
new TestOptionsMonitor<GatewayOptions>(new GatewayOptions()),
NullLogger<DefaultSubAgentManager>.Instance,
sessionStore: sessionStore);
}

private static Mock<IAgentHandle> CreateInstantHandle()
{
var handle = new Mock<IAgentHandle>();
handle.SetupGet(h => h.AgentId).Returns(AgentId.From("child-agent"));
handle.SetupGet(h => h.SessionId).Returns(SessionId.From("child-session"));
handle.SetupGet(h => h.IsRunning).Returns(false);
handle.Setup(h => h.PromptAsync(It.IsAny<string>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(new AgentResponse { Content = "done" });
return handle;
}
}
Loading
Loading