Skip to content

Commit 35a1676

Browse files
authoredFeb 25, 2025··
[improve] [broker] Make the estimated entry size more accurate (#23931)
1 parent 35c9fec commit 35a1676

File tree

7 files changed

+166
-50
lines changed

7 files changed

+166
-50
lines changed
 

‎managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

+42-16
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC;
2525
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES;
2626
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
27+
import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
2728
import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
2829
import com.google.common.annotations.VisibleForTesting;
2930
import com.google.common.base.MoreObjects;
@@ -3810,26 +3811,51 @@ public int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
38103811
if (maxSizeBytes == NO_MAX_SIZE_LIMIT) {
38113812
return maxEntries;
38123813
}
3814+
int maxEntriesBasedOnSize =
3815+
Long.valueOf(estimateEntryCountBySize(maxSizeBytes, readPosition, ledger)).intValue();
3816+
return Math.min(maxEntriesBasedOnSize, maxEntries);
3817+
}
38133818

3814-
double avgEntrySize = ledger.getStats().getEntrySizeAverage();
3815-
if (!Double.isFinite(avgEntrySize)) {
3816-
// We don't have yet any stats on the topic entries. Let's try to use the cursor avg size stats
3817-
avgEntrySize = (double) entriesReadSize / (double) entriesReadCount;
3818-
}
3819-
3820-
if (!Double.isFinite(avgEntrySize)) {
3821-
// If we still don't have any information, it means this is the first time we attempt reading
3822-
// and there are no writes. Let's start with 1 to avoid any overflow and start the avg stats
3823-
return 1;
3819+
static long estimateEntryCountBySize(long bytesSize, Position readPosition, ManagedLedgerImpl ml) {
3820+
Position posToRead = readPosition;
3821+
if (!ml.isValidPosition(readPosition)) {
3822+
posToRead = ml.getNextValidPosition(readPosition);
38243823
}
3824+
long result = 0;
3825+
long remainingBytesSize = bytesSize;
38253826

3826-
int maxEntriesBasedOnSize = (int) (maxSizeBytes / avgEntrySize);
3827-
if (maxEntriesBasedOnSize < 1) {
3828-
// We need to read at least one entry
3829-
return 1;
3827+
while (remainingBytesSize > 0) {
3828+
// Last ledger.
3829+
if (posToRead.getLedgerId() == ml.getCurrentLedger().getId()) {
3830+
if (ml.getCurrentLedgerSize() == 0 || ml.getCurrentLedgerEntries() == 0) {
3831+
// Only read 1 entry if no entries to read.
3832+
return 1;
3833+
}
3834+
long avg = Math.max(1, ml.getCurrentLedgerSize() / ml.getCurrentLedgerEntries())
3835+
+ BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
3836+
result += remainingBytesSize / avg;
3837+
break;
3838+
}
3839+
// Skip empty ledger.
3840+
LedgerInfo ledgerInfo = ml.getLedgersInfo().get(posToRead.getLedgerId());
3841+
if (ledgerInfo.getSize() == 0 || ledgerInfo.getEntries() == 0) {
3842+
posToRead = ml.getNextValidPosition(PositionFactory.create(posToRead.getLedgerId(), Long.MAX_VALUE));
3843+
continue;
3844+
}
3845+
// Calculate entries by average of ledgers.
3846+
long avg = Math.max(1, ledgerInfo.getSize() / ledgerInfo.getEntries()) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
3847+
long remainEntriesOfLedger = ledgerInfo.getEntries() - posToRead.getEntryId();
3848+
if (remainEntriesOfLedger * avg >= remainingBytesSize) {
3849+
result += remainingBytesSize / avg;
3850+
break;
3851+
} else {
3852+
// Calculate for the next ledger.
3853+
result += remainEntriesOfLedger;
3854+
remainingBytesSize -= remainEntriesOfLedger * avg;
3855+
posToRead = ml.getNextValidPosition(PositionFactory.create(posToRead.getLedgerId(), Long.MAX_VALUE));
3856+
}
38303857
}
3831-
3832-
return Math.min(maxEntriesBasedOnSize, maxEntries);
3858+
return Math.max(result, 1);
38333859
}
38343860

38353861
@Override

‎managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

+2
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
224224
private final CallbackMutex offloadMutex = new CallbackMutex();
225225
public static final CompletableFuture<Position> NULL_OFFLOAD_PROMISE = CompletableFuture
226226
.completedFuture(PositionFactory.LATEST);
227+
@VisibleForTesting
228+
@Getter
227229
protected volatile LedgerHandle currentLedger;
228230
protected volatile long currentLedgerEntries = 0;
229231
protected volatile long currentLedgerSize = 0;

‎managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ void asyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, Position
303303
doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition, numberOfEntries, shouldCacheEntry,
304304
originalCallback, ctx);
305305
} else {
306-
long estimatedEntrySize = getEstimatedEntrySize();
306+
long estimatedEntrySize = getEstimatedEntrySize(lh);
307307
long estimatedReadSize = numberOfEntries * estimatedEntrySize;
308308
if (log.isDebugEnabled()) {
309309
log.debug("Estimated read size: {} bytes for {} entries with {} estimated entry size",
@@ -419,12 +419,12 @@ void doAsyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, Positio
419419
}
420420

421421
@VisibleForTesting
422-
public long getEstimatedEntrySize() {
423-
long estimatedEntrySize = getAvgEntrySize();
424-
if (estimatedEntrySize == 0) {
425-
estimatedEntrySize = DEFAULT_ESTIMATED_ENTRY_SIZE;
422+
public long getEstimatedEntrySize(ReadHandle lh) {
423+
if (lh.getLength() == 0 || lh.getLastAddConfirmed() < 0) {
424+
// No entries stored.
425+
return Math.max(getAvgEntrySize(), DEFAULT_ESTIMATED_ENTRY_SIZE) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
426426
}
427-
return estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
427+
return Math.max(1, lh.getLength() / (lh.getLastAddConfirmed() + 1)) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
428428
}
429429

430430
private long getAvgEntrySize() {

‎managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java

+5-10
Original file line numberDiff line numberDiff line change
@@ -141,10 +141,9 @@ public void testPreciseLimitation(String missingCase) throws Exception {
141141
SimpleReadEntriesCallback cb0 = new SimpleReadEntriesCallback();
142142
entryCache.asyncReadEntry(spyCurrentLedger, 125, 125, true, cb0, ctx);
143143
cb0.entries.join();
144-
Long sizePerEntry1 = entryCache.getEstimatedEntrySize();
145-
Assert.assertEquals(sizePerEntry1, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
144+
int sizePerEntry = Long.valueOf(entryCache.getEstimatedEntrySize(ml.currentLedger)).intValue();
146145
Awaitility.await().untilAsserted(() -> {
147-
long remainingBytes =limiter.getRemainingBytes();
146+
long remainingBytes = limiter.getRemainingBytes();
148147
Assert.assertEquals(remainingBytes, totalCapacity);
149148
});
150149
log.info("remainingBytes 0: {}", limiter.getRemainingBytes());
@@ -165,7 +164,7 @@ public void testPreciseLimitation(String missingCase) throws Exception {
165164
entryCache.asyncReadEntry(spyCurrentLedger, start2, end2, true, cb2, ctx);
166165
}).start();
167166

168-
long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1 + readCount2, 1);
167+
long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1 + readCount2, sizePerEntry);
169168
long remainingBytesExpected1 = totalCapacity - bytesAcquired1;
170169
log.info("acquired : {}", bytesAcquired1);
171170
log.info("remainingBytesExpected 0 : {}", remainingBytesExpected1);
@@ -178,9 +177,7 @@ public void testPreciseLimitation(String missingCase) throws Exception {
178177
Thread.sleep(3000);
179178
readCompleteSignal1.countDown();
180179
cb1.entries.join();
181-
Long sizePerEntry2 = entryCache.getEstimatedEntrySize();
182-
Assert.assertEquals(sizePerEntry2, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
183-
long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2, 1);
180+
long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2, sizePerEntry);
184181
long remainingBytesExpected2 = totalCapacity - bytesAcquired2;
185182
log.info("acquired : {}", bytesAcquired2);
186183
log.info("remainingBytesExpected 1: {}", remainingBytesExpected2);
@@ -191,8 +188,6 @@ public void testPreciseLimitation(String missingCase) throws Exception {
191188

192189
readCompleteSignal2.countDown();
193190
cb2.entries.join();
194-
Long sizePerEntry3 = entryCache.getEstimatedEntrySize();
195-
Assert.assertEquals(sizePerEntry3, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
196191
Awaitility.await().untilAsserted(() -> {
197192
long remainingBytes = limiter.getRemainingBytes();
198193
log.info("remainingBytes 2: {}", remainingBytes);
@@ -204,7 +199,7 @@ public void testPreciseLimitation(String missingCase) throws Exception {
204199
}
205200

206201
private long calculateBytesSizeBeforeFirstReading(int entriesCount, int perEntrySize) {
207-
return entriesCount * (perEntrySize + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
202+
return entriesCount * perEntrySize;
208203
}
209204

210205
class SimpleReadEntriesCallback implements AsyncCallbacks.ReadEntriesCallback {

‎managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java

+90-8
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.bookkeeper.mledger.impl;
2020

21+
import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
2122
import static org.mockito.ArgumentMatchers.anyInt;
2223
import static org.mockito.Mockito.any;
2324
import static org.mockito.Mockito.doAnswer;
@@ -686,13 +687,15 @@ void testAsyncReadWithMaxSizeByte() throws Exception {
686687
ManagedCursor cursor = ledger.openCursor("c1");
687688

688689
for (int i = 0; i < 100; i++) {
689-
ledger.addEntry(new byte[1024]);
690+
ledger.addEntry(new byte[(int) (1024)]);
690691
}
691692

692-
// First time, since we don't have info, we'll get 1 single entry
693-
readAndCheck(cursor, 10, 3 * 1024, 1);
693+
// Since https://github.com/apache/pulsar/pull/23931 improved the performance of delivery, the consumer
694+
// will get more messages than before(it only receives 1 messages at the first delivery),
695+
int avg = (int) (BOOKKEEPER_READ_OVERHEAD_PER_ENTRY + 1024);
696+
readAndCheck(cursor, 10, 3 * avg, 3);
694697
// We should only return 3 entries, based on the max size
695-
readAndCheck(cursor, 20, 3 * 1024, 3);
698+
readAndCheck(cursor, 20, 3 * avg, 3);
696699
// If maxSize is < avg, we should get 1 entry
697700
readAndCheck(cursor, 10, 500, 1);
698701
}
@@ -3914,13 +3917,15 @@ public void testReadEntriesOrWaitWithMaxSize() throws Exception {
39143917
ledger.addEntry(new byte[1024]);
39153918
}
39163919

3917-
// First time, since we don't have info, we'll get 1 single entry
3918-
List<Entry> entries = c.readEntriesOrWait(10, 3 * 1024);
3919-
assertEquals(entries.size(), 1);
3920+
// Since https://github.com/apache/pulsar/pull/23931 improved the performance of delivery, the consumer
3921+
// will get more messages than before(it only receives 1 messages at the first delivery),
3922+
int avg = (int) (1024 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
3923+
List<Entry> entries = c.readEntriesOrWait(10, 3 * avg);
3924+
assertEquals(entries.size(), 3);
39203925
entries.forEach(Entry::release);
39213926

39223927
// We should only return 3 entries, based on the max size
3923-
entries = c.readEntriesOrWait(10, 3 * 1024);
3928+
entries = c.readEntriesOrWait(10, 3 * avg);
39243929
assertEquals(entries.size(), 3);
39253930
entries.forEach(Entry::release);
39263931

@@ -5164,6 +5169,83 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
51645169
assertEquals(positionRef4.get(), position4);
51655170
}
51665171

5172+
@Test
5173+
public void testEstimateEntryCountBySize() throws Exception {
5174+
final String mlName = "ml-" + UUID.randomUUID().toString().replaceAll("-", "");
5175+
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName);
5176+
long entryCount0 =
5177+
ManagedCursorImpl.estimateEntryCountBySize(16, PositionFactory.create(ml.getCurrentLedger().getId(), 0), ml);
5178+
assertEquals(entryCount0, 1);
5179+
// Avoid trimming ledgers.
5180+
ml.openCursor("c1");
5181+
5182+
// Build data.
5183+
for (int i = 0; i < 100; i++) {
5184+
ml.addEntry(new byte[]{1});
5185+
}
5186+
long ledger1 = ml.getCurrentLedger().getId();
5187+
ml.getCurrentLedger().close();
5188+
ml.ledgerClosed(ml.getCurrentLedger());
5189+
for (int i = 0; i < 100; i++) {
5190+
ml.addEntry(new byte[]{1, 2});
5191+
}
5192+
long ledger2 = ml.getCurrentLedger().getId();
5193+
ml.getCurrentLedger().close();
5194+
ml.ledgerClosed(ml.getCurrentLedger());
5195+
for (int i = 0; i < 100; i++) {
5196+
ml.addEntry(new byte[]{1, 2, 3, 4});
5197+
}
5198+
long ledger3 = ml.getCurrentLedger().getId();
5199+
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo1 = ml.getLedgersInfo().get(ledger1);
5200+
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo2 = ml.getLedgersInfo().get(ledger2);
5201+
long average1 = ledgerInfo1.getSize() / ledgerInfo1.getEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
5202+
long average2 = ledgerInfo2.getSize() / ledgerInfo2.getEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
5203+
long average3 = ml.getCurrentLedgerSize() / ml.getCurrentLedgerEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
5204+
assertEquals(average1, 1 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
5205+
assertEquals(average2, 2 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
5206+
assertEquals(average3, 4 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
5207+
5208+
// Test: the individual ledgers.
5209+
long entryCount1 =
5210+
ManagedCursorImpl.estimateEntryCountBySize(average1 * 16, PositionFactory.create(ledger1, 0), ml);
5211+
assertEquals(entryCount1, 16);
5212+
long entryCount2 =
5213+
ManagedCursorImpl.estimateEntryCountBySize(average2 * 8, PositionFactory.create(ledger2, 0), ml);
5214+
assertEquals(entryCount2, 8);
5215+
long entryCount3 =
5216+
ManagedCursorImpl.estimateEntryCountBySize(average3 * 4, PositionFactory.create(ledger3, 0), ml);
5217+
assertEquals(entryCount3, 4);
5218+
5219+
// Test: across ledgers.
5220+
long entryCount4 =
5221+
ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 8), PositionFactory.create(ledger1, 0), ml);
5222+
assertEquals(entryCount4, 108);
5223+
long entryCount5 =
5224+
ManagedCursorImpl.estimateEntryCountBySize((average2 * 100) + (average3 * 4), PositionFactory.create(ledger2, 0), ml);
5225+
assertEquals(entryCount5, 104);
5226+
long entryCount6 =
5227+
ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 100) + (average3 * 4), PositionFactory.create(ledger1, 0), ml);
5228+
assertEquals(entryCount6, 204);
5229+
5230+
long entryCount7 =
5231+
ManagedCursorImpl.estimateEntryCountBySize((average1 * 20) + (average2 * 8), PositionFactory.create(ledger1, 80), ml);
5232+
assertEquals(entryCount7, 28);
5233+
long entryCount8 =
5234+
ManagedCursorImpl.estimateEntryCountBySize((average2 * 20) + (average3 * 4), PositionFactory.create(ledger2, 80), ml);
5235+
assertEquals(entryCount8, 24);
5236+
long entryCount9 =
5237+
ManagedCursorImpl.estimateEntryCountBySize((average1 * 20) + (average2 * 100) + (average3 * 4), PositionFactory.create(ledger1, 80), ml);
5238+
assertEquals(entryCount9, 124);
5239+
5240+
// Test: read more than entries written.
5241+
long entryCount10 =
5242+
ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 100) + (average3 * 100) + (average3 * 4) , PositionFactory.create(ledger1, 0), ml);
5243+
assertEquals(entryCount10, 304);
5244+
5245+
// cleanup.
5246+
ml.delete();
5247+
}
5248+
51675249
@Test
51685250
void testForceCursorRecovery() throws Exception {
51695251
TestPulsarMockBookKeeper bk = new TestPulsarMockBookKeeper(executor);

‎pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public void testBatchMessageAck() {
8585
.newConsumer()
8686
.topic(topicName)
8787
.subscriptionName(subscriptionName)
88-
.receiverQueueSize(10)
88+
.receiverQueueSize(50)
8989
.subscriptionType(SubscriptionType.Shared)
9090
.enableBatchIndexAcknowledgment(true)
9191
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
@@ -114,27 +114,29 @@ public void testBatchMessageAck() {
114114
consumer.acknowledge(receive1);
115115
consumer.acknowledge(receive2);
116116
Awaitility.await().untilAsserted(() -> {
117-
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 18);
117+
// Since https://github.com/apache/pulsar/pull/23931 improved the mechanism of estimate average entry size,
118+
// broker will deliver much messages than before. So edit 18 -> 38 here.
119+
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 38);
118120
});
119121
Message<byte[]> receive3 = consumer.receive();
120122
Message<byte[]> receive4 = consumer.receive();
121123
consumer.acknowledge(receive3);
122124
consumer.acknowledge(receive4);
123125
Awaitility.await().untilAsserted(() -> {
124-
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16);
126+
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 36);
125127
});
126128
// Block cmd-flow send until verify finish. see: https://github.com/apache/pulsar/pull/17436.
127129
consumer.pause();
128130
Message<byte[]> receive5 = consumer.receive();
129131
consumer.negativeAcknowledge(receive5);
130132
Awaitility.await().pollInterval(1, TimeUnit.MILLISECONDS).untilAsserted(() -> {
131-
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 0);
133+
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 20);
132134
});
133135
// Unblock cmd-flow.
134136
consumer.resume();
135137
consumer.receive();
136138
Awaitility.await().untilAsserted(() -> {
137-
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16);
139+
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 36);
138140
});
139141
}
140142

‎pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java

+14-5
Original file line numberDiff line numberDiff line change
@@ -445,8 +445,13 @@ public void testAvgMessagesPerEntry() throws Exception {
445445
.batchingMaxPublishDelay(5, TimeUnit.SECONDS)
446446
.batchingMaxBytes(Integer.MAX_VALUE)
447447
.create();
448-
449-
producer.send("first-message");
448+
// The first messages deliver: 20 msgs.
449+
// Average of "messages per batch" is "1".
450+
for (int i = 0; i < 20; i++) {
451+
producer.send("first-message");
452+
}
453+
// The second messages deliver: 20 msgs.
454+
// Average of "messages per batch" is "Math.round(1 * 0.9 + 20 * 0.1) = 2.9 ~ 3".
450455
List<CompletableFuture<MessageId>> futures = new ArrayList<>();
451456
for (int i = 0; i < 20; i++) {
452457
futures.add(producer.sendAsync("message"));
@@ -480,6 +485,7 @@ public void testAvgMessagesPerEntry() throws Exception {
480485
metadataConsumer.put("matchValueReschedule", "producer2");
481486
@Cleanup
482487
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).properties(metadataConsumer)
488+
.receiverQueueSize(20)
483489
.subscriptionName(subName).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
484490

485491
int counter = 0;
@@ -494,14 +500,17 @@ public void testAvgMessagesPerEntry() throws Exception {
494500
}
495501
}
496502

497-
assertEquals(21, counter);
503+
assertEquals(40, counter);
498504

499505
ConsumerStats consumerStats =
500506
admin.topics().getStats(topic).getSubscriptions().get(subName).getConsumers().get(0);
501507

502-
assertEquals(21, consumerStats.getMsgOutCounter());
508+
assertEquals(40, consumerStats.getMsgOutCounter());
503509

504-
// Math.round(1 * 0.9 + 0.1 * (20 / 1))
510+
// The first messages deliver: 20 msgs.
511+
// Average of "messages per batch" is "1".
512+
// The second messages deliver: 20 msgs.
513+
// Average of "messages per batch" is "Math.round(1 * 0.9 + 20 * 0.1) = 2.9 ~ 3".
505514
int avgMessagesPerEntry = consumerStats.getAvgMessagesPerEntry();
506515
assertEquals(3, avgMessagesPerEntry);
507516
}

0 commit comments

Comments
 (0)
Please sign in to comment.