Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP NO MERGE] Migrate embedded to kraft #17238

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2529,6 +2529,7 @@ project(':streams') {
// testCompileOnly prevents streams from exporting a dependency on test-utils, which would cause a dependency cycle
testCompileOnly project(':streams:test-utils')

testImplementation project(':metadata')
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':server')
testImplementation project(':core')
Expand Down
2 changes: 2 additions & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,8 @@
</subpackage>

<subpackage name="integration">
<allow pkg="kafka.testkit"/>
<allow pkg="org.apache.kafka.metadata"/>
<allow pkg="kafka.admin" />
<allow pkg="kafka.api" />
<allow pkg="kafka.cluster" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ private void prepareConfigs(final String appID) {

protected static final int STREAMS_CONSUMER_TIMEOUT = 2000;
protected static final int CLEANUP_CONSUMER_TIMEOUT = 2000;
protected static final int TIMEOUT_MULTIPLIER = 15;
protected static final int TIMEOUT_MULTIPLIER = 30;

void prepareTest(final TestInfo testInfo) throws Exception {
final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo);
Expand All @@ -159,7 +159,7 @@ void prepareTest(final TestInfo testInfo) throws Exception {

waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT);

cluster.deleteAllTopicsAndWait(120000);
cluster.deleteAllTopics();
cluster.createTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);

add10InputElements();
Expand Down Expand Up @@ -200,6 +200,7 @@ public void testResetWhenInternalTopicsAreSpecified(final TestInfo testInfo) thr
// RUN
streams = new KafkaStreams(setupTopologyWithIntermediateTopic(true, OUTPUT_TOPIC_2), streamsConfig);
streams.start();
TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, "KafkaStreams not running in time");
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);

streams.close();
Expand Down Expand Up @@ -273,6 +274,7 @@ private void testReprocessingFromScratchAfterResetWithIntermediateUserTopic(fina
// RUN
streams = new KafkaStreams(setupTopologyWithIntermediateTopic(useRepartitioned, OUTPUT_TOPIC_2), streamsConfig);
streams.start();
TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, "KafkaStreams not running in time");
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
// receive only first values to make sure intermediate user topic is not consumed completely
// => required to test "seekToEnd" for intermediate topics
Expand Down Expand Up @@ -302,6 +304,9 @@ private void testReprocessingFromScratchAfterResetWithIntermediateUserTopic(fina

// RE-RUN
streams.start();
TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING,
"KafkaStreams not running in time");

final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
final List<KeyValue<Long, Long>> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC_2_RERUN, 40);
streams.close();
Expand All @@ -323,7 +328,7 @@ private void testReprocessingFromScratchAfterResetWithIntermediateUserTopic(fina
cleanGlobal(!useRepartitioned, null, null, appID);

if (!useRepartitioned) {
cluster.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC);
cluster.deleteTopic(INTERMEDIATE_USER_TOPIC);
}
}

Expand Down Expand Up @@ -420,7 +425,6 @@ protected void cleanGlobal(final boolean withIntermediateTopics,
}

protected void assertInternalTopicsGotDeleted(final String additionalExistingTopic) throws Exception {
// do not use list topics request, but read from the embedded cluster's zookeeper path directly to confirm
if (additionalExistingTopic != null) {
cluster.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN,
Topic.GROUP_METADATA_TOPIC_NAME, additionalExistingTopic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void setup(final TestInfo testInfo) {
builder = new StreamsBuilder();
builder.stream(inputTopic);

properties = mkObjectProperties(
properties = mkObjectProperties(
mkMap(
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,7 @@ public static void closeCluster() {
@BeforeEach
public void createTopics() throws Exception {
applicationId = "appId-" + TEST_NUMBER.getAndIncrement();
CLUSTER.deleteTopicsAndWait(
60_000L,
CLUSTER.deleteTopics(
SINGLE_PARTITION_INPUT_TOPIC, MULTI_PARTITION_INPUT_TOPIC,
SINGLE_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_THROUGH_TOPIC,
SINGLE_PARTITION_OUTPUT_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
Expand All @@ -81,19 +80,11 @@
@Tag("integration")
public class HighAvailabilityTaskAssignorIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3,
new Properties(),
asList(
new Properties() {{
setProperty(ServerConfigs.BROKER_RACK_CONFIG, AssignmentTestUtils.RACK_0);
}},
new Properties() {{
setProperty(ServerConfigs.BROKER_RACK_CONFIG, AssignmentTestUtils.RACK_1);
}},
new Properties() {{
setProperty(ServerConfigs.BROKER_RACK_CONFIG, AssignmentTestUtils.RACK_2);
}}
)
);
new Properties(), mkMap(
mkEntry(0, mkMap(mkEntry(ServerConfigs.BROKER_RACK_CONFIG, AssignmentTestUtils.RACK_0))),
mkEntry(1, mkMap(mkEntry(ServerConfigs.BROKER_RACK_CONFIG, AssignmentTestUtils.RACK_1))),
mkEntry(2, mkMap(mkEntry(ServerConfigs.BROKER_RACK_CONFIG, AssignmentTestUtils.RACK_2)))
));

@BeforeAll
public static void startCluster() throws IOException {
Expand Down Expand Up @@ -258,7 +249,7 @@ public void onRestoreEnd(final TopicPartition topicPartition,

restoreCompleteLatch.await();
// We should finalize the restoration without having restored any records (because they're already in
// the store. Otherwise, we failed to properly re-use the state from the standby.
// the store). Otherwise, we failed to properly re-use the state from the standby.
assertThat(instance1TotalRestored.get(), is(0L));
// Belt-and-suspenders check that we never even attempt to restore any records.
assertThat(instance1NumRestored.get(), is(-1L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public class IQv2StoreIntegrationTest {
private static final long SEED = new Random().nextLong();
private static final Random RANDOM = new Random(SEED);

private static final int NUM_BROKERS = 1;
private static final int NUM_BROKERS = 2;
public static final Duration WINDOW_SIZE = Duration.ofMinutes(5);
private static int port = 0;
private static final String INPUT_TOPIC_NAME = "input-topic";
Expand Down Expand Up @@ -379,7 +379,7 @@ public static void before()
throws InterruptedException, IOException, ExecutionException, TimeoutException {

CLUSTER.start();
CLUSTER.deleteAllTopicsAndWait(60 * 1000L);
CLUSTER.deleteAllTopics();
final int partitions = 2;
CLUSTER.createTopic(INPUT_TOPIC_NAME, partitions, 1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
Expand All @@ -70,10 +71,11 @@
@Timeout(600)
public class JoinGracePeriodDurabilityIntegrationTest {

private static final long NOW = Instant.now().toEpochMilli();

public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
3,
mkProperties(mkMap()),
0L
new Properties()
);

@BeforeAll
Expand Down Expand Up @@ -218,7 +220,7 @@ private void verifyOutput(final String topic, final List<KeyValueTimestamp<Strin
* just to exercise that everything works properly in the presence of commits.
*/
private long scaledTime(final long unscaledTime) {
return COMMIT_INTERVAL * 2 * unscaledTime;
return NOW + COMMIT_INTERVAL * 2 * unscaledTime;
}

private static void produceSynchronouslyToPartitionZero(final String topic, final List<KeyValueTimestamp<String, String>> toProduce) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void prepareTopology() throws InterruptedException {

@AfterEach
public void cleanup() throws InterruptedException, IOException {
CLUSTER.deleteAllTopicsAndWait(120000);
CLUSTER.deleteAllTopics();
IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void prepareTopology() throws InterruptedException {

@AfterEach
public void cleanup() throws InterruptedException, IOException {
CLUSTER.deleteAllTopicsAndWait(120000);
CLUSTER.deleteAllTopics();
IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,7 @@ public void after() throws Exception {
@Test
public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosDisabled() throws Exception {
try {
streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
streams.start();

streams = IntegrationTestUtils.getRunningStreams(STREAMS_CONFIG, streamsBuilder, false);
produceKeyValues("a", "b", "c");

assertNumberValuesRead(readKeyValues, expectedInitialResultsMap, "Table did not read all values");
Expand All @@ -132,6 +130,8 @@ public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosDi
// the state restore listener will append one record to the log
streams.setGlobalStateRestoreListener(new UpdatingSourceTopicOnRestoreStartStateRestoreListener());
streams.start();
TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING,
"KafkaStreams not running in time");

produceKeyValues("f", "g", "h");

Expand All @@ -149,8 +149,7 @@ public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosV2
STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);

try {
streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
streams.start();
streams = IntegrationTestUtils.getRunningStreams(STREAMS_CONFIG, streamsBuilder, false);

produceKeyValues("a", "b", "c");

Expand All @@ -161,6 +160,7 @@ public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosV2
// the state restore listener will append one record to the log
streams.setGlobalStateRestoreListener(new UpdatingSourceTopicOnRestoreStartStateRestoreListener());
streams.start();
TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, "KafkaStreams not running in time");

produceKeyValues("f", "g", "h");

Expand All @@ -176,16 +176,14 @@ public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosV2
@Test
public void shouldRestoreAndProgressWhenTopicNotWrittenToDuringRestoration() throws Exception {
try {
streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
streams.start();
streams = IntegrationTestUtils.getStartedStreams(STREAMS_CONFIG, streamsBuilder, false);

produceKeyValues("a", "b", "c");

assertNumberValuesRead(readKeyValues, expectedInitialResultsMap, "Table did not read all values");

streams.close();
streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
streams.start();
streams = IntegrationTestUtils.getRunningStreams(STREAMS_CONFIG, streamsBuilder, false);

produceKeyValues("f", "g", "h");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void before(final TestInfo testName) throws Exception {
adminClient = Admin.create(commonClientConfig);
}

CLUSTER.deleteAllTopicsAndWait(120_000L);
CLUSTER.deleteAllTopics();
CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
CLUSTER.createTopic(OUTPUT_TOPIC, 2, 1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
@Timeout(600)
@Tag("integration")
public class NamedTopologyIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);

private static final String TOPOLOGY_1 = "topology-1";
private static final String TOPOLOGY_2 = "topology-2";
Expand Down Expand Up @@ -243,14 +243,14 @@ public void shutdown() throws Exception {
CLUSTER.getAllTopicsInCluster().stream().filter(t -> t.contains("-changelog") || t.contains("-repartition")).forEach(t -> {
try {
assertThat("topic was not decorated", t.contains(TOPIC_PREFIX));
CLUSTER.deleteTopicsAndWait(t);
} catch (final InterruptedException e) {
CLUSTER.deleteTopics(t);
} catch (final RuntimeException e) {
e.printStackTrace();
}
});

CLUSTER.deleteTopicsAndWait(OUTPUT_STREAM_1, OUTPUT_STREAM_2, OUTPUT_STREAM_3);
CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
CLUSTER.deleteTopics(OUTPUT_STREAM_1, OUTPUT_STREAM_2, OUTPUT_STREAM_3);
CLUSTER.deleteTopics(SUM_OUTPUT, COUNT_OUTPUT);
}

@Test
Expand Down Expand Up @@ -518,8 +518,8 @@ public void shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMul

CLUSTER.getAllTopicsInCluster().stream().filter(t -> t.contains("-changelog")).forEach(t -> {
try {
CLUSTER.deleteTopicAndWait(t);
} catch (final InterruptedException e) {
CLUSTER.deleteTopic(t);
} catch (final RuntimeException e) {
e.printStackTrace();
}
});
Expand Down Expand Up @@ -570,7 +570,7 @@ public void shouldRemoveAndReplaceTopologicallyIncompatibleNamedTopology() throw
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
} finally {
CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
CLUSTER.deleteTopics(SUM_OUTPUT, COUNT_OUTPUT);
CLUSTER.deleteTopics(DELAYED_INPUT_STREAM_1);
}
}
Expand Down Expand Up @@ -624,8 +624,8 @@ public void shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTop

CLUSTER.getAllTopicsInCluster().stream().filter(t -> t.contains("changelog")).forEach(t -> {
try {
CLUSTER.deleteTopicAndWait(t);
} catch (final InterruptedException e) {
CLUSTER.deleteTopic(t);
} catch (final RuntimeException e) {
e.printStackTrace();
}
});
Expand All @@ -640,7 +640,7 @@ public void shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTop
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
} finally {
CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
CLUSTER.deleteTopics(SUM_OUTPUT, COUNT_OUTPUT);
}
}

Expand All @@ -662,8 +662,8 @@ public void shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTop

CLUSTER.getAllTopicsInCluster().stream().filter(t -> t.contains("-changelog") || t.contains("-repartition")).forEach(t -> {
try {
CLUSTER.deleteTopicsAndWait(t);
} catch (final InterruptedException e) {
CLUSTER.deleteTopics(t);
} catch (final RuntimeException e) {
e.printStackTrace();
}
});
Expand All @@ -678,7 +678,7 @@ public void shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTop
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));

CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
CLUSTER.deleteTopics(SUM_OUTPUT, COUNT_OUTPUT);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
public class PositionRestartIntegrationTest {
private static final Logger LOG = LoggerFactory.getLogger(PositionRestartIntegrationTest.class);
private static final long SEED = new Random().nextLong();
private static final int NUM_BROKERS = 1;
private static final int NUM_BROKERS = 3;
public static final Duration WINDOW_SIZE = Duration.ofMinutes(5);
private static int port = 0;
private static final String INPUT_TOPIC_NAME = "input-topic";
Expand Down Expand Up @@ -274,7 +274,7 @@ public static void before()
throws InterruptedException, IOException, ExecutionException, TimeoutException {

CLUSTER.start();
CLUSTER.deleteAllTopicsAndWait(60 * 1000L);
CLUSTER.deleteAllTopics();
final int partitions = 2;
CLUSTER.createTopic(INPUT_TOPIC_NAME, partitions, 1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public void shutdown() throws Exception {
kafkaStreams.close(ofSeconds(30));
}
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
CLUSTER.deleteAllTopicsAndWait(0L);
CLUSTER.deleteAllTopics();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void setupTopics() throws Exception {

@AfterEach
public void cleanup() throws InterruptedException {
CLUSTER.deleteAllTopicsAndWait(120000);
CLUSTER.deleteAllTopics();
}

@ParameterizedTest
Expand Down
Loading