Skip to content

Commit 739c41a

Browse files
poorbarcodenikhil-ctds
authored andcommitted
[improve] [broker] Make the estimated entry size more accurate (apache#23931)
(cherry picked from commit 35a1676) (cherry picked from commit d0e95db)
1 parent 262597d commit 739c41a

File tree

7 files changed

+166
-50
lines changed

7 files changed

+166
-50
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

+42-16
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC;
2525
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES;
2626
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
27+
import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
2728
import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
2829
import com.google.common.annotations.VisibleForTesting;
2930
import com.google.common.base.MoreObjects;
@@ -3710,26 +3711,51 @@ public int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
37103711
if (maxSizeBytes == NO_MAX_SIZE_LIMIT) {
37113712
return maxEntries;
37123713
}
3714+
int maxEntriesBasedOnSize =
3715+
Long.valueOf(estimateEntryCountBySize(maxSizeBytes, readPosition, ledger)).intValue();
3716+
return Math.min(maxEntriesBasedOnSize, maxEntries);
3717+
}
37133718

3714-
double avgEntrySize = ledger.getStats().getEntrySizeAverage();
3715-
if (!Double.isFinite(avgEntrySize)) {
3716-
// We don't have yet any stats on the topic entries. Let's try to use the cursor avg size stats
3717-
avgEntrySize = (double) entriesReadSize / (double) entriesReadCount;
3718-
}
3719-
3720-
if (!Double.isFinite(avgEntrySize)) {
3721-
// If we still don't have any information, it means this is the first time we attempt reading
3722-
// and there are no writes. Let's start with 1 to avoid any overflow and start the avg stats
3723-
return 1;
3719+
static long estimateEntryCountBySize(long bytesSize, PositionImpl readPosition, ManagedLedgerImpl ml) {
3720+
Position posToRead = readPosition;
3721+
if (!ml.isValidPosition(readPosition)) {
3722+
posToRead = ml.getNextValidPosition(readPosition);
37243723
}
3724+
long result = 0;
3725+
long remainingBytesSize = bytesSize;
37253726

3726-
int maxEntriesBasedOnSize = (int) (maxSizeBytes / avgEntrySize);
3727-
if (maxEntriesBasedOnSize < 1) {
3728-
// We need to read at least one entry
3729-
return 1;
3727+
while (remainingBytesSize > 0) {
3728+
// Last ledger.
3729+
if (posToRead.getLedgerId() == ml.getCurrentLedger().getId()) {
3730+
if (ml.getCurrentLedgerSize() == 0 || ml.getCurrentLedgerEntries() == 0) {
3731+
// Only read 1 entry if no entries to read.
3732+
return 1;
3733+
}
3734+
long avg = Math.max(1, ml.getCurrentLedgerSize() / ml.getCurrentLedgerEntries())
3735+
+ BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
3736+
result += remainingBytesSize / avg;
3737+
break;
3738+
}
3739+
// Skip empty ledger.
3740+
LedgerInfo ledgerInfo = ml.getLedgersInfo().get(posToRead.getLedgerId());
3741+
if (ledgerInfo.getSize() == 0 || ledgerInfo.getEntries() == 0) {
3742+
posToRead = ml.getNextValidPosition(PositionImpl.get(posToRead.getLedgerId(), Long.MAX_VALUE));
3743+
continue;
3744+
}
3745+
// Calculate entries by average of ledgers.
3746+
long avg = Math.max(1, ledgerInfo.getSize() / ledgerInfo.getEntries()) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
3747+
long remainEntriesOfLedger = ledgerInfo.getEntries() - posToRead.getEntryId();
3748+
if (remainEntriesOfLedger * avg >= remainingBytesSize) {
3749+
result += remainingBytesSize / avg;
3750+
break;
3751+
} else {
3752+
// Calculate for the next ledger.
3753+
result += remainEntriesOfLedger;
3754+
remainingBytesSize -= remainEntriesOfLedger * avg;
3755+
posToRead = ml.getNextValidPosition(PositionImpl.get(posToRead.getLedgerId(), Long.MAX_VALUE));
3756+
}
37303757
}
3731-
3732-
return Math.min(maxEntriesBasedOnSize, maxEntries);
3758+
return Math.max(result, 1);
37333759
}
37343760

37353761
@Override

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

+2
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
218218
private final CallbackMutex offloadMutex = new CallbackMutex();
219219
private static final CompletableFuture<PositionImpl> NULL_OFFLOAD_PROMISE = CompletableFuture
220220
.completedFuture(PositionImpl.LATEST);
221+
@VisibleForTesting
222+
@Getter
221223
protected volatile LedgerHandle currentLedger;
222224
protected volatile long currentLedgerEntries = 0;
223225
protected volatile long currentLedgerSize = 0;

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ void asyncReadEntriesByPosition(ReadHandle lh, PositionImpl firstPosition, Posit
302302
doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition, numberOfEntries, shouldCacheEntry,
303303
originalCallback, ctx);
304304
} else {
305-
long estimatedEntrySize = getEstimatedEntrySize();
305+
long estimatedEntrySize = getEstimatedEntrySize(lh);
306306
long estimatedReadSize = numberOfEntries * estimatedEntrySize;
307307
if (log.isDebugEnabled()) {
308308
log.debug("Estimated read size: {} bytes for {} entries with {} estimated entry size",
@@ -418,12 +418,12 @@ void doAsyncReadEntriesByPosition(ReadHandle lh, PositionImpl firstPosition, Pos
418418
}
419419

420420
@VisibleForTesting
421-
public long getEstimatedEntrySize() {
422-
long estimatedEntrySize = getAvgEntrySize();
423-
if (estimatedEntrySize == 0) {
424-
estimatedEntrySize = DEFAULT_ESTIMATED_ENTRY_SIZE;
421+
public long getEstimatedEntrySize(ReadHandle lh) {
422+
if (lh.getLength() == 0 || lh.getLastAddConfirmed() < 0) {
423+
// No entries stored.
424+
return Math.max(getAvgEntrySize(), DEFAULT_ESTIMATED_ENTRY_SIZE) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
425425
}
426-
return estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
426+
return Math.max(1, lh.getLength() / (lh.getLastAddConfirmed() + 1)) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
427427
}
428428

429429
private long getAvgEntrySize() {

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/InflightReadsLimiterIntegrationTest.java

+5-10
Original file line numberDiff line numberDiff line change
@@ -141,10 +141,9 @@ public void testPreciseLimitation(String missingCase) throws Exception {
141141
SimpleReadEntriesCallback cb0 = new SimpleReadEntriesCallback();
142142
entryCache.asyncReadEntry(spyCurrentLedger, 125, 125, true, cb0, ctx);
143143
cb0.entries.join();
144-
Long sizePerEntry1 = entryCache.getEstimatedEntrySize();
145-
Assert.assertEquals(sizePerEntry1, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
144+
int sizePerEntry = Long.valueOf(entryCache.getEstimatedEntrySize(ml.currentLedger)).intValue();
146145
Awaitility.await().untilAsserted(() -> {
147-
long remainingBytes =limiter.getRemainingBytes();
146+
long remainingBytes = limiter.getRemainingBytes();
148147
Assert.assertEquals(remainingBytes, totalCapacity);
149148
});
150149
log.info("remainingBytes 0: {}", limiter.getRemainingBytes());
@@ -165,7 +164,7 @@ public void testPreciseLimitation(String missingCase) throws Exception {
165164
entryCache.asyncReadEntry(spyCurrentLedger, start2, end2, true, cb2, ctx);
166165
}).start();
167166

168-
long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1 + readCount2, 1);
167+
long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1 + readCount2, sizePerEntry);
169168
long remainingBytesExpected1 = totalCapacity - bytesAcquired1;
170169
log.info("acquired : {}", bytesAcquired1);
171170
log.info("remainingBytesExpected 0 : {}", remainingBytesExpected1);
@@ -178,9 +177,7 @@ public void testPreciseLimitation(String missingCase) throws Exception {
178177
Thread.sleep(3000);
179178
readCompleteSignal1.countDown();
180179
cb1.entries.join();
181-
Long sizePerEntry2 = entryCache.getEstimatedEntrySize();
182-
Assert.assertEquals(sizePerEntry2, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
183-
long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2, 1);
180+
long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2, sizePerEntry);
184181
long remainingBytesExpected2 = totalCapacity - bytesAcquired2;
185182
log.info("acquired : {}", bytesAcquired2);
186183
log.info("remainingBytesExpected 1: {}", remainingBytesExpected2);
@@ -191,8 +188,6 @@ public void testPreciseLimitation(String missingCase) throws Exception {
191188

192189
readCompleteSignal2.countDown();
193190
cb2.entries.join();
194-
Long sizePerEntry3 = entryCache.getEstimatedEntrySize();
195-
Assert.assertEquals(sizePerEntry3, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
196191
Awaitility.await().untilAsserted(() -> {
197192
long remainingBytes = limiter.getRemainingBytes();
198193
log.info("remainingBytes 2: {}", remainingBytes);
@@ -204,7 +199,7 @@ public void testPreciseLimitation(String missingCase) throws Exception {
204199
}
205200

206201
private long calculateBytesSizeBeforeFirstReading(int entriesCount, int perEntrySize) {
207-
return entriesCount * (perEntrySize + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
202+
return entriesCount * perEntrySize;
208203
}
209204

210205
class SimpleReadEntriesCallback implements AsyncCallbacks.ReadEntriesCallback {

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java

+90-8
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.bookkeeper.mledger.impl;
2020

21+
import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
2122
import static org.mockito.ArgumentMatchers.anyInt;
2223
import static org.mockito.Mockito.any;
2324
import static org.mockito.Mockito.doAnswer;
@@ -681,13 +682,15 @@ void testAsyncReadWithMaxSizeByte() throws Exception {
681682
ManagedCursor cursor = ledger.openCursor("c1");
682683

683684
for (int i = 0; i < 100; i++) {
684-
ledger.addEntry(new byte[1024]);
685+
ledger.addEntry(new byte[(int) (1024)]);
685686
}
686687

687-
// First time, since we don't have info, we'll get 1 single entry
688-
readAndCheck(cursor, 10, 3 * 1024, 1);
688+
// Since https://github.com/apache/pulsar/pull/23931 improved the performance of delivery, the consumer
689+
// will get more messages than before(it only receives 1 messages at the first delivery),
690+
int avg = (int) (BOOKKEEPER_READ_OVERHEAD_PER_ENTRY + 1024);
691+
readAndCheck(cursor, 10, 3 * avg, 3);
689692
// We should only return 3 entries, based on the max size
690-
readAndCheck(cursor, 20, 3 * 1024, 3);
693+
readAndCheck(cursor, 20, 3 * avg, 3);
691694
// If maxSize is < avg, we should get 1 entry
692695
readAndCheck(cursor, 10, 500, 1);
693696
}
@@ -3885,13 +3888,15 @@ public void testReadEntriesOrWaitWithMaxSize() throws Exception {
38853888
ledger.addEntry(new byte[1024]);
38863889
}
38873890

3888-
// First time, since we don't have info, we'll get 1 single entry
3889-
List<Entry> entries = c.readEntriesOrWait(10, 3 * 1024);
3890-
assertEquals(entries.size(), 1);
3891+
// Since https://github.com/apache/pulsar/pull/23931 improved the performance of delivery, the consumer
3892+
// will get more messages than before(it only receives 1 messages at the first delivery),
3893+
int avg = (int) (1024 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
3894+
List<Entry> entries = c.readEntriesOrWait(10, 3 * avg);
3895+
assertEquals(entries.size(), 3);
38913896
entries.forEach(Entry::release);
38923897

38933898
// We should only return 3 entries, based on the max size
3894-
entries = c.readEntriesOrWait(10, 3 * 1024);
3899+
entries = c.readEntriesOrWait(10, 3 * avg);
38953900
assertEquals(entries.size(), 3);
38963901
entries.forEach(Entry::release);
38973902

@@ -4798,5 +4803,82 @@ public void operationFailed(ManagedLedgerException exception) {
47984803
assertEquals(cursor.getReadPosition(), markDeletedPosition.getNext());
47994804
}
48004805

4806+
@Test
4807+
public void testEstimateEntryCountBySize() throws Exception {
4808+
final String mlName = "ml-" + UUID.randomUUID().toString().replaceAll("-", "");
4809+
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName);
4810+
long entryCount0 =
4811+
ManagedCursorImpl.estimateEntryCountBySize(16, PositionImpl.get(ml.getCurrentLedger().getId(), 0), ml);
4812+
assertEquals(entryCount0, 1);
4813+
// Avoid trimming ledgers.
4814+
ml.openCursor("c1");
4815+
4816+
// Build data.
4817+
for (int i = 0; i < 100; i++) {
4818+
ml.addEntry(new byte[]{1});
4819+
}
4820+
long ledger1 = ml.getCurrentLedger().getId();
4821+
ml.getCurrentLedger().close();
4822+
ml.ledgerClosed(ml.getCurrentLedger());
4823+
for (int i = 0; i < 100; i++) {
4824+
ml.addEntry(new byte[]{1, 2});
4825+
}
4826+
long ledger2 = ml.getCurrentLedger().getId();
4827+
ml.getCurrentLedger().close();
4828+
ml.ledgerClosed(ml.getCurrentLedger());
4829+
for (int i = 0; i < 100; i++) {
4830+
ml.addEntry(new byte[]{1, 2, 3, 4});
4831+
}
4832+
long ledger3 = ml.getCurrentLedger().getId();
4833+
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo1 = ml.getLedgersInfo().get(ledger1);
4834+
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo2 = ml.getLedgersInfo().get(ledger2);
4835+
long average1 = ledgerInfo1.getSize() / ledgerInfo1.getEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
4836+
long average2 = ledgerInfo2.getSize() / ledgerInfo2.getEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
4837+
long average3 = ml.getCurrentLedgerSize() / ml.getCurrentLedgerEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
4838+
assertEquals(average1, 1 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
4839+
assertEquals(average2, 2 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
4840+
assertEquals(average3, 4 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
4841+
4842+
// Test: the individual ledgers.
4843+
long entryCount1 =
4844+
ManagedCursorImpl.estimateEntryCountBySize(average1 * 16, PositionImpl.get(ledger1, 0), ml);
4845+
assertEquals(entryCount1, 16);
4846+
long entryCount2 =
4847+
ManagedCursorImpl.estimateEntryCountBySize(average2 * 8, PositionImpl.get(ledger2, 0), ml);
4848+
assertEquals(entryCount2, 8);
4849+
long entryCount3 =
4850+
ManagedCursorImpl.estimateEntryCountBySize(average3 * 4, PositionImpl.get(ledger3, 0), ml);
4851+
assertEquals(entryCount3, 4);
4852+
4853+
// Test: across ledgers.
4854+
long entryCount4 =
4855+
ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 8), PositionImpl.get(ledger1, 0), ml);
4856+
assertEquals(entryCount4, 108);
4857+
long entryCount5 =
4858+
ManagedCursorImpl.estimateEntryCountBySize((average2 * 100) + (average3 * 4), PositionImpl.get(ledger2, 0), ml);
4859+
assertEquals(entryCount5, 104);
4860+
long entryCount6 =
4861+
ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 100) + (average3 * 4), PositionImpl.get(ledger1, 0), ml);
4862+
assertEquals(entryCount6, 204);
4863+
4864+
long entryCount7 =
4865+
ManagedCursorImpl.estimateEntryCountBySize((average1 * 20) + (average2 * 8), PositionImpl.get(ledger1, 80), ml);
4866+
assertEquals(entryCount7, 28);
4867+
long entryCount8 =
4868+
ManagedCursorImpl.estimateEntryCountBySize((average2 * 20) + (average3 * 4), PositionImpl.get(ledger2, 80), ml);
4869+
assertEquals(entryCount8, 24);
4870+
long entryCount9 =
4871+
ManagedCursorImpl.estimateEntryCountBySize((average1 * 20) + (average2 * 100) + (average3 * 4), PositionImpl.get(ledger1, 80), ml);
4872+
assertEquals(entryCount9, 124);
4873+
4874+
// Test: read more than entries written.
4875+
long entryCount10 =
4876+
ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 100) + (average3 * 100) + (average3 * 4) , PositionImpl.get(ledger1, 0), ml);
4877+
assertEquals(entryCount10, 304);
4878+
4879+
// cleanup.
4880+
ml.delete();
4881+
}
4882+
48014883
private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
48024884
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public void testBatchMessageAck() {
8585
.newConsumer()
8686
.topic(topicName)
8787
.subscriptionName(subscriptionName)
88-
.receiverQueueSize(10)
88+
.receiverQueueSize(50)
8989
.subscriptionType(SubscriptionType.Shared)
9090
.enableBatchIndexAcknowledgment(true)
9191
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
@@ -114,27 +114,29 @@ public void testBatchMessageAck() {
114114
consumer.acknowledge(receive1);
115115
consumer.acknowledge(receive2);
116116
Awaitility.await().untilAsserted(() -> {
117-
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 18);
117+
// Since https://github.com/apache/pulsar/pull/23931 improved the mechanism of estimate average entry size,
118+
// broker will deliver much messages than before. So edit 18 -> 38 here.
119+
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 38);
118120
});
119121
Message<byte[]> receive3 = consumer.receive();
120122
Message<byte[]> receive4 = consumer.receive();
121123
consumer.acknowledge(receive3);
122124
consumer.acknowledge(receive4);
123125
Awaitility.await().untilAsserted(() -> {
124-
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16);
126+
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 36);
125127
});
126128
// Block cmd-flow send until verify finish. see: https://github.com/apache/pulsar/pull/17436.
127129
consumer.pause();
128130
Message<byte[]> receive5 = consumer.receive();
129131
consumer.negativeAcknowledge(receive5);
130132
Awaitility.await().pollInterval(1, TimeUnit.MILLISECONDS).untilAsserted(() -> {
131-
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 0);
133+
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 20);
132134
});
133135
// Unblock cmd-flow.
134136
consumer.resume();
135137
consumer.receive();
136138
Awaitility.await().untilAsserted(() -> {
137-
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16);
139+
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 36);
138140
});
139141
}
140142

pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java

+14-5
Original file line numberDiff line numberDiff line change
@@ -388,8 +388,13 @@ public void testAvgMessagesPerEntry() throws Exception {
388388
.batchingMaxPublishDelay(5, TimeUnit.SECONDS)
389389
.batchingMaxBytes(Integer.MAX_VALUE)
390390
.create();
391-
392-
producer.send("first-message");
391+
// The first messages deliver: 20 msgs.
392+
// Average of "messages per batch" is "1".
393+
for (int i = 0; i < 20; i++) {
394+
producer.send("first-message");
395+
}
396+
// The second messages deliver: 20 msgs.
397+
// Average of "messages per batch" is "Math.round(1 * 0.9 + 20 * 0.1) = 2.9 ~ 3".
393398
List<CompletableFuture<MessageId>> futures = new ArrayList<>();
394399
for (int i = 0; i < 20; i++) {
395400
futures.add(producer.sendAsync("message"));
@@ -423,6 +428,7 @@ public void testAvgMessagesPerEntry() throws Exception {
423428
metadataConsumer.put("matchValueReschedule", "producer2");
424429
@Cleanup
425430
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).properties(metadataConsumer)
431+
.receiverQueueSize(20)
426432
.subscriptionName(subName).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
427433

428434
int counter = 0;
@@ -437,14 +443,17 @@ public void testAvgMessagesPerEntry() throws Exception {
437443
}
438444
}
439445

440-
assertEquals(21, counter);
446+
assertEquals(40, counter);
441447

442448
ConsumerStats consumerStats =
443449
admin.topics().getStats(topic).getSubscriptions().get(subName).getConsumers().get(0);
444450

445-
assertEquals(21, consumerStats.getMsgOutCounter());
451+
assertEquals(40, consumerStats.getMsgOutCounter());
446452

447-
// Math.round(1 * 0.9 + 0.1 * (20 / 1))
453+
// The first messages deliver: 20 msgs.
454+
// Average of "messages per batch" is "1".
455+
// The second messages deliver: 20 msgs.
456+
// Average of "messages per batch" is "Math.round(1 * 0.9 + 20 * 0.1) = 2.9 ~ 3".
448457
int avgMessagesPerEntry = consumerStats.getAvgMessagesPerEntry();
449458
assertEquals(3, avgMessagesPerEntry);
450459
}

0 commit comments

Comments
 (0)