Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 70 additions & 2 deletions src/Orleans.Core/Configuration/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,76 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Extensions.DependencyInjection;

namespace Orleans.Configuration.Internal
{
/// <summary>
/// A <see cref="ServiceDescriptor"/> subclass that tracks the underlying implementation type
/// used when registering a service via <see cref="ServiceCollectionExtensions.AddFromExisting"/>.
/// This allows service registrations to be identified and removed later based on their implementation type.
/// </summary>
internal sealed class TaggedServiceDescriptor : ServiceDescriptor
{
/// <summary>
/// Initializes a new instance of the <see cref="TaggedServiceDescriptor"/> class.
/// </summary>
/// <param name="serviceType">The type of the service.</param>
/// <param name="factory">The factory used for creating service instances.</param>
/// <param name="lifetime">The lifetime of the service.</param>
/// <param name="implementationType">The underlying implementation type this registration was created from.</param>
public TaggedServiceDescriptor(
Type serviceType,
Func<IServiceProvider, object> factory,
ServiceLifetime lifetime,
Type implementationType)
: base(serviceType, factory, lifetime)
{
SourceImplementationType = implementationType;
}

/// <summary>
/// Gets the underlying implementation type that this service registration was created from.
/// </summary>
public Type SourceImplementationType { get; }

/// <summary>
/// Removes all service descriptors from the collection that were registered from the specified implementation type.
/// </summary>
/// <typeparam name="TImplementation">The implementation type to remove registrations for.</typeparam>
/// <param name="services">The service collection to remove from.</param>
public static void RemoveAllForImplementation<TImplementation>(IServiceCollection services)
{
RemoveAllForImplementation(services, typeof(TImplementation));
}

/// <summary>
/// Removes all service descriptors from the collection that were registered from the specified implementation type.
/// </summary>
/// <param name="services">The service collection to remove from.</param>
/// <param name="implementationType">The implementation type to remove registrations for.</param>
public static void RemoveAllForImplementation(IServiceCollection services, Type implementationType)
{
var toRemove = new List<ServiceDescriptor>();
foreach (var descriptor in services)
{
if (descriptor is TaggedServiceDescriptor tagged && tagged.SourceImplementationType == implementationType)
{
toRemove.Add(descriptor);
}
else if (descriptor.ServiceType == implementationType || descriptor.ImplementationType == implementationType)
{
toRemove.Add(descriptor);
}
}

foreach (var descriptor in toRemove)
{
services.Remove(descriptor);
}
}
}

/// <summary>
/// Extension methods for configuring dependency injection.
/// </summary>
Expand Down Expand Up @@ -43,10 +110,11 @@ public static void AddFromExisting(this IServiceCollection services, Type servic
throw new ArgumentNullException(nameof(implementation), $"Unable to find previously registered ServiceType of '{implementation.FullName}'");
}

var newRegistration = new ServiceDescriptor(
var newRegistration = new TaggedServiceDescriptor(
service,
sp => sp.GetRequiredService(implementation),
registration.Lifetime);
registration.Lifetime,
implementation);
services.Add(newRegistration);
}

Expand Down
104 changes: 3 additions & 101 deletions src/Orleans.Runtime/Catalog/Catalog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@

namespace Orleans.Runtime
{
internal sealed partial class Catalog : SystemTarget, ICatalog, ILifecycleParticipant<ISiloLifecycle>
internal sealed partial class Catalog : SystemTarget, ICatalog, ILifecycleParticipant<ISiloLifecycle>, ISiloStatusListener
{
private readonly SiloAddress _siloAddress;
private readonly ActivationCollector activationCollector;
private readonly GrainDirectoryResolver grainDirectoryResolver;
private readonly ActivationDirectory activations;
private readonly IServiceProvider serviceProvider;
private readonly ILogger logger;
Expand All @@ -27,8 +25,6 @@ internal sealed partial class Catalog : SystemTarget, ICatalog, ILifecyclePartic
private readonly object[] _locks = new object[LockCount];

public Catalog(
ILocalSiloDetails localSiloDetails,
GrainDirectoryResolver grainDirectoryResolver,
ActivationDirectory activationDirectory,
ActivationCollector activationCollector,
IServiceProvider serviceProvider,
Expand All @@ -37,8 +33,6 @@ public Catalog(
SystemTargetShared shared)
: base(Constants.CatalogType, shared)
{
this._siloAddress = localSiloDetails.SiloAddress;
this.grainDirectoryResolver = grainDirectoryResolver;
this.activations = activationDirectory;
this.serviceProvider = serviceProvider;
this.grainActivator = grainActivator;
Expand Down Expand Up @@ -98,19 +92,6 @@ public void UnregisterMessageTarget(IGrainContext activation)
}
}

/// <summary>
/// FOR TESTING PURPOSES ONLY!!
/// </summary>
/// <param name="grain"></param>
internal int UnregisterGrainForTesting(GrainId grain)
{
var activation = activations.FindTarget(grain);
if (activation is null) return 0;

UnregisterMessageTarget(activation);
return 1;
}

/// <summary>
/// If activation already exists, return it.
/// Otherwise, creates a new activation, begins rehydrating it and activating it, then returns it.
Expand Down Expand Up @@ -302,99 +283,21 @@ await Parallel.ForEachAsync(addresses, (activationAddress, cancellationToken) =>
});
}

// TODO move this logic in the LocalGrainDirectory
internal void OnSiloStatusChange(ILocalGrainDirectory directory, SiloAddress updatedSilo, SiloStatus status)
void ISiloStatusListener.SiloStatusChangeNotification(SiloAddress updatedSilo, SiloStatus status)
{
// ignore joining events and also events on myself.
if (updatedSilo.Equals(_siloAddress)) return;

// We deactivate those activations when silo goes either of ShuttingDown/Stopping/Dead states,
// since this is what Directory is doing as well. Directory removes a silo based on all those 3 statuses,
// thus it will only deliver a "remove" notification for a given silo once to us. Therefore, we need to react the fist time we are notified.
// We may review the directory behavior in the future and treat ShuttingDown differently ("drain only") and then this code will have to change a well.
if (!status.IsTerminating()) return;
if (status == SiloStatus.Dead)
{
this.RuntimeClient.BreakOutstandingMessagesToSilo(updatedSilo);
}

var activationsToShutdown = new List<IGrainContext>();
try
{
// scan all activations in activation directory and deactivate the ones that the removed silo is their primary partition owner.
// Note: No lock needed here since ActivationDirectory uses ConcurrentDictionary which provides thread-safe enumeration
foreach (var activation in activations)
{
try
{
var activationData = activation.Value;
var placementStrategy = activationData.GetComponent<PlacementStrategy>();
var isUsingGrainDirectory = placementStrategy is { IsUsingGrainDirectory: true };
if (!isUsingGrainDirectory || !grainDirectoryResolver.IsUsingDefaultDirectory(activationData.GrainId.Type)) continue;
if (!updatedSilo.Equals(directory.GetPrimaryForGrain(activationData.GrainId))) continue;

activationsToShutdown.Add(activationData);
}
catch (Exception exc)
{
LogErrorCatalogSiloStatusChangeNotification(new(updatedSilo), exc);
}
}

if (activationsToShutdown.Count > 0)
{
LogInfoCatalogSiloStatusChangeNotification(activationsToShutdown.Count, new(updatedSilo));
}
}
finally
{
// outside the lock.
if (activationsToShutdown.Count > 0)
{
var reasonText = $"This activation is being deactivated due to a failure of server {updatedSilo}, since it was responsible for this activation's grain directory registration.";
var reason = new DeactivationReason(DeactivationReasonCode.DirectoryFailure, reasonText);
StartDeactivatingActivations(reason, activationsToShutdown, CancellationToken.None);
}
}

void StartDeactivatingActivations(DeactivationReason reason, List<IGrainContext> list, CancellationToken cancellationToken)
{
if (list == null || list.Count == 0) return;

LogDebugDeactivateActivations(list.Count);

foreach (var activation in list)
{
activation.Deactivate(reason, cancellationToken);
}
}
}

void ILifecycleParticipant<ISiloLifecycle>.Participate(ISiloLifecycle lifecycle)
{
// Do nothing, just ensure that this instance is created so that it can register itself in the activation directory.
_siloStatusOracle = serviceProvider.GetRequiredService<ISiloStatusOracle>();
_siloStatusOracle.SubscribeToSiloStatusEvents(this);
}

private readonly struct SiloAddressLogValue(SiloAddress silo)
{
public override string ToString() => silo.ToStringWithHashCode();
}

[LoggerMessage(
Level = LogLevel.Error,
EventId = (int)ErrorCode.Catalog_SiloStatusChangeNotification_Exception,
Message = "Catalog has thrown an exception while handling removal of silo {Silo}"
)]
private partial void LogErrorCatalogSiloStatusChangeNotification(SiloAddressLogValue silo, Exception exception);

[LoggerMessage(
Level = LogLevel.Information,
EventId = (int)ErrorCode.Catalog_SiloStatusChangeNotification,
Message = "Catalog is deactivating {Count} activations due to a failure of silo {Silo}, since it is a primary directory partition to these grain ids."
)]
private partial void LogInfoCatalogSiloStatusChangeNotification(int count, SiloAddressLogValue silo);

[LoggerMessage(
Level = LogLevel.Trace,
Message = "Unregistered activation {Activation}")]
Expand Down Expand Up @@ -430,6 +333,5 @@ private readonly struct SiloAddressLogValue(SiloAddress silo)
Message = "Failed to unregister non-existent activation {Address}"
)]
private partial void LogFailedToUnregisterNonExistingActivation(GrainAddress address, Exception exception);

}
}
2 changes: 0 additions & 2 deletions src/Orleans.Runtime/Core/InternalGrainRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ internal class InternalGrainRuntime(
GrainLocator grainLocator,
CompatibilityDirectorManager compatibilityDirectorManager,
IOptions<GrainCollectionOptions> collectionOptions,
ILocalGrainDirectory localGrainDirectory,
IActivationWorkingSet activationWorkingSet)
{
public InsideRuntimeClient RuntimeClient { get; } = catalog.RuntimeClient;
Expand All @@ -29,7 +28,6 @@ internal class InternalGrainRuntime(
public CompatibilityDirectorManager CompatibilityDirectorManager { get; } = compatibilityDirectorManager;
public GrainLocator GrainLocator { get; } = grainLocator;
public IOptions<GrainCollectionOptions> CollectionOptions { get; } = collectionOptions;
public ILocalGrainDirectory LocalGrainDirectory { get; } = localGrainDirectory;
public IActivationWorkingSet ActivationWorkingSet { get; } = activationWorkingSet;
}
}
2 changes: 1 addition & 1 deletion src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace Orleans.Runtime.GrainDirectory
/// <summary>
/// Implementation of <see cref="IGrainLocator"/> that uses <see cref="IGrainDirectory"/> stores.
/// </summary>
internal class CachedGrainLocator : IGrainLocator, ILifecycleParticipant<ISiloLifecycle>, CachedGrainLocator.ITestAccessor
internal sealed class CachedGrainLocator : IGrainLocator, ILifecycleParticipant<ISiloLifecycle>, CachedGrainLocator.ITestAccessor
{
private readonly GrainDirectoryResolver grainDirectoryResolver;
private readonly IGrainDirectoryCache cache;
Expand Down
Loading
Loading