Skip to content

Commit

Permalink
Set time for embedded cluster to current time and set low timestamps …
Browse files Browse the repository at this point in the history
…to current time + timestamp
  • Loading branch information
bbejeck committed Sep 19, 2024
1 parent f3aa2fd commit 74dc218
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.TestUtils;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
Expand All @@ -49,6 +48,7 @@

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
Expand All @@ -70,10 +70,11 @@
@Timeout(600)
public class JoinGracePeriodDurabilityIntegrationTest {

private static final long NOW = Instant.now().toEpochMilli();

public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
3,
new Properties(),
0L
new Properties()
);

@BeforeAll
Expand Down Expand Up @@ -219,7 +220,7 @@ private void verifyOutput(final String topic, final List<KeyValueTimestamp<Strin
* just to exercise that everything works properly in the presence of commits.
*/
private long scaledTime(final long unscaledTime) {
return COMMIT_INTERVAL * 2 * unscaledTime;
return NOW + COMMIT_INTERVAL * 2 * unscaledTime;
}

private static void produceSynchronouslyToPartitionZero(final String topic, final List<KeyValueTimestamp<String, String>> toProduce) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
Expand All @@ -63,14 +64,15 @@
@Timeout(600)
public class ResetPartitionTimeIntegrationTest {
private static final int NUM_BROKERS = 1;
private static final long NOW = Instant.now().toEpochMilli();
private static final Properties BROKER_CONFIG;
static {
BROKER_CONFIG = new Properties();
BROKER_CONFIG.put("transaction.state.log.replication.factor", (short) 1);
BROKER_CONFIG.put("transaction.state.log.min.isr", 1);
}
public static final EmbeddedKafkaCluster CLUSTER =
new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG, 0L);
new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);

@BeforeAll
public static void startCluster() throws IOException {
Expand Down Expand Up @@ -118,13 +120,13 @@ public void shouldPreservePartitionTimeOnKafkaStreamRestart(final String process
produceSynchronouslyToPartitionZero(
input,
Collections.singletonList(
new KeyValueTimestamp<>("k3", "v3", 5000)
new KeyValueTimestamp<>("k3", "v3", NOW + 5000)
)
);
verifyOutput(
outputRaw,
Collections.singletonList(
new KeyValueTimestamp<>("k3", "v3", 5000)
new KeyValueTimestamp<>("k3", "v3", NOW + 5000)
)
);
assertThat(lastRecordedTimestamp, is(-1L));
Expand All @@ -139,16 +141,16 @@ public void shouldPreservePartitionTimeOnKafkaStreamRestart(final String process
produceSynchronouslyToPartitionZero(
input,
Collections.singletonList(
new KeyValueTimestamp<>("k5", "v5", 4999)
new KeyValueTimestamp<>("k5", "v5", NOW + 4999)
)
);
verifyOutput(
outputRaw,
Collections.singletonList(
new KeyValueTimestamp<>("k5", "v5", 4999)
new KeyValueTimestamp<>("k5", "v5", NOW + 4999)
)
);
assertThat(lastRecordedTimestamp, is(5000L));
assertThat(lastRecordedTimestamp, is(NOW + 5000L));
} finally {
kafkaStreams.close();
quietlyCleanStateAfterTest(CLUSTER, kafkaStreams);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -81,7 +82,8 @@
@Timeout(600)
public class StreamsUncaughtExceptionHandlerIntegrationTest {

public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), Collections.emptyMap(), 0L, 0L);
private static final long NOW = Instant.now().toEpochMilli();
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), Collections.emptyMap());

@BeforeAll
public static void startCluster() throws IOException {
Expand Down Expand Up @@ -146,7 +148,7 @@ public void shouldShutdownThreadUsingOldHandler() throws Exception {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> counter.incrementAndGet());

startApplicationAndWaitUntilRunning(kafkaStreams);
produceMessages(0L, inputTopic, "A");
produceMessages(NOW, inputTopic, "A");

// should call the UncaughtExceptionHandler in current thread
TestUtils.waitForCondition(() -> counter.get() == 1, "Handler was called 1st time");
Expand All @@ -168,7 +170,7 @@ public void shouldShutdownClient() throws Exception {

startApplicationAndWaitUntilRunning(kafkaStreams);

produceMessages(0L, inputTopic, "A");
produceMessages(NOW, inputTopic, "A");
waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION);

assertThat(processorValueCollector.size(), equalTo(1));
Expand Down Expand Up @@ -252,7 +254,7 @@ public void shouldShutDownClientIfGlobalStreamThreadWantsToReplaceThread() throw

startApplicationAndWaitUntilRunning(kafkaStreams);

produceMessages(0L, inputTopic2, "A");
produceMessages(NOW, inputTopic2, "A");
waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION);

assertThat(processorValueCollector.size(), equalTo(1));
Expand Down Expand Up @@ -297,7 +299,7 @@ public void shouldEmitSameRecordAfterFailover() throws Exception {
IntegerSerializer.class,
StringSerializer.class,
new Properties()),
0L);
NOW);

IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
inputTopic2,
Expand All @@ -310,7 +312,7 @@ public void shouldEmitSameRecordAfterFailover() throws Exception {
IntegerSerializer.class,
StringSerializer.class,
new Properties()),
0L);
NOW);

IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
TestUtils.consumerConfig(
Expand Down Expand Up @@ -365,7 +367,7 @@ private void testShutdownApplication(final int numThreads) throws Exception {

startApplicationAndWaitUntilRunning(asList(kafkaStreams1, kafkaStreams2));

produceMessages(0L, inputTopic, "A");
produceMessages(NOW, inputTopic, "A");
waitForApplicationState(asList(kafkaStreams1, kafkaStreams2), KafkaStreams.State.ERROR, DEFAULT_DURATION);

assertThat(processorValueCollector.size(), equalTo(1));
Expand All @@ -386,7 +388,7 @@ private void testReplaceThreads(final int numThreads) throws Exception {
});
startApplicationAndWaitUntilRunning(kafkaStreams);

produceMessages(0L, inputTopic, "A");
produceMessages(NOW, inputTopic, "A");
TestUtils.waitForCondition(() -> count.get() == numThreads, "finished replacing threads");
TestUtils.waitForCondition(() -> throwError.get(), "finished replacing threads");
kafkaStreams.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
Expand All @@ -80,11 +81,10 @@
@Tag("integration")
@Timeout(600)
public class SuppressionDurabilityIntegrationTest {

private static final long NOW = Instant.now().toEpochMilli();
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
3,
mkProperties(mkMap()),
0L
mkProperties(mkMap())
);

@BeforeAll
Expand Down Expand Up @@ -307,7 +307,7 @@ private void verifyOutput(final String topic, final List<KeyValueTimestamp<Strin
* just to exercise that everything works properly in the presence of commits.
*/
private long scaledTime(final long unscaledTime) {
return COMMIT_INTERVAL * 2 * unscaledTime;
return NOW + COMMIT_INTERVAL * 2 * unscaledTime;
}

private static void produceSynchronouslyToPartitionZero(final String topic, final List<KeyValueTimestamp<String, String>> toProduce) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.junit.jupiter.api.Timeout;

import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -83,11 +84,11 @@
@Tag("integration")
@Timeout(600)
public class SuppressionIntegrationTest {
private static final long NOW = Instant.now().toEpochMilli();

public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
1,
mkProperties(mkMap()),
0L
mkProperties(mkMap())
);

@BeforeAll
Expand Down Expand Up @@ -525,7 +526,7 @@ private static Properties getStreamsConfig(final String appId) {
* just to exercise that everything works properly in the presence of commits.
*/
private static long scaledTime(final long unscaledTime) {
return COMMIT_INTERVAL * 2 * unscaledTime;
return NOW + COMMIT_INTERVAL * 2 * unscaledTime;
}

private static void produceSynchronously(final String topic, final List<KeyValueTimestamp<String, String>> toProduce) {
Expand Down

0 comments on commit 74dc218

Please sign in to comment.