-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17154: New consumer subscribe may join group without a call to consumer.poll #17165
Conversation
Hi @FrankYang0529 , I opened a PR for this behavior. |
df12479
to
42ac730
Compare
Hi @FrankYang0529 , I will close my PR because they do same thing and I leave a comment for |
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node); | ||
KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, | ||
assignor, true, groupInstanceId); | ||
consumer.subscribe(singletonList(topic)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we need to make application thread
sleep here to make sure network thread
does not send join request.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @TaiJuWu, application
and network
are different threads. I'm not sure why making application
thread sleep can let network
thread doesn't send join request. Also , this test would like to make sure KafkaConsumer#subscribe
doesn't send join request, so we should not have other operations to make the result. Could you elaborate more on your idea? Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the confusion.
The reason for making the application sleep here is to give the network enough time to send the join request even without poll. This helps to reproduce the bug more easily and consistently.
Additionally, to ensure the behavior we’ve promised in the documentation, the two operations (subscribe and poll) should not be too close together in the timeline in this test. Does that make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @TaiJuWu, thanks for the suggestion. It's hard to say waiting how much time can make sure KafkaConsumer#subscribe
doesn't send a join request. I add two unit tests to check the behavior.
In ApplicationEventProcessorTest#testSubscriptionChangeEvent
, we check ApplicationEventProcessor#process(SubscriptionChangeEvent)
doesn't call ConsumerMembershipManager#maybeJoinGroup
.
In ConsumerMembershipManagerTest#testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup
, we check ConsumerMembershipManager#onSubscriptionUpdated
keeps member state as UNSUBSCRIBED
and calling transitionToJoining
in maybeJoinGroup
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your explanation.
42ac730
to
23ee599
Compare
Hey @FrankYang0529, so seems this draft is the PR we're keeping right? Let me know when it's ready for review and I'll be happy to take a look. Thanks! |
23ee599
to
7b83257
Compare
Hi @lianetm, yes, the PR is ready for review. Thank you. |
91f5a98
to
804f17c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @FrankYang0529 , thanks for the patch! Left some comments for consideration.
membershipManager.onSubscriptionUpdated(); | ||
assertFalse(membershipManager.subscriptionUpdated()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these 2 lines are a bit confusing (we trigger "onSubscriptionUpdated" and expect "subscriptionUpdated" to be false). I would say that the boolean var name is what we should review, because it truly indicates if the member should join (related to comment on the membershipManager.subscriptionUpdated var)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change variable from subscriptionUpdated
to shouldTransitionToJoining
and add more comments for it. Thank you.
@@ -1617,12 +1617,25 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable | |||
} | |||
|
|||
@Test | |||
public void testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() { | |||
public void testOnSubscriptionUpdatedDoNothingIfInGroup() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about testSubscriptionUpdateDoesNotTransitionToJoining
(or similar)? (not transitioning to joining is really the core bit we were missing and are fixing/test here..and is actually consistent with the name you have for the similar test right below)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion. I think whether the member is in the group is important as well, because it determines whether to change shouldTransitionToJoining
variable. I update the test case name to testOnSubscriptionUpdatedDoesNotSetShouldTransitionToJoiningIfInGroup
and another test case below to testOnSubscriptionUpdatedSetShouldTransitionToJoiningIfNotInGroup
. Do you think it makes sense? Thank you.
public void testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() { | ||
ConsumerMembershipManager membershipManager = createMembershipManager(null); | ||
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); | ||
membershipManager.onSubscriptionUpdated(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we verify here that there was no transitionToJoining
? (to be sure that the transition is only triggered after the call to maybeJoinGroup
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, added it. Thanks.
@@ -3443,6 +3443,34 @@ public void testPreventMultiThread(GroupProtocol groupProtocol) throws Interrupt | |||
} | |||
} | |||
|
|||
@ParameterizedTest | |||
@EnumSource(value = GroupProtocol.class) | |||
public void testPollSendRequestForRebalance(GroupProtocol groupProtocol) throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo testPollSendsRequestForRebalance? (and maybe clearer testPollSendsRequestToJoin
?)
@@ -189,6 +191,9 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl | |||
|
|||
private final Time time; | |||
|
|||
// AtomicBoolean to track if the subscription has been updated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is truly more to know if we should join after the subscription has been updated right? (true if not in group)
a01febd
to
23d386f
Compare
Hi @lianetm, I found that |
Hey @FrankYang0529, just in case it helps, we have this https://issues.apache.org/jira/browse/KAFKA-17286 related to that flaky test with some ideas in the comments. |
Yes, that helps. I found that both in trunk and this PR, the failed result is reproducible by adding Do you think we let https://issues.apache.org/jira/browse/KAFKA-17286 handle this flaky? Or we should solve it in this PR? Thanks. |
23d386f
to
1cebf23
Compare
Hey @FrankYang0529 , sorry for the late reply, I was traveling last week but I'm back, so will be taking another look at this. You can mark it as ready, we can leave the fix for the flaky test separate given that we already understand the root cause not related to this change. I'll take another look at this one between today and tomorrow. Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @FrankYang0529, I took another full pass and left some comments for consideration. Thanks!
@@ -466,6 +474,17 @@ String memberIdInfoForLog() { | |||
*/ | |||
public void onSubscriptionUpdated() { | |||
if (state == MemberState.UNSUBSCRIBED) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this check is still needed here? It made sense before because we were doing the actual join, but now we're just setting a flag (that looks more like a "subscriptionUpdated" var now). Then on the maybeJoinGroup
is where we need to check that the state if UNSUBSCRIBED
and subscriptionUpdated
then join. Makes sense?
@@ -246,7 +252,9 @@ private void process(final SubscriptionChangeEvent ignored) { | |||
*/ | |||
private void process(final UnsubscribeEvent event) { | |||
if (requestManagers.consumerHeartbeatRequestManager.isPresent()) { | |||
System.out.println("UnsubscribeEvent: " + event); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets remove this please
CompletableFuture<Void> future = requestManagers.consumerHeartbeatRequestManager.get().membershipManager().leaveGroup(); | ||
System.out.println("UnsubscribeEvent: " + future.isCompletedExceptionally()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
@@ -244,6 +244,7 @@ class PlaintextConsumerSubscriptionTest extends AbstractConsumerTest { | |||
}, waitTimeMs = 5000, msg = "An InvalidTopicException should be thrown.") | |||
|
|||
assertEquals(s"Invalid topics: [${invalidTopicName}]", exception.getMessage) | |||
Thread.sleep(1000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove?
@@ -524,7 +543,7 @@ public void transitionToJoining() { | |||
* to leave the group has been sent out. | |||
*/ | |||
public CompletableFuture<Void> leaveGroup() { | |||
if (isNotInGroup()) { | |||
if (isNotInGroup() || state == MemberState.JOINING) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite get how this change relates to this PR, why is it that we need this here?
Also, this is effectively ignoring the leave group (unsubscribe) if JOINING, which I would expect is not right (the member will remain JOINING and may even become STABLE, never leave). Before this change, JOINING + consumer.unsubscribe => LEAVING (member would run the full leave flow and attempt to send the leave HB). With this change, JOINING + consumer.unsubscribe => still JOINING. Is that the intention?
I know that the leave while joining has challenges to solve so that it can be processed correctly (KIP-1082), but I would expect that we keep the intention we had here, even after the KIP-1082 fixes: we should attempt to leave the group when there's a call to unsubscribe while waiting for the join response (JOINING) because the broker may have already processed the join.
1cebf23
to
29f1f4f
Compare
Hi @lianetm, I'm sorry that there was a testing commit. I've removed it. Also, I change |
29f1f4f
to
217df42
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @FrankYang0529 , thanks for the updates. Just some minor comments left.
* AtomicBoolean to track whether the subscription is updated. | ||
* If it's true and subscription state is UNSUBSCRIBED, the next {@link #maybeJoinGroup()} will change member state to JOINING. | ||
*/ | ||
private final AtomicBoolean hasSubscriptionUpdated = new AtomicBoolean(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: what about simply subscriptionUpdated
...to track whether the subscription has been updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, update the variable and function to subscriptionUpdated
. Thank you.
@@ -1617,10 +1617,25 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable | |||
} | |||
|
|||
@Test | |||
public void testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() { | |||
public void testOnSubscriptionUpdatedDoesNotSetShouldTransitionToJoiningIfInGroup() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no "shouldTransitionToJoining" var/concept anymore, so should we update this ? maybe simply ...DoesNotTransitionToJoiningIfInGroup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the test case name to testOnSubscriptionUpdatedDoesNotTransitionToJoiningIfInGroup
.
} | ||
|
||
@Test | ||
public void testOnSubscriptionUpdatedSetShouldTransitionToJoiningIfNotInGroup() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
testOnSubscriptionUpdatedTransitionsToJoiningIOnPollfNotInGroup
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed the test case name to testOnSubscriptionUpdatedTransitionsToJoiningOnPollIfNotInGroup
. Thanks.
@@ -1137,10 +1137,25 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable | |||
} | |||
|
|||
@Test | |||
public void testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() { | |||
public void testOnSubscriptionUpdatedDoesNotSetShouldTransitionToJoiningIfInGroup() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's make sure to align these with the names in the consumer tests if they change with the comments above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, align test case names between ConsumerMembershipManagerTest
and ShareMembershipManagerTest
.
|
||
@ParameterizedTest | ||
@ValueSource(booleans = {true, false}) | ||
public void testSubscriptionChangeEvent(boolean withGroupId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say this test only makes sense withGroupId=true right? if there is no groupId, the processor will have null HBMgr, MembershipMgr (there cannot be a SubscriptionChange
event without a groupId, the api call to subscribe
requires groupId)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I leave withGroupId=true
case here. Thank you.
217df42
to
a091698
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @FrankYang0529 , could you please update the PR description where it says:
When calling AsyncKafkaConsumer#subscribe, send the JoinRequest if the flag is true.
I guess you meant when calling poll transition to joining if the subscription has been updated?
With that (and the last nit up to you) this looks good to me. Thanks!
* from the {@link #onSubscriptionUpdated} to fulfill the requirement of the "rebalances will only occur during an | ||
* active call to {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(Duration)}" | ||
*/ | ||
public void maybeJoinGroup() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: what do you think about calling this something like onConsumerPoll
? Just to make the func self explanatory (on poll -> transition to joining), no need to find usages to understand what this is all about. No strong feeling though, up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion. I updated PR description and change maybeJoinGroup
to onConsumerPoll
.
…consumer.poll Signed-off-by: PoAn Yang <[email protected]>
a091698
to
87575f2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @FrankYang0529! LGTM.
FYI, I filed https://issues.apache.org/jira/browse/KAFKA-17623 for a flaky consumer integration test that I noticed here, has been flaky for a while. |
Okay. I will take a look. Thanks for filing the issue. |
This PR causes the following build error:
|
uhm builds completed successfully but I guess it conflicted with a recent merge. Checking now and will open PR right away. Thanks @chia7712 ! |
Removing dup method here #17291 |
Once ASF signs off on the merge queue, we can completely avoid this class of failure on trunk. According to Infra team, we are a month or two away from being able to enable it. |
That will be helpful, it was really my miss not asking to have this rebased one last time. |
…consumer.poll (apache#17165) Reviewers: Lianet Magrans <[email protected]>, TaiJuWu <[email protected]>
…consumer.poll (apache#17165) Reviewers: Lianet Magrans <[email protected]>, TaiJuWu <[email protected]>
To fulfill "rebalances will only occur during an active call to KafkaConsumer#poll(Duration)", we should not send
JoinRequest
afterAsyncKafkaConsumer#subscribe
. Add a flagsubscriptionUpdated
toAbstractMembershipManager#onSubscriptionUpdated
. When callingAsyncKafkaConsumer#subscribe
, set the flag to true. When callingAsyncKafkaConsumer#poll
, send theJoinRequest
if the flag is true.Committer Checklist (excluded from commit message)