Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions examples/features/broker-connection/ha-with-mirroring/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.artemis.examples.broker-connection</groupId>
<artifactId>broker-connections</artifactId>
<version>2.51.0-SNAPSHOT</version>
</parent>

<artifactId>ha-with-mirroring</artifactId>
<packaging>jar</packaging>
<name>Apache Artemis Zookeeper Mirroring</name>

<properties>
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.artemis</groupId>
<artifactId>artemis-cli</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.artemis</groupId>
<artifactId>artemis-commons</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.artemis</groupId>
<artifactId>artemis-maven-plugin</artifactId>
<executions>
<execution>
<id>create0</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<!-- this makes it easier in certain envs -->
<javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
<instance>${basedir}/target/server0</instance>
<allowAnonymous>true</allowAnonymous>
<configuration>${basedir}/target/classes/artemis/server0</configuration>
</configuration>
</execution>
<execution>
<id>create1</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<!-- this makes it easier in certain envs -->
<instance>${basedir}/target/server1</instance>
<allowAnonymous>true</allowAnonymous>
<configuration>${basedir}/target/classes/artemis/server1</configuration>
</configuration>
</execution>
<execution>
<id>runClient</id>
<goals>
<goal>runClient</goal>
</goals>
<configuration>
<clientClass>org.apache.artemis.jms.example.HAWithMirroringExample</clientClass>
<args>
<param>${basedir}/target/server0</param>
<param>${basedir}/target/server1</param>
</args>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.apache.artemis.examples.broker-connection</groupId>
<artifactId>ha-with-mirroring</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
121 changes: 121 additions & 0 deletions examples/features/broker-connection/ha-with-mirroring/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# High Availability with Mirroring and Distributed Locks

To run the example, simply type **mvn verify** from this directory.

This example demonstrates how to achieve high availability (HA) using mirroring combined with distributed locks from the Lock Coordinator feature. The distributed locks ensure that only one broker accepts client connections at a time, providing automatic failover without split-brain scenarios.

## Overview

This example configures two brokers (server0 and server1) that:
- Mirror all messaging operations to each other using broker connections
- Share a distributed file-based lock to coordinate which broker accepts client connections
- Automatically failover client connections when the active broker fails

## How It Works

### Mirroring Configuration

Both brokers are configured with bidirectional mirroring using AMQP broker connections. Each broker mirrors its data to the other:

**server0/broker.xml:**
```xml
<broker-connections>
<amqp-connection uri="tcp://localhost:61001" name="mirror" retry-interval="2000">
<mirror sync="true"/>
</amqp-connection>
</broker-connections>
```

**server1/broker.xml:**
```xml
<broker-connections>
<amqp-connection uri="tcp://localhost:61000" name="mirror" retry-interval="2000">
<mirror sync="false"/>
</amqp-connection>
</broker-connections>
```

This ensures that messages, queues, and other operations are replicated across both brokers.

### Lock Coordinator for HA

The key feature of this example is the use of **distributed locks** to control which broker accepts client connections. Both brokers are configured with a lock coordinator on their client acceptors:

```xml
<lock-coordinators>
<lock-coordinator name="clients-lock">
<type>file</type>
<lock-id>mirror-cluster-clients</lock-id>
<check-period>1000</check-period>
<properties>
<property key="locks-folder" value="/path/to/shared/locks"/>
</properties>
</lock-coordinator>
</lock-coordinators>

<acceptors>
<acceptor name="forClients" lock-coordinator="clients-lock">tcp://localhost:61616</acceptor>
</acceptors>
```

The lock coordinator ensures that:
- Only the broker holding the lock accepts client connections on that acceptor
- If the active broker fails, the lock is automatically released and acquired by the other broker
- The backup broker immediately starts accepting connections when it acquires the lock
- The shared lock file prevents split-brain scenarios

### Client Failover

Clients connect using a failover URL that includes both broker addresses:

```java
ConnectionFactory factory = new org.apache.qpid.jms.JmsConnectionFactory(
"failover:(amqp://localhost:61616,amqp://localhost:61617)?failover.maxReconnectAttempts=-1");
```

When the active broker (holding the lock) fails:
1. The lock is automatically released
2. The backup broker acquires the lock and starts accepting connections
3. The client automatically reconnects to the now-active broker
4. All messages are available due to mirroring

## Example Flow

1. Both brokers start with mirroring configured
2. One broker (typically server0) acquires the distributed lock and accepts client connections
3. The client connects and sends 30 messages to a queue
4. Server0 is killed (simulating a failure)
5. Server1 automatically acquires the lock and starts accepting connections
6. The client reconnects to server1 via failover
7. All 30 messages are consumed from server1 (due to mirroring)

## Configuration Notes

The lock coordinator supports different lock types (file-based, zookeeper). This example uses file-based locks where both brokers must have access to a shared filesystem location.

The `check-period` parameter (in milliseconds) controls how frequently the lock holder verifies it still owns the lock, affecting how quickly failover occurs when a broker crashes.

## Change the configuration to ZooKeeper

If you want to try ZooKeeper, you need to change the lock-coordinator configuration. The class-name for the Lock Manager and the connect-string must be provided on server0 and server1.

```xml
<lock-coordinators>
<lock-coordinator name="clients-lock">
<class-name>org.apache.activemq.artemis.lockmanager.zookeeper.CuratorDistributedLockManager</class-name>
<lock-id>mirror-cluster-clients</lock-id>
<check-period>1000</check-period> <!-- how often to check if the lock is still valid, in milliseconds -->

<properties>
<property key="connect-string" value="localhost:2181"/>
</properties>
</lock-coordinator>
</lock-coordinators>
```

And of course you need to have ZooKeeper running for the broker. You can do that with Podman or Docker:

```shell
# you can replace podman with docker if you prefer, using the same arguments...
podman run -d --name zookeeper-artemis-test -p 2181:2181 zookeeper:latest
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.artemis.jms.example;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import java.io.File;

import org.apache.activemq.artemis.util.ServerUtil;
import org.apache.activemq.artemis.utils.FileUtil;

/**
* Example of live and replicating backup pair using mirroring and a distributed lock from the Lock Coordinator
*/
public class HAWithMirroringExample {

private static Process server0;

private static Process server1;

public static void main(final String[] args) throws Exception {
final int numMessages = 30;

// Configure the locks folder. The broker.xml needs to have the proper path in place.
// also the locks folder needs to be created before the server starts
configureLocksFolder(args);


try {

// Start the two servers
server0 = ServerUtil.startServer(args[0], HAWithMirroringExample.class.getSimpleName() + "-peer0", 0, 0);
Thread.sleep(2_000);
server1 = ServerUtil.startServer(args[1], HAWithMirroringExample.class.getSimpleName() + "-peer1", 1, 0);

// We connect to the broker holding the lock on the distributed lock
ConnectionFactory factory = new org.apache.qpid.jms.JmsConnectionFactory(
"failover:(amqp://localhost:61616,amqp://localhost:61617)?failover.maxReconnectAttempts=-1");


try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("exampleQueue");
MessageProducer producer = session.createProducer(queue);

// Send messages in one of the brokers
for (int i = 0; i < numMessages; i++) {
producer.send(session.createTextMessage("hello " + i));
}
session.commit();

// kill the server that was probably holding the lock:
ServerUtil.killServer(server0);


// now we consume messages after the broker is killed, the client should reconnect to the correct broker

MessageConsumer consumer = session.createConsumer(queue);
connection.start();
for (int i = 0; i < numMessages; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
System.out.println("Received message " + message.getText());
}
session.commit();
}


} finally {
ServerUtil.killServer(server0);
ServerUtil.killServer(server1);
}
}

private static void configureLocksFolder(String[] args) throws Exception {
File lockFolder = new File("./target/locks");
lockFolder.mkdirs();
FileUtil.findReplace(new File(args[0] + "/etc/broker.xml"), "CHANGEME", lockFolder.getAbsolutePath());
FileUtil.findReplace(new File(args[1] + "/etc/broker.xml"), "CHANGEME", lockFolder.getAbsolutePath());
}
}
Loading