Skip to content

Commit

Permalink
changes to tests based on comments
Browse files Browse the repository at this point in the history
Signed-off-by: Rahul Karajgikar <[email protected]>
  • Loading branch information
Rahul Karajgikar committed Sep 19, 2024
1 parent 83b16de commit fc170ee
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,9 @@
package org.opensearch.cluster.coordination;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.layout.PatternLayout;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.cluster.NodeConnectionsService;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand All @@ -53,6 +49,7 @@
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
import org.opensearch.test.TestLogsAppender;
import org.opensearch.test.store.MockFSIndexStore;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.test.transport.StubbableTransport;
Expand All @@ -64,12 +61,11 @@
import org.junit.After;
import org.junit.Before;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;

import static org.opensearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_ACTION_NAME;
import static org.hamcrest.Matchers.is;
Expand All @@ -81,7 +77,7 @@
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class NodeJoinLeftIT extends OpenSearchIntegTestCase {

private TestAppender testAppender;
private TestLogsAppender testLogsAppender;
private String clusterManager;
private String redNodeName;
private LoggerContext loggerContext;
Expand All @@ -108,11 +104,13 @@ protected void beforeIndexDeletion() throws Exception {
@Before
public void setUp() throws Exception {
super.setUp();
testAppender = new TestAppender();
// Add any other specific messages you want to capture
List<String> messagesToCapture = Arrays.asList("failed to join", "IllegalStateException");
testLogsAppender = new TestLogsAppender(messagesToCapture);
loggerContext = (LoggerContext) LogManager.getContext(false);
Configuration config = loggerContext.getConfiguration();
LoggerConfig loggerConfig = config.getLoggerConfig(ClusterConnectionManager.class.getName());
loggerConfig.addAppender(testAppender, null, null);
loggerConfig.addAppender(testLogsAppender, null, null);
loggerContext.updateLoggers();

String indexName = "test";
Expand Down Expand Up @@ -148,10 +146,11 @@ public void setUp() throws Exception {

@After
public void tearDown() throws Exception {
testLogsAppender.clearCapturedLogs();
loggerContext = (LoggerContext) LogManager.getContext(false);
Configuration config = loggerContext.getConfiguration();
LoggerConfig loggerConfig = config.getLoggerConfig(ClusterConnectionManager.class.getName());
loggerConfig.removeAppender(testAppender.getName());
loggerConfig.removeAppender(testLogsAppender.getName());
loggerContext.updateLoggers();
super.tearDown();
}
Expand Down Expand Up @@ -190,9 +189,9 @@ public void testClusterStabilityWhenJoinRequestHappensDuringNodeLeftTask() throw
);
redTransportService.addRequestHandlingBehavior(FOLLOWER_CHECK_ACTION_NAME, simulatedFailureBehaviour);

// Loop runs 10 times to ensure race condition gets reproduced
for (int i = 0; i < 10; i++) {
// Fail followerchecker by force to trigger node disconnect and node left
// Loop runs 5 times to ensure race condition gets reproduced
testLogsAppender.clearCapturedLogs();
for (int i = 0; i < 5; i++) {
logger.info("--> simulating followerchecker failure to trigger node-left");
succeedFollowerChecker.set(false);
ClusterHealthResponse response1 = client().admin().cluster().prepareHealth().setWaitForNodes("2").get();
Expand All @@ -214,12 +213,14 @@ public void testClusterStabilityWhenJoinRequestHappensDuringNodeLeftTask() throw
ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
assertThat(response.isTimedOut(), is(false));

// assert that the right exception message showed up in logs
assertTrue(
"Expected IllegalStateException was not logged",
testAppender.containsExceptionMessage("IllegalStateException[cannot make a new connection as disconnect to node")
);

// assert that join requests fail with the right exception
boolean logFound = testLogsAppender.waitForLog("failed to join", 30, TimeUnit.SECONDS)
&& testLogsAppender.waitForLog(
"IllegalStateException[cannot make a new connection as disconnect to node",
30,
TimeUnit.SECONDS
);
assertTrue("Expected log was not found within the timeout period", logFound);
}

public void testClusterStabilityWhenDisconnectDuringSlowNodeLeftTask() throws Exception {
Expand Down Expand Up @@ -259,8 +260,9 @@ public void testClusterStabilityWhenDisconnectDuringSlowNodeLeftTask() throws Ex
);
redTransportService.addRequestHandlingBehavior(FOLLOWER_CHECK_ACTION_NAME, simulatedFailureBehaviour);

// Loop runs 10 times to ensure race condition gets reproduced
for (int i = 0; i < 10; i++) {
// Loop runs 5 times to ensure race condition gets reproduced
testLogsAppender.clearCapturedLogs();
for (int i = 0; i < 5; i++) {
// Fail followerchecker by force to trigger node disconnect and node left
logger.info("--> simulating followerchecker failure to trigger node-left");
succeedFollowerChecker.set(false);
Expand Down Expand Up @@ -291,11 +293,15 @@ public void testClusterStabilityWhenDisconnectDuringSlowNodeLeftTask() throws Ex
ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
assertThat(response.isTimedOut(), is(false));

// assert that the right exception message showed up in logs
assertTrue(
"Expected IllegalStateException was not logged",
testAppender.containsExceptionMessage("IllegalStateException[cannot make a new connection as disconnect to node")
// assert that join requests fail with the right exception
boolean logFound = testLogsAppender.waitForLog("failed to join", 30, TimeUnit.SECONDS);
assertTrue("Expected log was not found within the timeout period", logFound);
logFound = testLogsAppender.waitForLog(
"IllegalStateException[cannot make a new connection as disconnect to node",
30,
TimeUnit.SECONDS
);
assertTrue("Expected log was not found within the timeout period", logFound);
}

public void testRestartDataNode() throws Exception {
Expand Down Expand Up @@ -346,29 +352,4 @@ public void messageReceived(
handler.messageReceived(request, channel, task);
}
}

private static class TestAppender extends AbstractAppender {
private final List<String> logs = new ArrayList<>();

TestAppender() {
super("TestAppender", null, PatternLayout.createDefaultLayout(), false, Property.EMPTY_ARRAY);
start();
}

@Override
public void append(LogEvent event) {
logs.add(event.getMessage().getFormattedMessage());
if (event.getThrown() != null) {
logs.add(event.getThrown().toString());
for (StackTraceElement element : event.getThrown().getStackTrace()) {
logs.add(element.toString());
}
}
}

boolean containsExceptionMessage(String exceptionMessage) {
Pattern pattern = Pattern.compile(Pattern.quote(exceptionMessage), Pattern.CASE_INSENSITIVE);
return logs.stream().anyMatch(log -> pattern.matcher(log).find());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.Version;
import org.opensearch.action.support.PlainActionFuture;
Expand All @@ -53,9 +56,11 @@
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.MockLogAppender;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.TestLogsAppender;
import org.opensearch.test.junit.annotations.TestLogging;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ClusterConnectionManager;
import org.opensearch.transport.ConnectTransportException;
import org.opensearch.transport.ConnectionProfile;
import org.opensearch.transport.Transport;
Expand All @@ -69,6 +74,7 @@
import org.junit.Before;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
Expand All @@ -94,6 +100,8 @@ public class NodeConnectionsServiceTests extends OpenSearchTestCase {
private ThreadPool threadPool;
private TransportService transportService;
private Map<DiscoveryNode, CheckedRunnable<Exception>> nodeConnectionBlocks;
private TestLogsAppender testLogsAppender;
private LoggerContext loggerContext;

private List<DiscoveryNode> generateNodes() {
List<DiscoveryNode> nodes = new ArrayList<>();
Expand Down Expand Up @@ -516,7 +524,7 @@ public void testConnectionCheckerRetriesIfPendingDisconnection() throws Interrup

// setup the connections
final DiscoveryNode node = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT);
;

final DiscoveryNodes nodes = DiscoveryNodes.builder().add(node).build();

final AtomicBoolean connectionCompleted = new AtomicBoolean();
Expand All @@ -526,13 +534,15 @@ public void testConnectionCheckerRetriesIfPendingDisconnection() throws Interrup

// now trigger a disconnect, and then set pending disconnections to true to fail any new connections
final long maxDisconnectionTime = 1000;
final long disconnectionTime = 100;
deterministicTaskQueue.scheduleAt(disconnectionTime, new Runnable() {
deterministicTaskQueue.scheduleNow(new Runnable() {
@Override
public void run() {
transportService.disconnectFromNode(node);
logger.info("--> setting pending disconnections to fail next connection attempts");
service.setPendingDisconnections(new HashSet<>(Collections.singleton(node)));
// we reset the connection count during the first disconnection
// we also clear the captured logs as we want to assert for exceptions that show up after this
testLogsAppender.clearCapturedLogs();
transportService.resetConnectToNodeCallCount();
}

Expand All @@ -541,20 +551,39 @@ public String toString() {
return "scheduled disconnection of " + node;
}
});
final long maxReconnectionTime = 2000;
final int expectedReconnectionAttempts = 5;

// ensure the disconnect task completes, give extra time also for connection checker tasks
runTasksUntil(deterministicTaskQueue, maxDisconnectionTime);

// verify that connectionchecker is trying to call connectToNode multiple times
// ensure the disconnect task completes, and run for additional time to check for reconnections
// exit early if we see enough reconnection attempts
logger.info("--> verifying connectionchecker is trying to reconnect");
runTasksUntilExpectedReconnectionAttempts(
deterministicTaskQueue,
maxDisconnectionTime + maxReconnectionTime,
transportService,
expectedReconnectionAttempts
);

// assert that we saw at least the required number of reconnection attempts, and the exceptions that showed up are as expected
logger.info("--> number of reconnection attempts: {}", transportService.getConnectToNodeCallCount());
assertThat("ConnectToNode should be called multiple times", transportService.getConnectToNodeCallCount(), greaterThan(5));
assertThat(
"Did not see enough reconnection attempts from connection checker",
transportService.getConnectToNodeCallCount(),
greaterThan(expectedReconnectionAttempts)
);
boolean logFound = testLogsAppender.waitForLog("failed to connect", 1, TimeUnit.SECONDS)
&& testLogsAppender.waitForLog(
"IllegalStateException: cannot make a new connection as disconnect to node",
1,
TimeUnit.SECONDS
);
assertTrue("Expected log for reconnection failure was not found in the required time period", logFound);
assertFalse("connected to " + node, transportService.nodeConnected(node));

// clear the pending disconnections and ensure the connection gets re-established automatically by connectionchecker
logger.info("--> clearing pending disconnections to allow connections to re-establish");
service.clearPendingDisconnections();
runTasksUntil(deterministicTaskQueue, maxDisconnectionTime + 2 * reconnectIntervalMillis);
runTasksUntil(deterministicTaskQueue, maxDisconnectionTime + maxReconnectionTime + 2 * reconnectIntervalMillis);
assertConnectedExactlyToNodes(transportService, nodes);
}

Expand All @@ -569,6 +598,24 @@ private void runTasksUntil(DeterministicTaskQueue deterministicTaskQueue, long e
deterministicTaskQueue.runAllRunnableTasks();
}

private void runTasksUntilExpectedReconnectionAttempts(
DeterministicTaskQueue deterministicTaskQueue,
long endTimeMillis,
TestTransportService transportService,
int expectedReconnectionAttempts
) {
// break the loop if we timeout or if we have enough reconnection attempts
while ((deterministicTaskQueue.getCurrentTimeMillis() < endTimeMillis)
&& (transportService.getConnectToNodeCallCount() <= expectedReconnectionAttempts)) {
if (deterministicTaskQueue.hasRunnableTasks() && randomBoolean()) {
deterministicTaskQueue.runRandomTask();
} else if (deterministicTaskQueue.hasDeferredTasks()) {
deterministicTaskQueue.advanceTime();
}
}
deterministicTaskQueue.runAllRunnableTasks();
}

private void ensureConnections(NodeConnectionsService service) {
final PlainActionFuture<Void> future = new PlainActionFuture<>();
service.ensureConnections(() -> future.onResponse(null));
Expand All @@ -594,6 +641,16 @@ private void assertConnected(TransportService transportService, Iterable<Discove
@Before
public void setUp() throws Exception {
super.setUp();
// Add any other specific messages you want to capture
List<String> messagesToCapture = Arrays.asList("failed to connect", "IllegalStateException");
testLogsAppender = new TestLogsAppender(messagesToCapture);
loggerContext = (LoggerContext) LogManager.getContext(false);
Configuration config = loggerContext.getConfiguration();
LoggerConfig loggerConfig = config.getLoggerConfig(NodeConnectionsService.class.getName());
loggerConfig.addAppender(testLogsAppender, null, null);
loggerConfig = config.getLoggerConfig(ClusterConnectionManager.class.getName());
loggerConfig.addAppender(testLogsAppender, null, null);
loggerContext.updateLoggers();
ThreadPool threadPool = new TestThreadPool(getClass().getName());
this.threadPool = threadPool;
nodeConnectionBlocks = newConcurrentMap();
Expand All @@ -605,6 +662,14 @@ public void setUp() throws Exception {
@Override
@After
public void tearDown() throws Exception {
testLogsAppender.clearCapturedLogs();
loggerContext = (LoggerContext) LogManager.getContext(false);
Configuration config = loggerContext.getConfiguration();
LoggerConfig loggerConfig = config.getLoggerConfig(NodeConnectionsService.class.getName());
loggerConfig.removeAppender(testLogsAppender.getName());
loggerConfig = config.getLoggerConfig(ClusterConnectionManager.class.getName());
loggerConfig.removeAppender(testLogsAppender.getName());
loggerContext.updateLoggers();
transportService.stop();
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
threadPool = null;
Expand Down
Loading

0 comments on commit fc170ee

Please sign in to comment.