|
19 | 19 | package org.apache.pulsar.client.impl;
|
20 | 20 |
|
21 | 21 | import static java.nio.charset.StandardCharsets.UTF_8;
|
| 22 | +import static org.testng.Assert.assertTrue; |
22 | 23 | import static org.testng.Assert.fail;
|
23 | 24 |
|
24 | 25 | import com.google.common.collect.Sets;
|
25 | 26 |
|
| 27 | +import java.lang.reflect.Field; |
| 28 | +import java.util.Optional; |
| 29 | +import java.util.concurrent.CompletableFuture; |
26 | 30 | import java.util.ArrayList;
|
27 | 31 | import java.util.HashMap;
|
28 | 32 | import java.util.List;
|
29 | 33 | import java.util.Map;
|
30 | 34 | import java.util.concurrent.TimeUnit;
|
| 35 | + |
31 | 36 | import lombok.Cleanup;
|
32 | 37 | import lombok.extern.slf4j.Slf4j;
|
| 38 | + |
| 39 | +import org.apache.bookkeeper.mledger.Position; |
| 40 | +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; |
| 41 | +import org.apache.bookkeeper.mledger.impl.PositionImpl; |
| 42 | +import org.apache.pulsar.broker.service.BrokerService; |
| 43 | +import org.apache.pulsar.broker.service.Topic; |
| 44 | +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; |
33 | 45 | import org.apache.pulsar.broker.transaction.TransactionTestBase;
|
34 | 46 | import org.apache.pulsar.client.api.Consumer;
|
35 | 47 | import org.apache.pulsar.client.api.Message;
|
|
51 | 63 | import org.apache.pulsar.common.policies.data.ClusterData;
|
52 | 64 | import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
|
53 | 65 | import org.apache.pulsar.common.policies.data.TenantInfo;
|
| 66 | +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; |
54 | 67 | import org.testng.Assert;
|
55 | 68 | import org.testng.annotations.AfterMethod;
|
56 | 69 | import org.testng.annotations.BeforeMethod;
|
@@ -317,7 +330,7 @@ private void txnAckTest(boolean batchEnable, int maxBatchSize,
|
317 | 330 |
|
318 | 331 | @Test
|
319 | 332 | public void txnMessageAckTest() throws Exception {
|
320 |
| - final String topic = TOPIC_MESSAGE_ACK_TEST; |
| 333 | + String topic = TOPIC_MESSAGE_ACK_TEST; |
321 | 334 | final String subName = "test";
|
322 | 335 | @Cleanup
|
323 | 336 | Consumer<byte[]> consumer = pulsarClient
|
@@ -382,8 +395,46 @@ public void txnMessageAckTest() throws Exception {
|
382 | 395 |
|
383 | 396 | message = consumer.receive(2, TimeUnit.SECONDS);
|
384 | 397 | Assert.assertNull(message);
|
385 |
| - |
386 |
| - markDeletePositionCheck(topic, subName, true); |
| 398 | + for (int partition = 0; partition < TOPIC_PARTITION; partition ++) { |
| 399 | + topic = TopicName.get(topic).getPartition(partition).toString(); |
| 400 | + boolean exist = false; |
| 401 | + for (int i = 0; i < getPulsarServiceList().size(); i++) { |
| 402 | + |
| 403 | + Field field = BrokerService.class.getDeclaredField("topics"); |
| 404 | + field.setAccessible(true); |
| 405 | + ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics = |
| 406 | + (ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>) field |
| 407 | + .get(getPulsarServiceList().get(i).getBrokerService()); |
| 408 | + CompletableFuture<Optional<Topic>> topicFuture = topics.get(topic); |
| 409 | + |
| 410 | + if (topicFuture != null) { |
| 411 | + Optional<Topic> topicOptional = topicFuture.get(); |
| 412 | + if (topicOptional.isPresent()) { |
| 413 | + PersistentSubscription persistentSubscription = |
| 414 | + (PersistentSubscription) topicOptional.get().getSubscription(subName); |
| 415 | + Position markDeletePosition = persistentSubscription.getCursor().getMarkDeletedPosition(); |
| 416 | + Position lastConfirmedEntry = persistentSubscription.getCursor() |
| 417 | + .getManagedLedger().getLastConfirmedEntry(); |
| 418 | + exist = true; |
| 419 | + if (!markDeletePosition.equals(lastConfirmedEntry)) { |
| 420 | + //this because of the transaction commit marker have't delete |
| 421 | + //delete commit marker after ack position |
| 422 | + //when delete commit marker operation is processing, next delete operation will not do again |
| 423 | + //when delete commit marker operation finish, it can run next delete commit marker operation |
| 424 | + //so this test may not delete all the position in this manageLedger. |
| 425 | + Position markerPosition = ((ManagedLedgerImpl) persistentSubscription.getCursor() |
| 426 | + .getManagedLedger()).getNextValidPosition((PositionImpl) markDeletePosition); |
| 427 | + //marker is the lastConfirmedEntry, after commit the marker will only be write in |
| 428 | + if (!markerPosition.equals(lastConfirmedEntry)) { |
| 429 | + log.error("Mark delete position is not commit marker position!"); |
| 430 | + fail(); |
| 431 | + } |
| 432 | + } |
| 433 | + } |
| 434 | + } |
| 435 | + } |
| 436 | + assertTrue(exist); |
| 437 | + } |
387 | 438 |
|
388 | 439 | log.info("receive transaction messages count: {}", receiveCnt);
|
389 | 440 | }
|
|
0 commit comments