Skip to content

Commit ca3f6c1

Browse files
committed
Revert "[improve] [broker] Make the estimated entry size more accurate (apache#23931)"
This reverts commit 42aab41.
1 parent 261ead1 commit ca3f6c1

File tree

7 files changed

+50
-166
lines changed

7 files changed

+50
-166
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

+16-42
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC;
2525
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES;
2626
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
27-
import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
2827
import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
2928
import com.google.common.annotations.VisibleForTesting;
3029
import com.google.common.base.MoreObjects;
@@ -3711,51 +3710,26 @@ public int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
37113710
if (maxSizeBytes == NO_MAX_SIZE_LIMIT) {
37123711
return maxEntries;
37133712
}
3714-
int maxEntriesBasedOnSize =
3715-
Long.valueOf(estimateEntryCountBySize(maxSizeBytes, readPosition, ledger)).intValue();
3716-
return Math.min(maxEntriesBasedOnSize, maxEntries);
3717-
}
37183713

3719-
static long estimateEntryCountBySize(long bytesSize, PositionImpl readPosition, ManagedLedgerImpl ml) {
3720-
Position posToRead = readPosition;
3721-
if (!ml.isValidPosition(readPosition)) {
3722-
posToRead = ml.getNextValidPosition(readPosition);
3714+
double avgEntrySize = ledger.getStats().getEntrySizeAverage();
3715+
if (!Double.isFinite(avgEntrySize)) {
3716+
// We don't have yet any stats on the topic entries. Let's try to use the cursor avg size stats
3717+
avgEntrySize = (double) entriesReadSize / (double) entriesReadCount;
37233718
}
3724-
long result = 0;
3725-
long remainingBytesSize = bytesSize;
37263719

3727-
while (remainingBytesSize > 0) {
3728-
// Last ledger.
3729-
if (posToRead.getLedgerId() == ml.getCurrentLedger().getId()) {
3730-
if (ml.getCurrentLedgerSize() == 0 || ml.getCurrentLedgerEntries() == 0) {
3731-
// Only read 1 entry if no entries to read.
3732-
return 1;
3733-
}
3734-
long avg = Math.max(1, ml.getCurrentLedgerSize() / ml.getCurrentLedgerEntries())
3735-
+ BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
3736-
result += remainingBytesSize / avg;
3737-
break;
3738-
}
3739-
// Skip empty ledger.
3740-
LedgerInfo ledgerInfo = ml.getLedgersInfo().get(posToRead.getLedgerId());
3741-
if (ledgerInfo.getSize() == 0 || ledgerInfo.getEntries() == 0) {
3742-
posToRead = ml.getNextValidPosition(PositionImpl.get(posToRead.getLedgerId(), Long.MAX_VALUE));
3743-
continue;
3744-
}
3745-
// Calculate entries by average of ledgers.
3746-
long avg = Math.max(1, ledgerInfo.getSize() / ledgerInfo.getEntries()) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
3747-
long remainEntriesOfLedger = ledgerInfo.getEntries() - posToRead.getEntryId();
3748-
if (remainEntriesOfLedger * avg >= remainingBytesSize) {
3749-
result += remainingBytesSize / avg;
3750-
break;
3751-
} else {
3752-
// Calculate for the next ledger.
3753-
result += remainEntriesOfLedger;
3754-
remainingBytesSize -= remainEntriesOfLedger * avg;
3755-
posToRead = ml.getNextValidPosition(PositionImpl.get(posToRead.getLedgerId(), Long.MAX_VALUE));
3756-
}
3720+
if (!Double.isFinite(avgEntrySize)) {
3721+
// If we still don't have any information, it means this is the first time we attempt reading
3722+
// and there are no writes. Let's start with 1 to avoid any overflow and start the avg stats
3723+
return 1;
3724+
}
3725+
3726+
int maxEntriesBasedOnSize = (int) (maxSizeBytes / avgEntrySize);
3727+
if (maxEntriesBasedOnSize < 1) {
3728+
// We need to read at least one entry
3729+
return 1;
37573730
}
3758-
return Math.max(result, 1);
3731+
3732+
return Math.min(maxEntriesBasedOnSize, maxEntries);
37593733
}
37603734

37613735
@Override

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

-2
Original file line numberDiff line numberDiff line change
@@ -218,8 +218,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
218218
private final CallbackMutex offloadMutex = new CallbackMutex();
219219
private static final CompletableFuture<PositionImpl> NULL_OFFLOAD_PROMISE = CompletableFuture
220220
.completedFuture(PositionImpl.LATEST);
221-
@VisibleForTesting
222-
@Getter
223221
protected volatile LedgerHandle currentLedger;
224222
protected volatile long currentLedgerEntries = 0;
225223
protected volatile long currentLedgerSize = 0;

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ void asyncReadEntriesByPosition(ReadHandle lh, PositionImpl firstPosition, Posit
302302
doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition, numberOfEntries, shouldCacheEntry,
303303
originalCallback, ctx);
304304
} else {
305-
long estimatedEntrySize = getEstimatedEntrySize(lh);
305+
long estimatedEntrySize = getEstimatedEntrySize();
306306
long estimatedReadSize = numberOfEntries * estimatedEntrySize;
307307
if (log.isDebugEnabled()) {
308308
log.debug("Estimated read size: {} bytes for {} entries with {} estimated entry size",
@@ -418,12 +418,12 @@ void doAsyncReadEntriesByPosition(ReadHandle lh, PositionImpl firstPosition, Pos
418418
}
419419

420420
@VisibleForTesting
421-
public long getEstimatedEntrySize(ReadHandle lh) {
422-
if (lh.getLength() == 0 || lh.getLastAddConfirmed() < 0) {
423-
// No entries stored.
424-
return Math.max(getAvgEntrySize(), DEFAULT_ESTIMATED_ENTRY_SIZE) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
421+
public long getEstimatedEntrySize() {
422+
long estimatedEntrySize = getAvgEntrySize();
423+
if (estimatedEntrySize == 0) {
424+
estimatedEntrySize = DEFAULT_ESTIMATED_ENTRY_SIZE;
425425
}
426-
return Math.max(1, lh.getLength() / (lh.getLastAddConfirmed() + 1)) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
426+
return estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
427427
}
428428

429429
private long getAvgEntrySize() {

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java

+10-5
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,10 @@ public void testPreciseLimitation(String missingCase) throws Exception {
141141
SimpleReadEntriesCallback cb0 = new SimpleReadEntriesCallback();
142142
entryCache.asyncReadEntry(spyCurrentLedger, 125, 125, true, cb0, ctx);
143143
cb0.entries.join();
144-
int sizePerEntry = Long.valueOf(entryCache.getEstimatedEntrySize(ml.currentLedger)).intValue();
144+
Long sizePerEntry1 = entryCache.getEstimatedEntrySize();
145+
Assert.assertEquals(sizePerEntry1, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
145146
Awaitility.await().untilAsserted(() -> {
146-
long remainingBytes = limiter.getRemainingBytes();
147+
long remainingBytes =limiter.getRemainingBytes();
147148
Assert.assertEquals(remainingBytes, totalCapacity);
148149
});
149150
log.info("remainingBytes 0: {}", limiter.getRemainingBytes());
@@ -164,7 +165,7 @@ public void testPreciseLimitation(String missingCase) throws Exception {
164165
entryCache.asyncReadEntry(spyCurrentLedger, start2, end2, true, cb2, ctx);
165166
}).start();
166167

167-
long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1 + readCount2, sizePerEntry);
168+
long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1 + readCount2, 1);
168169
long remainingBytesExpected1 = totalCapacity - bytesAcquired1;
169170
log.info("acquired : {}", bytesAcquired1);
170171
log.info("remainingBytesExpected 0 : {}", remainingBytesExpected1);
@@ -177,7 +178,9 @@ public void testPreciseLimitation(String missingCase) throws Exception {
177178
Thread.sleep(3000);
178179
readCompleteSignal1.countDown();
179180
cb1.entries.join();
180-
long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2, sizePerEntry);
181+
Long sizePerEntry2 = entryCache.getEstimatedEntrySize();
182+
Assert.assertEquals(sizePerEntry2, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
183+
long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2, 1);
181184
long remainingBytesExpected2 = totalCapacity - bytesAcquired2;
182185
log.info("acquired : {}", bytesAcquired2);
183186
log.info("remainingBytesExpected 1: {}", remainingBytesExpected2);
@@ -188,6 +191,8 @@ public void testPreciseLimitation(String missingCase) throws Exception {
188191

189192
readCompleteSignal2.countDown();
190193
cb2.entries.join();
194+
Long sizePerEntry3 = entryCache.getEstimatedEntrySize();
195+
Assert.assertEquals(sizePerEntry3, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
191196
Awaitility.await().untilAsserted(() -> {
192197
long remainingBytes = limiter.getRemainingBytes();
193198
log.info("remainingBytes 2: {}", remainingBytes);
@@ -199,7 +204,7 @@ public void testPreciseLimitation(String missingCase) throws Exception {
199204
}
200205

201206
private long calculateBytesSizeBeforeFirstReading(int entriesCount, int perEntrySize) {
202-
return entriesCount * perEntrySize;
207+
return entriesCount * (perEntrySize + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
203208
}
204209

205210
class SimpleReadEntriesCallback implements AsyncCallbacks.ReadEntriesCallback {

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java

+8-90
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.apache.bookkeeper.mledger.impl;
2020

21-
import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
2221
import static org.mockito.ArgumentMatchers.anyInt;
2322
import static org.mockito.Mockito.any;
2423
import static org.mockito.Mockito.doAnswer;
@@ -682,15 +681,13 @@ void testAsyncReadWithMaxSizeByte() throws Exception {
682681
ManagedCursor cursor = ledger.openCursor("c1");
683682

684683
for (int i = 0; i < 100; i++) {
685-
ledger.addEntry(new byte[(int) (1024)]);
684+
ledger.addEntry(new byte[1024]);
686685
}
687686

688-
// Since https://github.com/apache/pulsar/pull/23931 improved the performance of delivery, the consumer
689-
// will get more messages than before(it only receives 1 messages at the first delivery),
690-
int avg = (int) (BOOKKEEPER_READ_OVERHEAD_PER_ENTRY + 1024);
691-
readAndCheck(cursor, 10, 3 * avg, 3);
687+
// First time, since we don't have info, we'll get 1 single entry
688+
readAndCheck(cursor, 10, 3 * 1024, 1);
692689
// We should only return 3 entries, based on the max size
693-
readAndCheck(cursor, 20, 3 * avg, 3);
690+
readAndCheck(cursor, 20, 3 * 1024, 3);
694691
// If maxSize is < avg, we should get 1 entry
695692
readAndCheck(cursor, 10, 500, 1);
696693
}
@@ -3888,15 +3885,13 @@ public void testReadEntriesOrWaitWithMaxSize() throws Exception {
38883885
ledger.addEntry(new byte[1024]);
38893886
}
38903887

3891-
// Since https://github.com/apache/pulsar/pull/23931 improved the performance of delivery, the consumer
3892-
// will get more messages than before(it only receives 1 messages at the first delivery),
3893-
int avg = (int) (1024 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
3894-
List<Entry> entries = c.readEntriesOrWait(10, 3 * avg);
3895-
assertEquals(entries.size(), 3);
3888+
// First time, since we don't have info, we'll get 1 single entry
3889+
List<Entry> entries = c.readEntriesOrWait(10, 3 * 1024);
3890+
assertEquals(entries.size(), 1);
38963891
entries.forEach(Entry::release);
38973892

38983893
// We should only return 3 entries, based on the max size
3899-
entries = c.readEntriesOrWait(10, 3 * avg);
3894+
entries = c.readEntriesOrWait(10, 3 * 1024);
39003895
assertEquals(entries.size(), 3);
39013896
entries.forEach(Entry::release);
39023897

@@ -4803,82 +4798,5 @@ public void operationFailed(ManagedLedgerException exception) {
48034798
assertEquals(cursor.getReadPosition(), markDeletedPosition.getNext());
48044799
}
48054800

4806-
@Test
4807-
public void testEstimateEntryCountBySize() throws Exception {
4808-
final String mlName = "ml-" + UUID.randomUUID().toString().replaceAll("-", "");
4809-
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName);
4810-
long entryCount0 =
4811-
ManagedCursorImpl.estimateEntryCountBySize(16, PositionImpl.get(ml.getCurrentLedger().getId(), 0), ml);
4812-
assertEquals(entryCount0, 1);
4813-
// Avoid trimming ledgers.
4814-
ml.openCursor("c1");
4815-
4816-
// Build data.
4817-
for (int i = 0; i < 100; i++) {
4818-
ml.addEntry(new byte[]{1});
4819-
}
4820-
long ledger1 = ml.getCurrentLedger().getId();
4821-
ml.getCurrentLedger().close();
4822-
ml.ledgerClosed(ml.getCurrentLedger());
4823-
for (int i = 0; i < 100; i++) {
4824-
ml.addEntry(new byte[]{1, 2});
4825-
}
4826-
long ledger2 = ml.getCurrentLedger().getId();
4827-
ml.getCurrentLedger().close();
4828-
ml.ledgerClosed(ml.getCurrentLedger());
4829-
for (int i = 0; i < 100; i++) {
4830-
ml.addEntry(new byte[]{1, 2, 3, 4});
4831-
}
4832-
long ledger3 = ml.getCurrentLedger().getId();
4833-
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo1 = ml.getLedgersInfo().get(ledger1);
4834-
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo2 = ml.getLedgersInfo().get(ledger2);
4835-
long average1 = ledgerInfo1.getSize() / ledgerInfo1.getEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
4836-
long average2 = ledgerInfo2.getSize() / ledgerInfo2.getEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
4837-
long average3 = ml.getCurrentLedgerSize() / ml.getCurrentLedgerEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
4838-
assertEquals(average1, 1 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
4839-
assertEquals(average2, 2 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
4840-
assertEquals(average3, 4 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
4841-
4842-
// Test: the individual ledgers.
4843-
long entryCount1 =
4844-
ManagedCursorImpl.estimateEntryCountBySize(average1 * 16, PositionImpl.get(ledger1, 0), ml);
4845-
assertEquals(entryCount1, 16);
4846-
long entryCount2 =
4847-
ManagedCursorImpl.estimateEntryCountBySize(average2 * 8, PositionImpl.get(ledger2, 0), ml);
4848-
assertEquals(entryCount2, 8);
4849-
long entryCount3 =
4850-
ManagedCursorImpl.estimateEntryCountBySize(average3 * 4, PositionImpl.get(ledger3, 0), ml);
4851-
assertEquals(entryCount3, 4);
4852-
4853-
// Test: across ledgers.
4854-
long entryCount4 =
4855-
ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 8), PositionImpl.get(ledger1, 0), ml);
4856-
assertEquals(entryCount4, 108);
4857-
long entryCount5 =
4858-
ManagedCursorImpl.estimateEntryCountBySize((average2 * 100) + (average3 * 4), PositionImpl.get(ledger2, 0), ml);
4859-
assertEquals(entryCount5, 104);
4860-
long entryCount6 =
4861-
ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 100) + (average3 * 4), PositionImpl.get(ledger1, 0), ml);
4862-
assertEquals(entryCount6, 204);
4863-
4864-
long entryCount7 =
4865-
ManagedCursorImpl.estimateEntryCountBySize((average1 * 20) + (average2 * 8), PositionImpl.get(ledger1, 80), ml);
4866-
assertEquals(entryCount7, 28);
4867-
long entryCount8 =
4868-
ManagedCursorImpl.estimateEntryCountBySize((average2 * 20) + (average3 * 4), PositionImpl.get(ledger2, 80), ml);
4869-
assertEquals(entryCount8, 24);
4870-
long entryCount9 =
4871-
ManagedCursorImpl.estimateEntryCountBySize((average1 * 20) + (average2 * 100) + (average3 * 4), PositionImpl.get(ledger1, 80), ml);
4872-
assertEquals(entryCount9, 124);
4873-
4874-
// Test: read more than entries written.
4875-
long entryCount10 =
4876-
ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 100) + (average3 * 100) + (average3 * 4) , PositionImpl.get(ledger1, 0), ml);
4877-
assertEquals(entryCount10, 304);
4878-
4879-
// cleanup.
4880-
ml.delete();
4881-
}
4882-
48834801
private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
48844802
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java

+5-7
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public void testBatchMessageAck() {
8585
.newConsumer()
8686
.topic(topicName)
8787
.subscriptionName(subscriptionName)
88-
.receiverQueueSize(50)
88+
.receiverQueueSize(10)
8989
.subscriptionType(SubscriptionType.Shared)
9090
.enableBatchIndexAcknowledgment(true)
9191
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
@@ -114,29 +114,27 @@ public void testBatchMessageAck() {
114114
consumer.acknowledge(receive1);
115115
consumer.acknowledge(receive2);
116116
Awaitility.await().untilAsserted(() -> {
117-
// Since https://github.com/apache/pulsar/pull/23931 improved the mechanism of estimate average entry size,
118-
// broker will deliver much messages than before. So edit 18 -> 38 here.
119-
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 38);
117+
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 18);
120118
});
121119
Message<byte[]> receive3 = consumer.receive();
122120
Message<byte[]> receive4 = consumer.receive();
123121
consumer.acknowledge(receive3);
124122
consumer.acknowledge(receive4);
125123
Awaitility.await().untilAsserted(() -> {
126-
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 36);
124+
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16);
127125
});
128126
// Block cmd-flow send until verify finish. see: https://github.com/apache/pulsar/pull/17436.
129127
consumer.pause();
130128
Message<byte[]> receive5 = consumer.receive();
131129
consumer.negativeAcknowledge(receive5);
132130
Awaitility.await().pollInterval(1, TimeUnit.MILLISECONDS).untilAsserted(() -> {
133-
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 20);
131+
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 0);
134132
});
135133
// Unblock cmd-flow.
136134
consumer.resume();
137135
consumer.receive();
138136
Awaitility.await().untilAsserted(() -> {
139-
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 36);
137+
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16);
140138
});
141139
}
142140

pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java

+5-14
Original file line numberDiff line numberDiff line change
@@ -388,13 +388,8 @@ public void testAvgMessagesPerEntry() throws Exception {
388388
.batchingMaxPublishDelay(5, TimeUnit.SECONDS)
389389
.batchingMaxBytes(Integer.MAX_VALUE)
390390
.create();
391-
// The first messages deliver: 20 msgs.
392-
// Average of "messages per batch" is "1".
393-
for (int i = 0; i < 20; i++) {
394-
producer.send("first-message");
395-
}
396-
// The second messages deliver: 20 msgs.
397-
// Average of "messages per batch" is "Math.round(1 * 0.9 + 20 * 0.1) = 2.9 ~ 3".
391+
392+
producer.send("first-message");
398393
List<CompletableFuture<MessageId>> futures = new ArrayList<>();
399394
for (int i = 0; i < 20; i++) {
400395
futures.add(producer.sendAsync("message"));
@@ -428,7 +423,6 @@ public void testAvgMessagesPerEntry() throws Exception {
428423
metadataConsumer.put("matchValueReschedule", "producer2");
429424
@Cleanup
430425
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).properties(metadataConsumer)
431-
.receiverQueueSize(20)
432426
.subscriptionName(subName).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
433427

434428
int counter = 0;
@@ -443,17 +437,14 @@ public void testAvgMessagesPerEntry() throws Exception {
443437
}
444438
}
445439

446-
assertEquals(40, counter);
440+
assertEquals(21, counter);
447441

448442
ConsumerStats consumerStats =
449443
admin.topics().getStats(topic).getSubscriptions().get(subName).getConsumers().get(0);
450444

451-
assertEquals(40, consumerStats.getMsgOutCounter());
445+
assertEquals(21, consumerStats.getMsgOutCounter());
452446

453-
// The first messages deliver: 20 msgs.
454-
// Average of "messages per batch" is "1".
455-
// The second messages deliver: 20 msgs.
456-
// Average of "messages per batch" is "Math.round(1 * 0.9 + 20 * 0.1) = 2.9 ~ 3".
447+
// Math.round(1 * 0.9 + 0.1 * (20 / 1))
457448
int avgMessagesPerEntry = consumerStats.getAvgMessagesPerEntry();
458449
assertEquals(3, avgMessagesPerEntry);
459450
}

0 commit comments

Comments
 (0)