From 920d4072f97b4e648ba8545f1e7c1022fd3316a5 Mon Sep 17 00:00:00 2001 From: Gordon Beeming Date: Thu, 26 Mar 2026 16:08:16 +1000 Subject: [PATCH] Fix agent memory leak: switch SignalR calls from InvokeAsync to SendAsync The agent was leaking ~74GB over 21 hours via 296K unreleased 256K VM_ALLOCATE blocks (GC heap segments). Root cause: all SignalR hub calls used InvokeAsync (two-way RPC) which allocates pending invocation tracking state for each call, even for one-way methods like Heartbeat. Combined with NativeAOT, this pending state accumulated native memory that the GC never released back to the OS. Changes: - Switch all one-way SignalR calls (Heartbeat, SessionStatusChanged, DirectoryListing, ReportAllSessions, UpdateStatus) from InvokeAsync to SendAsync. Only RegisterAgent (which returns a result) keeps InvokeAsync. - Add CancellationToken parameter to all SignalR methods and thread stoppingToken through from Worker.cs callers. - Bound the Closed event reconnection loop with the stopping token instead of while(true), preventing unbounded retries during shutdown. - Dispose Process objects immediately when HealthCheck detects a crashed session, rather than waiting for the 1-hour cleanup cycle. Co-authored-by: Claude Opus 4.6 (1M context) Co-authored-by: GitButler --- .../Services/SessionManager.cs | 4 ++- .../Services/SignalRConnectionManager.cs | 36 ++++++++++--------- src/ClaudeNest.Agent/Worker.cs | 34 +++++++++--------- 3 files changed, 39 insertions(+), 35 deletions(-) diff --git a/src/ClaudeNest.Agent/Services/SessionManager.cs b/src/ClaudeNest.Agent/Services/SessionManager.cs index edceef80..b90a5d0e 100644 --- a/src/ClaudeNest.Agent/Services/SessionManager.cs +++ b/src/ClaudeNest.Agent/Services/SessionManager.cs @@ -200,9 +200,11 @@ public async Task HealthCheckAsync() } catch { - // Process no longer exists + // Process no longer exists — dispose handle immediately to free native resources session.State = SessionState.Crashed; session.EndedAt = DateTime.UtcNow; + session.Process?.Dispose(); + session.Process = null; await NotifyStatusChangedAsync(session); } } diff --git a/src/ClaudeNest.Agent/Services/SignalRConnectionManager.cs b/src/ClaudeNest.Agent/Services/SignalRConnectionManager.cs index 3976ad08..64b73d07 100644 --- a/src/ClaudeNest.Agent/Services/SignalRConnectionManager.cs +++ b/src/ClaudeNest.Agent/Services/SignalRConnectionManager.cs @@ -15,6 +15,7 @@ public sealed class SignalRConnectionManager : IAsyncDisposable private HubConnection? _connection; private readonly AgentCredentials _credentials; private readonly ILogger _logger; + private CancellationToken _stoppingToken; public event Func? OnListDirectories; public event Func? OnStartSession; @@ -31,8 +32,9 @@ public SignalRConnectionManager(AgentCredentials credentials, ILogger RegisterAgentAsync(AgentInfo agentInfo) + public async Task RegisterAgentAsync(AgentInfo agentInfo, CancellationToken ct = default) { if (_connection is not null) - return await _connection.InvokeAsync("RegisterAgent", agentInfo); + return await _connection.InvokeAsync("RegisterAgent", agentInfo, ct); return null; } - public async Task SendSessionStatusAsync(SessionStatusUpdate update) + public async Task SendSessionStatusAsync(SessionStatusUpdate update, CancellationToken ct = default) { if (_connection is not null) - await _connection.InvokeAsync("SessionStatusChanged", update); + await _connection.SendAsync("SessionStatusChanged", update, ct); } - public async Task SendDirectoryListingAsync(DirectoryListingResponse response) + public async Task SendDirectoryListingAsync(DirectoryListingResponse response, CancellationToken ct = default) { if (_connection is not null) - await _connection.InvokeAsync("DirectoryListing", response); + await _connection.SendAsync("DirectoryListing", response, ct); } - public async Task ReportAllSessionsAsync(Guid agentId, List sessions) + public async Task ReportAllSessionsAsync(Guid agentId, List sessions, CancellationToken ct = default) { if (_connection is not null) - await _connection.InvokeAsync("ReportAllSessions", agentId, sessions); + await _connection.SendAsync("ReportAllSessions", agentId, sessions, ct); } - public async Task SendHeartbeatAsync() + public async Task SendHeartbeatAsync(CancellationToken ct = default) { if (_connection is not null) - await _connection.InvokeAsync("Heartbeat"); + await _connection.SendAsync("Heartbeat", ct); } - public async Task SendUpdateStatusAsync(UpdateStatusReport report) + public async Task SendUpdateStatusAsync(UpdateStatusReport report, CancellationToken ct = default) { if (_connection is not null) - await _connection.InvokeAsync("UpdateStatus", report); + await _connection.SendAsync("UpdateStatus", report, ct); } public async ValueTask DisposeAsync() diff --git a/src/ClaudeNest.Agent/Worker.cs b/src/ClaudeNest.Agent/Worker.cs index c47994a9..648fb4e2 100644 --- a/src/ClaudeNest.Agent/Worker.cs +++ b/src/ClaudeNest.Agent/Worker.cs @@ -60,7 +60,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) Version = typeof(AgentWorker).Assembly.GetName().Version?.ToString() ?? "0.0.0", Architecture = RuntimeInformation.ProcessArchitecture.ToString(), AllowedPaths = config.AllowedPaths - }); + }, stoppingToken); logger.LogInformation("Agent re-registered after reconnection"); // Report current sessions on reconnect @@ -68,7 +68,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var currentSessions = _sessionManager.GetAllSessions(); if (currentSessions.Count > 0) - await _connectionManager.ReportAllSessionsAsync(credentials.AgentId, currentSessions); + await _connectionManager.ReportAllSessionsAsync(credentials.AgentId, currentSessions, stoppingToken); } } catch (Exception ex) @@ -82,7 +82,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { try { - await _connectionManager.SendSessionStatusAsync(update); + await _connectionManager.SendSessionStatusAsync(update, stoppingToken); } catch (Exception ex) { @@ -99,7 +99,7 @@ await _connectionManager.SendDirectoryListingAsync(new DirectoryListingResponse RequestId = requestId, Path = path, Directories = directories - }); + }, stoppingToken); }; _connectionManager.OnStartSession += async (sessionId, path, permissionMode) => @@ -117,7 +117,7 @@ await _connectionManager.SendSessionStatusAsync(new SessionStatusUpdate State = SessionState.Crashed, StartedAt = DateTime.UtcNow, EndedAt = DateTime.UtcNow - }); + }, stoppingToken); } }; @@ -129,7 +129,7 @@ await _connectionManager.SendSessionStatusAsync(new SessionStatusUpdate _connectionManager.OnGetSessions += async () => { var sessions = _sessionManager.GetAllSessions(); - await _connectionManager.ReportAllSessionsAsync(credentials.AgentId, sessions); + await _connectionManager.ReportAllSessionsAsync(credentials.AgentId, sessions, stoppingToken); }; _connectionManager.OnDeregister += () => @@ -181,7 +181,7 @@ await _connectionManager.SendSessionStatusAsync(new SessionStatusUpdate Version = typeof(AgentWorker).Assembly.GetName().Version?.ToString() ?? "0.0.0", Architecture = RuntimeInformation.ProcessArchitecture.ToString(), AllowedPaths = config.AllowedPaths - }); + }, stoppingToken); } catch (Exception ex) { @@ -208,7 +208,7 @@ await _connectionManager.SendUpdateStatusAsync(new UpdateStatusReport AgentId = credentials.AgentId, Status = "completed", NewVersion = currentVersion - }); + }, stoppingToken); } catch (Exception ex) { @@ -244,7 +244,7 @@ await _connectionManager.SendUpdateStatusAsync(new UpdateStatusReport { try { - await _connectionManager.SendSessionStatusAsync(update); + await _connectionManager.SendSessionStatusAsync(update, stoppingToken); } catch (Exception ex) { @@ -257,7 +257,7 @@ await _connectionManager.SendUpdateStatusAsync(new UpdateStatusReport var currentSessions = _sessionManager.GetAllSessions(); if (currentSessions.Count > 0) { - await _connectionManager.ReportAllSessionsAsync(credentials.AgentId, currentSessions); + await _connectionManager.ReportAllSessionsAsync(credentials.AgentId, currentSessions, stoppingToken); } logger.LogInformation("Agent registered and connected. Waiting for commands..."); @@ -270,7 +270,7 @@ await _connectionManager.SendUpdateStatusAsync(new UpdateStatusReport { await heartbeatTimer.WaitForNextTickAsync(stoppingToken); await _sessionManager.HealthCheckAsync(); - await _connectionManager.SendHeartbeatAsync(); + await _connectionManager.SendHeartbeatAsync(stoppingToken); // Check if a deferred update can now be applied await TryApplyPendingUpdateAsync(stoppingToken); @@ -297,7 +297,7 @@ await _connectionManager.SendUpdateStatusAsync(new UpdateStatusReport AgentId = agentId, Status = "downloading", NewVersion = notification.LatestVersion - }); + }, ct); // Download the binary first var binaryPath = await _updater.DownloadAsync( @@ -321,7 +321,7 @@ await _connectionManager.SendUpdateStatusAsync(new UpdateStatusReport AgentId = agentId, Status = "waiting_for_sessions", NewVersion = notification.LatestVersion - }); + }, ct); return; } @@ -331,7 +331,7 @@ await _connectionManager.SendUpdateStatusAsync(new UpdateStatusReport AgentId = agentId, Status = "restarting", NewVersion = notification.LatestVersion - }); + }, ct); await _updater.ApplyAsync( binaryPath, @@ -350,7 +350,7 @@ await _connectionManager.SendUpdateStatusAsync(new UpdateStatusReport Status = "failed", Error = ex.Message, NewVersion = notification.LatestVersion - }); + }, ct); } catch { @@ -386,7 +386,7 @@ await _connectionManager.SendUpdateStatusAsync(new UpdateStatusReport AgentId = agentId, Status = "restarting", NewVersion = notification.LatestVersion - }); + }, ct); await _updater.ApplyAsync( binaryPath, @@ -405,7 +405,7 @@ await _connectionManager.SendUpdateStatusAsync(new UpdateStatusReport Status = "failed", Error = ex.Message, NewVersion = notification.LatestVersion - }); + }, ct); } catch {