-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17465: Make getMembersFromGroup be non-blocking #17080
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @TaiJuWu
I have few comments, PTAL
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
@TaiJuWu |
Thanks for your remainder, I will address your comments later. |
Hi @frankvicky ,thanks for review, all comments are addressed. |
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TaiJuWu thanks for your updates. a couple of comments are left.
} else { | ||
memberIdentity.setMemberId(member.consumerId()); | ||
List<MemberIdentity> membersToRemove = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please initialize the size by res.members()
?
memberIdentity.setMemberId(member.consumerId()); | ||
List<MemberIdentity> membersToRemove = new ArrayList<>(); | ||
for (final MemberDescription member : res.members()) { | ||
MemberIdentity memberIdentity = new MemberIdentity().setReason(reason); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you prefer to use lambda function, please try to eliminate the local variable. for example:`
membersToRemove.add(member.groupInstanceId()
.map(id -> new MemberIdentity().setGroupInstanceId(id))
.orElseGet(() -> new MemberIdentity().setMemberId(member.consumerId()))
.setReason(reason));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @chia7712 , thanks for review.
I rewrite this part, please take a look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for update.
I have some comments.
RemoveMembersFromConsumerGroupHandler.newFuture(groupId); | ||
|
||
KafkaFutureImpl<List<MemberIdentity>> f; | ||
if (options.removeAll()) f = getMembersFromGroup(groupId, reason); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I would recommend not omitting the curly braces, or at the very least, it should break the line.
final SimpleAdminApiFuture<CoordinatorKey, Map<MemberIdentity, Errors>> future = | ||
RemoveMembersFromConsumerGroupHandler.newFuture(groupId); | ||
|
||
KafkaFutureImpl<List<MemberIdentity>> f; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rename this variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All comments are addressed, thanks!
Jira: https://issues.apache.org/jira/browse/KAFKA-17465
Committer Checklist (excluded from commit message)