Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18641: AsyncKafkaConsumer could lose records with auto offset commit #18737

Merged
merged 14 commits into from
Feb 20, 2025

Conversation

frankvicky
Copy link
Collaborator

@frankvicky frankvicky commented Jan 29, 2025

JIRA: KAFKA-18641
Please refer to jira ticket for further details.
The application thread advances positions, but SubscriptionState#allConsumed() is called by a background thread. In the current architecture, there is no way to sync the offsets between two threads, which leads to inconsistency between committed offsets and actually consumed records.

This patch implements a waiting mechanism to ensure the background thread has generated commit requests before allowing the application thread to start a new poll cycle and fetch new messages.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@frankvicky
Copy link
Collaborator Author

Hi @lianetm

I am currently moving the invocation from the background thread to the application thread. This ensures there will be no gap between committed offsets and actually consumed records.

However, this change raises some considerations. If we want to ensure SubscriptionState#allConsumed() is invoked by the application thread, we need to rely on events or event processor helper methods to deliver the offsets.

One last consideration is regarding AsyncKafkaConsumer#commitSync(final Duration timeout). Currently, it always passes Optional#empty() as an argument, which causes the background thread to invoke SubscriptionState#allConsumed(). Since this patch prevents invoking SubscriptionState#allConsumed() from the background thread, I think we should update AsyncKafkaConsumer#commitSync(final Duration timeout) to pass SubscriptionState#allConsumed() as an argument instead. WDYT?

Map<TopicPartition, OffsetAndMetadata> commitOffsets = offsets.orElseGet(subscriptions::allConsumed);


SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, calculateDeadlineMs(time, timeout));

@lianetm
Copy link
Member

lianetm commented Jan 29, 2025

Hey @frankvicky , thanks for tackling this promptly!

High level comment, we've been intentionally keeping most of the subscription state actions into the background thread, to avoid race conditions that we had in the past (ex. app thread trying to access subscription state info for a partition that was no longer assigned because the background had removed it with a reconciliation). The the trick is that the subscription state changes in the background, ex. when reconciliation completes and updates assignment, when events like seek or updateFetchPositions are processed and update positions).

So thinking about this case, what about the option of having the PollEvent processing (in the background thread) trigger all actions the CommitMgr has to at the beginning of each poll iteration:

  1. update auto-commit timer (already done)
  2. take a snapshot of the subscriptionState.allConsumed to be used on any commit of all consumed until the next poll (new, to fix the gap)

With that, all commits will continue to retrieve the allConsumed in the background as they do now, and the fix is more about "when" to retrieve them.

  • before this PR the allConsumed are retrieved freely when needed to be used for committing (and that's wrong because there could be a fetch half-way)
  • with this fix, the allConsumed to commit would only be retrieved at the beginning of the poll loop before fetching (but in the background, via a PollEvent event).

I'm thinking out loud here, could be missing stuff, let me know what do you think ;)

@kirktrue kirktrue added KIP-848 The Next Generation of the Consumer Rebalance Protocol ctr Consumer Threading Refactor (KIP-848) labels Jan 29, 2025
Copy link
Contributor

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @frankvicky!

I left a few comments/questions, but I think this is headed in the right direction.

QQ: the offsets map isn't cleared after it's used. Should it be?

Thanks!

@@ -88,6 +87,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
private final boolean throwOnFetchStableOffsetUnsupported;
final PendingRequests pendingRequests;
private boolean closing = false;
private Map<TopicPartition, OffsetAndMetadata> allConsumed = Map.of();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the name a little confusing. (I know we get it from SubscriptionState.) What about latestPartitionOffsets or something? If nothing else, please add a comment to make it clear what this map contains.

This might be a stylistic choice, but what's the advantage of this over using Collections.emptyMap()?

Comment on lines 637 to 653
public Map<TopicPartition, OffsetAndMetadata> allConsumed() {
return allConsumed;
}

public void allConsumed(Map<TopicPartition, OffsetAndMetadata> allConsumed) {
this.allConsumed = allConsumed;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably stick to the convention of using set for setters. Also, it would be prudent to make the given map immutable so that when other code invokes the getter they're not able to modify it.

Suggested change
public Map<TopicPartition, OffsetAndMetadata> allConsumed() {
return allConsumed;
}
public void allConsumed(Map<TopicPartition, OffsetAndMetadata> allConsumed) {
this.allConsumed = allConsumed;
}
public Map<TopicPartition, OffsetAndMetadata> latestPartitionOffsets() {
return latestPartitionOffsets;
}
public void setLatestPartitionOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
this.latestPartitionOffsets = Collections.unmodifiableMap(offsets);
}

Comment on lines 747 to 748
applicationEventHandler.add(new PollEvent(timer.currentTimeMs()));
applicationEventHandler.add(new PollEvent(timer.currentTimeMs(), subscriptions.allConsumed()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've done a lot of work to ensure the background thread "owns" the current subscription state. This seems to go against those efforts.

Can we add a comment to this line that explains why we don't want to the background thread to "own" the set of consumed partitions? This would be helpful since later on during the poll process we do implicitly let the background thread determine the partitions to fetch (rather than having the application thread pass in the partitions).

Comment on lines 207 to 212
commitRequestManager.updateAutoCommitTimer(event.pollTimeMs());
commitRequestManager.allConsumed(event.allConsumed());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we consolidate the auto-commit timer and the offsets map in a single method call that's more clearly named? Are there other places that call updateAutoCommitTimer() other than this path?

Comment on lines 574 to 586
requestManagers.commitRequestManager.ifPresent(commitRequestManager ->
commitRequestManager.allConsumed(subscriptions.allConsumed()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is allConsumed() called here again, but this time with the data from SubscriptionState.allConsumed(). Please add some comments to explain. Thanks.

super(Type.POLL);
this.pollTimeMs = pollTimeMs;
this.allConsumed = allConsumed;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please wrap this in a call to Collections.unmodifiableMap().

@github-actions github-actions bot removed the triage PRs from the community label Jan 30, 2025
@frankvicky
Copy link
Collaborator Author

frankvicky commented Jan 30, 2025

Hi @lianetm

take a snapshot of the subscriptionState.allConsumed to be used on any commit of all consumed until the next poll (new, to fix the gap)

I think not only poll will affect the offsets. Following this logic, shouldn't this also apply to seek and assign?
Since there is not only poll that will affect the offset, if we do this way, we should review these operations either.

For example:
kafka.api.PlaintextConsumerCommitTest#testAutoCommitOnClose.
Currently, we rely on commitSync and commitAsync, which have invoked subscriptionState.allConsumed behind the scenes to get the offset.

@lianetm
Copy link
Member

lianetm commented Jan 30, 2025

Hey @frankvicky , good point (but you mean seek and position I guess?) Those are the ones, other than poll, that can update the positions. Agree we need to consider them too.

In the end it's truly only 2 events behind the 3 calls (SeekUnvalidatedEvent and CheckAndUpdatePositionsEvent), so one option would be just to make sure that when we process those events in the background, the commitMgr updates the positions snapshot to commit (after the event completes, to make sure it has the positions updated). With this, we wouldn't need to change anything in the PollEvent I expect. Thoughts?

@frankvicky frankvicky force-pushed the KAFKA-18641 branch 2 times, most recently from 1117b1f to 970d2a2 Compare January 31, 2025 06:19
@github-actions github-actions bot removed the small Small PRs label Jan 31, 2025
Comment on lines 254 to 257
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = manager.commitSync(
event.offsets().orElseGet(subscriptions::allConsumed),
event.deadlineMs()
);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @lianetm
I believe commitSync also needs to get the latest consumed offsets, so I added a subscriptions::allConsumed invocation here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uhm not sure, relates to comment above

@frankvicky
Copy link
Collaborator Author

Hi @kirktrue
Thanks for the review.
Since the current version is hugely different from the previous one, please retake a look.

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @frankvicky ! Thanks for the updates! Most important comment to discuss is #18737 (comment)

@@ -414,7 +418,16 @@ private void process(final ResetOffsetEvent event) {
*/
private void process(final CheckAndUpdatePositionsEvent event) {
CompletableFuture<Boolean> future = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
future.whenComplete(complete(event.future()));
final CompletableFuture<Boolean> b = event.future();
future.whenComplete((BiConsumer<? super Boolean, ? super Throwable>) (value, exception) -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need to cast the (value, exception) here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I should clean up after using the inline function from idea. I will remove this unneeded cast in the next commit.

}

public void setLatestPartitionOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
this.latestPartitionOffsets = Collections.unmodifiableMap(offsets);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add a debug log here to know that we're updating the all consumed positions to be committed? (I expect it will be helpful to track the flow if needed)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it would be helpful

verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
assertTrue(future.isDone());
Map<TopicPartition, OffsetAndMetadata> commitOffsets = assertDoesNotThrow(() -> future.get());
assertEquals(offsets, commitOffsets);
}

@Test
public void testCommitAsyncWithEmptyAllConsumedOffsets() {
public void testCommitAsyncWithEmptyLatestPartitionOffsetsOffsets() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say the test name still applies as it was (just that allConsumed is taken when we know it has been returned). The new one seems a bit confusing

@@ -414,7 +418,16 @@ private void process(final ResetOffsetEvent event) {
*/
private void process(final CheckAndUpdatePositionsEvent event) {
CompletableFuture<Boolean> future = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
future.whenComplete(complete(event.future()));
final CompletableFuture<Boolean> b = event.future();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason why we need this var? (vs using event.future directly to complete below)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

related comment: #18737 (comment)

@@ -389,7 +388,7 @@ private void autoCommitSyncBeforeRevocationWithRetries(OffsetCommitRequestState
* future will be completed with a {@link RetriableCommitFailedException}.
*/
public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitAsync(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) {
Map<TopicPartition, OffsetAndMetadata> commitOffsets = offsets.orElseGet(subscriptions::allConsumed);
Map<TopicPartition, OffsetAndMetadata> commitOffsets = offsets.orElseGet(this::latestPartitionOffsets);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uhm I don't think we should change here, and it's actually dangerous I believe. This is my reasoning (please correct me at any point): we have 2 kinds of commit operations in this manager:

  1. commits triggered automatically in the background (commit before rebalance and auto-commit on the interval)
  2. commits triggered by API calls (commitSync and commitAsync, which are only triggered by a consumer.commitSync/Async call or consumer.close. Note that these could be for specific offsets, or for allConsumed)

My take is that with this PR we need to change only 1, which are the ones affected by the race condition with the fetch happening within a consumer poll iteration. Those commits that happen automatically cannot take the allConsumed from the subscription state because we could be in the middle of a consumer poll iteration in the app thread (with positions advanced but the records not returned yet). So agree with the changes to maybeAutoCommitSyncBeforeRevocation and maybeAutoCommitAsync to not use subscriptionState.allConsumed.

But, the commits grouped in 2 (triggered by consumer API calls), can and should use the allConsumed from the subscriptionState I expect, as they happen outside of poll the loop, so first, they don't land in the race we're targeting, and most importantly, we cannot even ensure that the commitMgr latestPartitionOffsets has the positions returned when they are called (this is the dangerous part).

Ex. single call to poll that returns 5 records + commitSync()/commitAsync()
If that commit takes the latestPartitionOffsets from the commitReqMgr, wouldn't that be 0? the latestPartitionOffsets is only incremented on the next call to poll (if any), which makes sense, because that's the only time, when running a continuos poll, that we can certainly assume that the records have been returned (on the previous iteration). Makes sense?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.
For commitSync()/commitAsync() calls without arguments, we need a way to get the consumed offsets.

To ensure we always have the most up-to-date offset information at the time of commit, we should directly access subscriptionState.allConsumed. This gives us the current state rather than a potentially stale snapshot.

Comment on lines 254 to 257
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = manager.commitSync(
event.offsets().orElseGet(subscriptions::allConsumed),
event.deadlineMs()
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uhm not sure, relates to comment above

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky : Thanks for the PR. Left a comment.

@@ -324,7 +323,7 @@ public CompletableFuture<Void> maybeAutoCommitSyncBeforeRevocation(final long de

CompletableFuture<Void> result = new CompletableFuture<>();
OffsetCommitRequestState requestState =
createOffsetCommitRequest(subscriptions.allConsumed(), deadlineMs);
createOffsetCommitRequest(latestPartitionOffsets, deadlineMs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a subtle issue here. AbstractMembershipManager does the following step when revoking a partiton.

        // Mark partitions as pending revocation to stop fetching from the partitions (no new
        // fetches sent out, and no in-flight fetches responses processed).
        markPendingRevocationToPauseFetching(revokedPartitions);

        // Commit offsets if auto-commit enabled before reconciling a new assignment. Request will
        // be retried until it succeeds, fails with non-retriable error, or timer expires.
        CompletableFuture<Void> commitResult;

        commitResult = signalReconciliationStarted();

The first step marks the revoked partition as pendingRevocation, which prevents the partition's data from being returned in future consumer.poll() calls. However, when we get here, it's possible that a batch of records have just been returned to the application thread before the first step, but those records haven't been processed yet. So latestPartitionOffsets is not up to date yet. We need to wait for the next setLatestPartitionOffsets() call to happen. At that point, we know any record returned to the application will have been processed and no more records can be given to the application. So, it's safe to commit the offset at that point.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @junrao
Thanks for the review.

Yes, this is indeed a problem. Since assignment reconciliation is triggered from a different path (ConsumerGroupHeartbeat) and not in the normal user app consume loop, I think we could update latestPartitionOffsets in ConsumerMembershipMananger#signalReconciliationStarted().

In this way, we could get the following benefits:

  • Getting the latest latestPartitionOffsets after marking the revoked partition as pendingRevocation.
  • We could avoid createOffsetCommitRequest and autoCommitSyncBeforeRevocationWithRetries always invoking subscription#allConsumed, which will lead to the gap between the app thread and the background thread.

WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we get subscriptions.allConsumed() on signalReconciliationStarted, I'm afraid we could be retrieving positions that have been advanced in the app thread but not processed by the app yet? I believe this is what @junrao was referring to with:

We need to wait for the next setLatestPartitionOffsets() call to happen. At that point, we know any record returned to the application will have been processed and no more records can be given to the application. So, it's safe to commit the offset at that point.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, it seems I misunderstood it 😓

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a tricky one, but I wonder if there is a simple fix at a higher level. At the moment we're triggering reconciliation freely in the background (when polling all managers, polling the membershipMgr is the one triggering it), and as I see it, that's probably what's conceptually wrong here? Should we consider triggering reconciliations only when processing a PollEvent?

With that this situation here disappears, because we would be generating the commit to revoke before any fetching happens (and even considering that the commit needs to be retried, at that point we know we had marked the partition as pending for revocation already, so no new fetches for it).

Seems conceptually right and simple, but I could be missing something. What do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a solution worth trying since we could see that every poll is the point at which the latest offsets have just been committed.
However, one concern is that the reconciliation process could be delayed if the user application's per-loop time is unstable or slow. The original purpose of the reconciliation mechanism was to allow the background thread to process reconciliation immediately and effectively, so I'm uncertain if this trade-off would be worthwhile. 🤔

Copy link
Member

@lianetm lianetm Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I get your concern (and I'm still going over all this), but this is how I see it now:

  1. we would be only waiting to trigger the reconciliation, not to process it until the end (once triggered, it will carry on in the background as always, not blocking on anything new, just the commit request to complete and the callbacks, as always, we should keep this behaviour)
  2. we're just saying we will align the reconciliation triggering with the consumer poll (like the classic does btw) because we need to wait for stable positions to start reconciling a new assignment. So yes, there is a delay to start reconciling, but it's for correctness: we have to commit before a rebalance, but we cannot guarantee we can commit correctly the consumed positions if we don't have stable positions.

Looking at the poll loop from a high level, these are the 3 main blocks:

  1. app thread poll start (PollEvent)
  2. update fetch positions
  3. fetch

So we're saying we change to trigger a reconciliation only on 1, when we have stable positions, so we know the allConsumed to commit (and then rebalance). Of course that means that if we get a HB response with a new assignment right after the PollEvent (1), we would have to wait until the next PollEvent to start reconciling that assignment. But with the current version of triggering reconciliations freely in the background, that's exactly the root cause of the problem imo: we start a reconciliation when 2/3 are happening, and it's a mess because we cannot determine the allConsumed to commit, it's a moving target (until we know the records have been returned, and that's on the next PollEvent).

Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More brainstorming improving this idea a bit more, all the above is important if we have auto-commit enabled only. Also it's important to keep resolving the topics names for the topic Ids assigned asap (to request the metadata needed, that part doesn't need to wait for any in-flight fetch), so even though we the need the above comment, we shouldn't throw away completely the attempt to reconcile on the membershipMgr.poll.

So one option that comes to mind is to keep calling maybeReconcile(canCommit=false) from the membershipMgr.poll to not delay the resolution of the topics, and also not delay reconciliation if autoCommit disabled (the new param is simply to short-circuit right after resolving metadata, before markReconciliationInProgress I guess) if autoCommitEnabled & !canCommit. Then we also need a call to maybeReconcile(canCommit=true) when a PollEvent is processed (this is what the above comment is about, basically the one to reconcile with autoCommit, only possible when

we know any record returned to the application will have been processed and no more records can be given to the application. So, it's safe to commit the offset at that point.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me.
In this way, we have two scenarios:

autoCommitEnabled=true

ConsumerMembershipManager.poll will invoke maybeReconcile(canCommit=false), which will only resolve the topic names for the assigned topic IDs and return early before markReconciliationInProgress due to canCommit=false. The PollEvent will then invoke maybeReconcile(canCommit=true) and complete the entire reconciliation process.

autoCommitEnabled=false

This follows a similar pattern to autoCommitEnabled=true, with the key difference being that we don't handle auto-commits here - instead, committing should be managed through onPartitionRevoked (users should invoke commitSync in their implementation).

This approach allows us to ensure certain positions before reconciliation, with only a minimal delay as trade-off. Sounds good to me.

@github-actions github-actions bot added the small Small PRs label Feb 2, 2025
@frankvicky frankvicky force-pushed the KAFKA-18641 branch 5 times, most recently from bc643a9 to 7a229d9 Compare February 5, 2025 03:00
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky : Thanks for the updated PR. One minor comment.

commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
// all commit request generation points have been passed,
// so it's safe to notify the app thread could proceed and start fetching
event.future().complete(null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, the event hasn't been fully processed yet. So, it's a bit weird to complete the event midway. Perhaps we could introduce a separate future like reconcileAndAutoCommit and wait for that.

@frankvicky frankvicky force-pushed the KAFKA-18641 branch 2 times, most recently from dfd031e to 7468c53 Compare February 17, 2025 03:38
Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @frankvicky ! Some minor comments aligning the docs with the changes

// This will trigger async auto-commits of consumed positions when hitting
// the interval time or on partition revocation
applicationEventHandler.add(event);
// Wait for reconciliation and auto-commit to complete to ensure all commit requests are processed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Wait for reconciliation and auto-commit to complete to ensure all commit requests are processed
// Wait for reconciliation and auto-commit to be triggered, to ensure all commit requests retrieve the positions to commit

// Make sure to let the background thread know that we are still polling.
applicationEventHandler.add(new PollEvent(timer.currentTimeMs()));
// This will trigger async auto-commits of consumed positions when hitting
// the interval time or on partition revocation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// the interval time or on partition revocation
// the interval time or reconciling new assignments

@@ -186,7 +186,6 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
return drainPendingOffsetCommitRequests();
}

maybeAutoCommitAsync();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The java doc stayed behind after this change (still reads The function will also try to autocommit the offsets, if feature is enabled.)

Comment on lines 31 to 32
* <li>auto-commit on revocation</li>
* <li>auto-commit</li>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* <li>auto-commit on revocation</li>
* <li>auto-commit</li>
* <li>auto-commit on rebalance</li>
* <li>auto-commit on the interval</li>

@frankvicky
Copy link
Collaborator Author

Hi @lianetm
I have just updated the patch based on the latest comments.
PTAL 🙇🏼

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! One more comment

applicationEventHandler.add(event);
// Wait for reconciliation and auto-commit to be triggered, to ensure all commit requests
// retrieve the positions to commit before proceeding with fetching new records
ConsumerUtils.getResult(event.reconcileAndAutoCommit());
Copy link
Member

@lianetm lianetm Feb 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we pass the default api timeout here? Under normal execution, this will just complete right away (local actions in the background), but if the background thread is faulty (ie. died) and the event can't be processed, the consumer would hang here indefinitely (instead of timing out). Note that I suggest the default api timeout and not the timeout from param because we could have poll(ZERO), and that 0 shouldn't apply to the inter-thread communication, which is what we're doing here. Same for the blocking call we added for offsetsReady.

We do this same approach in other api calls btw, ex. seek. Makes sense?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.
Previously, I thought it was crucial for the app thread to wait for reconciliation and auto-commit completion, but I didn't consider the possibility of a faulty background thread.
Given that, we also need to apply this timeout to offsetsReady, right?

ConsumerUtils.getResult(commitEvent.offsetsReady());

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @frankvicky ! Looks good to me. Let's just wait to see if @junrao has any other comments.

@chia7712
Copy link
Member

@frankvicky could you please fix the conflicts?

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky : Thanks for the updated PR. LGTM. Just a minor comment.

@@ -123,8 +123,7 @@ public class ConsumerMembershipManager extends AbstractMembershipManager<Consume
private final Optional<String> serverAssignor;

/**
* Manager to perform commit requests needed before revoking partitions (if auto-commit is
* enabled)
* Manager to perform commit requests needed before rebalance (if auto-commit is enabled)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes it sound like that commitRequestManager is only used for auto offset commit, but it's used for commit calls from the users too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @junrao
IMHO, this javadoc actually describes the purpose of commitRequestManager in this specific class, where it's only used for auto-commit before rebalance.
While you're correct that commitRequestManager handles user commit calls too, that functionality isn't used here.
Does it make sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky : Thanks for the explanation. This looks good to me then.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky : Thanks for the explanation. LGTM

@@ -123,8 +123,7 @@ public class ConsumerMembershipManager extends AbstractMembershipManager<Consume
private final Optional<String> serverAssignor;

/**
* Manager to perform commit requests needed before revoking partitions (if auto-commit is
* enabled)
* Manager to perform commit requests needed before rebalance (if auto-commit is enabled)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky : Thanks for the explanation. This looks good to me then.

@lianetm lianetm merged commit 709bfc5 into apache:trunk Feb 20, 2025
9 checks passed
lianetm pushed a commit that referenced this pull request Feb 20, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Blocker This pull request is identified as solving a blocker for a release. ci-approved clients consumer ctr Consumer Threading Refactor (KIP-848) KIP-848 The Next Generation of the Consumer Rebalance Protocol
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants