Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@
*/
public class FullyQualifiedResourceId implements Comparable<FullyQualifiedResourceId>, Cloneable, Serializable {

private static final UUID LOWEST_UUID = new UUID(Long.MIN_VALUE, Long.MIN_VALUE);

private static final UUID HIGHEST_UUID = new UUID(Long.MAX_VALUE, Long.MAX_VALUE);

private static int compareNullable(@Nullable UUID contextId1, @Nullable UUID contextId2) {
if (contextId1 == null && contextId2 == null) {
return 0;
Expand Down Expand Up @@ -65,14 +61,6 @@ public FullyQualifiedResourceId(@Nullable UUID contextId, UUID resourceId) {
this.resourceId = resourceId;
}

public static FullyQualifiedResourceId lower(UUID contextId) {
return new FullyQualifiedResourceId(contextId, LOWEST_UUID);
}

public static FullyQualifiedResourceId upper(UUID contextId) {
return new FullyQualifiedResourceId(contextId, HIGHEST_UUID);
}

/**
* Gets global ID portion of this {@code FullyQualifiedResourceId}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@
import static java.util.Collections.unmodifiableMap;
import static java.util.Collections.unmodifiableSet;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Supplier;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.logger.IgniteLogger;
Expand All @@ -41,12 +40,12 @@ public class RemotelyTriggeredResourceRegistry {
/** The logger. */
private static final IgniteLogger LOG = Loggers.forClass(RemotelyTriggeredResourceRegistry.class);

/** Resources map. */
private final ConcurrentNavigableMap<FullyQualifiedResourceId, RemotelyTriggeredResource> resources = new ConcurrentSkipListMap<>();
/** Resources map: contextId -> (resourceId -> resource). */
private final ConcurrentHashMap<UUID, ConcurrentHashMap<UUID, RemotelyTriggeredResource>> resources =
new ConcurrentHashMap<>();

// TODO IGNITE-21633 We may get rid of this map.
/** Remote host inconsistent ids mapped to resources created by them. */
private final Map<UUID, Set<FullyQualifiedResourceId>> remoteHostsToResources = new ConcurrentHashMap<>();
/** Reverse index: remoteHostId -> set of contextIds registered by that host. */
private final ConcurrentHashMap<UUID, Set<UUID>> remoteHostToContextIds = new ConcurrentHashMap<>();

/**
* Register a resource.
Expand All @@ -61,9 +60,18 @@ public <T extends ManuallyCloseable> T register(
UUID remoteHostId,
Supplier<ManuallyCloseable> resourceProvider
) {
T r = (T) resources.computeIfAbsent(resourceId, k -> new RemotelyTriggeredResource(resourceProvider.get(), remoteHostId)).resource;
UUID contextId = Objects.requireNonNull(resourceId.contextId(), "contextId must not be null");

addRemoteHostResource(remoteHostId, resourceId);
ConcurrentHashMap<UUID, RemotelyTriggeredResource> innerMap =
resources.computeIfAbsent(contextId, k -> new ConcurrentHashMap<>());

@SuppressWarnings("unchecked")
T r = (T) innerMap.computeIfAbsent(
resourceId.resourceId(),
k -> new RemotelyTriggeredResource(resourceProvider.get(), remoteHostId)
).resource;

remoteHostToContextIds.computeIfAbsent(remoteHostId, k -> ConcurrentHashMap.newKeySet()).add(contextId);

return r;
}
Expand All @@ -74,18 +82,32 @@ public <T extends ManuallyCloseable> T register(
* @param resourceId Resource id.
*/
public void close(FullyQualifiedResourceId resourceId) throws ResourceCloseException {
RemotelyTriggeredResource remotelyTriggeredResource = resources.get(resourceId);
UUID contextId = resourceId.contextId();

if (remotelyTriggeredResource != null) {
try {
remotelyTriggeredResource.resource.close();
ConcurrentHashMap<UUID, RemotelyTriggeredResource> innerMap = resources.get(contextId);

if (innerMap == null) {
return;
}

resources.remove(resourceId);
RemotelyTriggeredResource resource = innerMap.get(resourceId.resourceId());

removeRemoteHostResource(remotelyTriggeredResource.remoteHostId(), resourceId);
} catch (Exception e) {
throw new ResourceCloseException(resourceId, remotelyTriggeredResource.remoteHostId(), e);
if (resource == null) {
return;
}

try {
resource.resource.close();

innerMap.remove(resourceId.resourceId());

if (innerMap.isEmpty()) {
resources.remove(contextId, innerMap);

removeRemoteHostContext(resource.remoteHostId(), contextId);
}
} catch (Exception e) {
throw new ResourceCloseException(resourceId, resource.remoteHostId(), e);
}
}

Expand All @@ -95,43 +117,54 @@ public void close(FullyQualifiedResourceId resourceId) throws ResourceCloseExcep
* @param contextId Context id of the resources.
*/
public void close(UUID contextId) throws ResourceCloseException {
Map<FullyQualifiedResourceId, RemotelyTriggeredResource> resourcesWithContext = resources(contextId);
ConcurrentHashMap<UUID, RemotelyTriggeredResource> innerMap = resources.get(contextId);

ResourceCloseException ex = null;
if (innerMap == null) {
return;
}

Set<FullyQualifiedResourceId> closedResources = new HashSet<>();
ResourceCloseException ex = null;
Set<UUID> closedResourceIds = new HashSet<>();

// We assume that the resources of the same context are triggered by the same remote host.
UUID remoteHostId = null;

for (Entry<FullyQualifiedResourceId, RemotelyTriggeredResource> entry : resourcesWithContext.entrySet()) {
for (Map.Entry<UUID, RemotelyTriggeredResource> entry : innerMap.entrySet()) {
try {
entry.getValue().resource.close();

closedResources.add(entry.getKey());
closedResourceIds.add(entry.getKey());

if (remoteHostId == null) {
remoteHostId = entry.getValue().remoteHostId();
}

assert remoteHostId.equals(entry.getValue().remoteHostId()) : "Resources of the same context triggered by different remote "
+ "hosts [" + remoteHostId + ", " + entry.getValue().remoteHostId() + "].";
if (!remoteHostId.equals(entry.getValue().remoteHostId())) {
throw new IllegalStateException("Resources of the same context triggered by different remote "
+ "hosts [" + remoteHostId + ", " + entry.getValue().remoteHostId() + "].");
}
} catch (Exception e) {
FullyQualifiedResourceId fqId = new FullyQualifiedResourceId(contextId, entry.getKey());

if (ex == null) {
ex = new ResourceCloseException(entry.getKey(), entry.getValue().remoteHostId(), e);
ex = new ResourceCloseException(fqId, entry.getValue().remoteHostId(), e);
} else {
ex.addSuppressed(e);
}
}
}

if (!closedResources.isEmpty()) {
assert remoteHostId != null : "Remote host is null, resources=" + resourcesWithContext;
if (!closedResourceIds.isEmpty()) {
assert remoteHostId != null : "Remote host is null, contextId=" + contextId;

for (UUID resourceId : closedResourceIds) {
innerMap.remove(resourceId);
}

for (FullyQualifiedResourceId resourceId : closedResources) {
resourcesWithContext.remove(resourceId);
if (innerMap.isEmpty()) {
resources.remove(contextId, innerMap);

removeRemoteHostResource(remoteHostId, resourceId);
removeRemoteHostContext(remoteHostId, contextId);
}
}

Expand All @@ -146,57 +179,44 @@ public void close(UUID contextId) throws ResourceCloseException {
* @param remoteHostId Remote host inconsistent id.
*/
public void closeByRemoteHostId(UUID remoteHostId) {
Set<FullyQualifiedResourceId> resourceIds = remoteHostsToResources.get(remoteHostId);
Set<UUID> contextIds = remoteHostToContextIds.remove(remoteHostId);

if (resourceIds == null) {
// Remote host resources were already closed, likely by a concurrent call of "removeRemoteHostResource" method.
if (contextIds == null) {
// Remote host resources were already closed or no resources were registered for this host.
return;
}

for (FullyQualifiedResourceId resourceId : resourceIds) {
for (UUID contextId : contextIds) {
try {
close(resourceId);
close(contextId);
} catch (Exception e) {
LOG.warn("Exception occurred during the orphan resource closing [resourceId={}].", e, resourceId);
LOG.warn("Exception occurred during the orphan resource closing [contextId={}].", e, contextId);
}
}
}

private void addRemoteHostResource(UUID remoteHostId, FullyQualifiedResourceId resourceId) {
remoteHostsToResources.compute(remoteHostId, (k, v) -> {
if (v == null) {
v = ConcurrentHashMap.newKeySet();
}

v.add(resourceId);

return v;
});
}

private void removeRemoteHostResource(UUID remoteHostId, FullyQualifiedResourceId resourceId) {
remoteHostsToResources.computeIfPresent(remoteHostId, (k, v) -> {
v.remove(resourceId);
private void removeRemoteHostContext(UUID remoteHostId, UUID contextId) {
remoteHostToContextIds.computeIfPresent(remoteHostId, (k, v) -> {
v.remove(contextId);

return v.isEmpty() ? null : v;
});
}

private Map<FullyQualifiedResourceId, RemotelyTriggeredResource> resources(UUID contextId) {
var lowResourceId = FullyQualifiedResourceId.lower(contextId);
var upperResourceId = FullyQualifiedResourceId.upper(contextId);

return resources.subMap(lowResourceId, true, upperResourceId, true);
}

/**
* Returns all resources.
* Returns all resources as a flat map.
*
* @return Remotely triggered resources.
*/
@TestOnly
public Map<FullyQualifiedResourceId, RemotelyTriggeredResource> resources() {
return unmodifiableMap(resources);
Map<FullyQualifiedResourceId, RemotelyTriggeredResource> flat = new HashMap<>();

resources.forEach((contextId, innerMap) ->
innerMap.forEach((resourceId, resource) ->
flat.put(new FullyQualifiedResourceId(contextId, resourceId), resource)));

return unmodifiableMap(flat);
}

/**
Expand All @@ -205,7 +225,7 @@ public Map<FullyQualifiedResourceId, RemotelyTriggeredResource> resources() {
* @return Remote host inconsistent ids.
*/
Set<UUID> registeredRemoteHosts() {
return unmodifiableSet(remoteHostsToResources.keySet());
return unmodifiableSet(remoteHostToContextIds.keySet());
}

/**
Expand Down
Loading