diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java
index 7196c5a39b7..ab6819353d5 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java
@@ -45,6 +45,7 @@ public class ServerLocatorConfig {
public int reconnectAttempts = ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS;
public int initialConnectAttempts = ActiveMQClient.INITIAL_CONNECT_ATTEMPTS;
public int failoverAttempts = ActiveMQClient.DEFAULT_FAILOVER_ATTEMPTS;
+ public int failbackAttempts = ActiveMQClient.DEFAULT_FAILBACK_ATTEMPTS;
public int initialMessagePacketSize = ActiveMQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
public boolean cacheLargeMessagesClient = ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
public int compressionLevel = ActiveMQClient.DEFAULT_COMPRESSION_LEVEL;
@@ -83,6 +84,7 @@ public ServerLocatorConfig(final ServerLocatorConfig locator) {
reconnectAttempts = locator.reconnectAttempts;
initialConnectAttempts = locator.initialConnectAttempts;
failoverAttempts = locator.failoverAttempts;
+ failbackAttempts = locator.failbackAttempts;
initialMessagePacketSize = locator.initialMessagePacketSize;
useTopologyForLoadBalancing = locator.useTopologyForLoadBalancing;
compressionLevel = locator.compressionLevel;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
index 0b911a2665f..21738d5f2a3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
@@ -117,6 +117,8 @@ public final class ActiveMQClient {
public static final int DEFAULT_FAILOVER_ATTEMPTS = 0;
+ public static final int DEFAULT_FAILBACK_ATTEMPTS = 0;
+
@Deprecated
public static final boolean DEFAULT_FAILOVER_ON_INITIAL_CONNECTION = false;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
index b7e6083f458..f75ead61bee 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
@@ -668,6 +668,21 @@ ClientSessionFactory createSessionFactory(TransportConfiguration transportConfig
*/
int getFailoverAttempts();
+ /**
+ * Sets the maximum number of failback attempts to establish a new conection to the original broker after a failover.
+ *
+ * Value must be -1 (to try infinitely), 0 (to never atempt failback) or greater than 0.
+ *
+ * @param attempts maximum number of failback attempts after a successful failover
+ * @return this ServerLocator
+ */
+ ServerLocator setFailbackAttempts(int attempts);
+
+ /**
+ * @return the number of failback attempts after a successful failover
+ */
+ int getFailbackAttempts();
+
/**
* Returns true if the client will automatically attempt to connect to the backup server if the initial
* connection to the live server fails
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
index 86d0b9465d8..7a30c1d6a2e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
@@ -77,6 +77,8 @@
import java.lang.invoke.MethodHandles;
import java.util.function.BiPredicate;
+import static org.apache.activemq.artemis.api.core.ActiveMQExceptionType.DISCONNECTED;
+
public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -93,6 +95,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
private volatile TransportConfiguration backupConnectorConfig;
+ private TransportConfiguration failbackConnectorConfig;
+
private ConnectorFactory connectorFactory;
private final long callTimeout;
@@ -135,6 +139,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
private int failoverAttempts;
+ private int failbackAttempts;
+
private final Set listeners = new ConcurrentHashSet<>();
private final Set failoverListeners = new ConcurrentHashSet<>();
@@ -144,6 +150,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
private Future> pingerFuture;
private PingRunnable pingRunnable;
+ private FailbackRunnable failbackRunnable;
+
private final List incomingInterceptors;
private final List outgoingInterceptors;
@@ -244,6 +252,8 @@ public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
this.failoverAttempts = locatorConfig.failoverAttempts;
+ this.failbackAttempts = locatorConfig.failbackAttempts;
+
this.scheduledThreadPool = scheduledThreadPool;
this.threadPool = threadPool;
@@ -722,6 +732,12 @@ private void failoverOrReconnect(final Object connectionID,
int connectorsCount = 0;
int failoverRetries = 0;
long failoverRetryInterval = retryInterval;
+
+ //Save current connector config for failback purposes
+ if (failbackAttempts != 0 && failbackConnectorConfig == null) {
+ failbackConnectorConfig = connectorConfig;
+ }
+
Pair connectorPair;
BiPredicate failoverRetryPredicate =
(reconnected, retries) -> clientProtocolManager.isAlive() &&
@@ -815,6 +831,128 @@ private void failoverOrReconnect(final Object connectionID,
}
}
+ private void failback(final ActiveMQException me,
+ final TransportConfiguration previousConnectorConfig) {
+
+ logger.debug("Original node has come back online, performing failback now");
+
+ for (ClientSessionInternal session : sessions) {
+ SessionContext context = session.getSessionContext();
+ if (context instanceof ActiveMQSessionContext) {
+ ActiveMQSessionContext sessionContext = (ActiveMQSessionContext) context;
+ if (sessionContext.isKilled()) {
+ setReconnectAttempts(0);
+ }
+ }
+ }
+
+ Set sessionsToClose = null;
+ if (!clientProtocolManager.isAlive()) {
+ return;
+ }
+
+ Lock localFailoverLock = lockFailover();
+
+ try {
+
+ callFailoverListeners(FailoverEventType.FAILURE_DETECTED);
+ callSessionFailureListeners(me, false, false, null);
+
+ if (clientProtocolManager.cleanupBeforeFailover(me)) {
+
+ RemotingConnection oldConnection = connection;
+
+ connection = null;
+
+ Connector localConnector = connector;
+ if (localConnector != null) {
+ try {
+ localConnector.close();
+ } catch (Exception ignore) {
+ // no-op
+ }
+ }
+
+ cancelScheduledTasks();
+
+ connector = null;
+
+ HashSet sessionsToFailover;
+ synchronized (sessions) {
+ sessionsToFailover = new HashSet<>(sessions);
+ }
+
+ // Notify sessions before failover.
+ for (ClientSessionInternal session : sessionsToFailover) {
+ session.preHandleFailover(connection);
+ }
+
+ boolean sessionsReconnected = false;
+
+ connectorConfig = previousConnectorConfig;
+ currentConnectorConfig = previousConnectorConfig;
+
+ getConnection();
+
+ if (connection != null) {
+ sessionsReconnected = reconnectSessions(sessionsToFailover, oldConnection, me);
+
+ if (!sessionsReconnected) {
+ if (oldConnection != null) {
+ oldConnection.destroy();
+ }
+
+ oldConnection = connection;
+ connection = null;
+ }
+ }
+
+ // Notify sessions after failover.
+ for (ClientSessionInternal session : sessionsToFailover) {
+ session.postHandleFailover(connection, sessionsReconnected);
+ }
+
+ if (oldConnection != null) {
+ oldConnection.destroy();
+ }
+
+ if (connection != null) {
+ callFailoverListeners(FailoverEventType.FAILOVER_COMPLETED);
+
+ }
+ }
+
+ if (connection == null) {
+ synchronized (sessions) {
+ sessionsToClose = new HashSet<>(sessions);
+ }
+ callFailoverListeners(FailoverEventType.FAILOVER_FAILED);
+ callSessionFailureListeners(me, true, false, null);
+ }
+ } finally {
+ localFailoverLock.unlock();
+ }
+
+ // This needs to be outside the failover lock to prevent deadlock
+ if (connection != null) {
+ callSessionFailureListeners(me, true, true);
+ }
+
+ if (sessionsToClose != null) {
+ // If connection is null it means we didn't succeed in failing over or reconnecting
+ // so we close all the sessions, so they will throw exceptions when attempted to be used
+
+ for (ClientSessionInternal session : sessionsToClose) {
+ try {
+ session.cleanUp(true);
+ } catch (Exception cause) {
+ ActiveMQClientLogger.LOGGER.failedToCleanupSession(cause);
+ }
+ }
+ }
+
+ }
+
private ClientSession createSessionInternal(final String rawUsername,
final String rawPassword,
final boolean xa,
@@ -1018,6 +1156,10 @@ public boolean waitForRetry(long interval) {
return false;
}
+ private long getRetryInterval() {
+ return retryInterval;
+ }
+
private void cancelScheduledTasks() {
Future> pingerFutureLocal = pingerFuture;
if (pingerFutureLocal != null) {
@@ -1027,8 +1169,13 @@ private void cancelScheduledTasks() {
if (pingRunnableLocal != null) {
pingRunnableLocal.cancel();
}
+ FailbackRunnable failbackRunnableLocal = failbackRunnable;
+ if (failbackRunnableLocal != null) {
+ failbackRunnableLocal.cancel();
+ }
pingerFuture = null;
pingRunnable = null;
+ failbackRunnable = null;
}
private void checkCloseConnection() {
@@ -1492,6 +1639,68 @@ public synchronized void cancel() {
}
}
+ private void attemptFailback() {
+ if (failbackRunnable == null) {
+ failbackRunnable = new FailbackRunnable();
+ }
+ threadPool.execute(failbackRunnable);
+ }
+
+ private class FailbackRunnable implements Runnable {
+ private boolean first = true;
+ private boolean cancelled;
+
+ @Override
+ public synchronized void run() {
+
+ if (!first) {
+ return;
+ }
+
+ first = false;
+
+ logger.debug("Attempting failback. Trying to reach {} for failback", failbackConnectorConfig.toString());
+
+ int attempts = 0;
+ long failbackRetryInterval = getRetryInterval();
+
+ ConnectorFactory transportConnectorFactory;
+ Connector transportConnector;
+ Connection transportConnection;
+
+ while (!cancelled && (failbackAttempts == -1 || attempts++ < failbackAttempts)) {
+
+ waitForRetry(failbackRetryInterval);
+ failbackRetryInterval = getNextRetryInterval(failbackRetryInterval);
+
+ transportConnectorFactory = instantiateConnectorFactory(failbackConnectorConfig.getFactoryClassName());
+ transportConnector = createConnector(transportConnectorFactory, failbackConnectorConfig);
+ transportConnection = openTransportConnection(transportConnector);
+
+ if (transportConnection != null) {
+ transportConnector.close();
+ transportConnection.close();
+ ActiveMQException exception = new ActiveMQException("Failing back to original broker: " + failbackConnectorConfig.toString(), DISCONNECTED);
+ failback(exception, failbackConnectorConfig);
+ break;
+ }
+
+ }
+
+ if (failbackConnectorConfig.equals(currentConnectorConfig)) {
+ failbackConnectorConfig = null;
+ }
+
+ first = true;
+
+ }
+
+ public synchronized void cancel() {
+ cancelled = true;
+ }
+
+ }
+
protected RemotingConnection establishNewConnection() {
Connection transportConnection = createTransportConnection();
@@ -1572,6 +1781,13 @@ public void notifyNodeUp(long uniqueEventID,
boolean isLast) {
try {
+
+ if (failbackConnectorConfig != null && connectorPair.getA() != null && TransportConfigurationUtil.isSameHost(connectorPair.getA(), failbackConnectorConfig)) {
+ if (!currentConnectorConfig.equals(failbackConnectorConfig) && failbackRunnable == null) {
+ attemptFailback();
+ }
+ }
+
// if it is our connector then set the live id used for failover
if (connectorPair.getA() != null && TransportConfigurationUtil.isSameHost(connectorPair.getA(), currentConnectorConfig)) {
liveNodeID = nodeID;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index af7b304c86f..980d3df0cf6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -1199,6 +1199,18 @@ public int getFailoverAttempts() {
return config.failoverAttempts;
}
+ @Override
+ public ServerLocatorImpl setFailbackAttempts(int attempts) {
+ checkWrite();
+ this.config.failbackAttempts = attempts;
+ return this;
+ }
+
+ @Override
+ public int getFailbackAttempts() {
+ return config.failbackAttempts;
+ }
+
@Deprecated
@Override
public boolean isFailoverOnInitialConnection() {
diff --git a/docs/user-manual/client-failover.adoc b/docs/user-manual/client-failover.adoc
index 9fe40551209..c23b5b9c5ce 100644
--- a/docs/user-manual/client-failover.adoc
+++ b/docs/user-manual/client-failover.adoc
@@ -48,6 +48,16 @@ Set `failoverAttempts` to any non-zero value to reconnect to other live servers,
If `reconnectAttempts` value is not zero then the client will try to reconnect to other live servers only after all attempts to <> or <> fail.
+== Failing back after a successful failover
+
+It is also possile to have the client keep trying to reconnect to the original live server after a successful failover. In this case the failover connection will get disconnected and moved back to the original server if it comes back online at some later time.
+
+This is controlled by the property `failbackAttempts`
+
+If set to a non-zero value then following a failover the client will try to reach the original server continiously according to the configured `retryInterval` until it reaches the number of configured `failbackAttempts`.
+
+Setting this parameter to `-1` means try forever. Default value is `0`
+
== Session reconnection
When clients <> after a restart, <> or <> any sessions will no longer exist on the server and it won't be possible to 100% transparently re-attach to them.
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ClientConnectorFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ClientConnectorFailoverTest.java
index e86998fe08a..191208b14ee 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ClientConnectorFailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ClientConnectorFailoverTest.java
@@ -37,6 +37,7 @@
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
@@ -212,6 +213,117 @@ public void testConsumerAfterFailoverWithRedistribution() throws Exception {
}
}
+ @Test
+ public void testConsumerAfterFailoverAndFailbackWithRedistribution() throws Exception {
+ setupCluster();
+
+ AddressSettings testAddressSettings = new AddressSettings().setRedistributionDelay(0);
+ for (int i : getServerIDs()) {
+ getServer(i).getAddressSettingsRepository().addMatch(QUEUES_TESTADDRESS, testAddressSettings);
+ }
+
+ startServers(getLiveServerIDs());
+ startServers(getBackupServerIDs());
+
+ for (int i : getLiveServerIDs()) {
+ waitForTopology(servers[i], 3, 3);
+ }
+
+ for (int i : getBackupServerIDs()) {
+ waitForFailoverTopology(i, 0, 1, 2);
+ }
+
+ for (int i : getLiveServerIDs()) {
+ setupSessionFactory(i, i + 3, isNetty(), false);
+ createQueue(i, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
+ }
+
+ List transportConfigList = new ArrayList<>();
+ for (int i : getLiveServerIDs()) {
+ Map params = generateParams(i, isNetty());
+ TransportConfiguration serverToTC = createTransportConfiguration("node" + i, isNetty(), false, params);
+ serverToTC.getExtraParams().put(TEST_PARAM, TEST_PARAM);
+ transportConfigList.add(serverToTC);
+ }
+ TransportConfiguration[] transportConfigs = transportConfigList.toArray(new TransportConfiguration[transportConfigList.size()]);
+
+ try (ServerLocator serverLocator = new ServerLocatorImpl(false, transportConfigs)) {
+ serverLocator.setFailoverAttempts(3);
+ serverLocator.setFailbackAttempts(10);
+ serverLocator.setReconnectAttempts(0);
+ serverLocator.setUseTopologyForLoadBalancing(false);
+
+ try (ClientSessionFactory sessionFactory = serverLocator.createSessionFactory()) {
+ try (ClientSession clientSession = sessionFactory.createSession()) {
+ clientSession.start();
+
+ int serverIdBeforeCrash = Integer.parseInt(sessionFactory.
+ getConnectorConfiguration().getName().substring(4));
+
+ QueueControl testQueueControlBeforeCrash = (QueueControl)getServer(serverIdBeforeCrash).
+ getManagementService().getResource(ResourceNames.QUEUE + QUEUE_NAME);
+
+ Assert.assertEquals(0, testQueueControlBeforeCrash.getMessageCount());
+
+ try (ClientProducer clientProducer = clientSession.createProducer(QUEUES_TESTADDRESS)) {
+ clientProducer.send(clientSession.createMessage(true));
+ clientProducer.send(clientSession.createMessage(true));
+ clientProducer.send(clientSession.createMessage(true));
+ }
+
+ Assert.assertEquals(3, testQueueControlBeforeCrash.getMessageCount());
+
+ try (ClientConsumer clientConsumer = clientSession.createConsumer(QUEUE_NAME)) {
+ ClientMessage messageBeforeCrash = clientConsumer.receive(3000);
+ Assert.assertNotNull(messageBeforeCrash);
+ messageBeforeCrash.acknowledge();
+ clientSession.commit();
+
+ Assert.assertEquals(2, testQueueControlBeforeCrash.getMessageCount());
+
+ ActiveMQServer serv = getServer(serverIdBeforeCrash);
+ serv.stop();
+ waitForServerToStop(serv);
+
+ Assert.assertEquals(TEST_PARAM, sessionFactory.getConnectorConfiguration().getExtraParams().get(TEST_PARAM));
+
+ int serverIdAfterCrash = Integer.parseInt(sessionFactory.
+ getConnectorConfiguration().getName().substring(4));
+ Assert.assertNotEquals(serverIdBeforeCrash, serverIdAfterCrash);
+
+ Assert.assertTrue(isLiveServerID(serverIdAfterCrash));
+
+ QueueControl testQueueControlAfterCrash = (QueueControl)getServer(serverIdAfterCrash).
+ getManagementService().getResource(ResourceNames.QUEUE + QUEUE_NAME);
+
+ Wait.waitFor(() -> testQueueControlAfterCrash.getMessageCount() == 2, 3000);
+
+ ClientMessage messageAfterCrash = clientConsumer.receive(3000);
+ Assert.assertNotNull(messageAfterCrash);
+ messageAfterCrash.acknowledge();
+ clientSession.commit();
+
+ serv.start();
+ waitForServerToStart(serv);
+
+ QueueControl testQueueControlAfterFailback = (QueueControl)getServer(serverIdBeforeCrash).
+ getManagementService().getResource(ResourceNames.QUEUE + QUEUE_NAME);
+ Wait.waitFor(() -> testQueueControlAfterFailback.getMessageCount() == 1, 3000);
+
+ int serverIdAfterFailback = Integer.parseInt(sessionFactory.
+ getConnectorConfiguration().getName().substring(4));
+
+ Assert.assertEquals(serverIdBeforeCrash, serverIdAfterFailback);
+ Assert.assertTrue(isLiveServerID(serverIdAfterFailback));
+ Assert.assertNotNull(clientConsumer.receive());
+
+ }
+ clientSession.stop();
+ }
+ }
+ }
+ }
+
@Test
public void testAutoCreatedQueueAfterFailoverWithoutHA() throws Exception {
setupCluster();