diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FullyQualifiedResourceId.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FullyQualifiedResourceId.java index daab07fdafc..f0f1b52b663 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FullyQualifiedResourceId.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FullyQualifiedResourceId.java @@ -27,10 +27,6 @@ */ public class FullyQualifiedResourceId implements Comparable, Cloneable, Serializable { - private static final UUID LOWEST_UUID = new UUID(Long.MIN_VALUE, Long.MIN_VALUE); - - private static final UUID HIGHEST_UUID = new UUID(Long.MAX_VALUE, Long.MAX_VALUE); - private static int compareNullable(@Nullable UUID contextId1, @Nullable UUID contextId2) { if (contextId1 == null && contextId2 == null) { return 0; @@ -65,14 +61,6 @@ public FullyQualifiedResourceId(@Nullable UUID contextId, UUID resourceId) { this.resourceId = resourceId; } - public static FullyQualifiedResourceId lower(UUID contextId) { - return new FullyQualifiedResourceId(contextId, LOWEST_UUID); - } - - public static FullyQualifiedResourceId upper(UUID contextId) { - return new FullyQualifiedResourceId(contextId, HIGHEST_UUID); - } - /** * Gets global ID portion of this {@code FullyQualifiedResourceId}. * diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/RemotelyTriggeredResourceRegistry.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/RemotelyTriggeredResourceRegistry.java index b7d8064fbc1..6bd04999948 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/RemotelyTriggeredResourceRegistry.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/RemotelyTriggeredResourceRegistry.java @@ -20,14 +20,13 @@ import static java.util.Collections.unmodifiableMap; import static java.util.Collections.unmodifiableSet; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.Map.Entry; +import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.function.Supplier; import org.apache.ignite.internal.close.ManuallyCloseable; import org.apache.ignite.internal.logger.IgniteLogger; @@ -41,12 +40,12 @@ public class RemotelyTriggeredResourceRegistry { /** The logger. */ private static final IgniteLogger LOG = Loggers.forClass(RemotelyTriggeredResourceRegistry.class); - /** Resources map. */ - private final ConcurrentNavigableMap resources = new ConcurrentSkipListMap<>(); + /** Resources map: contextId -> (resourceId -> resource). */ + private final ConcurrentHashMap> resources = + new ConcurrentHashMap<>(); - // TODO IGNITE-21633 We may get rid of this map. - /** Remote host inconsistent ids mapped to resources created by them. */ - private final Map> remoteHostsToResources = new ConcurrentHashMap<>(); + /** Reverse index: remoteHostId -> set of contextIds registered by that host. */ + private final ConcurrentHashMap> remoteHostToContextIds = new ConcurrentHashMap<>(); /** * Register a resource. @@ -61,9 +60,18 @@ public T register( UUID remoteHostId, Supplier resourceProvider ) { - T r = (T) resources.computeIfAbsent(resourceId, k -> new RemotelyTriggeredResource(resourceProvider.get(), remoteHostId)).resource; + UUID contextId = Objects.requireNonNull(resourceId.contextId(), "contextId must not be null"); - addRemoteHostResource(remoteHostId, resourceId); + ConcurrentHashMap innerMap = + resources.computeIfAbsent(contextId, k -> new ConcurrentHashMap<>()); + + @SuppressWarnings("unchecked") + T r = (T) innerMap.computeIfAbsent( + resourceId.resourceId(), + k -> new RemotelyTriggeredResource(resourceProvider.get(), remoteHostId) + ).resource; + + remoteHostToContextIds.computeIfAbsent(remoteHostId, k -> ConcurrentHashMap.newKeySet()).add(contextId); return r; } @@ -74,18 +82,32 @@ public T register( * @param resourceId Resource id. */ public void close(FullyQualifiedResourceId resourceId) throws ResourceCloseException { - RemotelyTriggeredResource remotelyTriggeredResource = resources.get(resourceId); + UUID contextId = resourceId.contextId(); - if (remotelyTriggeredResource != null) { - try { - remotelyTriggeredResource.resource.close(); + ConcurrentHashMap innerMap = resources.get(contextId); + + if (innerMap == null) { + return; + } - resources.remove(resourceId); + RemotelyTriggeredResource resource = innerMap.get(resourceId.resourceId()); - removeRemoteHostResource(remotelyTriggeredResource.remoteHostId(), resourceId); - } catch (Exception e) { - throw new ResourceCloseException(resourceId, remotelyTriggeredResource.remoteHostId(), e); + if (resource == null) { + return; + } + + try { + resource.resource.close(); + + innerMap.remove(resourceId.resourceId()); + + if (innerMap.isEmpty()) { + resources.remove(contextId, innerMap); + + removeRemoteHostContext(resource.remoteHostId(), contextId); } + } catch (Exception e) { + throw new ResourceCloseException(resourceId, resource.remoteHostId(), e); } } @@ -95,43 +117,54 @@ public void close(FullyQualifiedResourceId resourceId) throws ResourceCloseExcep * @param contextId Context id of the resources. */ public void close(UUID contextId) throws ResourceCloseException { - Map resourcesWithContext = resources(contextId); + ConcurrentHashMap innerMap = resources.get(contextId); - ResourceCloseException ex = null; + if (innerMap == null) { + return; + } - Set closedResources = new HashSet<>(); + ResourceCloseException ex = null; + Set closedResourceIds = new HashSet<>(); // We assume that the resources of the same context are triggered by the same remote host. UUID remoteHostId = null; - for (Entry entry : resourcesWithContext.entrySet()) { + for (Map.Entry entry : innerMap.entrySet()) { try { entry.getValue().resource.close(); - closedResources.add(entry.getKey()); + closedResourceIds.add(entry.getKey()); if (remoteHostId == null) { remoteHostId = entry.getValue().remoteHostId(); } - assert remoteHostId.equals(entry.getValue().remoteHostId()) : "Resources of the same context triggered by different remote " - + "hosts [" + remoteHostId + ", " + entry.getValue().remoteHostId() + "]."; + if (!remoteHostId.equals(entry.getValue().remoteHostId())) { + throw new IllegalStateException("Resources of the same context triggered by different remote " + + "hosts [" + remoteHostId + ", " + entry.getValue().remoteHostId() + "]."); + } } catch (Exception e) { + FullyQualifiedResourceId fqId = new FullyQualifiedResourceId(contextId, entry.getKey()); + if (ex == null) { - ex = new ResourceCloseException(entry.getKey(), entry.getValue().remoteHostId(), e); + ex = new ResourceCloseException(fqId, entry.getValue().remoteHostId(), e); } else { ex.addSuppressed(e); } } } - if (!closedResources.isEmpty()) { - assert remoteHostId != null : "Remote host is null, resources=" + resourcesWithContext; + if (!closedResourceIds.isEmpty()) { + assert remoteHostId != null : "Remote host is null, contextId=" + contextId; + + for (UUID resourceId : closedResourceIds) { + innerMap.remove(resourceId); + } - for (FullyQualifiedResourceId resourceId : closedResources) { - resourcesWithContext.remove(resourceId); + if (innerMap.isEmpty()) { + resources.remove(contextId, innerMap); - removeRemoteHostResource(remoteHostId, resourceId); + removeRemoteHostContext(remoteHostId, contextId); } } @@ -146,57 +179,44 @@ public void close(UUID contextId) throws ResourceCloseException { * @param remoteHostId Remote host inconsistent id. */ public void closeByRemoteHostId(UUID remoteHostId) { - Set resourceIds = remoteHostsToResources.get(remoteHostId); + Set contextIds = remoteHostToContextIds.remove(remoteHostId); - if (resourceIds == null) { - // Remote host resources were already closed, likely by a concurrent call of "removeRemoteHostResource" method. + if (contextIds == null) { + // Remote host resources were already closed or no resources were registered for this host. return; } - for (FullyQualifiedResourceId resourceId : resourceIds) { + for (UUID contextId : contextIds) { try { - close(resourceId); + close(contextId); } catch (Exception e) { - LOG.warn("Exception occurred during the orphan resource closing [resourceId={}].", e, resourceId); + LOG.warn("Exception occurred during the orphan resource closing [contextId={}].", e, contextId); } } } - private void addRemoteHostResource(UUID remoteHostId, FullyQualifiedResourceId resourceId) { - remoteHostsToResources.compute(remoteHostId, (k, v) -> { - if (v == null) { - v = ConcurrentHashMap.newKeySet(); - } - - v.add(resourceId); - - return v; - }); - } - - private void removeRemoteHostResource(UUID remoteHostId, FullyQualifiedResourceId resourceId) { - remoteHostsToResources.computeIfPresent(remoteHostId, (k, v) -> { - v.remove(resourceId); + private void removeRemoteHostContext(UUID remoteHostId, UUID contextId) { + remoteHostToContextIds.computeIfPresent(remoteHostId, (k, v) -> { + v.remove(contextId); return v.isEmpty() ? null : v; }); } - private Map resources(UUID contextId) { - var lowResourceId = FullyQualifiedResourceId.lower(contextId); - var upperResourceId = FullyQualifiedResourceId.upper(contextId); - - return resources.subMap(lowResourceId, true, upperResourceId, true); - } - /** - * Returns all resources. + * Returns all resources as a flat map. * * @return Remotely triggered resources. */ @TestOnly public Map resources() { - return unmodifiableMap(resources); + Map flat = new HashMap<>(); + + resources.forEach((contextId, innerMap) -> + innerMap.forEach((resourceId, resource) -> + flat.put(new FullyQualifiedResourceId(contextId, resourceId), resource))); + + return unmodifiableMap(flat); } /** @@ -205,7 +225,7 @@ public Map resources() { * @return Remote host inconsistent ids. */ Set registeredRemoteHosts() { - return unmodifiableSet(remoteHostsToResources.keySet()); + return unmodifiableSet(remoteHostToContextIds.keySet()); } /** diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/RemotelyTriggeredResourceRegistryTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/RemotelyTriggeredResourceRegistryTest.java new file mode 100644 index 00000000000..a76ba0d341d --- /dev/null +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/RemotelyTriggeredResourceRegistryTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx.impl; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.junit.jupiter.api.Test; + +class RemotelyTriggeredResourceRegistryTest extends BaseIgniteAbstractTest { + + private final RemotelyTriggeredResourceRegistry registry = new RemotelyTriggeredResourceRegistry(); + + @Test + void registerAndCloseById() throws Exception { + UUID contextId = UUID.randomUUID(); + UUID remoteHostId = UUID.randomUUID(); + FullyQualifiedResourceId id = new FullyQualifiedResourceId(contextId, UUID.randomUUID()); + + AtomicInteger closedCount = new AtomicInteger(); + registry.register(id, remoteHostId, () -> () -> closedCount.incrementAndGet()); + + assertEquals(1, registry.resources().size()); + + registry.close(id); + + assertEquals(1, closedCount.get()); + assertEquals(0, registry.resources().size()); + } + + @Test + void registerReturnsSameResourceOnDuplicateCall() throws Exception { + UUID contextId = UUID.randomUUID(); + UUID remoteHostId = UUID.randomUUID(); + FullyQualifiedResourceId id = new FullyQualifiedResourceId(contextId, UUID.randomUUID()); + + AtomicInteger creationCount = new AtomicInteger(); + registry.register(id, remoteHostId, () -> { + creationCount.incrementAndGet(); + return () -> {}; + }); + registry.register(id, remoteHostId, () -> { + creationCount.incrementAndGet(); + return () -> {}; + }); + + assertEquals(1, creationCount.get()); + assertEquals(1, registry.resources().size()); + } + + @Test + void closeByContextIdClosesAllResourcesInContext() throws Exception { + UUID contextId = UUID.randomUUID(); + UUID remoteHostId = UUID.randomUUID(); + + AtomicInteger closedCount = new AtomicInteger(); + for (int i = 0; i < 5; i++) { + FullyQualifiedResourceId id = new FullyQualifiedResourceId(contextId, UUID.randomUUID()); + registry.register(id, remoteHostId, () -> () -> closedCount.incrementAndGet()); + } + + assertEquals(5, registry.resources().size()); + registry.close(contextId); + + assertEquals(5, closedCount.get()); + assertEquals(0, registry.resources().size()); + } + + @Test + void closeByRemoteHostIdClosesAllResourcesFromThatHost() throws Exception { + UUID remoteHostId = UUID.randomUUID(); + AtomicInteger closedCount = new AtomicInteger(); + + for (int i = 0; i < 3; i++) { + FullyQualifiedResourceId id = new FullyQualifiedResourceId(UUID.randomUUID(), UUID.randomUUID()); + registry.register(id, remoteHostId, () -> () -> closedCount.incrementAndGet()); + } + + assertEquals(3, registry.resources().size()); + registry.closeByRemoteHostId(remoteHostId); + + assertEquals(3, closedCount.get()); + assertEquals(0, registry.resources().size()); + } + + @Test + void closeByRemoteHostIdIsNoOpForUnknownHost() throws Exception { + assertDoesNotThrow(() -> registry.closeByRemoteHostId(UUID.randomUUID())); + assertEquals(0, registry.resources().size()); + } + + @Test + void closeByContextIdAfterCloseByRemoteHostIdIsNoOp() throws Exception { + UUID contextId = UUID.randomUUID(); + UUID remoteHostId = UUID.randomUUID(); + AtomicInteger closedCount = new AtomicInteger(); + + FullyQualifiedResourceId id = new FullyQualifiedResourceId(contextId, UUID.randomUUID()); + registry.register(id, remoteHostId, () -> () -> closedCount.incrementAndGet()); + + registry.closeByRemoteHostId(remoteHostId); + registry.close(contextId); // must be a no-op + + assertEquals(1, closedCount.get()); + assertEquals(0, registry.resources().size()); + } + + @Test + void closeByIdIsNoOpForUnknownResource() throws Exception { + FullyQualifiedResourceId id = new FullyQualifiedResourceId(UUID.randomUUID(), UUID.randomUUID()); + assertDoesNotThrow(() -> registry.close(id)); + assertEquals(0, registry.resources().size()); + } + + @Test + void closeByContextIdIsNoOpForUnknownContext() throws Exception { + assertDoesNotThrow(() -> registry.close(UUID.randomUUID())); + assertEquals(0, registry.resources().size()); + } + + @Test + void multipleContextsSameRemoteHost() throws Exception { + UUID remoteHostId = UUID.randomUUID(); + UUID contextId1 = UUID.randomUUID(); + UUID contextId2 = UUID.randomUUID(); + + AtomicInteger closedCount = new AtomicInteger(); + registry.register(new FullyQualifiedResourceId(contextId1, UUID.randomUUID()), remoteHostId, + () -> () -> closedCount.incrementAndGet()); + registry.register(new FullyQualifiedResourceId(contextId2, UUID.randomUUID()), remoteHostId, + () -> () -> closedCount.incrementAndGet()); + + registry.close(contextId1); // closes one context, host entry must survive + + assertEquals(1, closedCount.get()); + assertEquals(1, registry.resources().size()); + assertEquals(1, registry.registeredRemoteHosts().size()); // host still has contextId2 + + registry.close(contextId2); + + assertEquals(2, closedCount.get()); + assertEquals(0, registry.resources().size()); + assertEquals(0, registry.registeredRemoteHosts().size()); // host entry cleaned up + } + + @Test + void concurrentRegistrationWithSameContextAndHost() throws Exception { + UUID contextId = UUID.randomUUID(); + UUID remoteHostId = UUID.randomUUID(); + int threadCount = 32; + + List ids = new ArrayList<>(); + for (int i = 0; i < threadCount; i++) { + ids.add(new FullyQualifiedResourceId(contextId, UUID.randomUUID())); + } + + CountDownLatch start = new CountDownLatch(1); + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + List> futures = new ArrayList<>(); + try { + for (FullyQualifiedResourceId id : ids) { + futures.add(executor.submit(() -> { + try { + start.await(); + registry.register(id, remoteHostId, () -> () -> {}); + } catch (Exception e) { + throw new RuntimeException(e); + } + })); + } + start.countDown(); + for (Future future : futures) { + future.get(5, TimeUnit.SECONDS); + } + } finally { + executor.shutdown(); + assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS)); + } + + assertEquals(threadCount, registry.resources().size()); + assertEquals(1, registry.registeredRemoteHosts().size()); + } +}