Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17480: New consumer commit all consumed should retrieve offsets in background thread #17150

Merged
merged 1 commit into from
Nov 7, 2024

Conversation

FrankYang0529
Copy link
Member

@FrankYang0529 FrankYang0529 commented Sep 10, 2024

When committing all consumed offsets (sync, async, or on close), the new consumer retrieves the offsets from subscriptionState.allConsumed() in the app thread. We should consider to retrieve the allConsumed in the background when processing the events, to avoid inconsistencies given that the subscription state could be modified in the background thread since the moment the allConsumed was retrieved in the app thread.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@lianetm lianetm added consumer ctr Consumer Threading Refactor (KIP-848) labels Sep 11, 2024
@lianetm
Copy link
Contributor

lianetm commented Sep 13, 2024

Hey @FrankYang0529, thanks for taking this one! I see it is marked as Draft still so haven't looked into it but let me know when ready for review and I'll do. Thanks!

@FrankYang0529 FrankYang0529 force-pushed the KAFKA-17480 branch 2 times, most recently from 4576cc6 to 6622353 Compare September 17, 2024 01:56
@FrankYang0529 FrankYang0529 marked this pull request as ready for review September 17, 2024 02:03
@FrankYang0529
Copy link
Member Author

Hi @lianetm, yes, this one is ready for reviewing. Thank you.

Copy link
Contributor

@frankvicky frankvicky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @FrankYang0529
I have a minor comment for consideration, PTAL

Comment on lines 256 to 280
verify(metadata, never()).updateLastSeenEpochIfNewer(tp, 1);
verify(commitRequestManager, never()).commitSync(offsets, deadlineMs);
Copy link
Contributor

@frankvicky frankvicky Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I noticed that the handling logic when groupId is absent is almost identical across multiple tests. Maybe you could extract a helper method like verifyNoInteractions(metadata, commitRequestManager, isSynced). WDYT ?

Copy link
Contributor

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @FrankYang0529 , thanks for the patch! Took a first pass.

CommitRequestManager manager = requestManagers.commitRequestManager.get();
CompletableFuture<Void> future = manager.commitAsync(event.offsets());
future.whenComplete(complete(event.future()));
process((CommitEvent) event);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have 2 separate event types (async and sync), to then join them together in one here, to then split them again for the actual process with:

if (event.type() == Type.COMMIT_ASYNC) {
                future = manager.commitAsync(offsets);
            } else {
                future = manager.commitSync(offsets, event.deadlineMs());
            }

I get that with this we can reuse a bit but wonder if it's worth the twisted flow. Could we maybe keep them separate (as they originally are when the events are created), then process(Sync) that ends up calling the mgr.commitSync, and process(Async) calling manager.commitAsync, and just encapsulate in funcs what we want to reuse in both? (ex. maybeUpdateLastSeenEpochIfNewer() with lines 188-191 that would be called from both, similar for the logic to retrieve offsets from the event, ln 180-181). What do you think?

@@ -208,6 +206,110 @@ public void testSeekUnvalidatedEventWithException() {
assertInstanceOf(IllegalStateException.class, e.getCause());
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSyncCommitEventWithOffsets(boolean withGroupId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is testing commit it does need a group id, so it should be only for withGroupId=true I expect


@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSyncCommitEventWithCommitAllConsumed(boolean withGroupId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same, only relevant for withGroupId=true right? (and the all the other commit tests down below)

@kirktrue kirktrue added KIP-848 The Next Generation of the Consumer Rebalance Protocol clients labels Sep 26, 2024
Copy link
Collaborator

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @FrankYang0529!

What do you think about wrapping the offset map in an Optional instead of requiring the map to be empty and the flag to be set?

Something like:

    Optional<Map<TopicPartition, OffsetAndMetadata>> = Optional.empty();

IMO, it makes the intent a little more clear. Just a suggestion.

Thanks!

@FrankYang0529
Copy link
Member Author

Hi @lianetm and @kirktrue, thanks for the review. I may need more time for this. I found that AsyncKafkaConsumer#interceptors consume offsets. If we use background thread to get subscriptions.allConsumed(), we need CommitEvent#future to return offsets, so AsyncKafkaConsumer#interceptors doesn't get empty offsets.

@FrankYang0529 FrankYang0529 marked this pull request as draft September 27, 2024 14:03
@FrankYang0529 FrankYang0529 force-pushed the KAFKA-17480 branch 3 times, most recently from 451b3c6 to e43c2e4 Compare October 7, 2024 07:10
@FrankYang0529 FrankYang0529 marked this pull request as ready for review October 7, 2024 07:10
@FrankYang0529
Copy link
Member Author

Hi @lianetm and @kirktrue, I addressed all comments. The PR is ready to review now. Thanks.

Copy link
Collaborator

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @FrankYang0529 for the PR. It looks pretty straightforward, but I had a couple of questions first.

Comment on lines 235 to 240
private void maybeUpdateLastSeenEpochIfNewer(final Map<TopicPartition, OffsetAndMetadata> offsets) {
offsets.forEach((topicPartition, offsetAndMetadata) -> {
if (offsetAndMetadata.leaderEpoch().isPresent())
metadata.updateLastSeenEpochIfNewer(topicPartition, offsetAndMetadata.leaderEpoch().get());
});
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic in ApplicationEventProcessor.maybeUpdateLastSeenEpochIfNewer() looks very similar to AsyncKafkaConsumer.updateLastSeenEpochIfNewer(). Is it possible to extract that into a shared method somewhere, say ConsumerUtils?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, I extract a new function ConsumerUtils#maybeUpdateLastSeenEpochIfNewer. Thanks for the suggestion.

assertDoesNotThrow(() -> commitEvent.future().complete(null));
assertTrue(commitEvent.offsets().isPresent());
assertEquals(offsets, commitEvent.offsets().get());
assertDoesNotThrow(() -> commitEvent.future().complete(offsets));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this line is in the original code, but I have a question: in what case would calling CompletableFuture.complete() possibly throw an exception?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, we should not test Java implementation which is not part of Kafka code. Remove assertDoesNotThrow for CompletableFuture.complete(). Thank you.

Comment on lines -607 to -546
verify(metadata).updateLastSeenEpochIfNewer(t0, 2);
verify(metadata).updateLastSeenEpochIfNewer(t1, 1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, why don't we want to verify the metadata was updated? That still happens in the ApplicationEventProcessor, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We mock ApplicationEventProcessor in AsyncKafkaConsumerTest, so can't verify related implementation in it. We test it in https://github.com/apache/kafka/pull/17150/files#diff-87404aabc3e6c99569d678ff904586ab232577240d9932839c52135a34b2b57aR260.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree with removing the checks on metadata, but then I think we should remove the testCommitSyncLeaderEpochUpdate completely. The metadata update is not done in this component anymore, so this ends up simply testing commitSync, which is already covered. (coverage for the leader epoch update is now in the commitReqMgr, which does the metadata update)

Comment on lines -643 to -582
verify(metadata).updateLastSeenEpochIfNewer(t0, 2);
verify(metadata).updateLastSeenEpochIfNewer(t1, 1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here as above.


setupProcessor(true);
doReturn(true).when(metadata).updateLastSeenEpochIfNewer(tp, 1);
doReturn(CompletableFuture.completedFuture(null)).when(commitRequestManager).commitSync(offsets, deadlineMs);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to be daft, but can you explain why we'd want to complete the Future with null instead of result? I'm confused how the call to event.future.get() ends up with the correct value 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The future from CommitRequestManager#commitSync is CompletableFuture<Void>, so it's ok to use null.

The event.future.get() can get expected result is because it gets offsets here https://github.com/apache/kafka/pull/17150/files#diff-583aa43613edc803471d93ed124d1cbed165768ed674c747784ab068e0ed7518R233.

acquireAndEnsureOpen();
long commitStart = time.nanoseconds();
try {
SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, calculateDeadlineMs(time, timeout));
CompletableFuture<Void> commitFuture = commit(syncCommitEvent);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitFuture = commit(syncCommitEvent);

Timer requestTimer = time.timer(timeout.toMillis());
awaitPendingAsyncCommitsAndExecuteCommitCallbacks(requestTimer, true);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really clear to me what point this serves. Why do we care about the async commits if they were all sent prior to the sync commit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @hachikuji, this line was introduced in #15613.

    /**
     * Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)}
     * (or similar) are guaranteed to have their callbacks invoked prior to completion of this method.
     */
    @Override
    public void commitSync() {
        delegate.commitSync();
    }

Copy link
Contributor

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @FrankYang0529 ! Left some comments, and there is a bit we're missing I believe: on the consumer close, we trigger a commit of all consumed offsets too (still retrieving them directly from the subscription state in the app thread), let's update that one too please.

void commitSyncAllConsumed(final Timer timer) {
Map<TopicPartition, OffsetAndMetadata> allConsumed = subscriptions.allConsumed();

Thanks!

@@ -911,7 +911,7 @@ public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition
wakeupTrigger.setActiveTask(event.future());
try {
final Map<TopicPartition, OffsetAndMetadata> committedOffsets = applicationEventHandler.addAndGet(event);
committedOffsets.forEach(this::updateLastSeenEpochIfNewer);
ConsumerUtils.maybeUpdateLastSeenEpochIfNewer(metadata, committedOffsets);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metadata updates have been progressively moved to the background. Should we do the last push here and move this call to maybeUpdateLastSeenEpochIfNewer logic to the background ApplicationEventProcessor? (it's called from there already for all commit events, if we move this and the call in seek to their events process call we would probably be done with it and have it consistently updated only in the background)

@@ -258,4 +258,11 @@ public static KafkaException maybeWrapAsKafkaException(Throwable t, String messa
else
return new KafkaException(message, t);
}

public static void maybeUpdateLastSeenEpochIfNewer(ConsumerMetadata metadata, final Map<TopicPartition, OffsetAndMetadata> offsets) {
Copy link
Contributor

@lianetm lianetm Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we have all the calls to this metadata update consistently in the background, it will probably make sense to move this helper closer to AppEventProcessor where it's used, what do you think?

acquireAndEnsureOpen();
try {
AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets);
lastPendingAsyncCommit = commit(asyncCommitEvent).whenComplete((r, t) -> {
lastPendingAsyncCommit = commit(asyncCommitEvent).whenComplete((committedOffsets, t) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we rename 't' to error or something clearer? just like you did with 'r'

@@ -20,14 +20,15 @@
import org.apache.kafka.common.TopicPartition;

import java.util.Map;
import java.util.Optional;

/**
* Event to commit offsets waiting for a response and retrying on expected retriable errors until
* the timer expires.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe worth adding that if the event contains no offsets it will commit all consumed offsets retrieved from the subscription state, makes sense?

SyncCommitEvent event = new SyncCommitEvent(Optional.of(offsets), deadlineMs);

setupProcessor(true);
doReturn(true).when(metadata).updateLastSeenEpochIfNewer(tp, 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this line needed? I would expect that we only care about verifying that the metadata was updated, done on ln 260 (no need to return any specific value)


setupProcessor(true);
doReturn(offsets).when(subscriptionState).allConsumed();
doReturn(true).when(metadata).updateLastSeenEpochIfNewer(tp, 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed?

AsyncCommitEvent event = new AsyncCommitEvent(Optional.of(offsets));

setupProcessor(true);
doReturn(true).when(metadata).updateLastSeenEpochIfNewer(tp, 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed?


setupProcessor(true);
doReturn(offsets).when(subscriptionState).allConsumed();
doReturn(true).when(metadata).updateLastSeenEpochIfNewer(tp, 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed?

setupProcessor(true);

processor.process(event);
Map<TopicPartition, OffsetAndMetadata> committedOffsets = assertDoesNotThrow(() -> event.future().get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about verifying that we don't generate requests for empty offsets?

verify(commitRequestManager, never()).commitSync(any(), anyLong());

(let's add it also to the other tests that play with the different commits and empty offsets)

@FrankYang0529
Copy link
Member Author

FrankYang0529 commented Oct 30, 2024

Hi @lianetm, thanks for the review. I address all comments with following update:

  • Move ConsumerUtils#maybeUpdateLastSeenEpochIfNewer to ApplicationEventProcessor#maybeUpdateLastSeenEpochIfNewer and do all metadata update in the background thread.
  • Add testFetchCommittedOffsetsEvent to ApplicationEventProcessorTest.
  • When autoCommitEnabled is true, using Optional.empty for commitSync function. Update AsyncKafkaConsumerTest#testAutoCommitSyncEnabled to check SyncCommitEvent#offsets is not present.
  • Update default auto commit enabled to false in AsyncKafkaConsumerTest. Before this PR, we also wait for SyncCommitEvent and we didn't mock complete SyncCommitEvent, but previous test result didn't be blocked on close function by the event. If the default auto commit enabled is true and the test case calls close function evidently, it may be blocked by SyncCommitEvent and wait for 30 seconds. I'm not sure whether the root cause is that we change the future result from Void to Map<TopicPartition, OffsetAndMetadata>.

Copy link
Collaborator

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @FrankYang0529!

Overall looks good.

Can we move the bulk of the logic that was added in ApplicationEventProcessor methods into a new method in CommitRequestManager? It seems like we might be able to consolidate the code because the only difference (that I see) between the two is that one calls the CommitRequestManager’s commitAsync() method and the other calls commitSync().

Thanks!

} catch (Exception e) {
// consistent with async auto-commit failures, we do not propagate the exception
log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumed, e.getMessage());
log.warn("Synchronous auto-commit failed: {}", e.getMessage());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unrelated to this PR, but should we log the stack trace of the error? In commitAsync() we log the stack trace, not just the message.

Suggested change
log.warn("Synchronous auto-commit failed: {}", e.getMessage());
log.warn("Synchronous auto-commit failed", e);

Comment on lines 195 to 196
Map<TopicPartition, OffsetAndMetadata> offsets = event.offsets().isPresent() ?
event.offsets().get() : subscriptions.allConsumed();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor, but in my opinion this is a little clearer:

Suggested change
Map<TopicPartition, OffsetAndMetadata> offsets = event.offsets().isPresent() ?
event.offsets().get() : subscriptions.allConsumed();
Map<TopicPartition, OffsetAndMetadata> offsets = event.offsets().orElseGet(subscriptions::allConsumed);

@@ -187,22 +188,62 @@ private void process(final CreateFetchRequestsEvent event) {

private void process(final AsyncCommitEvent event) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move the bulk of this logic into a new CommitRequestManager method? We try to keep the process() methods lightweight.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kirktrue, do you mean that update CommitRequestManager like following?

    public CompletableFuture<Void> commitSync(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets,
                                              final long deadlineMs) {
        if (!offsets.isPresent() || offsets.get().isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> result = new CompletableFuture<>();
        OffsetCommitRequestState requestState = createOffsetCommitRequest(offsets.get(), deadlineMs);
        commitSyncWithRetries(requestState, result);
        return result;
    }

If we do that, we have to run maybeUpdateLastSeenEpochIfNewer(offsets) after future is complete. Is there any side effect for this change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey @FrankYang0529 . I cannot see any undesired side-effect, and the change actually makes sense to keep the committed offsets related logic in the commitRequestManager.

I expect we just need to pass the metadata object into the commit manager constructor (it's already available in RequestManagers), and ensure that the commit mgr calls the maybeUpdateLastSeenEpochIfNewer from the fetchCommitted, commitSync and commitAsync (exactly how it's now done in the appEventProcessor)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that whether the order of calling maybeUpdateLastSeenEpochIfNewer(offsets) affects the result. After checking again, I think it's okay to move the function to CommitRequestManager. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, and I would say it's fine if we move it to the commitMgr as you did, as long as we update metadata before completing the result futures that the mgr returns

@@ -508,7 +505,7 @@ public void testSubscriptionRegexEvalOnPollOnlyIfMetadataChanges() {
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
"client-id");
"client-id", false);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move the new false parameter to be aligned with the other parameters?

@@ -846,7 +836,7 @@ public void testUnsubscribeOnClose() {
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
"client-id"));
"client-id", false));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same alignment issue here...

@@ -863,7 +853,7 @@ public void testFailedPartitionRevocationOnClose() {
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
"client-id"));
"client-id", false));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same alignment issue here...

@@ -881,13 +871,17 @@ public void testAutoCommitSyncEnabled() {
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
"client-id");
"client-id", false);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same alignment issue here...

@@ -899,7 +893,7 @@ public void testAutoCommitSyncDisabled() {
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
"client-id");
"client-id", false);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same alignment issue here...

@@ -1700,7 +1690,7 @@ public void testEnsurePollEventSentOnConsumerPoll() {
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
"client-id");
"client-id", false);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same alignment issue here...

@FrankYang0529 FrankYang0529 force-pushed the KAFKA-17480 branch 2 times, most recently from 9edef7b to a1ac1c0 Compare November 1, 2024 07:07
Comment on lines 236 to 221
Map<TopicPartition, OffsetAndMetadata> offsets = event.offsets().orElseGet(subscriptions::allConsumed);
if (offsets.isEmpty()) {
event.future().complete(Collections.emptyMap());
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also this decision of committing all consumed if the commit event has empty offsets could be moved to the commitRequestManager.commitSync to keep all the commit logic there in one place.

Copy link
Contributor

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @FrankYang0529 , thanks for the updates! Regarding this change:

Update default auto commit enabled to false in AsyncKafkaConsumerTest. Before this PR, we also wait for SyncCommitEvent and we didn't mock complete SyncCommitEvent, but previous test result didn't be blocked on close function by the event. If the default auto commit enabled is true and the test case calls close function evidently, it may be blocked by SyncCommitEvent and wait for 30 seconds. I'm not sure whether the root cause is that we change the future result from Void to Map<TopicPartition, OffsetAndMetadata>

I expect that the root cause is that before this PR we completed the commit sync event in the app thread if there were no consumed offsets (so the tests didn't require a mocked completion to close). With this PR we always block on the event, even if no consumed (waiting for the background to check the subscription state). So the failures you faced are expected and I agree we need to update the test. Turning the auto-commit to false as you did makes sense to me given that we have explicit tests for the autoCommit=true combinations.

The only thing we need to review/fix is that there are several tests that call completeCommitSyncApplicationEventSuccessfully but do not perform commitSync or set the auto-commit to true. Those tests don't need to call that anymore I would expect. Could you check and remove it? Thanks!

@@ -863,7 +861,8 @@ public void testAutoCommitSyncDisabled() {
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
"client-id");
"client-id",
false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not introduced by this PR, but this change made me notice that this test is not really testing what it should. We're verifying that no commit sync event is generated if autoCommit false, but we're only doing consumer.subscribe & seek (which will never generate a sync event anyways). I expect this test would also pass if we pass autoCommit=true.

This test should explicitly call consumer.close after the seek, and then verify that no SyncEvent is generated if autoCommit=false, makes sense?

@@ -844,14 +837,19 @@ public void testAutoCommitSyncEnabled() {
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
"client-id");
"client-id",
false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this false is confusing given that the test is called "testAutoCommitSyncEnabled", but I notice it's because the test does not take the config into account and calls the commitSyncAllConsumed explicitly, so what about renaming it to testCommitSyncAllConsumed?

@FrankYang0529
Copy link
Member Author

Hi @lianetm / @kirktrue, thanks for the review and suggestion. I do following change:

  • Move offsets option checking and maybeUpdateLastSeenEpochIfNewer to CommitRequestManager.
  • Move related test cases from ApplicationEventProcessorTest to CommitRequestManagerTest.
  • Remove unnecessary completeCommitSyncApplicationEventSuccessfully from AsyncKafkaConsumerTest.
  • Update testAutoCommitSyncDisabled to call close explicitly.
  • Change testAutoCommitSyncEnabled to testCommitSyncAllConsumed.

Copy link
Contributor

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @FrankYang0529 ! Here are some more comments.

@@ -198,23 +198,47 @@ private void process(final CreateFetchRequestsEvent event) {
}

private void process(final AsyncCommitEvent event) {
if (!requestManagers.commitRequestManager.isPresent()) {
if (requestManagers.commitRequestManager.isEmpty()) {
event.future().complete(Map.of());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we better complete exceptionally? I expect this would only happen during development phase if a bug is introduced, but still, it would be easier to catch if we make the app event fail because the expected manager is not present. I notice there's no consistency in how we handle these (some process calls complete exceptionally, others just return/complete, but I would lean towards not swallowing the unlikely/unexpected error here)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. There are some process don't align with the rule. Do we want to create a Jira to handle it? Or I can update them in this PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uhm I would leave it out of this PR. Even though there are some other process calls that should probably fail if the expected manager is not present, in some we shouldn't fail and just return/no-op as they do now (ie. commitOnClose no-op if no commit mgr, triggered from close regardless of groupId present or not).

}

private void process(final SyncCommitEvent event) {
if (!requestManagers.commitRequestManager.isPresent()) {
if (requestManagers.commitRequestManager.isEmpty()) {
event.future().complete(Map.of());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

completeExceptionally?

Comment on lines 231 to 238
future.whenComplete((offsets, throwable) -> {
if (throwable != null) {
log.error("Committing offsets failed", throwable);
event.future().completeExceptionally(throwable);
} else {
event.future().complete(offsets);
}
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we simplify this to future.whenComplete(complete(event.future())) ?

Comment on lines 209 to 216
future.whenComplete((offsets, throwable) -> {
if (throwable != null) {
log.error("Committing offsets failed", throwable);
event.future().completeExceptionally(throwable);
} else {
event.future().complete(offsets);
}
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we simplify this to future.whenComplete(complete(event.future()))?

Comment on lines 252 to 258
future.whenComplete((value, exception) -> {
if (exception != null)
event.future().completeExceptionally(exception);
else {
event.future().complete(value);
}
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not reusing future.whenComplete(complete(event.future())) like before?

Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp, offsetAndMetadata);
doReturn(offsets).when(subscriptionState).allConsumed();

CommitRequestManager commitRequestManager = create(true, 100);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -532,6 +544,7 @@ private void fetchOffsetsWithRetries(final OffsetFetchRequestState fetchRequest,
}
if (error == null) {
result.complete(res);
maybeUpdateLastSeenEpochIfNewer(res);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

commitRequest.future.whenComplete((committedOffsets, error) -> {
if (error != null) {
asyncCommitResult.completeExceptionally(commitAsyncExceptionForError(error));
} else {
asyncCommitResult.complete(null);
asyncCommitResult.complete(commitOffsets);
maybeUpdateLastSeenEpochIfNewer(commitOffsets);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should probably go before completing the "asyncCommitResult" future, to ensure that we don't signal the caller that the commit completed if we haven't updated the metadata yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, thinking more about this, we shouldn't wait for receiving a response to the request to update the metadata object with the offsets we sent in the request (before this PR and on the classic consumer, we update metadata before sending any request). So this call should really go as soon as we determine the set of commitOffsets (after the early return if (commitOffsets.isEmpty()) maybe?)

pendingRequests.addOffsetCommitRequest(requestAttempt);

// Retry the same commit request while it fails with RetriableException and the retry
// timeout hasn't expired.
requestAttempt.future.whenComplete((res, error) -> {
if (error == null) {
result.complete(null);
result.complete(requestAttempt.offsets);
maybeUpdateLastSeenEpochIfNewer(requestAttempt.offsets);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

related to comment from above, should we move this to commitSync, before we send the request or receive a response

@@ -187,22 +188,62 @@ private void process(final CreateFetchRequestsEvent event) {

private void process(final AsyncCommitEvent event) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, and I would say it's fine if we move it to the commitMgr as you did, as long as we update metadata before completing the result futures that the mgr returns

@FrankYang0529 FrankYang0529 force-pushed the KAFKA-17480 branch 2 times, most recently from b31521b to feec864 Compare November 5, 2024 14:41
Copy link
Contributor

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @FrankYang0529 ! A few more comments.

commitRequest.future.whenComplete((committedOffsets, error) -> {
if (error != null) {
asyncCommitResult.completeExceptionally(commitAsyncExceptionForError(error));
} else {
asyncCommitResult.complete(null);
asyncCommitResult.complete(commitOffsets);
maybeUpdateLastSeenEpochIfNewer(commitOffsets);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, thinking more about this, we shouldn't wait for receiving a response to the request to update the metadata object with the offsets we sent in the request (before this PR and on the classic consumer, we update metadata before sending any request). So this call should really go as soon as we determine the set of commitOffsets (after the early return if (commitOffsets.isEmpty()) maybe?)

pendingRequests.addOffsetCommitRequest(requestAttempt);

// Retry the same commit request while it fails with RetriableException and the retry
// timeout hasn't expired.
requestAttempt.future.whenComplete((res, error) -> {
if (error == null) {
result.complete(null);
result.complete(requestAttempt.offsets);
maybeUpdateLastSeenEpochIfNewer(requestAttempt.offsets);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

related to comment from above, should we move this to commitSync, before we send the request or receive a response

@@ -72,6 +72,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@SuppressWarnings("ClassDataAbstractionCoupling")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you double check if we really need this after the latest changes? (I don't see many new deps in the changes anymore)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we still need this line, or it shows following error.

[ant:checkstyle] [ERROR] apache/kafka/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:75:1: Class Data Abstraction Coupling is 27 (max allowed is 25) classes [ApplicationEventProcessor, AssignmentChangeEvent, AsyncCommitEvent, CheckAndUpdatePositionsEvent, CommitOnCloseEvent, CompletableFuture, CreateFetchRequestsEvent, FetchCommittedOffsetsEvent, ListOffsetsEvent, LogContext, MockRebalanceListener, MockTime, NetworkClientDelegate.PollResult, OffsetAndMetadata, PollEvent, RequestManagers, ResetOffsetEvent, SeekUnvalidatedEvent, SubscriptionState, SubscriptionState.FetchPosition, SyncCommitEvent, TopicMetadataEvent, TopicPartition, TopicPatternSubscriptionChangeEvent, TopicSubscriptionChangeEvent, UnsubscribeEvent, UpdatePatternSubscriptionEvent]. [ClassDataAbstractionCoupling]

if (!requestManagers.commitRequestManager.isPresent()) {
if (requestManagers.commitRequestManager.isEmpty()) {
event.future().completeExceptionally(new KafkaException("Unable to async commit " +
"offset because the CommittedRequestManager is not available. Check if group.id was set correctly"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: CommitRequestManager

if (!requestManagers.commitRequestManager.isPresent()) {
if (requestManagers.commitRequestManager.isEmpty()) {
event.future().completeExceptionally(new KafkaException("Unable to sync commit " +
"offset because the CommittedRequestManager is not available. Check if group.id was set correctly"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: CommitRequestManager

}

private void process(final FetchCommittedOffsetsEvent event) {
if (!requestManagers.commitRequestManager.isPresent()) {
if (requestManagers.commitRequestManager.isEmpty()) {
event.future().completeExceptionally(new KafkaException("Unable to fetch committed " +
"offset because the CommittedRequestManager is not available. Check if group.id was set correctly"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CommitRequestManager

Comment on lines 419 to 420
ExecutionException e = assertThrows(ExecutionException.class, () -> event.future().get());
assertInstanceOf(IllegalStateException.class, e.getCause());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertFutureThrows?

Comment on lines 457 to 458
ExecutionException e = assertThrows(ExecutionException.class, () -> event.future().get());
assertInstanceOf(IllegalStateException.class, e.getCause());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertFutureThrows?

Copy link
Contributor

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just minor suggestions left. Also please merge trunk latest changes if you haven't. Thanks!

@FrankYang0529
Copy link
Member Author

Hi @lianetm, thanks for the review. I addressed all comments and rebased trunk.


setupProcessor(false);
processor.process(event);
assertThrows(ExecutionException.class, () -> event.future().get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should also assertFutureThrows(KafkaException) right? (and nit/taste, I would import TestUtils.assertFutureThrows and simplify the code to have calls to assertFutureThrows)

Copy link
Contributor

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the updates @FrankYang0529 ! LGTM.

@FrankYang0529
Copy link
Member Author

Thanks for your review and patience. 🙏

@lianetm lianetm merged commit b213c64 into apache:trunk Nov 7, 2024
8 checks passed
@FrankYang0529 FrankYang0529 deleted the KAFKA-17480 branch November 7, 2024 14:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
clients consumer ctr Consumer Threading Refactor (KIP-848) KIP-848 The Next Generation of the Consumer Rebalance Protocol
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants