Skip to content

Commit

Permalink
[fix][broker] Fix incorrect blockedConsumerOnUnackedMsgs value when m…
Browse files Browse the repository at this point in the history
…axUnackedMessagesPerConsumer is 1 (apache#23796)

(cherry picked from commit 5443c69)
(cherry picked from commit b098772)
  • Loading branch information
summeriiii authored and srinath-ctds committed Feb 24, 2025
1 parent 314ecfe commit c915c9d
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,7 @@ private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String,
ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position, ackOwnerConsumer);
if (checkCanRemovePendingAcksAndHandle(ackOwnerConsumer, position, msgId)) {
addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
updateBlockedConsumerOnUnackedMsgs(ackOwnerConsumer);
}
}

Expand Down Expand Up @@ -990,6 +991,11 @@ private boolean removePendingAcks(Consumer ackOwnedConsumer, PositionImpl positi
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received ack {}", topicName, subscription, consumerId, position);
}
updateBlockedConsumerOnUnackedMsgs(ackOwnedConsumer);
return true;
}

public void updateBlockedConsumerOnUnackedMsgs(Consumer ackOwnedConsumer) {
// unblock consumer-throttling when limit check is disabled or receives half of maxUnackedMessages =>
// consumer can start again consuming messages
int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer);
Expand All @@ -999,7 +1005,6 @@ private boolean removePendingAcks(Consumer ackOwnedConsumer, PositionImpl positi
ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
flowConsumerBlockedPermits(ackOwnedConsumer);
}
return true;
}

public ConcurrentLongLongPairHashMap getPendingAcks() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1664,6 +1664,107 @@ public void testDynamicConfigurationsForceDeleteTenantAllowed() throws Exception
});
}

@Test
public void testIsSystemTopicAllowAutoTopicCreationAsync() throws Exception {
BrokerService brokerService = pulsar.getBrokerService();
assertFalse(brokerService.isAllowAutoTopicCreationAsync(
ServiceUnitStateChannelImpl.TOPIC).get());
assertTrue(brokerService.isAllowAutoTopicCreationAsync(
"persistent://pulsar/system/my-system-topic").get());
}

@Test
public void testDuplicateAcknowledgement() throws Exception {
final String ns = "prop/ns-test";

admin.namespaces().createNamespace(ns, 2);
final String topicName = "persistent://prop/ns-test/duplicated-acknowledgement-test";
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.create();
@Cleanup
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName("sub-1")
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.isAckReceiptEnabled(true)
.subscribe();
producer.send("1".getBytes(StandardCharsets.UTF_8));
Message<byte[]> message = consumer1.receive();
consumer1.acknowledge(message);
consumer1.acknowledge(message);
assertEquals(admin.topics().getStats(topicName).getSubscriptions()
.get("sub-1").getUnackedMessages(), 0);
}

@Test
public void testBlockedConsumerOnUnackedMsgs() throws Exception {
final String ns = "prop/ns-test";
admin.namespaces().createNamespace(ns, 2);
admin.namespaces().setMaxUnackedMessagesPerConsumer(ns, 1);

final String topicName = "persistent://prop/ns-test/testBlockedConsumerOnUnackedMsgs";
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.create();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName("sub-test")
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.isAckReceiptEnabled(true)
.receiverQueueSize(0)
.subscribe();

producer.send("1".getBytes(StandardCharsets.UTF_8));
producer.send("2".getBytes(StandardCharsets.UTF_8));

// 1. receive message
Message<byte[]> message = consumer.receive();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);

SubscriptionStats subscriptionStats = admin.topics().getStats(topicName).getSubscriptions().get("sub-test");
assertEquals(subscriptionStats.getUnackedMessages(), 1);
assertTrue(subscriptionStats.getConsumers().get(0).isBlockedConsumerOnUnackedMsgs());

// 2、ack this message
consumer.acknowledge(message);
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);

subscriptionStats = admin.topics().getStats(topicName).getSubscriptions().get("sub-test");
assertEquals(subscriptionStats.getUnackedMessages(), 0);
assertFalse(subscriptionStats.getConsumers().get(0).isBlockedConsumerOnUnackedMsgs());
}

@Test
public void testUnsubscribeNonDurableSub() throws Exception {
final String ns = "prop/ns-test";
final String topic = ns + "/testUnsubscribeNonDurableSub";

admin.namespaces().createNamespace(ns, 2);
admin.topics().createPartitionedTopic(String.format("persistent://%s", topic), 1);

pulsarClient.newProducer(Schema.STRING).topic(topic).create().close();
@Cleanup
Consumer<String> consumer = pulsarClient
.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionMode(SubscriptionMode.NonDurable)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionName("sub1")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
try {
consumer.unsubscribe();
} catch (Exception ex) {
fail("Unsubscribe failed");
}
}

// this test is disabled since it is flaky
@Test(enabled = false)
public void testMetricsPersistentTopicLoadFails() throws Exception {
Expand Down

0 comments on commit c915c9d

Please sign in to comment.