diff --git a/src/extensions/BotNexus.Extensions.Channels.SignalR/GatewayHub.cs b/src/extensions/BotNexus.Extensions.Channels.SignalR/GatewayHub.cs index 70dbdf2f..2836130c 100644 --- a/src/extensions/BotNexus.Extensions.Channels.SignalR/GatewayHub.cs +++ b/src/extensions/BotNexus.Extensions.Channels.SignalR/GatewayHub.cs @@ -43,6 +43,7 @@ public sealed class GatewayHub : Hub private readonly IAskUserResponseRegistry? _askUserResponseRegistry; private readonly IOptionsMonitor _compactionOptions; private readonly ISessionEndMemoryFlusher? _sessionEndFlusher; + private readonly ISubAgentManager? _subAgentManager; private readonly ILogger _logger; public GatewayHub( @@ -59,7 +60,8 @@ public GatewayHub( ILogger logger, IConversationStore? conversationStore = null, IAskUserResponseRegistry? askUserResponseRegistry = null, - ISessionEndMemoryFlusher? sessionEndFlusher = null) + ISessionEndMemoryFlusher? sessionEndFlusher = null, + ISubAgentManager? subAgentManager = null) { _supervisor = supervisor; _registry = registry; @@ -75,6 +77,7 @@ public GatewayHub( _conversationStore = conversationStore; _askUserResponseRegistry = askUserResponseRegistry; _sessionEndFlusher = sessionEndFlusher; + _subAgentManager = subAgentManager; } /// @@ -480,6 +483,24 @@ public async Task ResetSession(AgentId agentId, SessionId sessionId) var typedSessionId = NormalizeSessionId(sessionId); await _supervisor.StopAsync(typedAgentId, typedSessionId, CancellationToken.None); + // Clean up any orphaned sub-agent sessions spawned by this session + if (_subAgentManager is not null) + { + try + { + var cleaned = await _subAgentManager.CleanupChildSessionsAsync(typedSessionId, CancellationToken.None); + if (cleaned > 0) + _logger.LogInformation( + "Cleaned up {Count} orphaned sub-agent session(s) for parent session '{SessionId}'.", + cleaned, + typedSessionId); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Sub-agent cleanup failed for session '{SessionId}', reset will proceed.", typedSessionId); + } + } + // Phase 2: session-end memory flush before archiving if (_sessionEndFlusher is not null) { diff --git a/src/gateway/BotNexus.Gateway.Api/Controllers/SessionsController.cs b/src/gateway/BotNexus.Gateway.Api/Controllers/SessionsController.cs index a3d058f0..47b4a46f 100644 --- a/src/gateway/BotNexus.Gateway.Api/Controllers/SessionsController.cs +++ b/src/gateway/BotNexus.Gateway.Api/Controllers/SessionsController.cs @@ -415,5 +415,14 @@ public Task KillAsync(string subAgentId, SessionId requestingSessionId, Ca /// The on completed async result. public Task OnCompletedAsync(string subAgentId, string resultSummary, CancellationToken ct = default) => Task.CompletedTask; + + /// + /// Executes cleanup child sessions async. + /// + /// The parent session id. + /// The ct. + /// Always returns 0 (no-op). + public Task CleanupChildSessionsAsync(SessionId parentSessionId, CancellationToken ct = default) + => Task.FromResult(0); } } diff --git a/src/gateway/BotNexus.Gateway.Contracts/Agents/ISubAgentManager.cs b/src/gateway/BotNexus.Gateway.Contracts/Agents/ISubAgentManager.cs index b3ca3b04..929a3bd0 100644 --- a/src/gateway/BotNexus.Gateway.Contracts/Agents/ISubAgentManager.cs +++ b/src/gateway/BotNexus.Gateway.Contracts/Agents/ISubAgentManager.cs @@ -50,4 +50,13 @@ public interface ISubAgentManager /// A cancellation token that can be used to cancel the operation. /// A task representing the asynchronous operation. Task OnCompletedAsync(string subAgentId, string resultSummary, CancellationToken ct = default); + + /// + /// Kills all running sub-agents for the specified parent session and archives their sessions. + /// Called when a parent session is reset to prevent orphaned sub-agent sessions. + /// + /// The parent session whose sub-agents should be cleaned up. + /// A cancellation token that can be used to cancel the operation. + /// The number of sub-agent sessions that were cleaned up. + Task CleanupChildSessionsAsync(SessionId parentSessionId, CancellationToken ct = default); } diff --git a/src/gateway/BotNexus.Gateway/Agents/DefaultSubAgentManager.cs b/src/gateway/BotNexus.Gateway/Agents/DefaultSubAgentManager.cs index efd443db..a519cb52 100644 --- a/src/gateway/BotNexus.Gateway/Agents/DefaultSubAgentManager.cs +++ b/src/gateway/BotNexus.Gateway/Agents/DefaultSubAgentManager.cs @@ -386,6 +386,69 @@ await _dispatcher.DispatchAsync(new InboundMessage await CleanupChildAgentAsync(subAgentId, updated.ChildSessionId, CancellationToken.None); } } + /// + public async Task CleanupChildSessionsAsync(SessionId parentSessionId, CancellationToken ct = default) + { + if (!_parentChildren.TryGetValue(parentSessionId, out var subAgentIds)) + return 0; + + var cleaned = 0; + foreach (var subAgentId in subAgentIds.ToArray()) + { + if (!_subAgents.TryGetValue(subAgentId, out var info)) + continue; + + // Cancel the timeout CTS to stop a running sub-agent + if (_timeouts.TryRemove(subAgentId, out var timeoutCts)) + { + try { timeoutCts.Cancel(); } + catch (ObjectDisposedException) { } + timeoutCts.Dispose(); + } + + // Clean up agent resources + await CleanupChildAgentAsync(subAgentId, info.ChildSessionId, ct); + + // Mark as killed if still running + TryUpdateSubAgent( + subAgentId, + current => current.Status is SubAgentStatus.Running + ? current with + { + Status = SubAgentStatus.Killed, + CompletedAt = DateTimeOffset.UtcNow, + ResultSummary = "Sub-agent killed: parent session was reset." + } + : current); + + // Archive the child session in the session store + if (_sessionStore is not null) + { + try + { + await _sessionStore.ArchiveAsync(info.ChildSessionId, ct); + } + catch (Exception ex) + { + _logger.LogWarning( + ex, + "Failed archiving orphaned child session '{ChildSessionId}' for sub-agent '{SubAgentId}'.", + info.ChildSessionId, + subAgentId); + } + } + + cleaned++; + _logger.LogInformation( + "Cleaned up orphaned sub-agent '{SubAgentId}' (child session '{ChildSessionId}') after parent session '{ParentSessionId}' reset.", + subAgentId, + info.ChildSessionId, + parentSessionId); + } + + return cleaned; + } + private async Task RunSubAgentAsync(string subAgentId, IAgentHandle handle, string task, int timeoutSeconds, string? inheritedConversationId = null) { if (!_timeouts.TryGetValue(subAgentId, out var timeoutCts)) diff --git a/tests/gateway/BotNexus.Gateway.Tests/Agents/DefaultSubAgentManagerTests.cs b/tests/gateway/BotNexus.Gateway.Tests/Agents/DefaultSubAgentManagerTests.cs index b8f696e2..0ce9f8b7 100644 --- a/tests/gateway/BotNexus.Gateway.Tests/Agents/DefaultSubAgentManagerTests.cs +++ b/tests/gateway/BotNexus.Gateway.Tests/Agents/DefaultSubAgentManagerTests.cs @@ -381,6 +381,26 @@ public async Task OnCompletedAsync(string subAgentId, string resultSummary, Canc await parentHandle.FollowUpAsync($"Sub-agent {subAgentId} completed: {resultSummary}", ct); } + public Task CleanupChildSessionsAsync(SessionId parentSessionId, CancellationToken ct = default) + { + var killed = entries.Values + .Where(e => e.Info.ParentSessionId.Value.Equals(parentSessionId.Value, StringComparison.Ordinal)) + .ToList(); + + foreach (var runtime in killed) + { + runtime.LifetimeCts.Cancel(); + UpdateInfo(runtime.Info.SubAgentId, current => current with + { + Status = SubAgentStatus.Killed, + CompletedAt = DateTimeOffset.UtcNow, + ResultSummary = "Sub-agent killed: parent session was reset." + }); + } + + return Task.FromResult(killed.Count); + } + private async Task MonitorRunAsync(RuntimeEntry runtime, string task, int timeoutSeconds) { using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSeconds)); diff --git a/tests/gateway/BotNexus.Gateway.Tests/Agents/SubAgentCleanupTests.cs b/tests/gateway/BotNexus.Gateway.Tests/Agents/SubAgentCleanupTests.cs new file mode 100644 index 00000000..96d4d977 --- /dev/null +++ b/tests/gateway/BotNexus.Gateway.Tests/Agents/SubAgentCleanupTests.cs @@ -0,0 +1,186 @@ +using BotNexus.Domain.Primitives; +using BotNexus.Gateway.Abstractions.Agents; +using BotNexus.Gateway.Abstractions.Models; +using BotNexus.Gateway.Abstractions.Sessions; +using BotNexus.Gateway.Agents; +using BotNexus.Gateway.Configuration; +using Microsoft.Extensions.Logging.Abstractions; +using Moq; + +namespace BotNexus.Gateway.Tests.Agents; + +/// +/// Tests for DefaultSubAgentManager.CleanupChildSessionsAsync -- prevents orphaned sub-agent +/// sessions when a parent session is reset. +/// +public sealed class SubAgentCleanupTests +{ + [Fact] + public async Task CleanupChildSessionsAsync_ReturnsZero_WhenNoChildrenExist() + { + var manager = CreateManager(); + + var count = await manager.CleanupChildSessionsAsync(SessionId.From("orphan-session")); + + count.ShouldBe(0); + } + + [Fact] + public async Task CleanupChildSessionsAsync_ArchivesChildSession_WhenChildExists() + { + var sessionStore = new Mock(); + sessionStore.Setup(s => s.ArchiveAsync(It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + var childHandle = CreateInstantHandle(); + var supervisor = new Mock(); + supervisor + .Setup(s => s.GetOrCreateAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(childHandle.Object); + supervisor + .Setup(s => s.StopAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + var manager = CreateManager(supervisor.Object, sessionStore: sessionStore.Object); + + var parentSessionId = SessionId.From("parent-session"); + await manager.SpawnAsync(new SubAgentSpawnRequest + { + ParentAgentId = AgentId.From("parent-agent"), + ParentSessionId = parentSessionId, + Task = "do something", + TimeoutSeconds = 600 + }); + + // Wait briefly for the background task to start + await Task.Delay(50); + + var count = await manager.CleanupChildSessionsAsync(parentSessionId); + + count.ShouldBe(1); + sessionStore.Verify(s => s.ArchiveAsync(It.IsAny(), It.IsAny()), Times.AtLeastOnce); + } + + [Fact] + public async Task CleanupChildSessionsAsync_IsNonFatal_WhenSessionArchiveFails() + { + var sessionStore = new Mock(); + sessionStore.Setup(s => s.ArchiveAsync(It.IsAny(), It.IsAny())) + .ThrowsAsync(new InvalidOperationException("session store unavailable")); + + var childHandle = CreateInstantHandle(); + var supervisor = new Mock(); + supervisor + .Setup(s => s.GetOrCreateAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(childHandle.Object); + supervisor + .Setup(s => s.StopAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + var manager = CreateManager(supervisor.Object, sessionStore: sessionStore.Object); + + var parentSessionId = SessionId.From("parent-session"); + await manager.SpawnAsync(new SubAgentSpawnRequest + { + ParentAgentId = AgentId.From("parent-agent"), + ParentSessionId = parentSessionId, + Task = "do something", + TimeoutSeconds = 600 + }); + + await Task.Delay(50); + + // Should not throw even if archive fails + var count = await manager.CleanupChildSessionsAsync(parentSessionId); + count.ShouldBe(1); + } + + [Fact] + public async Task CleanupChildSessionsAsync_DoesNotCleanOtherParentSessions() + { + var sessionStore = new Mock(); + sessionStore.Setup(s => s.ArchiveAsync(It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + var childHandle = CreateInstantHandle(); + var supervisor = new Mock(); + supervisor + .Setup(s => s.GetOrCreateAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(childHandle.Object); + supervisor + .Setup(s => s.StopAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + var manager = CreateManager(supervisor.Object, sessionStore: sessionStore.Object); + + var parentSessionA = SessionId.From("parent-session-a"); + var parentSessionB = SessionId.From("parent-session-b"); + + await manager.SpawnAsync(new SubAgentSpawnRequest + { + ParentAgentId = AgentId.From("parent-agent"), + ParentSessionId = parentSessionA, + Task = "task A", + TimeoutSeconds = 600 + }); + await manager.SpawnAsync(new SubAgentSpawnRequest + { + ParentAgentId = AgentId.From("parent-agent"), + ParentSessionId = parentSessionB, + Task = "task B", + TimeoutSeconds = 600 + }); + + await Task.Delay(50); + + // Only clean up session A + var count = await manager.CleanupChildSessionsAsync(parentSessionA); + + count.ShouldBe(1); + + // Session B's child should still be trackable + var remaining = await manager.ListAsync(parentSessionB); + remaining.ShouldNotBeEmpty(); + } + + private static DefaultSubAgentManager CreateManager( + IAgentSupervisor? supervisor = null, + ISessionStore? sessionStore = null) + { + var registry = new Mock(); + registry.Setup(r => r.Get(It.IsAny())).Returns(new AgentDescriptor + { + AgentId = AgentId.From("parent-agent"), + DisplayName = "Parent Agent", + ModelId = "gpt-4.1", + ApiProvider = "copilot" + }); + registry.Setup(r => r.Contains(It.IsAny())).Returns(false); + registry.Setup(r => r.Register(It.IsAny())); + registry.Setup(r => r.Unregister(It.IsAny())); + + var usedSupervisor = supervisor ?? Mock.Of(); + var activity = Mock.Of(); + var dispatcher = Mock.Of(); + + return new DefaultSubAgentManager( + usedSupervisor, + registry.Object, + activity, + dispatcher, + new TestOptionsMonitor(new GatewayOptions()), + NullLogger.Instance, + sessionStore: sessionStore); + } + + private static Mock CreateInstantHandle() + { + var handle = new Mock(); + handle.SetupGet(h => h.AgentId).Returns(AgentId.From("child-agent")); + handle.SetupGet(h => h.SessionId).Returns(SessionId.From("child-session")); + handle.SetupGet(h => h.IsRunning).Returns(false); + handle.Setup(h => h.PromptAsync(It.IsAny(), It.IsAny())) + .ReturnsAsync(new AgentResponse { Content = "done" }); + return handle; + } +} diff --git a/tests/gateway/BotNexus.Gateway.Tests/SignalRHubTests.cs b/tests/gateway/BotNexus.Gateway.Tests/SignalRHubTests.cs index d078539e..a56b17ed 100644 --- a/tests/gateway/BotNexus.Gateway.Tests/SignalRHubTests.cs +++ b/tests/gateway/BotNexus.Gateway.Tests/SignalRHubTests.cs @@ -452,6 +452,62 @@ public async Task ResetSession_ArchivesInsteadOfDeleting() sessions.Verify(value => value.DeleteAsync(It.IsAny(), It.IsAny()), Times.Never); } + [Fact] + public async Task ResetSession_CallsSubAgentCleanup_WhenManagerProvided() + { + var caller = new Mock(); + caller.Setup(proxy => proxy.SessionReset(It.IsAny())) + .Returns(Task.CompletedTask); + var clients = new Mock>(); + clients.SetupGet(value => value.Caller).Returns(caller.Object); + + var sessions = new Mock(); + sessions.Setup(value => value.ArchiveAsync("session-1", CancellationToken.None)).Returns(Task.CompletedTask); + + var supervisor = new Mock(); + supervisor.Setup(value => value.StopAsync(BotNexus.Domain.Primitives.AgentId.From("agent-a"), BotNexus.Domain.Primitives.SessionId.From("session-1"), CancellationToken.None)).Returns(Task.CompletedTask); + + var subAgentManager = new Mock(); + subAgentManager + .Setup(m => m.CleanupChildSessionsAsync(BotNexus.Domain.Primitives.SessionId.From("session-1"), It.IsAny())) + .ReturnsAsync(2); + + var hub = CreateHub( + clients: clients.Object, + sessions: sessions.Object, + supervisor: supervisor.Object, + subAgentManager: subAgentManager.Object); + + await hub.ResetSession("agent-a", "session-1"); + + subAgentManager.Verify( + m => m.CleanupChildSessionsAsync(BotNexus.Domain.Primitives.SessionId.From("session-1"), It.IsAny()), + Times.Once); + } + + [Fact] + public async Task ResetSession_ProceedsNormally_WhenNoSubAgentManager() + { + var caller = new Mock(); + caller.Setup(proxy => proxy.SessionReset(It.IsAny())) + .Returns(Task.CompletedTask); + var clients = new Mock>(); + clients.SetupGet(value => value.Caller).Returns(caller.Object); + + var sessions = new Mock(); + sessions.Setup(value => value.ArchiveAsync("session-1", CancellationToken.None)).Returns(Task.CompletedTask); + + var supervisor = new Mock(); + supervisor.Setup(value => value.StopAsync(BotNexus.Domain.Primitives.AgentId.From("agent-a"), BotNexus.Domain.Primitives.SessionId.From("session-1"), CancellationToken.None)).Returns(Task.CompletedTask); + + // No subAgentManager passed -- optional param + var hub = CreateHub(clients: clients.Object, sessions: sessions.Object, supervisor: supervisor.Object); + + await Should.NotThrowAsync(() => hub.ResetSession("agent-a", "session-1")); + + sessions.Verify(value => value.ArchiveAsync("session-1", CancellationToken.None), Times.Once); + } + [Fact] public async Task CompactSession_Hub_ReturnsCompactionStats() { @@ -511,6 +567,7 @@ private static GatewayHub CreateHub( IConversationDispatcher? conversationDispatcher = null, IConversationStore? conversationStore = null, IAskUserResponseRegistry? askUserResponseRegistry = null, + ISubAgentManager? subAgentManager = null, string connectionId = "conn-test") { var sessionStore = sessions ?? new InMemorySessionStore(); @@ -534,7 +591,8 @@ private static GatewayHub CreateHub( compactionOptions ?? new TestOptionsMonitor(new CompactionOptions()), NullLogger.Instance, convStore, - askUserResponseRegistry) + askUserResponseRegistry, + subAgentManager: subAgentManager) { Clients = clients ?? Mock.Of>(), Groups = groups ?? Mock.Of(),