From 940e76800d9ca0d80f5c9afe813b38d45761166c Mon Sep 17 00:00:00 2001 From: Jiawen Wang <74594733+summeriiii@users.noreply.github.com> Date: Fri, 14 Feb 2025 17:56:45 +0800 Subject: [PATCH] [fix][broker] Fix incorrect blockedConsumerOnUnackedMsgs value when maxUnackedMessagesPerConsumer is 1 (#23796) (cherry picked from commit 5443c69d84818cb4a49704f7ab7dbccf65b2179a) (cherry picked from commit d6a255489a386b307b64dfaeba67ebed4562b920) --- .../pulsar/broker/service/Consumer.java | 7 +++- .../broker/service/BrokerServiceTest.java | 41 +++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 37ab5c21993d7..7247a8978a829 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -612,6 +612,7 @@ private CompletableFuture individualAckNormal(CommandAck ack, Map // consumer can start again consuming messages int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer); @@ -1107,7 +1113,6 @@ private boolean removePendingAcks(Consumer ackOwnedConsumer, Position position) ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false; flowConsumerBlockedPermits(ackOwnedConsumer); } - return true; } public PendingAcksMap getPendingAcks() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 89727014be99e..fa76fdd5bf45c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -1790,6 +1790,47 @@ public void testDuplicateAcknowledgement() throws Exception { .get("sub-1").getUnackedMessages(), 0); } + @Test + public void testBlockedConsumerOnUnackedMsgs() throws Exception { + final String ns = "prop/ns-test"; + admin.namespaces().createNamespace(ns, 2); + admin.namespaces().setMaxUnackedMessagesPerConsumer(ns, 1); + + final String topicName = "persistent://prop/ns-test/testBlockedConsumerOnUnackedMsgs"; + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(topicName) + .create(); + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName("sub-test") + .acknowledgmentGroupTime(0, TimeUnit.SECONDS) + .subscriptionType(SubscriptionType.Shared) + .isAckReceiptEnabled(true) + .receiverQueueSize(0) + .subscribe(); + + producer.send("1".getBytes(StandardCharsets.UTF_8)); + producer.send("2".getBytes(StandardCharsets.UTF_8)); + + // 1. receive message + Message message = consumer.receive(); + Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); + + SubscriptionStats subscriptionStats = admin.topics().getStats(topicName).getSubscriptions().get("sub-test"); + assertEquals(subscriptionStats.getUnackedMessages(), 1); + assertTrue(subscriptionStats.getConsumers().get(0).isBlockedConsumerOnUnackedMsgs()); + + // 2、ack this message + consumer.acknowledge(message); + Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); + + subscriptionStats = admin.topics().getStats(topicName).getSubscriptions().get("sub-test"); + assertEquals(subscriptionStats.getUnackedMessages(), 0); + assertFalse(subscriptionStats.getConsumers().get(0).isBlockedConsumerOnUnackedMsgs()); + } + @Test public void testUnsubscribeNonDurableSub() throws Exception { final String ns = "prop/ns-test";