Skip to content

Commit bdc92fd

Browse files
authored
MINOR: Cleanup zk condition in TransactionsTest, QuorumTestHarness and PlaintextConsumerAssignorsTest (apache#18639)
Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent a783dc6 commit bdc92fd

File tree

3 files changed

+12
-36
lines changed

3 files changed

+12
-36
lines changed

core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignorsTest.scala

-2
Original file line numberDiff line numberDiff line change
@@ -308,8 +308,6 @@ class PlaintextConsumerAssignorsTest extends AbstractConsumerTest {
308308
// Only the classic group protocol supports client-side assignors
309309
@ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}.assignmentStrategy={2}")
310310
@CsvSource(Array(
311-
"zk, classic, org.apache.kafka.clients.consumer.CooperativeStickyAssignor",
312-
"zk, classic, org.apache.kafka.clients.consumer.RangeAssignor",
313311
"kraft, classic, org.apache.kafka.clients.consumer.CooperativeStickyAssignor",
314312
"kraft, classic, org.apache.kafka.clients.consumer.RangeAssignor"
315313
))

core/src/test/scala/integration/kafka/api/TransactionsTest.scala

+11-29
Original file line numberDiff line numberDiff line change
@@ -587,14 +587,9 @@ class TransactionsTest extends IntegrationTestHarness {
587587
fail("Should not be able to send messages from a fenced producer.")
588588
} catch {
589589
case _: InvalidProducerEpochException =>
590-
case e: ExecutionException => {
591-
if (quorum == "zk") {
592-
assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
593-
} else {
594-
// In kraft mode, transactionV2 is used.
595-
assertTrue(e.getCause.isInstanceOf[InvalidProducerEpochException])
596-
}
597-
}
590+
case e: ExecutionException =>
591+
// In kraft mode, transactionV2 is used.
592+
assertTrue(e.getCause.isInstanceOf[InvalidProducerEpochException])
598593
case e: Exception =>
599594
throw new AssertionError("Got an unexpected exception from a fenced producer.", e)
600595
}
@@ -622,27 +617,14 @@ class TransactionsTest extends IntegrationTestHarness {
622617
// Wait for the expiration cycle to kick in.
623618
Thread.sleep(600)
624619

625-
if (quorum == "zk") {
626-
// In zk mode, transaction v1 is used.
627-
try {
628-
// Now that the transaction has expired, the second send should fail with a ProducerFencedException.
629-
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "2", "2", willBeCommitted = false)).get()
630-
fail("should have raised a ProducerFencedException since the transaction has expired")
631-
} catch {
632-
case _: ProducerFencedException =>
633-
case e: ExecutionException =>
634-
assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
635-
}
636-
} else {
637-
try {
638-
// Now that the transaction has expired, the second send should fail with a InvalidProducerEpochException.
639-
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "2", "2", willBeCommitted = false)).get()
640-
fail("should have raised a InvalidProducerEpochException since the transaction has expired")
641-
} catch {
642-
case _: InvalidProducerEpochException =>
643-
case e: ExecutionException =>
644-
assertTrue(e.getCause.isInstanceOf[InvalidProducerEpochException])
645-
}
620+
try {
621+
// Now that the transaction has expired, the second send should fail with a InvalidProducerEpochException.
622+
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "2", "2", willBeCommitted = false)).get()
623+
fail("should have raised a InvalidProducerEpochException since the transaction has expired")
624+
} catch {
625+
case _: InvalidProducerEpochException =>
626+
case e: ExecutionException =>
627+
assertTrue(e.getCause.isInstanceOf[InvalidProducerEpochException])
646628
}
647629

648630
// Verify that the first message was aborted and the second one was never written at all.

core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala

+1-5
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,6 @@ abstract class QuorumTestHarness extends Logging {
270270
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, s"CONTROLLER://localhost:0,$listeners")
271271
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, s"CONTROLLER,$listenerNames")
272272
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$nodeId@localhost:0")
273-
// Setting the configuration to the same value set on the brokers via TestUtils to keep KRaft based and Zk based controller configs are consistent.
274273
props.setProperty(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG, "1000")
275274
val config = new KafkaConfig(props)
276275

@@ -369,7 +368,7 @@ object QuorumTestHarness {
369368

370369
/**
371370
* Verify that a previous test that doesn't use QuorumTestHarness hasn't left behind an unexpected thread.
372-
* This assumes that brokers, ZooKeeper clients, producers and consumers are not created in another @BeforeClass,
371+
* This assumes that brokers, admin clients, producers and consumers are not created in another @BeforeClass,
373372
* which is true for core tests where this harness is used.
374373
*/
375374
@BeforeAll
@@ -437,9 +436,6 @@ object QuorumTestHarness {
437436
)
438437
}
439438

440-
// The following is for tests that only work with the classic group protocol because of relying on Zookeeper
441-
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit: java.util.stream.Stream[Arguments] = stream.Stream.of(Arguments.of("zk", GroupProtocol.CLASSIC.name.toLowerCase(Locale.ROOT)))
442-
443439
// The following parameter groups are to *temporarily* avoid bugs with the CONSUMER group protocol Consumer
444440
// implementation that would otherwise cause tests to fail.
445441
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly

0 commit comments

Comments
 (0)