Skip to content

Commit

Permalink
ARTEMIS-4325 - Ability to failback after failover
Browse files Browse the repository at this point in the history
  • Loading branch information
AntonRoskvist committed Jun 22, 2023
1 parent dc4bf95 commit 0dde73c
Show file tree
Hide file tree
Showing 7 changed files with 357 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class ServerLocatorConfig {
public int reconnectAttempts = ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS;
public int initialConnectAttempts = ActiveMQClient.INITIAL_CONNECT_ATTEMPTS;
public int failoverAttempts = ActiveMQClient.DEFAULT_FAILOVER_ATTEMPTS;
public int failbackAttempts = ActiveMQClient.DEFAULT_FAILBACK_ATTEMPTS;
public int initialMessagePacketSize = ActiveMQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
public boolean cacheLargeMessagesClient = ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
public int compressionLevel = ActiveMQClient.DEFAULT_COMPRESSION_LEVEL;
Expand Down Expand Up @@ -83,6 +84,7 @@ public ServerLocatorConfig(final ServerLocatorConfig locator) {
reconnectAttempts = locator.reconnectAttempts;
initialConnectAttempts = locator.initialConnectAttempts;
failoverAttempts = locator.failoverAttempts;
failbackAttempts = locator.failbackAttempts;
initialMessagePacketSize = locator.initialMessagePacketSize;
useTopologyForLoadBalancing = locator.useTopologyForLoadBalancing;
compressionLevel = locator.compressionLevel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ public final class ActiveMQClient {

public static final int DEFAULT_FAILOVER_ATTEMPTS = 0;

public static final int DEFAULT_FAILBACK_ATTEMPTS = 0;

@Deprecated
public static final boolean DEFAULT_FAILOVER_ON_INITIAL_CONNECTION = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,21 @@ ClientSessionFactory createSessionFactory(TransportConfiguration transportConfig
*/
int getFailoverAttempts();

/**
* Sets the maximum number of failback attempts to establish a new conection to the original broker after a failover.
* <p>
* Value must be -1 (to try infinitely), 0 (to never atempt failback) or greater than 0.
*
* @param attempts maximum number of failback attempts after a successful failover
* @return this ServerLocator
*/
ServerLocator setFailbackAttempts(int attempts);

/**
* @return the number of failback attempts after a successful failover
*/
int getFailbackAttempts();

/**
* Returns true if the client will automatically attempt to connect to the backup server if the initial
* connection to the live server fails
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@
import java.lang.invoke.MethodHandles;
import java.util.function.BiPredicate;

import static org.apache.activemq.artemis.api.core.ActiveMQExceptionType.DISCONNECTED;

public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
Expand All @@ -93,6 +95,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C

private volatile TransportConfiguration backupConnectorConfig;

private Pair<TransportConfiguration, TransportConfiguration> failbackConnectorPair;

private ConnectorFactory connectorFactory;

private final long callTimeout;
Expand Down Expand Up @@ -135,6 +139,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C

private int failoverAttempts;

private int failbackAttempts;

private final Set<SessionFailureListener> listeners = new ConcurrentHashSet<>();

private final Set<FailoverEventListener> failoverListeners = new ConcurrentHashSet<>();
Expand All @@ -144,6 +150,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
private Future<?> pingerFuture;
private PingRunnable pingRunnable;

private FailbackCheck failbackChecker;

private final List<Interceptor> incomingInterceptors;

private final List<Interceptor> outgoingInterceptors;
Expand Down Expand Up @@ -244,6 +252,8 @@ public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,

this.failoverAttempts = locatorConfig.failoverAttempts;

this.failbackAttempts = locatorConfig.failbackAttempts;

this.scheduledThreadPool = scheduledThreadPool;

this.threadPool = threadPool;
Expand Down Expand Up @@ -722,6 +732,12 @@ private void failoverOrReconnect(final Object connectionID,
int connectorsCount = 0;
int failoverRetries = 0;
long failoverRetryInterval = retryInterval;

//Save current connector config for failback purposes
if (failbackConnectorPair == null) {
failbackConnectorPair = new Pair<>(connectorConfig, backupConnectorConfig);
}

Pair<TransportConfiguration, TransportConfiguration> connectorPair;
BiPredicate<Boolean, Integer> failoverRetryPredicate =
(reconnected, retries) -> clientProtocolManager.isAlive() &&
Expand Down Expand Up @@ -752,6 +768,9 @@ private void failoverOrReconnect(final Object connectionID,
oldConnection = connection;
connection = null;
}
if (failbackAttempts != 0 && !connection.getTransportConnection().getConnectorConfig().equals(failbackConnectorPair.getA())) {
runFailBackCheck();
}
}

if (connectorsCount >= serverLocator.getConnectorsSize()) {
Expand Down Expand Up @@ -815,6 +834,130 @@ private void failoverOrReconnect(final Object connectionID,
}
}

private void failback(final ActiveMQException me,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair) {

logger.debug("Original node has come back online, performing failback now");

for (ClientSessionInternal session : sessions) {
SessionContext context = session.getSessionContext();
if (context instanceof ActiveMQSessionContext) {
ActiveMQSessionContext sessionContext = (ActiveMQSessionContext) context;
if (sessionContext.isKilled()) {
setReconnectAttempts(0);
}
}
}

Set<ClientSessionInternal> sessionsToClose = null;
if (!clientProtocolManager.isAlive()) {
return;
}

Lock localFailoverLock = lockFailover();

try {

callFailoverListeners(FailoverEventType.FAILURE_DETECTED);
callSessionFailureListeners(me, false, false, null);

if (clientProtocolManager.cleanupBeforeFailover(me)) {

RemotingConnection oldConnection = connection;

connection = null;

Connector localConnector = connector;
if (localConnector != null) {
try {
localConnector.close();
} catch (Exception ignore) {
// no-op
}
}

cancelScheduledTasks();

connector = null;

HashSet<ClientSessionInternal> sessionsToFailover;
synchronized (sessions) {
sessionsToFailover = new HashSet<>(sessions);
}

// Notify sessions before failover.
for (ClientSessionInternal session : sessionsToFailover) {
session.preHandleFailover(connection);
}

boolean sessionsReconnected = false;

connectorConfig = connectorPair.getA();
currentConnectorConfig = connectorPair.getA();
if (connectorPair.getB() != null) {
backupConnectorConfig = connectorPair.getB();
}

getConnection();

if (connection != null) {
sessionsReconnected = reconnectSessions(sessionsToFailover, oldConnection, me);

if (!sessionsReconnected) {
if (oldConnection != null) {
oldConnection.destroy();
}

oldConnection = connection;
connection = null;
}
}

// Notify sessions after failover.
for (ClientSessionInternal session : sessionsToFailover) {
session.postHandleFailover(connection, sessionsReconnected);
}

if (oldConnection != null) {
oldConnection.destroy();
}

if (connection != null) {
callFailoverListeners(FailoverEventType.FAILOVER_COMPLETED);
}
}

if (connection == null) {
synchronized (sessions) {
sessionsToClose = new HashSet<>(sessions);
}
callFailoverListeners(FailoverEventType.FAILOVER_FAILED);
callSessionFailureListeners(me, true, false, null);
}
} finally {
localFailoverLock.unlock();
}

// This needs to be outside the failover lock to prevent deadlock
if (connection != null) {
callSessionFailureListeners(me, true, true);
}

if (sessionsToClose != null) {
// If connection is null it means we didn't succeed in failing over or reconnecting
// so we close all the sessions, so they will throw exceptions when attempted to be used

for (ClientSessionInternal session : sessionsToClose) {
try {
session.cleanUp(true);
} catch (Exception cause) {
ActiveMQClientLogger.LOGGER.failedToCleanupSession(cause);
}
}
}

}

private ClientSession createSessionInternal(final String rawUsername,
final String rawPassword,
final boolean xa,
Expand Down Expand Up @@ -995,6 +1138,10 @@ private int getConnectionWithRetry(final int reconnectAttempts, RemotingConnecti
return count;
}

private long getRetryInterval() {
return retryInterval;
}

private long getNextRetryInterval(long retryInterval) {
// Exponential back-off
long nextRetryInterval = (long) (retryInterval * retryIntervalMultiplier);
Expand Down Expand Up @@ -1492,6 +1639,59 @@ public synchronized void cancel() {
}
}

private void runFailBackCheck() {
if (failbackChecker == null) {
failbackChecker = new FailbackCheck();
}
threadPool.execute(failbackChecker);
}

private class FailbackCheck implements Runnable {
private boolean first = true;

@Override
public synchronized void run() {

if (!first) {
return;
}

first = false;

logger.debug("failbackChecker is running, trying to reach {} for failback", failbackConnectorPair.getA().toString());

boolean run = true;
int attempts = 0;
long failbackRetryInterval = getRetryInterval();

ConnectorFactory transportConnectorFactory;
Connector transportConnector;
Connection transportConnection;

while (failbackAttempts == -1 || attempts++ < failbackAttempts) {

waitForRetry(failbackRetryInterval);
failbackRetryInterval = getNextRetryInterval(failbackRetryInterval);

transportConnectorFactory = instantiateConnectorFactory(failbackConnectorPair.getA().getFactoryClassName());
transportConnector = createConnector(transportConnectorFactory, failbackConnectorPair.getA());
transportConnection = openTransportConnection(transportConnector);

if (transportConnection != null) {
transportConnector.close();
transportConnection.close();
ActiveMQException exception = new ActiveMQException("Failing back to original broker: " + failbackConnectorPair.getA().toString(), DISCONNECTED);
failback(exception, failbackConnectorPair);
break;
}

}

first = true;
}

}

protected RemotingConnection establishNewConnection() {
Connection transportConnection = createTransportConnection();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1199,6 +1199,18 @@ public int getFailoverAttempts() {
return config.failoverAttempts;
}

@Override
public ServerLocatorImpl setFailbackAttempts(int attempts) {
checkWrite();
this.config.failbackAttempts = attempts;
return this;
}

@Override
public int getFailbackAttempts() {
return config.failbackAttempts;
}

@Deprecated
@Override
public boolean isFailoverOnInitialConnection() {
Expand Down
14 changes: 14 additions & 0 deletions docs/user-manual/en/client-failover.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,20 @@ to other live servers only after all attempts to
[reconnect to the same server](#reconnect-to-the-same-server) or
[reconnect to the backup server](#reconnect-to-the-backup-server) fail.

## Failing back after a successful failover
It is also possile to have the client keep trying to reconnect to the original
live server after a successful failover. In this case the failover connection
will get disconnected and moved back to the original server if it comes
back online at some later time.

This is controlled by the property `failbackAttempts`

If set to a non-zero value then following a failover the client will try to reach
the original server continiously according to the configured `retryInterval`
until it reaches the number of configured `failbackAttempts`.

Setting this parameter to `-1` means try forever. Default value is `0`

## Session reconnection

When clients [reconnect to the same server](#reconnect-to-the-same-server)
Expand Down
Loading

0 comments on commit 0dde73c

Please sign in to comment.