Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Optimize delayed message delivery by reducing unnecessary reads to storage in InMemoryDelayedDeliveryTracker #23912

Open
1 of 2 tasks
lhotari opened this issue Jan 30, 2025 · 0 comments
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages

Comments

@lhotari
Copy link
Member

lhotari commented Jan 30, 2025

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

In the current delayed message delivery, there's an opportunity to reduce unnecessary reads to storage.

In Pulsar, there are 2 implementations for the delayed delivery tracker, InMemoryDelayedDeliveryTracker and BucketDelayedDeliveryTracker.
This is configured by the delayedDeliveryTrackerFactoryClassName configuration key. The default setting chooses the in memory implementation:

pulsar/conf/broker.conf

Lines 614 to 617 in b02d52c

# Class name of the factory that implements the delayed deliver tracker.
# If value is "org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory",
# will create bucket based delayed message index tracker.
delayedDeliveryTrackerFactoryClassName=org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory

The BucketDelayedDeliveryTracker contains an optimization to skip messages in reading which have been "indexed":

protected Predicate<Position> createReadEntriesSkipConditionForNormalRead() {
Predicate<Position> skipCondition = null;
// Filter out and skip read delayed messages exist in DelayedDeliveryTracker
if (delayedDeliveryTracker.isPresent()) {
final DelayedDeliveryTracker deliveryTracker = delayedDeliveryTracker.get();
if (deliveryTracker instanceof BucketDelayedDeliveryTracker) {
skipCondition = position -> ((BucketDelayedDeliveryTracker) deliveryTracker)
.containsMessage(position.getLedgerId(), position.getEntryId());
}
}
return skipCondition;
}

This already reduces reads when BucketDelayedDeliveryTracker is in use.

The state of the InMemoryDelayedDeliveryTracker gets cleared after all consumers have disconnected:

if (consumerList.isEmpty()) {
if (havePendingRead || havePendingReplayRead) {
// There is a pending read from previous run. We must wait for it to complete and then rewind
shouldRewindBeforeReadingOrReplaying = true;
} else {
cursor.rewind();
shouldRewindBeforeReadingOrReplaying = false;
}
redeliveryMessages.clear();
delayedDeliveryTracker.ifPresent(tracker -> {
// Don't clean up BucketDelayedDeliveryTracker, otherwise we will lose the bucket snapshot
if (tracker instanceof InMemoryDelayedDeliveryTracker) {
tracker.clear();
}
});
}

Solution

It would be useful to keep state also in the InMemoryDelayedDeliveryTracker and skip reading delayed messages when the information is already available for the delivery time of a specific entry.

Alternatives

No response

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@lhotari lhotari added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Jan 30, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

No branches or pull requests

1 participant