diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java index eb6f25025c5..4a852344172 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java @@ -51,6 +51,8 @@ public class TransportConfiguration implements Serializable { private String name; + private String lockCoordinator; + private String factoryClassName = "null"; private Map params; @@ -413,6 +415,15 @@ public void decode(final ActiveMQBuffer buffer) { } } + public String getLockCoordinator() { + return lockCoordinator; + } + + public TransportConfiguration setLockCoordinator(String lockCoordinator) { + this.lockCoordinator = lockCoordinator; + return this; + } + private static String replaceWildcardChars(final String str) { return str.replace('.', '-'); } diff --git a/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/DistributedLock.java b/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/DistributedLock.java index 11f73ebdd23..a624c0004b5 100644 --- a/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/DistributedLock.java +++ b/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/DistributedLock.java @@ -24,6 +24,7 @@ public interface DistributedLock extends AutoCloseable { String getLockId(); + // TODO: A better name for this method would be isLockValid boolean isHeldByCaller() throws UnavailableStateException; boolean tryLock() throws UnavailableStateException, InterruptedException; diff --git a/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/DistributedLockManager.java b/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/DistributedLockManager.java index d42f8e985fa..dafdf32216c 100644 --- a/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/DistributedLockManager.java +++ b/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/DistributedLockManager.java @@ -21,16 +21,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.activemq.artemis.utils.ClassloadingUtil; - public interface DistributedLockManager extends AutoCloseable { static DistributedLockManager newInstanceOf(String className, Map properties) throws Exception { - return (DistributedLockManager) ClassloadingUtil.getInstanceForParamsWithTypeCheck(className, - DistributedLockManager.class, - DistributedLockManager.class.getClassLoader(), - new Class[]{Map.class}, - properties); + DistributedLockManagerFactory factory = Registry.getInstance().getFactoryWithClassName(className); + if (factory == null) { + throw new IllegalArgumentException(className + " not found"); + } + return factory.build(properties); } @FunctionalInterface diff --git a/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/DistributedLockManagerFactory.java b/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/DistributedLockManagerFactory.java new file mode 100644 index 00000000000..b9fb4de039e --- /dev/null +++ b/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/DistributedLockManagerFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.lockmanager; + +import java.util.Map; +import java.util.Set; + +public interface DistributedLockManagerFactory { + DistributedLockManager build(Map properties); + + String getName(); + + String getImplName(); + + default Map validateParameters(Map config) { + config.forEach((parameterName, ignore) -> validateParameter(parameterName)); + return config; + } + + default String getParameterListAsString() { + return String.join(", ", getValidParametersList()); + } + + Set getValidParametersList(); + + default void validateParameter(String parameterName) { + Set validList = getValidParametersList(); + if (!validList.contains(parameterName)) { + throw new IllegalArgumentException("Invalid parameter '" + parameterName + "'. Accepted parameters: " + String.join(", ", validList)); + } + } +} diff --git a/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/Registry.java b/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/Registry.java new file mode 100644 index 00000000000..af479d4f18b --- /dev/null +++ b/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/Registry.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.lockmanager; + +import java.util.HashMap; +import java.util.ServiceLoader; + +public class Registry { + + private final HashMap factories = new HashMap<>(); + private final HashMap factoriesWithImpl = new HashMap<>(); + + private volatile boolean serviceLoaded = false; + + private static final Registry INSTANCE = new Registry(); + + private Registry() { + } + + public static Registry getInstance() { + return INSTANCE; + } + + public synchronized void register(DistributedLockManagerFactory factory) { + factories.put(factory.getName().toLowerCase(), factory); + factoriesWithImpl.put(factory.getImplName(), factory); + } + + public synchronized void unregisterWithType(String type) { + unregister(factories.get(type.toLowerCase())); + } + + public synchronized void unregisterWithClassName(String name) { + unregister(factoriesWithImpl.get(name)); + } + + private void unregister(DistributedLockManagerFactory factory) { + if (factory != null) { + factories.remove(factory.getName()); + factoriesWithImpl.remove(factory.getImplName()); + } + } + + public synchronized DistributedLockManagerFactory getFactoryWithClassName(String className) { + checkService(); + DistributedLockManagerFactory factory = factoriesWithImpl.get(className); + if (factory == null) { + throw new IllegalArgumentException("factory " + className + " not found"); + } + return factory; + } + + public synchronized DistributedLockManagerFactory getFactory(String type) { + checkService(); + return factories.get(type.toLowerCase()); + } + + public synchronized void checkService() { + if (serviceLoaded) { + return; + } + ServiceLoader services = ServiceLoader.load(DistributedLockManagerFactory.class); + services.forEach(this::register); + serviceLoaded = true; + } + +} diff --git a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManager.java b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManager.java index dcd5b19d887..52005a7861c 100644 --- a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManager.java +++ b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManager.java @@ -31,21 +31,13 @@ import org.apache.activemq.artemis.lockmanager.MutableLong; import org.apache.activemq.artemis.lockmanager.UnavailableStateException; -/** - * This is an implementation suitable to be used just on unit tests and it won't attempt to manage nor purge existing - * stale locks files. It's part of the tests life-cycle to properly set-up and tear-down the environment. - */ public class FileBasedLockManager implements DistributedLockManager { private final File locksFolder; private final Map locks; private boolean started; - public FileBasedLockManager(Map args) { - this(new File(args.get("locks-folder"))); - } - - public FileBasedLockManager(File locksFolder) { + FileBasedLockManager(File locksFolder) { Objects.requireNonNull(locksFolder); if (!locksFolder.exists()) { throw new IllegalStateException(locksFolder + " is supposed to already exists"); diff --git a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManagerFactory.java b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManagerFactory.java new file mode 100644 index 00000000000..c68e55d7ca3 --- /dev/null +++ b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManagerFactory.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.lockmanager.file; + +import java.io.File; +import java.util.Map; +import java.util.Set; + +import org.apache.activemq.artemis.lockmanager.DistributedLockManager; +import org.apache.activemq.artemis.lockmanager.DistributedLockManagerFactory; + +/** + * Factory for creating file-based distributed lock managers. + *

+ * This implementation uses the file system to manage distributed locks + *

+ * Valid configuration parameters: + *

    + *
  • locks-folder (required): Path to the directory where lock files will be created and managed. + * The directory must be created in advance before using this lock manager.
  • + *
+ */ +public class FileBasedLockManagerFactory implements DistributedLockManagerFactory { + + private static final String LOCK_FOLDER = "locks-folder"; + + private static final Set VALID_PARAMS = Set.of(LOCK_FOLDER); + + @Override + public String getName() { + return "file"; + } + + @Override + public DistributedLockManager build(Map config) { + config = validateParameters(config); + String folder = config.get(LOCK_FOLDER); + if (folder == null) { + throw new IllegalArgumentException("folder not passed as a parameter"); + } + return new FileBasedLockManager(new File(folder)); + } + + @Override + public Set getValidParametersList() { + return VALID_PARAMS; + } + + @Override + public String getImplName() { + return FileBasedLockManager.class.getName(); + } +} diff --git a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileDistributedLock.java b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileDistributedLock.java index 60ac64f235a..9f57f736443 100644 --- a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileDistributedLock.java +++ b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileDistributedLock.java @@ -65,7 +65,7 @@ public boolean isHeldByCaller() { } @Override - public boolean tryLock() { + public synchronized boolean tryLock() { checkNotClosed(); final FileLock fileLock = this.fileLock; if (fileLock != null) { @@ -88,7 +88,7 @@ public boolean tryLock() { } @Override - public void unlock() { + public synchronized void unlock() { checkNotClosed(); final FileLock fileLock = this.fileLock; if (fileLock != null) { diff --git a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManager.java b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManager.java index 6b5fd5f2104..c92864a1a01 100644 --- a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManager.java +++ b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManager.java @@ -20,13 +20,10 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.activemq.artemis.lockmanager.DistributedLock; import org.apache.activemq.artemis.lockmanager.DistributedLockManager; @@ -42,7 +39,6 @@ import org.apache.curator.utils.DebugUtils; import static java.util.Objects.requireNonNull; -import static java.util.stream.Collectors.joining; public class CuratorDistributedLockManager implements DistributedLockManager, ConnectionStateListener { @@ -97,41 +93,6 @@ public int hashCode() { } } - private static final String CONNECT_STRING_PARAM = "connect-string"; - private static final String NAMESPACE_PARAM = "namespace"; - private static final String SESSION_MS_PARAM = "session-ms"; - private static final String SESSION_PERCENT_PARAM = "session-percent"; - private static final String CONNECTION_MS_PARAM = "connection-ms"; - private static final String RETRIES_PARAM = "retries"; - private static final String RETRIES_MS_PARAM = "retries-ms"; - private static final Set VALID_PARAMS = Stream.of( - CONNECT_STRING_PARAM, - NAMESPACE_PARAM, - SESSION_MS_PARAM, - SESSION_PERCENT_PARAM, - CONNECTION_MS_PARAM, - RETRIES_PARAM, - RETRIES_MS_PARAM).collect(Collectors.toSet()); - private static final String VALID_PARAMS_ON_ERROR = VALID_PARAMS.stream().collect(joining(",")); - // It's 9 times the default ZK tick time ie 2000 ms - private static final String DEFAULT_SESSION_TIMEOUT_MS = Integer.toString(18_000); - private static final String DEFAULT_CONNECTION_TIMEOUT_MS = Integer.toString(8_000); - private static final String DEFAULT_RETRIES = Integer.toString(1); - private static final String DEFAULT_RETRIES_MS = Integer.toString(1000); - // why 1/3 of the session? https://cwiki.apache.org/confluence/display/CURATOR/TN14 - private static final String DEFAULT_SESSION_PERCENT = Integer.toString(33); - - private static Map validateParameters(Map config) { - config.forEach((parameterName, ignore) -> validateParameter(parameterName)); - return config; - } - - private static void validateParameter(String parameterName) { - if (!VALID_PARAMS.contains(parameterName)) { - throw new IllegalArgumentException("non existent parameter " + parameterName + ": accepted list is " + VALID_PARAMS_ON_ERROR); - } - } - private CuratorFramework client; private final Map primitives; private List listeners; @@ -146,21 +107,7 @@ private static void validateParameter(String parameterName) { } } - public CuratorDistributedLockManager(Map config) { - this(validateParameters(config), true); - } - - private CuratorDistributedLockManager(Map config, boolean ignore) { - this(config.get(CONNECT_STRING_PARAM), - config.get(NAMESPACE_PARAM), - Integer.parseInt(config.getOrDefault(SESSION_MS_PARAM, DEFAULT_SESSION_TIMEOUT_MS)), - Integer.parseInt(config.getOrDefault(SESSION_PERCENT_PARAM, DEFAULT_SESSION_PERCENT)), - Integer.parseInt(config.getOrDefault(CONNECTION_MS_PARAM, DEFAULT_CONNECTION_TIMEOUT_MS)), - Integer.parseInt(config.getOrDefault(RETRIES_PARAM, DEFAULT_RETRIES)), - Integer.parseInt(config.getOrDefault(RETRIES_MS_PARAM, DEFAULT_RETRIES_MS))); - } - - private CuratorDistributedLockManager(String connectString, + CuratorDistributedLockManager(String connectString, String namespace, int sessionMs, int sessionPercent, diff --git a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManagerFactory.java b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManagerFactory.java new file mode 100644 index 00000000000..4dc07155eaf --- /dev/null +++ b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManagerFactory.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.lockmanager.zookeeper; + +import java.util.Map; +import java.util.Set; + +import org.apache.activemq.artemis.lockmanager.DistributedLockManager; +import org.apache.activemq.artemis.lockmanager.DistributedLockManagerFactory; + +/** + * Factory for creating ZooKeeper-based distributed lock managers using Apache Curator. + *

+ * Valid configuration parameters: + *

    + *
  • connect-string (required): ZooKeeper connection string (e.g., "localhost:2181" or "host1:2181,host2:2181,host3:2181")
  • + *
  • namespace (required): Namespace prefix for all ZooKeeper paths to isolate lock manager data
  • + *
  • session-ms (optional, default: 18000): Session timeout in milliseconds
  • + *
  • session-percent (optional, default: 33): Percentage of session timeout to use for lock operations
  • + *
  • connection-ms (optional, default: 8000): Connection timeout in milliseconds
  • + *
  • retries (optional, default: 1): Number of retry attempts for failed operations
  • + *
  • retries-ms (optional, default: 1000): Delay in milliseconds between retry attempts
  • + *
+ */ +public class CuratorDistributedLockManagerFactory implements DistributedLockManagerFactory { + + private static final String CONNECT_STRING_PARAM = "connect-string"; + private static final String NAMESPACE_PARAM = "namespace"; + private static final String SESSION_MS_PARAM = "session-ms"; + private static final String SESSION_PERCENT_PARAM = "session-percent"; + private static final String CONNECTION_MS_PARAM = "connection-ms"; + private static final String RETRIES_PARAM = "retries"; + private static final String RETRIES_MS_PARAM = "retries-ms"; + private static final Set VALID_PARAMS = Set.of(CONNECT_STRING_PARAM, NAMESPACE_PARAM, SESSION_MS_PARAM, SESSION_PERCENT_PARAM, CONNECTION_MS_PARAM, RETRIES_PARAM, RETRIES_MS_PARAM); + + // It's 9 times the default ZK tick time ie 2000 ms + private static final String DEFAULT_SESSION_TIMEOUT_MS = Integer.toString(18_000); + private static final String DEFAULT_CONNECTION_TIMEOUT_MS = Integer.toString(8_000); + private static final String DEFAULT_RETRIES = Integer.toString(1); + private static final String DEFAULT_RETRIES_MS = Integer.toString(1000); + // why 1/3 of the session? https://cwiki.apache.org/confluence/display/CURATOR/TN14 + private static final String DEFAULT_SESSION_PERCENT = Integer.toString(33); + + @Override + public Set getValidParametersList() { + return VALID_PARAMS; + } + + @Override + public DistributedLockManager build(Map config) { + validateParameters(config); + return new CuratorDistributedLockManager(config.get(CONNECT_STRING_PARAM), + config.get(NAMESPACE_PARAM), + Integer.parseInt(config.getOrDefault(SESSION_MS_PARAM, DEFAULT_SESSION_TIMEOUT_MS)), + Integer.parseInt(config.getOrDefault(SESSION_PERCENT_PARAM, DEFAULT_SESSION_PERCENT)), + Integer.parseInt(config.getOrDefault(CONNECTION_MS_PARAM, DEFAULT_CONNECTION_TIMEOUT_MS)), + Integer.parseInt(config.getOrDefault(RETRIES_PARAM, DEFAULT_RETRIES)), + Integer.parseInt(config.getOrDefault(RETRIES_MS_PARAM, DEFAULT_RETRIES_MS))); + } + + @Override + public String getName() { + return "ZK"; + } + + @Override + public String getImplName() { + return CuratorDistributedLockManager.class.getName(); + } +} diff --git a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/quorum/zookeeper/CuratorDistributedPrimitiveManager.java b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/quorum/zookeeper/CuratorDistributedPrimitiveManager.java deleted file mode 100644 index b4f008313ae..00000000000 --- a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/quorum/zookeeper/CuratorDistributedPrimitiveManager.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.quorum.zookeeper; - -import java.util.Map; - -import org.apache.activemq.artemis.lockmanager.zookeeper.CuratorDistributedLockManager; - -/** - * This is for backwards compatibility - */ -@Deprecated(forRemoval = true) -public class CuratorDistributedPrimitiveManager extends CuratorDistributedLockManager { - public CuratorDistributedPrimitiveManager(Map config) { - super(config); - } -} diff --git a/artemis-lockmanager/artemis-lockmanager-ri/src/main/resources/META-INF/services/org.apache.activemq.artemis.lockmanager.DistributedLockManagerFactory b/artemis-lockmanager/artemis-lockmanager-ri/src/main/resources/META-INF/services/org.apache.activemq.artemis.lockmanager.DistributedLockManagerFactory new file mode 100644 index 00000000000..587525afa2e --- /dev/null +++ b/artemis-lockmanager/artemis-lockmanager-ri/src/main/resources/META-INF/services/org.apache.activemq.artemis.lockmanager.DistributedLockManagerFactory @@ -0,0 +1,2 @@ +org.apache.activemq.artemis.lockmanager.zookeeper.CuratorDistributedLockManagerFactory +org.apache.activemq.artemis.lockmanager.file.FileBasedLockManagerFactory diff --git a/artemis-lockmanager/artemis-lockmanager-ri/src/test/java/org/apache/activemq/artemis/lockmanager/RegistryTest.java b/artemis-lockmanager/artemis-lockmanager-ri/src/test/java/org/apache/activemq/artemis/lockmanager/RegistryTest.java new file mode 100644 index 00000000000..667b149b0a8 --- /dev/null +++ b/artemis-lockmanager/artemis-lockmanager-ri/src/test/java/org/apache/activemq/artemis/lockmanager/RegistryTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.lockmanager; + +import java.lang.invoke.MethodHandles; +import java.util.Map; +import java.util.Set; + +import org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager; +import org.apache.activemq.artemis.lockmanager.file.FileBasedLockManagerFactory; +import org.apache.activemq.artemis.lockmanager.zookeeper.CuratorDistributedLockManager; +import org.apache.activemq.artemis.lockmanager.zookeeper.CuratorDistributedLockManagerFactory; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RegistryTest { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @Test + public void testDiscovery() { + + DistributedLockManagerFactory factory; + + factory = Registry.getInstance().getFactory("file"); + assertTrue(factory instanceof FileBasedLockManagerFactory); + assertEquals(FileBasedLockManager.class.getName(), factory.getImplName()); + + factory = Registry.getInstance().getFactory("FILE"); + assertTrue(factory instanceof FileBasedLockManagerFactory); + assertEquals(FileBasedLockManager.class.getName(), factory.getImplName()); + + factory = Registry.getInstance().getFactoryWithClassName(FileBasedLockManager.class.getName()); + assertTrue(factory instanceof FileBasedLockManagerFactory); + assertEquals(FileBasedLockManager.class.getName(), factory.getImplName()); + + factory = Registry.getInstance().getFactory("ZK"); + assertTrue(factory instanceof CuratorDistributedLockManagerFactory); + assertEquals(CuratorDistributedLockManager.class.getName(), factory.getImplName()); + + factory = Registry.getInstance().getFactory("zk"); + assertTrue(factory instanceof CuratorDistributedLockManagerFactory); + assertEquals(CuratorDistributedLockManager.class.getName(), factory.getImplName()); + + factory = Registry.getInstance().getFactoryWithClassName(CuratorDistributedLockManager.class.getName()); + assertTrue(factory instanceof CuratorDistributedLockManagerFactory); + assertEquals(CuratorDistributedLockManager.class.getName(), factory.getImplName()); + } + + @Test + public void testUnregister() { + Registry.getInstance().register(new FakeDistributedLockManagerFactory()); + assertInstanceOf(FakeDistributedLockManagerFactory.class, Registry.getInstance().getFactory("fake")); + assertInstanceOf(FakeDistributedLockManagerFactory.class, Registry.getInstance().getFactoryWithClassName("Fake")); + Registry.getInstance().unregisterWithType("fake"); + assertNull(Registry.getInstance().getFactory("fake")); + assertThrows(IllegalArgumentException.class, () -> Registry.getInstance().getFactoryWithClassName("Fake")); + Registry.getInstance().register(new FakeDistributedLockManagerFactory()); + assertInstanceOf(FakeDistributedLockManagerFactory.class, Registry.getInstance().getFactory("fake")); + assertInstanceOf(FakeDistributedLockManagerFactory.class, Registry.getInstance().getFactoryWithClassName("Fake")); + Registry.getInstance().unregisterWithClassName("Fake"); + assertNull(Registry.getInstance().getFactory("fake")); + assertNull(Registry.getInstance().getFactory("Fake")); + assertThrows(IllegalArgumentException.class, () -> Registry.getInstance().getFactoryWithClassName("Fake")); + assertDoesNotThrow(() -> Registry.getInstance().unregisterWithType("dontExist")); + assertDoesNotThrow(() -> Registry.getInstance().unregisterWithClassName("dontExist")); + } + + public static class FakeDistributedLockManagerFactory implements DistributedLockManagerFactory { + + @Override + public DistributedLockManager build(Map properties) { + return null; + } + + @Override + public String getName() { + return "fake"; + } + + @Override + public String getImplName() { + return "Fake"; + } + + @Override + public Set getValidParametersList() { + return Set.of(); + } + } + + +} diff --git a/artemis-lockmanager/artemis-lockmanager-ri/src/test/java/org/apache/activemq/artemis/lockmanager/file/FileDistributedLockTest.java b/artemis-lockmanager/artemis-lockmanager-ri/src/test/java/org/apache/activemq/artemis/lockmanager/file/FileDistributedLockTest.java index bcddde55158..60ea70d5068 100644 --- a/artemis-lockmanager/artemis-lockmanager-ri/src/test/java/org/apache/activemq/artemis/lockmanager/file/FileDistributedLockTest.java +++ b/artemis-lockmanager/artemis-lockmanager-ri/src/test/java/org/apache/activemq/artemis/lockmanager/file/FileDistributedLockTest.java @@ -20,7 +20,6 @@ import java.io.File; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; import java.util.Collections; import java.util.Map; @@ -63,14 +62,14 @@ public void reflectiveManagerCreation() throws Exception { @Test public void reflectiveManagerCreationFailWithoutLocksFolder() throws Exception { - assertThrows(InvocationTargetException.class, () -> { + assertThrows(IllegalArgumentException.class, () -> { DistributedLockManager.newInstanceOf(managerClassName(), Collections.emptyMap()); }); } @Test public void reflectiveManagerCreationFailIfLocksFolderIsNotFolder() throws Exception { - assertThrows(InvocationTargetException.class, () -> { + assertThrows(IllegalStateException.class, () -> { DistributedLockManager.newInstanceOf(managerClassName(), Collections.singletonMap("locks-folder", File.createTempFile("junit", null, tmpFolder).toString())); }); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index dfbc499c081..440abab3767 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -18,6 +18,7 @@ import java.io.File; import java.net.URL; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Properties; @@ -1368,6 +1369,10 @@ default boolean isJDBC() { void unRegisterBrokerPlugin(ActiveMQServerBasePlugin plugin); + Collection getLockCoordinatorConfigurations(); + + void addLockCoordinatorConfiguration(LockCoordinatorConfiguration configuration); + List getBrokerPlugins(); List getBrokerConnectionPlugins(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/LockCoordinatorConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/LockCoordinatorConfiguration.java new file mode 100644 index 00000000000..7c31e96fb5b --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/LockCoordinatorConfiguration.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.config; + +import java.util.HashMap; +import java.util.Map; + +public class LockCoordinatorConfiguration { + + String name; + String lockId; + String type; + int checkPeriod; + Map properties; + + public LockCoordinatorConfiguration() { + properties = new HashMap<>(); + } + + public LockCoordinatorConfiguration(Map properties) { + this.properties = properties; + } + + public String getName() { + return name; + } + + public LockCoordinatorConfiguration setName(String name) { + if (name == null || name.trim().isEmpty()) { + throw new IllegalArgumentException("LockCoordinator name cannot be null or empty"); + } + this.name = name; + return this; + } + + public String getLockId() { + return lockId; + } + + public LockCoordinatorConfiguration setLockId(String lockId) { + if (lockId == null || lockId.trim().isEmpty()) { + throw new IllegalArgumentException("LockCoordinator lockId cannot be null or empty"); + } + this.lockId = lockId; + return this; + } + + public String getLockType() { + return type; + } + + public LockCoordinatorConfiguration setLockType(String type) { + if (type == null || type.trim().isEmpty()) { + throw new IllegalArgumentException("LockCoordinator type cannot be null or empty"); + } + this.type = type; + return this; + } + + public int getCheckPeriod() { + return checkPeriod; + } + + public LockCoordinatorConfiguration setCheckPeriod(int checkPeriod) { + if (checkPeriod <= 0) { + throw new IllegalArgumentException("LockCoordinator checkPeriod must be positive, got: " + checkPeriod); + } + this.checkPeriod = checkPeriod; + return this; + } + + public Map getProperties() { + return properties; + } + + @Override + public String toString() { + return "LockCoordinatorConfiguration{" + "name='" + name + '\'' + ", lockId='" + lockId + '\'' + ", type='" + type + '\'' + ", checkPeriod=" + checkPeriod + ", properties=" + properties + '}'; + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index b74f6596dfa..4bb27eeef8e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -88,6 +88,7 @@ import org.apache.activemq.artemis.core.config.FederationConfiguration; import org.apache.activemq.artemis.core.config.HAPolicyConfiguration; import org.apache.activemq.artemis.core.config.JaasAppConfiguration; +import org.apache.activemq.artemis.core.config.LockCoordinatorConfiguration; import org.apache.activemq.artemis.core.config.MetricsConfiguration; import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.config.WildcardConfiguration; @@ -227,6 +228,8 @@ public class ConfigurationImpl extends javax.security.auth.login.Configuration i private boolean persistIDCache = ActiveMQDefaultConfiguration.isDefaultPersistIdCache(); + private Set lockCoordinatorConfigurations = new HashSet<>(); + private List incomingInterceptorClassNames = new ArrayList<>(); private List outgoingInterceptorClassNames = new ArrayList<>(); @@ -3509,6 +3512,16 @@ public Configuration setMirrorPageTransaction(boolean ignorePageTransactions) { return this; } + @Override + public Set getLockCoordinatorConfigurations() { + return lockCoordinatorConfigurations; + } + + @Override + public void addLockCoordinatorConfiguration(LockCoordinatorConfiguration configuration) { + lockCoordinatorConfigurations.add(configuration); + } + // extend property utils with ability to auto-fill and locate from collections // collection entries are identified by the name() property private static class CollectionAutoFillPropertiesUtil extends PropertyUtilsBean { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index f6ad71f36d2..25210b0de1c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -51,6 +51,7 @@ import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.config.FederationConfiguration; +import org.apache.activemq.artemis.core.config.LockCoordinatorConfiguration; import org.apache.activemq.artemis.core.config.MetricsConfiguration; import org.apache.activemq.artemis.core.config.ScaleDownConfiguration; import org.apache.activemq.artemis.core.config.TransformerConfiguration; @@ -97,6 +98,7 @@ import org.apache.activemq.artemis.core.server.SecuritySettingPlugin; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration; +import org.apache.activemq.artemis.core.server.lock.LockCoordinator; import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; import org.apache.activemq.artemis.core.server.routing.KeyType; @@ -737,6 +739,22 @@ public void parseMainConfig(final Element e, final Configuration config) throws } } + NodeList lockCoordinators = e.getElementsByTagName("lock-coordinators"); + + if (lockCoordinators != null) { + for (int i = 0; i < lockCoordinators.getLength(); i++) { + Element lockCoordinatorElement = (Element) lockCoordinators.item(i); + + for (int j = 0; j < lockCoordinatorElement.getChildNodes().getLength(); ++j) { + Node node = lockCoordinatorElement.getChildNodes().item(j); + + if (node.getNodeName().equalsIgnoreCase("lock-coordinator")) { + parseLockCoordinator((Element) node, config); + } + } + } + } + // Persistence config config.setLargeMessagesDirectory(getString(e, "large-messages-directory", config.getLargeMessagesDirectory(), NOT_NULL_OR_EMPTY)); @@ -916,6 +934,31 @@ public void parseMainConfig(final Element e, final Configuration config) throws } } + private void parseLockCoordinator(final Element lockCoordinatorElement, final Configuration mainConfig) throws Exception { + String name = lockCoordinatorElement.getAttribute("name"); + String lockId = getString(lockCoordinatorElement, "lock-id", name, NO_CHECK); + String lockType = getString(lockCoordinatorElement, "type", null, NOT_NULL_OR_EMPTY); + int checkPeriod = getInteger(lockCoordinatorElement, "check-period", LockCoordinator.DEFAULT_CHECK_PERIOD, NO_CHECK); + + HashMap properties = new HashMap<>(); + + if (parameterExists(lockCoordinatorElement, "properties")) { + final NodeList propertyNodeList = lockCoordinatorElement.getElementsByTagName("property"); + final int propertiesCount = propertyNodeList.getLength(); + properties = new HashMap<>(propertiesCount); + for (int i = 0; i < propertiesCount; i++) { + final Element propertyNode = (Element) propertyNodeList.item(i); + final String propertyName = propertyNode.getAttributeNode("key").getValue(); + final String propertyValue = propertyNode.getAttributeNode("value").getValue(); + properties.put(propertyName, propertyValue); + } + } + + LockCoordinatorConfiguration lockCoordinatorConfiguration = new LockCoordinatorConfiguration(properties).setName(name).setLockId(lockId).setLockType(lockType).setCheckPeriod(checkPeriod); + mainConfig.addLockCoordinatorConfiguration(lockCoordinatorConfiguration); + } + + private void parseJournalRetention(final Element e, final Configuration config) { NodeList retention = e.getElementsByTagName("journal-retention-directory"); @@ -1621,13 +1664,21 @@ private TransportConfiguration parseAcceptorTransportConfiguration(final Element final Configuration mainConfig) throws Exception { Node nameNode = e.getAttributes().getNamedItem("name"); + String lockCoordinator = e.getAttribute("lock-coordinator"); + + String name = nameNode != null ? nameNode.getNodeValue() : null; String uri = e.getChildNodes().item(0).getNodeValue(); List configurations = ConfigurationUtils.parseAcceptorURI(name, uri); + TransportConfiguration transportConfiguration = configurations.get(0); - Map params = configurations.get(0).getParams(); + Map params = transportConfiguration.getParams(); + + if (lockCoordinator != null) { + transportConfiguration.setLockCoordinator(lockCoordinator); + } if (mainConfig.isMaskPassword() != null) { params.put(ActiveMQDefaultConfiguration.getPropMaskPassword(), mainConfig.isMaskPassword()); @@ -1637,7 +1688,7 @@ private TransportConfiguration parseAcceptorTransportConfiguration(final Element } } - return configurations.get(0); + return transportConfiguration; } private TransportConfiguration parseConnectorTransportConfiguration(final Element e, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java index e4fb9fd2188..a0d48cc2a3b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java @@ -85,6 +85,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; +import org.apache.activemq.artemis.core.server.lock.LockCoordinator; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.NotificationService; import org.apache.activemq.artemis.core.server.metrics.MetricsManager; @@ -125,6 +126,8 @@ public class NettyAcceptor extends AbstractAcceptor { } } + LockCoordinator lockCoordinator; + //just for debug private final String protocolsString; @@ -441,6 +444,17 @@ private Object loadSSLContext() { } } + @Override + public LockCoordinator getLockCoordinator() { + return lockCoordinator; + } + + @Override + public NettyAcceptor setLockCoordinator(LockCoordinator lockCoordinator) { + this.lockCoordinator = lockCoordinator; + return this; + } + public int getTcpReceiveBufferSize() { return tcpReceiveBufferSize; } @@ -451,6 +465,15 @@ public SSLContextConfig getSSLContextConfig() { @Override public synchronized void start() throws Exception { + if (lockCoordinator == null) { + internalStart(); + } else { + lockCoordinator.onLockAcquired(this::internalStart); + lockCoordinator.onLockReleased(this::internalStop); + } + } + + private void internalStart() throws Exception { if (channelClazz != null) { // Already started return; @@ -770,6 +793,14 @@ public Map getConfiguration() { @Override public void stop() throws Exception { + if (lockCoordinator != null) { + lockCoordinator.stop(); + } else { + internalStop(); + } + } + + private void internalStop() throws Exception { CountDownLatch latch = new CountDownLatch(1); asyncStop(latch::countDown); latch.await(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java index c4ec118c1c8..932a5fb2567 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java @@ -71,6 +71,7 @@ import org.apache.activemq.artemis.core.server.ServiceRegistry; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterManager; +import org.apache.activemq.artemis.core.server.lock.LockCoordinator; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.reload.ReloadManager; import org.apache.activemq.artemis.logs.AuditLogger; @@ -284,6 +285,14 @@ public Acceptor createAcceptor(TransportConfiguration info) { } acceptor = factory.createAcceptor(info.getName(), clusterConnection, info.getParams(), new DelegatingBufferHandler(), this, threadPool, scheduledThreadPool, selectedProtocols, server.getThreadGroupName("remoting-" + info.getName()), server.getMetricsManager()); + if (info.getLockCoordinator() != null) { + LockCoordinator lockCoordinator = server.getLockCoordinator(info.getLockCoordinator()); + if (lockCoordinator == null) { + ActiveMQServerLogger.LOGGER.lockCoordinatorNotFoundOnAcceptor(info.getLockCoordinator(), acceptor.getName()); + } else { + acceptor.setLockCoordinator(lockCoordinator); + } + } if (defaultInvmSecurityPrincipal != null && acceptor.isUnsecurable()) { acceptor.setDefaultActiveMQPrincipal(defaultInvmSecurityPrincipal); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index d6b0662a7b7..3158483591a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -56,6 +56,7 @@ import org.apache.activemq.artemis.core.server.impl.Activation; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.ConnectorsService; +import org.apache.activemq.artemis.core.server.lock.LockCoordinator; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.metrics.MetricsManager; import org.apache.activemq.artemis.core.server.mirror.MirrorController; @@ -177,6 +178,8 @@ enum SERVER_STATE { CriticalAnalyzer getCriticalAnalyzer(); + LockCoordinator getLockCoordinator(String name); + void updateStatus(String component, String statusJson); /** diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index fa75e685d40..f30452053b4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1518,4 +1518,16 @@ void slowConsumerDetected(String sessionID, @LogMessage(id = 224153, value = "Unable to find page {} on Address {} while reloading ACKNOWLEDGE_CURSOR, deleting record {}.", level = LogMessage.Level.INFO) void cannotFindPageFileDuringPageAckReload(long pageNr, Object address, long id); + + @LogMessage(id = 224154, value = "Invalid type configured on LockCoordinator {}, type {} does not exist", level = LogMessage.Level.WARN) + void invalidTypeLockCoordinator(String name, String type); + + @LogMessage(id = 224155, value = "LockCoordinator {} not found on acceptor {}", level = LogMessage.Level.WARN) + void lockCoordinatorNotFoundOnAcceptor(String lockName, String acceptorName); + + @LogMessage(id = 224156, value = "LockCoordinator {} starting with type={} and lockID={} with checkPeriod={} milliseconds", level = LogMessage.Level.INFO) + void lockCoordinatorStarting(String lockName, String type, String lockID, int checkPeriod); + + @LogMessage(id = 224157, value = "At least one of the components failed to start under the lockCoordinator {}. A retry will be executed", level = LogMessage.Level.INFO) + void retryLockCoordinator(String name); } \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 0d434133239..6a83ff7242e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -73,6 +73,7 @@ import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.config.FederationConfiguration; import org.apache.activemq.artemis.core.config.HAPolicyConfiguration; +import org.apache.activemq.artemis.core.config.LockCoordinatorConfiguration; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationBrokerPlugin; import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; import org.apache.activemq.artemis.core.config.impl.LegacyJMSConfiguration; @@ -161,6 +162,7 @@ import org.apache.activemq.artemis.core.server.group.impl.LocalGroupingHandler; import org.apache.activemq.artemis.core.server.group.impl.RemoteGroupingHandler; import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager; +import org.apache.activemq.artemis.core.server.lock.LockCoordinator; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl; import org.apache.activemq.artemis.core.server.metrics.BrokerMetricNames; @@ -195,6 +197,9 @@ import org.apache.activemq.artemis.core.transaction.ResourceManager; import org.apache.activemq.artemis.core.transaction.impl.ResourceManagerImpl; import org.apache.activemq.artemis.core.version.Version; +import org.apache.activemq.artemis.lockmanager.DistributedLockManager; +import org.apache.activemq.artemis.lockmanager.DistributedLockManagerFactory; +import org.apache.activemq.artemis.lockmanager.Registry; import org.apache.activemq.artemis.logs.AuditLogger; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -291,6 +296,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { private ReplayManager replayManager; + private ConcurrentHashMap lockCoordinators = new ConcurrentHashMap<>(); + /** * Certain management operations shouldn't use more than one thread. this semaphore is used to guarantee a single * thread used. @@ -552,6 +559,10 @@ public boolean isRebuildCounters() { return this.rebuildCounters; } + @Override + public LockCoordinator getLockCoordinator(String name) { + return lockCoordinators.get(name); + } @Override public void replay(Date start, Date end, String address, String target, String filter) throws Exception { @@ -735,6 +746,8 @@ private void internalStart() throws Exception { ActiveMQServerLogger.LOGGER.serverStarting((haPolicy.isBackup() ? "Backup" : "Primary"), configuration); + startLockCoordinators(); + final boolean wasPrimary = !haPolicy.isBackup(); if (!haPolicy.isBackup()) { activation = haPolicy.createActivation(this, false, activationParams, ioCriticalErrorListener); @@ -793,6 +806,33 @@ private void internalStart() throws Exception { } } + private void startLockCoordinators() { + for (LockCoordinatorConfiguration lockCoordinatorConfiguration : configuration.getLockCoordinatorConfigurations()) { + String lockType = lockCoordinatorConfiguration.getLockType(); + String name = lockCoordinatorConfiguration.getName(); + String lockId = lockCoordinatorConfiguration.getLockId(); + int checkPeriod = lockCoordinatorConfiguration.getCheckPeriod(); + + DistributedLockManagerFactory distributedLockManagerFactory = Registry.getInstance().getFactory(lockType); + if (distributedLockManagerFactory == null) { + ActiveMQServerLogger.LOGGER.invalidTypeLockCoordinator(lockCoordinatorConfiguration.getName(), lockType); + continue; + } + + DistributedLockManager lockManager = distributedLockManagerFactory.build(lockCoordinatorConfiguration.getProperties()); + LockCoordinator lockCoordinator = new LockCoordinator(scheduledPool, executorFactory.getExecutor(), checkPeriod, lockManager, lockId, name); + lockCoordinators.put(name, lockCoordinator); + ActiveMQServerLogger.LOGGER.lockCoordinatorStarting(name, lockType, lockId, checkPeriod); + lockCoordinator.start(); + } + } + + private void stopLockCoordinators() { + if (lockCoordinators != null) { + lockCoordinators.values().forEach(LockCoordinator::stop); + lockCoordinators.clear(); + } + } private void takingLongToStart(Object criticalComponent) { ActiveMQServerLogger.LOGGER.tooLongToStart(criticalComponent); @@ -1381,6 +1421,8 @@ private void stop(boolean failoverOnServerShutdown, ActiveMQServerLogger.LOGGER.errorStoppingComponent(remotingService.getClass().getName(), t); } + stopLockCoordinators(); + stopComponent(pagingManager); if (!criticalIOError && pagingManager != null) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/lock/LockCoordinator.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/lock/LockCoordinator.java new file mode 100644 index 00000000000..b614cb6ce81 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/lock/LockCoordinator.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.lock; + +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.lockmanager.DistributedLock; +import org.apache.activemq.artemis.lockmanager.DistributedLockManager; +import org.apache.activemq.artemis.utils.RunnableEx; +import org.apache.activemq.artemis.utils.SimpleFutureImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages distributed locks a pluggable distributed lock mechanism. + *

+ * The LockMonitor periodically attempts to acquire a distributed lock. When the lock + * is acquired, registered "acquired" callbacks are executed. When the lock is lost + * or released, "released" callbacks are executed. + * + * @see org.apache.activemq.artemis.lockmanager.DistributedLockManager + */ +public class LockCoordinator extends ActiveMQScheduledComponent { + + /** Default period (in milliseconds) for checking lock status */ + public static final int DEFAULT_CHECK_PERIOD = 5000; + + String debugInfo; + + public String getDebugInfo() { + return debugInfo; + } + + public LockCoordinator setDebugInfo(String debugInfo) { + this.debugInfo = debugInfo; + return this; + } + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final ArrayList lockAcquiredCallback = new ArrayList<>(); + private final ArrayList lockReleasedCallback = new ArrayList<>(); + private final long checkPeriod; + private final String name; + private final String lockID; + + DistributedLockManager lockManager; + DistributedLock distributedLock; + volatile boolean locked; + + public DistributedLockManager getLockManager() { + return lockManager; + } + + /** + * Registers a callback to be executed when lock is acquired. + * If the lock is already held when this method is called, the callback + * will be executed immediately (on the executor thread). + * + * Also In case the runnable throws any exceptions, the lock will be released, any previously added callback will be called for stop + * and the monitor will retry the locks + * + * @param runnable the callback to execute when lock is acquired + */ + public void onLockAcquired(RunnableEx runnable) { + this.lockAcquiredCallback.add(runnable); + // if it's locked we run the runnable being added, + // however we must check this inside the executor + // or within a global locking + executor.execute(() -> runIfLocked(runnable)); + } + + /** + * Registers a callback to be executed when lock is released or lost. + * + * @param runnable the callback to execute when lock is released + */ + public void onLockReleased(RunnableEx runnable) { + this.lockReleasedCallback.add(runnable); + } + + /** + * Stops the lock coordinator, releasing any held locks and cleaning up resources. + * This method blocks until all cleanup is complete. + */ + @Override + public void stop() { + super.stop(); + SimpleFutureImpl simpleFuture = new SimpleFutureImpl<>(); + executor.execute(() -> { + if (locked) { + fireLockChanged(false); + } + if (distributedLock != null) { + try { + distributedLock.unlock(); + } catch (Exception e) { + logger.debug("Error unlocking during stop", e); + } + try { + distributedLock.close(); + } catch (Exception e) { + logger.debug("Error closing lock during stop", e); + } + distributedLock = null; + } + if (lockManager != null) { + try { + lockManager.stop(); + } catch (Exception e) { + logger.debug("Error stopping lock manager during stop", e); + } + lockManager = null; + } + simpleFuture.set(null); + }); + try { + simpleFuture.get(); + } catch (Exception e) { + logger.debug("Error waiting for stop to complete", e); + } + } + + /** + * Returns whether this instance currently holds the lock. + * + * @return true if lock is currently held, false otherwise + */ + public boolean isLocked() { + return locked; + } + + /** + * Constructs a new LockCoordinator. + * + * @param scheduledExecutor the executor for scheduling periodic lock checks + * @param executor the executor for running callbacks + * @param checkPeriod how often to check lock status (in milliseconds) + * @param lockManager the distributed lock manager implementation to use + * @param lockID the unique identifier for the lock + * @param name a descriptive name for this lock coordinator + */ + public LockCoordinator(ScheduledExecutorService scheduledExecutor, Executor executor, long checkPeriod, DistributedLockManager lockManager, String lockID, String name) { + super(scheduledExecutor, executor, checkPeriod, checkPeriod, TimeUnit.MILLISECONDS, false); + assert executor != null; + this.lockManager = lockManager; + this.checkPeriod = checkPeriod; + this.lockID = lockID; + this.name = name; + } + + private void fireLockChanged(boolean locked) { + this.locked = locked; + if (locked) { + AtomicBoolean treatErrors = new AtomicBoolean(false); + lockAcquiredCallback.forEach(r -> doRunTreatingErrors(r, treatErrors)); + if (treatErrors.get()) { + retryLock(); + } + } else { + lockReleasedCallback.forEach(this::doRunWithLogException); + } + } + + private void retryLock() { + ActiveMQServerLogger.LOGGER.retryLockCoordinator(name); + // Release lock and retry on next scheduled run if callbacks failed + executor.execute(this::executeRetryLock); + } + + // to be used as a runnable on the executor + private void executeRetryLock() { + if (locked) { + logger.debug("Unlocking to retry the callback"); + fireLockChanged(false); + if (distributedLock != null) { + try { + distributedLock.unlock(); + distributedLock.close(); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + distributedLock = null; + } + if (lockManager != null) { + try { + lockManager.stop(); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + } + } + } + + private void runIfLocked(RunnableEx checkBeingAdded) { + if (locked) { + try { + doRun(checkBeingAdded); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + retryLock(); + } + } + } + + private void doRunTreatingErrors(RunnableEx r, AtomicBoolean errorOnStart) { + try { + r.run(); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + errorOnStart.set(true); + } + } + + private void doRun(RunnableEx r) throws Exception { + r.run(); + } + + private void doRunWithLogException(RunnableEx r) { + try { + r.run(); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + } + } + + @Override + public void run() { + try { + if (!locked) { + if (!lockManager.isStarted()) { + lockManager.start(); + } + DistributedLock lock = lockManager.getDistributedLock(lockID); + if (lock.tryLock(1, TimeUnit.SECONDS)) { + logger.debug("Succeeded on locking {}, lockID={}", name, lockID); + this.distributedLock = lock; + fireLockChanged(true); + } else { + logger.debug("Not able to lock {}, lockID={}", name, lockID); + lock.close(); + lockManager.stop(); + } + } else { + if (!distributedLock.isHeldByCaller()) { + fireLockChanged(false); + distributedLock.close(); + distributedLock = null; + lockManager.stop(); + } + } + } catch (Exception e) { + fireLockChanged(false); + if (distributedLock != null) { + try { + distributedLock.close(); + } catch (Exception closeEx) { + logger.debug("Error closing lock", closeEx); + } + distributedLock = null; + } + if (lockManager != null) { + try { + lockManager.stop(); + } catch (Exception stopEx) { + logger.debug("Error stopping lock manager", stopEx); + } + } + logger.warn(e.getMessage(), e); + } + } +} + diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Acceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Acceptor.java index 5914c8c59e7..baa6fd7b223 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Acceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Acceptor.java @@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.security.ActiveMQPrincipal; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; +import org.apache.activemq.artemis.core.server.lock.LockCoordinator; import org.apache.activemq.artemis.core.server.management.NotificationService; import static org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_AUTO_START; @@ -39,6 +40,14 @@ public interface Acceptor extends ActiveMQComponent { */ String getName(); + default Acceptor setLockCoordinator(LockCoordinator lockCoordinator) { + return this; + } + + default LockCoordinator getLockCoordinator() { + return null; + } + /** * Pause the acceptor and stop it from receiving client requests. */ diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 0cf7e94fa3a..795cada91d9 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -551,6 +551,8 @@ + + @@ -3224,6 +3226,78 @@ + + + + + + + The lockID the implementation will use to reach the lock provider. + This is different from the lock-coordinator name, but if lock-id is omitted, we will use name of the lock-coordinator as a value. + Notice this feature is in tech preview and its configuration is subjected to changes. + + + + + + + The type of lock being used for coordination. Options: file, ZK. + Notice this is a pluggable functionality so a provider may introduce additional options. + + + + + + + A period used to verify if the lock still valid and renew it if needed. + + + + + + + A list of options for the distributed-primitive-manager + + + + + + + + A key-value pair option for the distributed-primitive-manager + + + + + + + + + + + + unique name for the lock coordinator + + + + + + + + + + a list of lock coordinators + + + + + + + + + + + @@ -4901,6 +4975,8 @@ + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java index 8987868d028..a0bbe97a41d 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java @@ -29,8 +29,10 @@ import static org.junit.jupiter.api.Assertions.fail; import java.beans.PropertyDescriptor; +import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; import java.io.File; +import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.PrintWriter; import java.io.StringReader; @@ -62,6 +64,7 @@ import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.ConfigurationUtils; import org.apache.activemq.artemis.core.config.HAPolicyConfiguration; +import org.apache.activemq.artemis.core.config.LockCoordinatorConfiguration; import org.apache.activemq.artemis.core.config.ScaleDownConfiguration; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBridgeAddressPolicyElement; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBridgeBrokerConnectionElement; @@ -3044,6 +3047,79 @@ public void testExportInvalidPropertyOnAcceptor() throws Exception { assertDoesNotThrow(() -> configuration.exportAsProperties(fileOutput)); } + /** + * Verifies the lock coordinator configuration parsing and export process: + *

    + *
  • Creates a configuration from broker properties
  • + *
  • Validates the configuration output
  • + *
  • Exports the configuration back to a new {@link java.util.Properties}
  • + *
  • Verifies that the new output contains all initially specified properties
  • + *
+ */ + @Test + public void testParseLockCoordinator() throws Exception { + Properties properties = new Properties(); + + properties.put("lockCoordinatorConfigurations.hello.checkPeriod", "123"); + properties.put("lockCoordinatorConfigurations.hello.lockId", "lock-id"); + properties.put("lockCoordinatorConfigurations.hello.name", "hello"); + properties.put("lockCoordinatorConfigurations.hello.lockType", "someLock"); + for (int i = 0; i < 10; i++) { + properties.put("lockCoordinatorConfigurations.hello.properties.k" + i, "v" + i); + } + + properties.put("acceptorConfigurations.netty.factoryClassName", "netty"); + properties.put("acceptorConfigurations.netty.lockCoordinator", "hello"); + properties.put("acceptorConfigurations.netty.name", "netty"); + properties.put("acceptorConfigurations.netty.params.port", "8888"); + properties.put("acceptorConfigurations.netty.params.host", "localhost"); + + ConfigurationImpl configuration = new ConfigurationImpl(); + configuration.parsePrefixedProperties(properties, null); + + assertEquals(1, configuration.getAcceptorConfigurations().size()); + TransportConfiguration acceptorConfig = null; + for (TransportConfiguration t :configuration.getAcceptorConfigurations()) { + acceptorConfig = t; + } + // I am not going to validate all the parameters from netty since this is already tested elsewhere + assertEquals("hello", acceptorConfig.getLockCoordinator()); + assertEquals("netty", acceptorConfig.getFactoryClassName()); + + assertEquals(1, configuration.getLockCoordinatorConfigurations().size()); + + LockCoordinatorConfiguration lockCoordinatorConfiguration = null; + + for (LockCoordinatorConfiguration t : configuration.getLockCoordinatorConfigurations()) { + lockCoordinatorConfiguration = t; + } + Map lockProperties = lockCoordinatorConfiguration.getProperties(); + for (int i = 0; i < 10; i++) { + assertEquals("v" + i, lockProperties.get("k" + i)); + } + + assertEquals(123, lockCoordinatorConfiguration.getCheckPeriod()); + assertEquals("lock-id", lockCoordinatorConfiguration.getLockId()); + assertEquals("hello", lockCoordinatorConfiguration.getName()); + assertEquals("someLock", lockCoordinatorConfiguration.getLockType()); + + File outputProperty = new File(getTestDirfile(), "broker.properties"); + configuration.exportAsProperties(outputProperty); + + Properties brokerProperties = new Properties(); + + try (FileInputStream is = new FileInputStream(outputProperty)) { + BufferedInputStream bis = new BufferedInputStream(is); + brokerProperties.load(bis); + } + + properties.forEach((k, v) -> { + logger.debug("Validating {} = {}", k, v); + assertEquals(v, brokerProperties.get(k)); + }); + + } + /** * To test ARTEMIS-926 */ diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java index 53b07421d76..315a77d93c9 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java @@ -26,7 +26,9 @@ import java.io.ByteArrayInputStream; import java.io.PrintStream; +import java.lang.invoke.MethodHandles; import java.nio.charset.StandardCharsets; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -40,6 +42,7 @@ import org.apache.activemq.artemis.core.config.FederationConfiguration; import org.apache.activemq.artemis.core.config.FileDeploymentManager; import org.apache.activemq.artemis.core.config.HAPolicyConfiguration; +import org.apache.activemq.artemis.core.config.LockCoordinatorConfiguration; import org.apache.activemq.artemis.core.config.ScaleDownConfiguration; import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration; @@ -55,10 +58,14 @@ import org.apache.activemq.artemis.utils.PasswordMaskingUtil; import org.apache.activemq.artemis.utils.StringPrintStream; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.xml.sax.SAXParseException; public class FileConfigurationParserTest extends ServerTestBase { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final String PURGE_FOLDER_FALSE = """ @@ -90,6 +97,7 @@ public class FileConfigurationParserTest extends ServerTestBase { tcp://localhost:5545 tcp://localhost:5545 vm://0 + tcp://localhost:5545 @@ -130,6 +138,21 @@ public class FileConfigurationParserTest extends ServerTestBase { """; + + private static final String LOCK_COORDINATOR_PART = """ + + + sausage-factory + etcd + 333 + + + + + + """; + + /** * These "InvalidConfigurationTest*.xml" files are modified copies of {@literal ConfigurationTest-full-config.xml}, * so just diff it for changes, e.g. @@ -423,6 +446,27 @@ public void testParsingDefaultServerConfigWithENCMaskedPwd() throws Exception { assertEquals("helloworld", bconfig.getPassword()); } + @Test + public void testLockCoordinatorParse() throws Exception { + FileConfigurationParser parser = new FileConfigurationParser(); + String configStr = FIRST_PART + LOCK_COORDINATOR_PART + LAST_PART; + Configuration configuration = parser.parseMainConfig(new ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8))); + + Collection lockConfigurations = configuration.getLockCoordinatorConfigurations(); + lockConfigurations.forEach(f -> logger.info("lockConfiguration={}", f)); + assertEquals(1, lockConfigurations.size()); + for (LockCoordinatorConfiguration lockConfiguration : lockConfigurations) { + assertEquals("my-lock", lockConfiguration.getName()); + assertEquals("sausage-factory", lockConfiguration.getLockId()); + assertEquals("etcd", lockConfiguration.getLockType()); + Map properties = lockConfiguration.getProperties(); + assertEquals(2, properties.size()); + assertEquals("value1", properties.get("test1")); + assertEquals("value2", properties.get("test2")); + } + configuration.getAcceptorConfigurations().stream().filter(f -> f.getName().equals("netty-with-lock")).forEach(f -> assertEquals("my-lock", f.getLockCoordinator())); + } + @Test public void testDefaultBridgeProducerWindowSize() throws Exception { FileConfigurationParser parser = new FileConfigurationParser(); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/HAPolicyConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/HAPolicyConfigurationTest.java index 16a82a64c6f..5d42df0378e 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/HAPolicyConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/HAPolicyConfigurationTest.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -57,14 +58,28 @@ import org.apache.activemq.artemis.core.server.impl.SharedStorePrimaryActivation; import org.apache.activemq.artemis.lockmanager.DistributedLock; import org.apache.activemq.artemis.lockmanager.DistributedLockManager; +import org.apache.activemq.artemis.lockmanager.DistributedLockManagerFactory; import org.apache.activemq.artemis.lockmanager.MutableLong; +import org.apache.activemq.artemis.lockmanager.Registry; import org.apache.activemq.artemis.lockmanager.UnavailableStateException; import org.apache.activemq.artemis.tests.util.ServerTestBase; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; public class HAPolicyConfigurationTest extends ServerTestBase { + @BeforeAll + public static void register() { + Registry.getInstance().register(new FakeDistributedLockManagerFactory()); + } + + @AfterAll + public static void unregister() { + Registry.getInstance().unregisterWithType("fake"); + } + @Override @AfterEach public void tearDown() throws Exception { @@ -166,6 +181,29 @@ public void liveOnlyTest() throws Exception { } } + public static class FakeDistributedLockManagerFactory implements DistributedLockManagerFactory { + + @Override + public DistributedLockManager build(Map properties) { + return new FakeDistributedLockManager(properties); + } + + @Override + public String getName() { + return "fake"; + } + + @Override + public String getImplName() { + return FakeDistributedLockManager.class.getName(); + } + + @Override + public Set getValidParametersList() { + return Set.of(); + } + } + public static class FakeDistributedLockManager implements DistributedLockManager { private final Map config; diff --git a/docs/user-manual/_book.adoc b/docs/user-manual/_book.adoc index d32db5faa83..bf048d4c29e 100644 --- a/docs/user-manual/_book.adoc +++ b/docs/user-manual/_book.adoc @@ -65,6 +65,7 @@ include::network-isolation.adoc[leveloffset=1] include::restart-sequence.adoc[leveloffset=1] include::activation-tools.adoc[leveloffset=1] include::amqp-broker-connections.adoc[leveloffset=1] +include::lock-coordination.adoc[leveloffset=1] include::federation.adoc[leveloffset=1] include::federation-address.adoc[leveloffset=1] include::federation-queue.adoc[leveloffset=1] diff --git a/docs/user-manual/_diagrams/lock-coordination-example.odg b/docs/user-manual/_diagrams/lock-coordination-example.odg new file mode 100644 index 00000000000..d82f5b5c729 Binary files /dev/null and b/docs/user-manual/_diagrams/lock-coordination-example.odg differ diff --git a/docs/user-manual/images/lock-coordination-example.png b/docs/user-manual/images/lock-coordination-example.png new file mode 100644 index 00000000000..752c2909d39 Binary files /dev/null and b/docs/user-manual/images/lock-coordination-example.png differ diff --git a/docs/user-manual/lock-coordination.adoc b/docs/user-manual/lock-coordination.adoc new file mode 100644 index 00000000000..bbae74f4d01 --- /dev/null +++ b/docs/user-manual/lock-coordination.adoc @@ -0,0 +1,163 @@ += Lock Coordination +:idprefix: +:idseparator: - +:docinfo: shared + +The Lock Coordinator provides pluggable distributed lock mechanism monitoring. +It allows multiple broker instances to coordinate the activation of specific configuration elements, ensuring that only one broker instance activates a particular element at any given time. + +When a broker acquires a lock through a distributed lock, the associated configuration elements are activated. +If the lock is lost or released, those elements are deactivated. + +In the current version, the Lock Coordinator can be applied to control the startup and shutdown of acceptors. +When an acceptor is associated with a lock coordinator, it will only start accepting connections when the broker successfully acquires the distributed lock. +If lock is lost for any reason, the acceptor automatically stops accepting new connections. + +The same pattern used on acceptors may eventually be applied to other configuration elements. +If you have ideas for additional use cases where this pattern could be applied, please file a JIRA issue. + +WARNING: This feature is in technical preview and its configuration elements are subject to possible modifications. + +== Configuration + +It is possible to specify multiple lock-coordinators and associate them with other broker elements. + +The broker element associated with a lock-coordinator (e.g., an acceptor) will only be started if the distributed lock can be acquired. +If the lock cannot be acquired or is lost, the associated element will be stopped. + +This pattern can be used to ensure clients connect to only one of your mirrored brokers at a time, preventing split-brain scenarios and duplicate message processing. + +Depending on the provider selector, multiple configuration options can be provided. +Please consult the javadoc for your lock implementation. +A simple table will be provided in this chapter for the two reference implementations we provide, but this could be a plugin being added to your broker. + +In this next example, we configure a broker with: + +* Two acceptors: one for mirroring traffic (`for-mirroring-only`) and one for client connections (`for-clients-only`) +* A File-based lock-coordinator named `clients-lock` +* The client acceptor associated with the lock-coordinator, so it only activates when the distributed lock is acquired +* A mirror connection to another broker for data replication + +[,xml] +---- + + tcp://0.0.0.0:61001?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 + + + + + file + mirror-cluster-clients + 1000 + + + + + + + + + + + + + + + +---- + +In the previous configuration, the broker will use a file lock, and the acceptor will only be active if it can hold the distributed lock between the mirrored brokers. + +image:images/lock-coordination-example.png[HA with mirroring] + +You can find a https://github.com/apache/artemis-examples/tree/main/examples/features/broker-connection/ha-with-mirroring[working example] on how to run HA with Mirroring. + +== Configuration Options + +=== Common Configuration + +The following elements are configured on lock-coordinator + +[cols="1,1,1,3"] +|=== +|Element |Required |Default |Description + +|name +|Yes +|None +|Unique identifier for this lock-coordinator instance, used to reference it from other configuration elements + +|type +|Yes +|None +|The lock provider type (e.g., "FILE" or "ZK") + +|lock-id +|Yes +|None +|Unique identifier for the distributed lock. All brokers competing for the same distributed lock must use the same lock-id + +|check-period +|No +|5000 +|How often to check if the lock is still valid, in milliseconds +|=== + +=== File + +The file-based lock uses the file system to manage distributed locks. + +[cols="1,1,1,3"] +|=== +|Property |Required |Default |Description + +|locks-folder +|Yes +|None +|Path to the directory where lock files will be created and managed. The directory must be created in advance before using this lock. +|=== + +=== ZooKeeper + +The ZooKeeper-based lock uses Apache Curator to manage distributed locks via ZooKeeper. + +[cols="1,1,1,3"] +|=== +|Property |Required |Default |Description + +|connect-string +|Yes +|None +|ZooKeeper connection string (e.g., "localhost:2181" or "host1:2181,host2:2181,host3:2181") + +|namespace +|Yes +|None +|Namespace prefix for all ZooKeeper paths to isolate data + +|session-ms +|No +|18000 +|Session timeout in milliseconds + +|session-percent +|No +|33 +|Percentage of session timeout to use for lock operations + +|connection-ms +|No +|8000 +|Connection timeout in milliseconds + +|retries +|No +|1 +|Number of retry attempts for failed operations + +|retries-ms +|No +|1000 +|Delay in milliseconds between retry attempts +|=== diff --git a/docs/user-manual/restart-sequence.adoc b/docs/user-manual/restart-sequence.adoc index fa535aee76e..dc1f90e8d77 100644 --- a/docs/user-manual/restart-sequence.adoc +++ b/docs/user-manual/restart-sequence.adoc @@ -1,11 +1,15 @@ -= Restart Sequence += Restart Sequence if using Journal replication :idprefix: :idseparator: - :docinfo: shared -{project-name-full} ships with 2 architectures for providing HA features. -The primary and backup brokers can be configured either using network replication or using shared storage. -This document will share restart sequences for the brokers under various circumstances when the client applications are connected to it. +{project-name-full} ships with 3 possibilities for providing HA: + +- Shared storage +- Network Journal Replication +- AMQP Broker Connection Mirroring + +This page will cover steps to restart the broker while using journal replication. == Restarting 1 broker at a time diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/lockmanager/LockManagerNettyNoGroupNameReplicatedFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/lockmanager/LockManagerNettyNoGroupNameReplicatedFailoverTest.java index 92830e25127..f89f54e9023 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/lockmanager/LockManagerNettyNoGroupNameReplicatedFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/lockmanager/LockManagerNettyNoGroupNameReplicatedFailoverTest.java @@ -38,6 +38,7 @@ import org.apache.activemq.artemis.dto.WebServerDTO; import org.apache.activemq.artemis.lockmanager.MutableLong; import org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager; +import org.apache.activemq.artemis.lockmanager.file.FileBasedLockManagerFactory; import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTest; import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer; import org.apache.activemq.artemis.tests.util.Wait; @@ -226,7 +227,8 @@ public static void doDecrementActivationSequenceForForceRestartOf(Logger log, No nodeManager.start(); long localActivation = nodeManager.readNodeActivationSequence(); // file based - FileBasedLockManager fileBasedPrimitiveManager = new FileBasedLockManager(DistributedLockManagerConfiguration.getProperties()); + FileBasedLockManagerFactory factory = new FileBasedLockManagerFactory(); + FileBasedLockManager fileBasedPrimitiveManager = (FileBasedLockManager) factory.build(DistributedLockManagerConfiguration.getProperties()); fileBasedPrimitiveManager.start(); try { MutableLong mutableLong = fileBasedPrimitiveManager.getMutableLong(nodeManager.getNodeId().toString()); diff --git a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/A/broker.xml b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/A/broker.xml new file mode 100644 index 00000000000..08580a0eea8 --- /dev/null +++ b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/A/broker.xml @@ -0,0 +1,205 @@ + + + + + + + + 0.0.0.0 + + true + + + NIO + + ./data/paging + + ./data/bindings + + ./data/journal + + ./data/large-messages + + true + + 2 + + -1 + + 1000 + + false + + + + + + + + + + + + + + + + + + + + + + 5000 + + + 90 + + + + + + + + + + + + tcp://0.0.0.0:61000?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 + + + + + ZK + fail + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + + + DLQ + ExpiryQueue + 0 + + -1 + 1 + 10 + PAGE + true + true + + + + +
+ + + +
+
+ + + +
+
+ + + + + +
+
+ +
+
diff --git a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/B/broker.xml b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/B/broker.xml new file mode 100644 index 00000000000..c515bf7bbdc --- /dev/null +++ b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/B/broker.xml @@ -0,0 +1,205 @@ + + + + + + + + 0.0.0.0 + + true + + + NIO + + ./data/paging + + ./data/bindings + + ./data/journal + + ./data/large-messages + + true + + 2 + + -1 + + 1000 + + false + + + + + + + + + + + + + + + + + + + + + + 5000 + + + 90 + + + + + + + + + + + + tcp://0.0.0.0:61001?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 + + + + + ZK + fail + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + + + DLQ + ExpiryQueue + 0 + + -1 + 1 + 10 + PAGE + true + true + + + + +
+ + + +
+
+ + + +
+
+ + + + + +
+
+ +
+
diff --git a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/A/broker.xml b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/A/broker.xml new file mode 100644 index 00000000000..f5f50f78360 --- /dev/null +++ b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/A/broker.xml @@ -0,0 +1,186 @@ + + + + + + + + 0.0.0.0 + + true + + + NIO + + ./data/paging + + ./data/bindings + + ./data/journal + + ./data/large-messages + + true + + 2 + + -1 + + 1000 + + false + + + + + + + + + + + + + + + + + + + 5000 + + + 90 + + + + + tcp://0.0.0.0:61000?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + + + DLQ + ExpiryQueue + 0 + + -1 + 1 + 10 + PAGE + true + true + + + + +
+ + + +
+
+ + + +
+
+ + + + + +
+
+ +
+
diff --git a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/B/broker.xml b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/B/broker.xml new file mode 100644 index 00000000000..8b9b054733a --- /dev/null +++ b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/B/broker.xml @@ -0,0 +1,187 @@ + + + + + + + + 0.0.0.0 + + true + + + NIO + + ./data/paging + + ./data/bindings + + ./data/journal + + ./data/large-messages + + true + + 2 + + -1 + + 1000 + + false + + + + + + + + + + + + + + + + + + + + + + 5000 + + + 90 + + + + + tcp://0.0.0.0:61001?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + + + DLQ + ExpiryQueue + 0 + + -1 + 1 + 10 + PAGE + true + true + + + + +
+ + + +
+
+ + + +
+
+ + + + + +
+
+ +
+
diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/DualMirrorSingleAcceptorRunningTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/DualMirrorSingleAcceptorRunningTest.java new file mode 100644 index 00000000000..d05bde63686 --- /dev/null +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/DualMirrorSingleAcceptorRunningTest.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.activemq.artemis.tests.smoke.lockmanager; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.io.File; +import java.io.FileOutputStream; +import java.lang.invoke.MethodHandles; +import java.util.Properties; +import java.util.function.Consumer; + +import org.apache.activemq.artemis.api.core.management.SimpleManagement; +import org.apache.activemq.artemis.cli.commands.helper.HelperCreate; +import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.FileUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class DualMirrorSingleAcceptorRunningTest extends SmokeTestBase { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public static final String SERVER_NAME_WITH_ZK_A = "lockmanager/dualMirrorSingleAcceptor/ZK/A"; + public static final String SERVER_NAME_WITH_ZK_B = "lockmanager/dualMirrorSingleAcceptor/ZK/B"; + + public static final String SERVER_NAME_WITH_FILE_A = "lockmanager/dualMirrorSingleAcceptor/file/A"; + public static final String SERVER_NAME_WITH_FILE_B = "lockmanager/dualMirrorSingleAcceptor/file/B"; + + // Test constants + private static final int ALTERNATING_TEST_ITERATIONS = 2; + private static final int MESSAGES_SENT_PER_ITERATION = 100; + private static final int MESSAGES_CONSUMED_PER_ITERATION = 17; + private static final int MESSAGES_REMAINING_PER_ITERATION = MESSAGES_SENT_PER_ITERATION - MESSAGES_CONSUMED_PER_ITERATION; + private static final int EXPECTED_FINAL_MESSAGE_COUNT = ALTERNATING_TEST_ITERATIONS * MESSAGES_REMAINING_PER_ITERATION; + + private static final int ZK_BASE_PORT = 2181; + + Process processA; + Process processB; + + private static void customizeFileServer(File serverLocation, File fileLock) { + try { + FileUtil.findReplace(new File(serverLocation, "/etc/broker.xml"), "CHANGEME", fileLock.getAbsolutePath()); + } catch (Throwable e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + private static void createServerPair(String serverNameA, String serverNameB, + String configPathA, String configPathB, + Consumer customizeServer) throws Exception { + File serverLocationA = getFileServerLocation(serverNameA); + File serverLocationB = getFileServerLocation(serverNameB); + deleteDirectory(serverLocationB); + deleteDirectory(serverLocationA); + + createSingleServer(serverLocationA, configPathA, "A", customizeServer); + createSingleServer(serverLocationB, configPathB, "B", customizeServer); + } + + private static void createSingleServer(File serverLocation, String configPath, + String userAndPassword, Consumer customizeServer) throws Exception { + HelperCreate cliCreateServer = helperCreate(); + cliCreateServer.setAllowAnonymous(true) + .setUser(userAndPassword) + .setPassword(userAndPassword) + .setNoWeb(true) + .setConfiguration(configPath) + .setArtemisInstance(serverLocation); + cliCreateServer.createServer(); + + if (customizeServer != null) { + customizeServer.accept(serverLocation); + } + } + + @BeforeEach + public void prepareServers() throws Exception { + + } + + @Test + public void testAlternatingZK() throws Throwable { + { + createServerPair(SERVER_NAME_WITH_ZK_A, SERVER_NAME_WITH_ZK_B, + "./src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/A", + "./src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/B", + null); + + cleanupData(SERVER_NAME_WITH_ZK_A); + cleanupData(SERVER_NAME_WITH_ZK_B); + } + + // starting zookeeper + ZookeeperCluster zkCluster = new ZookeeperCluster(temporaryFolder, 1, ZK_BASE_PORT, 100); + zkCluster.start(); + runAfter(zkCluster::stop); + + testAlternating(SERVER_NAME_WITH_ZK_A, SERVER_NAME_WITH_ZK_B, null, null); + } + + @Test + public void testAlternatingFile() throws Throwable { + File fileLock = new File("./target/serverLock"); + fileLock.mkdirs(); + + { + createServerPair(SERVER_NAME_WITH_FILE_A, SERVER_NAME_WITH_FILE_B, + "./src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/A", + "./src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/B", + s -> customizeFileServer(s, fileLock)); + + cleanupData(SERVER_NAME_WITH_FILE_A); + cleanupData(SERVER_NAME_WITH_FILE_B); + } + + Properties properties = new Properties(); + + properties.put("acceptorConfigurations.artemis.extraParams.amqpCredits", "1000"); + properties.put("acceptorConfigurations.artemis.extraParams.amqpLowCredits", "300"); + properties.put("acceptorConfigurations.artemis.factoryClassName", "org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory"); + properties.put("acceptorConfigurations.artemis.lockCoordinator", "failover"); + properties.put("acceptorConfigurations.artemis.name", "artemis"); + properties.put("acceptorConfigurations.artemis.params.scheme", "tcp"); + properties.put("acceptorConfigurations.artemis.params.tcpReceiveBufferSize", "1048576"); + properties.put("acceptorConfigurations.artemis.params.port", "61616"); + properties.put("acceptorConfigurations.artemis.params.host", "localhost"); + properties.put("acceptorConfigurations.artemis.params.protocols", "CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE"); + properties.put("acceptorConfigurations.artemis.params.useEpoll", "true"); + properties.put("acceptorConfigurations.artemis.params.tcpSendBufferSize", "1048576"); + + properties.put("lockCoordinatorConfigurations.failover.checkPeriod", "5000"); + properties.put("lockCoordinatorConfigurations.failover.lockType", "file"); + properties.put("lockCoordinatorConfigurations.failover.lockId", "fail"); + properties.put("lockCoordinatorConfigurations.failover.name", "failover"); + properties.put("lockCoordinatorConfigurations.failover.properties.locks-folder", fileLock.getAbsolutePath()); + + try (FileOutputStream fileOutputStream = new FileOutputStream(new File(getServerLocation(SERVER_NAME_WITH_FILE_A), "broker.properties"))) { + properties.store(fileOutputStream, null); + } + + try (FileOutputStream fileOutputStream = new FileOutputStream(new File(getServerLocation(SERVER_NAME_WITH_FILE_B), "broker.properties"))) { + properties.store(fileOutputStream, null); + } + + // I'm using broker properties in one of the tests, to help validating it + File propertiesA = new File(getServerLocation(SERVER_NAME_WITH_FILE_A), "broker.properties"); + File propertiesB = new File(getServerLocation(SERVER_NAME_WITH_FILE_B), "broker.properties"); + + testAlternating(SERVER_NAME_WITH_FILE_A, SERVER_NAME_WITH_FILE_B, propertiesA, propertiesB); + } + + public void testAlternating(String nameServerA, String nameServerB, File brokerPropertiesA, File brokerPropertiesB) throws Throwable { + processA = startServer(nameServerA, 0, -1, brokerPropertiesA); + waitForXToStart(); + processB = startServer(nameServerB, 0, -1, brokerPropertiesB); + ConnectionFactory cfX = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61616"); + + for (int i = 0; i < ALTERNATING_TEST_ITERATIONS; i++) { + logger.info("Iteration {}: Server {} active", i, (i % 2 == 0) ? "A" : "B"); + + if (i % 2 == 0) { + // Even iteration: Server A active, kill Server B + killServer(processB); + waitForXToStart(); + } else { + // Odd iteration: Server B active, kill Server A + killServer(processA); + waitForXToStart(); + } + + // Send messages through the shared acceptor + cfX = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61616"); + sendMessages(cfX, MESSAGES_SENT_PER_ITERATION); + + // Consume some messages + receiveMessages(cfX, MESSAGES_CONSUMED_PER_ITERATION); + + // Restart the killed server + if (i % 2 == 0) { + processB = startServer(nameServerB, 0, -1, brokerPropertiesB); + } else { + processA = startServer(nameServerA, 0, -1, brokerPropertiesA); + } + } + + // Verify they both have the expected message count (iterations × (sent - consumed)) + assertMessageCount("tcp://localhost:61000", "myQueue", EXPECTED_FINAL_MESSAGE_COUNT); + assertMessageCount("tcp://localhost:61001", "myQueue", EXPECTED_FINAL_MESSAGE_COUNT); + } + + private static void sendMessages(ConnectionFactory cfX, int nmessages) throws JMSException { + try (Connection connectionX = cfX.createConnection("A", "A")) { + Session sessionX = connectionX.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = sessionX.createQueue("myQueue"); + MessageProducer producerX = sessionX.createProducer(queue); + for (int i = 0; i < nmessages; i++) { + producerX.send(sessionX.createTextMessage("hello " + i)); + } + sessionX.commit(); + } + } + + private static void receiveMessages(ConnectionFactory cfX, int nmessages) throws JMSException { + try (Connection connectionX = cfX.createConnection("A", "A")) { + connectionX.start(); + Session sessionX = connectionX.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = sessionX.createQueue("myQueue"); + MessageConsumer consumerX = sessionX.createConsumer(queue); + for (int i = 0; i < nmessages; i++) { + TextMessage message = (TextMessage) consumerX.receive(5000); + assertNotNull(message, "Expected message " + i + " but got null"); + } + sessionX.commit(); + } + } + + private void waitForXToStart() { + for (int i = 0; i < 20; i++) { + try { + ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:61616"); + Connection connection = factory.createConnection(); + connection.close(); + return; + } catch (Throwable e) { + logger.debug(e.getMessage(), e); + try { + Thread.sleep(500); + } catch (Throwable ignored) { + } + } + } + } + + protected void assertMessageCount(String uri, String queueName, int count) throws Exception { + SimpleManagement simpleManagement = new SimpleManagement(uri, null, null); + Wait.assertEquals(count, () -> { + try { + return simpleManagement.getMessageCountOnQueue(queueName); + } catch (Throwable e) { + return -1; + } + }); + } + +} \ No newline at end of file diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/LockCoordinatorTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/LockCoordinatorTest.java new file mode 100644 index 00000000000..6937c01b6cd --- /dev/null +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/LockCoordinatorTest.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.smoke.lockmanager; + +import java.io.File; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.apache.activemq.artemis.core.server.lock.LockCoordinator; +import org.apache.activemq.artemis.lockmanager.DistributedLockManager; +import org.apache.activemq.artemis.lockmanager.DistributedLockManagerFactory; +import org.apache.activemq.artemis.lockmanager.MutableLong; +import org.apache.activemq.artemis.lockmanager.Registry; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * This test needs external dependencies. It follows the same pattern described at {@link DualMirrorSingleAcceptorRunningTest}. + * please read the documentation from that test for more detail on how to run this test. + */ +public class LockCoordinatorTest extends ActiveMQTestBase { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final int ZK_BASE_PORT = 2181; + private static final String ZK_ENDPOINTS = "127.0.0.1:2181"; + private static final long KEEP_ALIVE_INTERVAL_MS = 200; + private static final int NUM_THREADS = 10; + + private ExecutorService executorService; + private ScheduledExecutorService scheduledExecutor; + private AtomicInteger lockHolderCount; + private AtomicInteger lockChanged; + private OrderedExecutorFactory executorFactory; + + @BeforeEach + @Override + public void setUp() { + disableCheckThread(); + scheduledExecutor = Executors.newScheduledThreadPool(NUM_THREADS); + executorService = Executors.newFixedThreadPool(NUM_THREADS * 2); + executorFactory = new OrderedExecutorFactory(executorService); + lockHolderCount = new AtomicInteger(0); + lockChanged = new AtomicInteger(0); + } + + @AfterEach + @Override + public void tearDown() { + scheduledExecutor.shutdownNow(); + executorService.shutdownNow(); + } + + @Test + public void testWithFile() throws Exception { + internalTest(i -> getFileCoordinators(i)); + } + + @Test + public void testWithZK() throws Exception { + ZookeeperCluster zkCluster = new ZookeeperCluster(temporaryFolder, 1, ZK_BASE_PORT, 100); + zkCluster.start(); + runAfter(zkCluster::stop); + assertEquals(ZK_ENDPOINTS, zkCluster.getConnectString()); + internalTest(i -> getZKCoordinators(i, zkCluster.getConnectString())); + } + + private void internalTest(Function> lockCoordinatorSupplier) throws Exception { + testOnlyOneLockHolderAtATime(lockCoordinatorSupplier.apply(NUM_THREADS)); + testAddAfterLocked(lockCoordinatorSupplier.apply(1).get(0)); + testRetryAfterError(lockCoordinatorSupplier.apply(1).get(0)); + testRetryAfterErrorWithDelayAdd(lockCoordinatorSupplier.apply(1).get(0)); + + { + List list = lockCoordinatorSupplier.apply(2); + testNoRetryWhileNotAcquired(list.get(0), list.get(1)); + } + } + + private void testAddAfterLocked(LockCoordinator lockCoordinator) throws Exception { + lockHolderCount.set(0); + lockChanged.set(0); + + try { + lockCoordinator.start(); + Wait.assertEquals(1, () -> lockHolderCount.get(), 15000, 100); + + AtomicInteger afterRunning = new AtomicInteger(0); + assertTrue(lockCoordinator.isLocked()); + lockCoordinator.onLockAcquired(afterRunning::incrementAndGet); + + Wait.assertEquals(1, afterRunning::get); + + assertEquals(1, lockHolderCount.get()); + } finally { + lockCoordinator.stop(); + } + } + + private void testRetryAfterError(LockCoordinator lockCoordinator) throws Exception { + lockHolderCount.set(0); + lockChanged.set(0); + + AtomicBoolean succeeded = new AtomicBoolean(false); + AtomicInteger numberOfTries = new AtomicInteger(0); + try { + lockCoordinator.onLockAcquired(() -> { + if (numberOfTries.incrementAndGet() < 5) { + throw new IOException("please retry"); + } + succeeded.set(true); + }); + lockCoordinator.start(); + + Wait.assertTrue(succeeded::get, 5000, 100); + Wait.assertEquals(1, lockHolderCount::get); + } finally { + lockCoordinator.stop(); + } + } + + private void testRetryAfterErrorWithDelayAdd(LockCoordinator lockCoordinator) throws Exception { + lockHolderCount.set(0); + lockChanged.set(0); + + AtomicBoolean succeeded = new AtomicBoolean(false); + AtomicInteger numberOfTries = new AtomicInteger(0); + try { + lockCoordinator.start(); + Wait.assertEquals(1, lockHolderCount::get); + + lockCoordinator.onLockAcquired(() -> { + if (numberOfTries.incrementAndGet() < 5) { + throw new RuntimeException("please retry"); + } + succeeded.set(true); + }); + + Wait.assertTrue(succeeded::get, 5000, 100); + Wait.assertEquals(1, lockHolderCount::get); + } finally { + lockCoordinator.stop(); + } + } + + // validate that no retry would happen since the lock wasn't held in the secondLock + private void testNoRetryWhileNotAcquired(LockCoordinator firstLock, LockCoordinator secondLock) throws Exception { + lockHolderCount.set(0); + lockChanged.set(0); + AtomicBoolean throwError = new AtomicBoolean(true); + AtomicBoolean errorHappened = new AtomicBoolean(false); + + AtomicBoolean succeeded = new AtomicBoolean(false); + try { + firstLock.start(); + Wait.assertEquals(1, lockHolderCount::get); + assertTrue(firstLock.isLocked()); + secondLock.start(); + assertFalse(secondLock.isLocked()); + + secondLock.onLockAcquired(() -> { + if (throwError.get()) { + errorHappened.set(true); + throw new RuntimeException("please retry"); + } + succeeded.set(true); + }); + + assertFalse(succeeded.get()); + assertFalse(errorHappened.get()); + firstLock.stop(); + Wait.assertTrue(errorHappened::get, 5000, 100); + throwError.set(false); + Wait.assertTrue(succeeded::get, 5000, 100); + Wait.assertEquals(1, lockHolderCount::get); + } finally { + firstLock.stop(); + secondLock.stop(); + } + } + + + private void testOnlyOneLockHolderAtATime(List lockCoordinators) throws Exception { + try { + + lockCoordinators.forEach(LockCoordinator::start); + + Wait.assertEquals(1, () -> lockHolderCount.get(), 15000, 100); + + long value = RandomUtil.randomPositiveLong(); + + boolean first = true; + + for (LockCoordinator lockCoordinator : lockCoordinators) { + MutableLong mutableLong = lockCoordinator.getLockManager().getMutableLong("mutableLong"); + if (first) { + mutableLong.set(value); + first = false; + } else { + assertEquals(value, mutableLong.get()); + } + mutableLong.close(); + } + + logger.info("Stopping ********************************************************************************"); + + // We keep stopping lockManager that is holding the lock + // we do this until we stop every one of the locks + while (!lockCoordinators.isEmpty()) { + if (!Wait.waitFor(() -> lockHolderCount.get() == 1, 15000, 100)) { + for (LockCoordinator lock : lockCoordinators) { + logger.info("lock {} is holdingLock={}", lock.getDebugInfo(), lock.isLocked()); + } + } + Wait.assertEquals(1, () -> lockHolderCount.get(), 15000, 100); + for (LockCoordinator lock : lockCoordinators) { + if (lock.isLocked()) { + long changed = lockChanged.get(); + lock.stop(); + lockCoordinators.remove(lock); + //Wait.assertTrue(() -> lockChanged.get() != changed, 5000, 100); + break; + } + } + } + + // Verify that no locks are held after stopping + Wait.assertEquals(0, () -> lockHolderCount.get(), 15000, 100); + } finally { + try { + lockCoordinators.forEach(LockCoordinator::stop); + } catch (Throwable ignored) { + } + } + } + + private List getFileCoordinators(int numberOfCoordinators) { + File file = new File(getTemporaryDir() + "/lockFolder"); + file.mkdirs(); + HashMap parameters = new HashMap<>(); + parameters.put("locks-folder", file.getAbsolutePath()); + return getLockCoordinators(numberOfCoordinators, "file", parameters); + } + + private List getZKCoordinators(int numberOfCoordinators, String connectString) { + HashMap parameters = new HashMap<>(); + parameters.put("connect-string", connectString); + return getLockCoordinators(numberOfCoordinators, "ZK", parameters); + } + + private List getLockCoordinators(int numberOfCoordinators, String factoryName, HashMap parameters) { + return getLockCoordinators(numberOfCoordinators, () -> { + DistributedLockManagerFactory factory = Registry.getInstance().getFactory(factoryName); + return factory.build(parameters); + }); + } + + private List getLockCoordinators(int numberOfCoordinators, Supplier lockManagerSupplier) { + List locks = new ArrayList<>(); + String lockName = "lock-test-" + RandomUtil.randomUUIDString(); + for (int i = 0; i < numberOfCoordinators; i++) { + DistributedLockManager lockManager = lockManagerSupplier.get(); + + LockCoordinator lockCoordinator = new LockCoordinator(scheduledExecutor, executorFactory.getExecutor(), KEEP_ALIVE_INTERVAL_MS, lockManager, lockName, lockName); + lockCoordinator.onLockAcquired(() -> lock(lockCoordinator)); + lockCoordinator.onLockReleased(() -> unlock(lockCoordinator)); + lockCoordinator.onLockReleased(() -> lockChanged.incrementAndGet()); + lockCoordinator.onLockAcquired(() -> lockChanged.incrementAndGet()); + lockCoordinator.setDebugInfo("ID" + i); + locks.add(lockCoordinator); + } + return locks; + } + + private void lock(LockCoordinator lockCoordinator) { + logger.info("++Lock {} lock", lockCoordinator.getDebugInfo()); + lockHolderCount.incrementAndGet(); + } + + private void unlock(LockCoordinator lockCoordinator) { + logger.info("--Lock {} unlocking", lockCoordinator.getDebugInfo()); + lockHolderCount.decrementAndGet(); + } + +} diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/ZookeeperCluster.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/ZookeeperCluster.java new file mode 100644 index 00000000000..2634b06060d --- /dev/null +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/ZookeeperCluster.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.smoke.lockmanager; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +import org.apache.activemq.artemis.tests.extensions.ThreadLeakCheckExtension; +import org.apache.curator.test.InstanceSpec; +import org.apache.curator.test.TestingCluster; +import org.apache.curator.test.TestingZooKeeperServer; + +/** + * This is encapsulating Zookeeper instances for tests + * */ +public class ZookeeperCluster { + private TestingCluster testingServer; + private InstanceSpec[] clusterSpecs; + private int nodes; + private final File root; + + public ZookeeperCluster(File root, int nodes, int basePort, int serverTickMS) throws IOException { + this.root = root; + this.nodes = nodes; + clusterSpecs = new InstanceSpec[nodes]; + for (int i = 0; i < nodes; i++) { + clusterSpecs[i] = new InstanceSpec(newFolder(root, "node" + i), basePort + i, -1, -1, true, -1, serverTickMS, -1); + } + testingServer = new TestingCluster(clusterSpecs); + } + + public void start() throws Exception { + testingServer.start(); + } + + public void stop() throws Exception { + ThreadLeakCheckExtension.addKownThread("ListenerHandler-"); + testingServer.stop(); + } + + private static File newFolder(File root, String... subDirs) throws IOException { + String subFolder = String.join("/", subDirs); + File result = new File(root, subFolder); + if (!result.mkdirs()) { + throw new IOException("Couldn't create folders " + root); + } + return result; + } + + public String getConnectString() { + return testingServer.getConnectString(); + } + + public List getServers() { + return testingServer.getServers(); + } + + public int getNodes() { + return nodes; + } +} diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/ZookeeperLockManagerSinglePairTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/ZookeeperLockManagerSinglePairTest.java index 634c6a80f0f..1fea82cda4b 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/ZookeeperLockManagerSinglePairTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/ZookeeperLockManagerSinglePairTest.java @@ -22,10 +22,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; -import org.apache.activemq.artemis.tests.extensions.ThreadLeakCheckExtension; import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension; -import org.apache.curator.test.InstanceSpec; -import org.apache.curator.test.TestingCluster; import org.apache.curator.test.TestingZooKeeperServer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -33,8 +30,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; import java.lang.invoke.MethodHandles; //Parameters set in super class @@ -46,34 +41,27 @@ public class ZookeeperLockManagerSinglePairTest extends LockManagerSinglePairTes // Beware: the server tick must be small enough that to let the session to be correctly expired private static final int SERVER_TICK_MS = 100; - private TestingCluster testingServer; - private InstanceSpec[] clusterSpecs; - private int nodes; + ZookeeperCluster zookeeperCluster; @BeforeEach @Override public void setup() throws Exception { super.setup(); - nodes = 3; - clusterSpecs = new InstanceSpec[nodes]; - for (int i = 0; i < nodes; i++) { - clusterSpecs[i] = new InstanceSpec(newFolder(temporaryFolder, "node" + i), BASE_SERVER_PORT + i, -1, -1, true, -1, SERVER_TICK_MS, -1); - } - testingServer = new TestingCluster(clusterSpecs); - testingServer.start(); - assertEquals("127.0.0.1:6666,127.0.0.1:6667,127.0.0.1:6668", testingServer.getConnectString()); - logger.info("Cluster of {} nodes on: {}", 3, testingServer.getConnectString()); + + zookeeperCluster = new ZookeeperCluster(temporaryFolder, 3, BASE_SERVER_PORT, SERVER_TICK_MS); + zookeeperCluster.start(); + assertEquals("127.0.0.1:6666,127.0.0.1:6667,127.0.0.1:6668", zookeeperCluster.getConnectString()); + logger.info("Cluster of {} nodes on: {}", 3, zookeeperCluster.getConnectString()); } @Override @AfterEach public void after() throws Exception { // zk bits that leak from servers - ThreadLeakCheckExtension.addKownThread("ListenerHandler-"); try { super.after(); } finally { - testingServer.close(); + zookeeperCluster.stop(); } } @@ -88,8 +76,8 @@ protected boolean awaitAsyncSetupCompleted(long timeout, TimeUnit unit) { @Override protected int[] stopMajority() throws Exception { - List followers = testingServer.getServers(); - final int quorum = (nodes / 2) + 1; + List followers = zookeeperCluster.getServers(); + final int quorum = (zookeeperCluster.getNodes() / 2) + 1; final int[] stopped = new int[quorum]; for (int i = 0; i < quorum; i++) { followers.get(i).stop(); @@ -100,18 +88,9 @@ protected int[] stopMajority() throws Exception { @Override protected void restart(int[] nodes) throws Exception { - List servers = testingServer.getServers(); + List servers = zookeeperCluster.getServers(); for (int nodeIndex : nodes) { servers.get(nodeIndex).restart(); } } - - private static File newFolder(File root, String... subDirs) throws IOException { - String subFolder = String.join("/", subDirs); - File result = new File(root, subFolder); - if (!result.mkdirs()) { - throw new IOException("Couldn't create folders " + root); - } - return result; - } }