-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[improve][pip] PIP-406: Introduce pulsar_subscription_dispatch_thrott…
…led_msgs and bytes metrics
- Loading branch information
Showing
1 changed file
with
75 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
# PIP-406: Introduce pulsar_subscription_dispatch_throttled_msgs and bytes metrics | ||
|
||
# Background knowledge | ||
|
||
## Motivation | ||
|
||
Currently, users can monitor subscription backlogs using the `pulsar_subscription_back_log_no_delayed` metric. | ||
However, if [dispatch throttling](https://pulsar.apache.org/docs/next/concepts-throttling/) is configured at the broker/topic/subscription level, | ||
this metric may not accurately reflect whether the backlog is due to insufficient consumer capacity, as it could be caused by dispatch throttling. | ||
|
||
## Goals | ||
|
||
Introduce metrics to indicate the number of `messages/bytes throttled` for a subscription. This allows users to write PromQL queries to identify subscriptions with high backlogs but low or no throttling, pinpointing backlogs caused by insufficient consumer capacity. | ||
|
||
## In Scope | ||
- Introduce the metric `pulsar_subscription_dispatch_throttled_msgs` to represent the total number of messages throttled for a subscription. | ||
- Introduce the metric `pulsar_subscription_dispatch_throttled_bytes` to represent the total number of bytes throttled for a subscription. | ||
- Add `dispatchThrottledMsgs` and `dispatchThrottledBytes` fields to topic subscription stats. | ||
|
||
## Out of Scope | ||
- These states are not persistent and will reset upon subscription reconnection. | ||
|
||
# High Level Design | ||
1. Maintain `dispatchThrottledMsgs` and `dispatchThrottledBytes` in `AbstractBaseDispatcher`. Increase these values whenever the number of messages/bytes is reduced during `calculateToRead`. | ||
2. Output these fields when retrieving topic stats and metrics. | ||
|
||
|
||
# Detailed Design | ||
|
||
## Design & Implementation Details | ||
1. Maintain `dispatchThrottledMsgs` and `dispatchThrottledBytes` in `AbstractBaseDispatcher`: | ||
```java | ||
private final LongAdder dispatchThrottledMsgs = new LongAdder(); | ||
private final AtomicLong dispatchThrottledBytes = new AtomicLong(); | ||
``` | ||
|
||
2. During each [calculateToRead](https://github.com/apache/pulsar/blob/411f6973e85b0a6213e992386e1704f93d0aae42/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java#L371-L377), | ||
if the number of `messages/bytes` is reduced, increase these fields accordingly. | ||
|
||
- dispatchThrottledBytes may overflow in extreme cases, so reset this value before overflow: | ||
```diff | ||
protected Pair<Integer, Long> updateMessagesToRead(DispatchRateLimiter dispatchRateLimiter, | ||
int messagesToRead, long bytesToRead) { | ||
// update messagesToRead according to available dispatch rate limit. | ||
- return computeReadLimits(messagesToRead, | ||
+ Pair<Integer, Long> result = computeReadLimits(messagesToRead, | ||
(int) dispatchRateLimiter.getAvailableDispatchRateLimitOnMsg(), | ||
bytesToRead, dispatchRateLimiter.getAvailableDispatchRateLimitOnByte()); | ||
+ if (result.getLeft() < messagesToRead) { | ||
+ dispatchThrottledMsgs.add(messagesToRead - result.getLeft()); | ||
+ } | ||
+ if (result.getRight() < bytesToRead) { | ||
+ long increment = bytesToRead - result.getRight(); | ||
+ dispatchThrottledBytes.updateAndGet(current -> { | ||
+ // Check if adding the increment would cause an overflow | ||
+ if (Long.MAX_VALUE - current < increment) { | ||
+ return increment; | ||
+ } | ||
+ return current + increment; | ||
+ }); | ||
+ } | ||
+ return result; | ||
} | ||
``` | ||
|
||
## Public-facing Changes | ||
- None | ||
|
||
|
||
### Configuration | ||
- None | ||
|
||
# Backward & Forward Compatibility | ||
- Full Compatibility | ||
|