Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/ClaudeNest.Agent/Services/SessionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,11 @@ public async Task HealthCheckAsync()
}
catch
{
// Process no longer exists
// Process no longer exists — dispose handle immediately to free native resources
session.State = SessionState.Crashed;
session.EndedAt = DateTime.UtcNow;
session.Process?.Dispose();
Comment thread
GordonBeeming marked this conversation as resolved.
session.Process = null;
await NotifyStatusChangedAsync(session);
}
}
Expand Down
36 changes: 19 additions & 17 deletions src/ClaudeNest.Agent/Services/SignalRConnectionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public sealed class SignalRConnectionManager : IAsyncDisposable
private HubConnection? _connection;
private readonly AgentCredentials _credentials;
private readonly ILogger<SignalRConnectionManager> _logger;
private CancellationToken _stoppingToken;

public event Func<string, string, Task>? OnListDirectories;
public event Func<Guid, string, string, Task>? OnStartSession;
Expand All @@ -31,8 +32,9 @@ public SignalRConnectionManager(AgentCredentials credentials, ILogger<SignalRCon
_logger = logger;
}

public async Task ConnectAsync(CancellationToken cancellationToken)
public async Task ConnectAsync(CancellationToken stoppingToken)
{
_stoppingToken = stoppingToken;
var hubUrl = $"{_credentials.BackendUrl.TrimEnd('/')}/hubs/nest";

var jsonOptions = new JsonSerializerOptions
Expand Down Expand Up @@ -105,12 +107,12 @@ public async Task ConnectAsync(CancellationToken cancellationToken)
_logger.LogWarning(error, "SignalR connection closed. Starting manual reconnection...");
var delay = TimeSpan.FromSeconds(5);
var maxDelay = TimeSpan.FromMinutes(5);
while (true)
while (!_stoppingToken.IsCancellationRequested)
{
try
{
await Task.Delay(delay);
await _connection.StartAsync();
await Task.Delay(delay, _stoppingToken);
await _connection.StartAsync(_stoppingToken);
Comment thread
GordonBeeming marked this conversation as resolved.
_logger.LogInformation("Manual reconnection successful");
if (OnReconnected is not null)
await OnReconnected(_connection.ConnectionId);
Expand All @@ -124,45 +126,45 @@ public async Task ConnectAsync(CancellationToken cancellationToken)
}
};

await _connection.StartAsync(cancellationToken);
await _connection.StartAsync(stoppingToken);
_logger.LogInformation("Connected to SignalR hub at {Url}", hubUrl);
}

public async Task<AgentRegistrationResult?> RegisterAgentAsync(AgentInfo agentInfo)
public async Task<AgentRegistrationResult?> RegisterAgentAsync(AgentInfo agentInfo, CancellationToken ct = default)
{
if (_connection is not null)
return await _connection.InvokeAsync<AgentRegistrationResult>("RegisterAgent", agentInfo);
return await _connection.InvokeAsync<AgentRegistrationResult>("RegisterAgent", agentInfo, ct);
return null;
}

public async Task SendSessionStatusAsync(SessionStatusUpdate update)
public async Task SendSessionStatusAsync(SessionStatusUpdate update, CancellationToken ct = default)
{
if (_connection is not null)
await _connection.InvokeAsync("SessionStatusChanged", update);
await _connection.SendAsync("SessionStatusChanged", update, ct);
}

public async Task SendDirectoryListingAsync(DirectoryListingResponse response)
public async Task SendDirectoryListingAsync(DirectoryListingResponse response, CancellationToken ct = default)
{
if (_connection is not null)
await _connection.InvokeAsync("DirectoryListing", response);
await _connection.SendAsync("DirectoryListing", response, ct);
}

public async Task ReportAllSessionsAsync(Guid agentId, List<SessionStatusUpdate> sessions)
public async Task ReportAllSessionsAsync(Guid agentId, List<SessionStatusUpdate> sessions, CancellationToken ct = default)
{
if (_connection is not null)
await _connection.InvokeAsync("ReportAllSessions", agentId, sessions);
await _connection.SendAsync("ReportAllSessions", agentId, sessions, ct);
}

public async Task SendHeartbeatAsync()
public async Task SendHeartbeatAsync(CancellationToken ct = default)
{
if (_connection is not null)
await _connection.InvokeAsync("Heartbeat");
await _connection.SendAsync("Heartbeat", ct);
}

public async Task SendUpdateStatusAsync(UpdateStatusReport report)
public async Task SendUpdateStatusAsync(UpdateStatusReport report, CancellationToken ct = default)
{
if (_connection is not null)
await _connection.InvokeAsync("UpdateStatus", report);
await _connection.SendAsync("UpdateStatus", report, ct);
}

public async ValueTask DisposeAsync()
Expand Down
34 changes: 17 additions & 17 deletions src/ClaudeNest.Agent/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
Version = typeof(AgentWorker).Assembly.GetName().Version?.ToString() ?? "0.0.0",
Architecture = RuntimeInformation.ProcessArchitecture.ToString(),
AllowedPaths = config.AllowedPaths
});
}, stoppingToken);
logger.LogInformation("Agent re-registered after reconnection");

// Report current sessions on reconnect
if (_sessionManager is not null)
{
var currentSessions = _sessionManager.GetAllSessions();
if (currentSessions.Count > 0)
await _connectionManager.ReportAllSessionsAsync(credentials.AgentId, currentSessions);
await _connectionManager.ReportAllSessionsAsync(credentials.AgentId, currentSessions, stoppingToken);
}
}
catch (Exception ex)
Expand All @@ -82,7 +82,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
await _connectionManager.SendSessionStatusAsync(update);
await _connectionManager.SendSessionStatusAsync(update, stoppingToken);
}
catch (Exception ex)
{
Expand All @@ -99,7 +99,7 @@ await _connectionManager.SendDirectoryListingAsync(new DirectoryListingResponse
RequestId = requestId,
Path = path,
Directories = directories
});
}, stoppingToken);
};

_connectionManager.OnStartSession += async (sessionId, path, permissionMode) =>
Expand All @@ -117,7 +117,7 @@ await _connectionManager.SendSessionStatusAsync(new SessionStatusUpdate
State = SessionState.Crashed,
StartedAt = DateTime.UtcNow,
EndedAt = DateTime.UtcNow
});
}, stoppingToken);
}
};

Expand All @@ -129,7 +129,7 @@ await _connectionManager.SendSessionStatusAsync(new SessionStatusUpdate
_connectionManager.OnGetSessions += async () =>
{
var sessions = _sessionManager.GetAllSessions();
await _connectionManager.ReportAllSessionsAsync(credentials.AgentId, sessions);
await _connectionManager.ReportAllSessionsAsync(credentials.AgentId, sessions, stoppingToken);
};

_connectionManager.OnDeregister += () =>
Expand Down Expand Up @@ -181,7 +181,7 @@ await _connectionManager.SendSessionStatusAsync(new SessionStatusUpdate
Version = typeof(AgentWorker).Assembly.GetName().Version?.ToString() ?? "0.0.0",
Architecture = RuntimeInformation.ProcessArchitecture.ToString(),
AllowedPaths = config.AllowedPaths
});
}, stoppingToken);
}
catch (Exception ex)
{
Expand All @@ -208,7 +208,7 @@ await _connectionManager.SendUpdateStatusAsync(new UpdateStatusReport
AgentId = credentials.AgentId,
Status = "completed",
NewVersion = currentVersion
});
}, stoppingToken);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -244,7 +244,7 @@ await _connectionManager.SendUpdateStatusAsync(new UpdateStatusReport
{
try
{
await _connectionManager.SendSessionStatusAsync(update);
await _connectionManager.SendSessionStatusAsync(update, stoppingToken);
}
catch (Exception ex)
{
Expand All @@ -257,7 +257,7 @@ await _connectionManager.SendUpdateStatusAsync(new UpdateStatusReport
var currentSessions = _sessionManager.GetAllSessions();
if (currentSessions.Count > 0)
{
await _connectionManager.ReportAllSessionsAsync(credentials.AgentId, currentSessions);
await _connectionManager.ReportAllSessionsAsync(credentials.AgentId, currentSessions, stoppingToken);
}

logger.LogInformation("Agent registered and connected. Waiting for commands...");
Expand All @@ -270,7 +270,7 @@ await _connectionManager.SendUpdateStatusAsync(new UpdateStatusReport
{
await heartbeatTimer.WaitForNextTickAsync(stoppingToken);
await _sessionManager.HealthCheckAsync();
await _connectionManager.SendHeartbeatAsync();
await _connectionManager.SendHeartbeatAsync(stoppingToken);

// Check if a deferred update can now be applied
await TryApplyPendingUpdateAsync(stoppingToken);
Expand All @@ -297,7 +297,7 @@ await _connectionManager.SendUpdateStatusAsync(new UpdateStatusReport
AgentId = agentId,
Status = "downloading",
NewVersion = notification.LatestVersion
});
}, ct);

// Download the binary first
var binaryPath = await _updater.DownloadAsync(
Expand All @@ -321,7 +321,7 @@ await _connectionManager.SendUpdateStatusAsync(new UpdateStatusReport
AgentId = agentId,
Status = "waiting_for_sessions",
NewVersion = notification.LatestVersion
});
}, ct);
return;
}

Expand All @@ -331,7 +331,7 @@ await _connectionManager.SendUpdateStatusAsync(new UpdateStatusReport
AgentId = agentId,
Status = "restarting",
NewVersion = notification.LatestVersion
});
}, ct);

await _updater.ApplyAsync(
binaryPath,
Expand All @@ -350,7 +350,7 @@ await _connectionManager.SendUpdateStatusAsync(new UpdateStatusReport
Status = "failed",
Error = ex.Message,
NewVersion = notification.LatestVersion
});
}, ct);
}
catch
{
Expand Down Expand Up @@ -386,7 +386,7 @@ await _connectionManager.SendUpdateStatusAsync(new UpdateStatusReport
AgentId = agentId,
Status = "restarting",
NewVersion = notification.LatestVersion
});
}, ct);

await _updater.ApplyAsync(
binaryPath,
Expand All @@ -405,7 +405,7 @@ await _connectionManager.SendUpdateStatusAsync(new UpdateStatusReport
Status = "failed",
Error = ex.Message,
NewVersion = notification.LatestVersion
});
}, ct);
}
catch
{
Expand Down
Loading