-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16792: Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0) #16982
Conversation
@@ -2471,6 +2477,8 @@ public void testCurrentLag(GroupProtocol groupProtocol) { | |||
|
|||
// poll once to update with the current metadata | |||
consumer.poll(Duration.ofMillis(0)); | |||
TestUtils.waitForCondition(() -> client.requests().stream().anyMatch(request -> request.requestBuilder().apiKey().equals(ApiKeys.FIND_COORDINATOR)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting, makes sense how you keep a single poll, and we just have to wait a bit for the background to actually generate the request. What about encapsulating this bit to make it clearer in all places it's used, to end up with something like:
TestUtils.waitForCondition(() -> requestGenerated(ApiKeys.FIND_COORDINATOR))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated it. Thanks for the suggestion.
1de6793
to
5581e1b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, metadata.fetch().nodes().get(0))); | ||
|
||
consumer.seek(tp0, 50L); | ||
|
||
if (groupProtocol == GroupProtocol.CONSUMER) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see how you make the test pass, and that is fine and works, but I believe we could greatly simplify this test if we get the FindCoord, OffsetFetch and FetchRequests out of the picture, which seem to me are really not relevant to this test?
This test is all about partition offsets (end offsets stored in the leader), compared to a position set manually with seek, so not related at all with committed offsets or affected by fetch.
So, this is the simplification I'm thinking about:
- we create the consumer without groupId -> this will ensure we don't send any OffsetFetch request. We don't really need them given that we're only playing with the partition offsets
- we pause the partition right after assign -> this ensures that we don't issue fetch requests. We don't really need them for this test, and having them makes the test different for both consumers
With those 2 small changes, I would expect we can keep the same test for both consumer, without any specifics for the new consumer, and it would still be true to its purpose, testing what it has always tested. What do you think?
Hey @FrankYang0529 , I took another look, left a comment for consideration. Thanks! |
5581e1b
to
bcbc49d
Compare
} catch (UnsupportedVersionException e) { | ||
return true; | ||
} | ||
}, "Failed to fetch stable offset"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this msg is not quite right. If this condition fails, it means that the "consumer failed to throw UnsupportedVersionException on poll" (we actually expect that it fails to fetch the offsets)
|
||
// no error for no end offset (so unknown lag) | ||
assertEquals(OptionalLong.empty(), consumer.currentLag(tp0)); | ||
|
||
// poll once again, which should return the list-offset response | ||
// and hence next call would return correct lag result | ||
client.respond(listOffsetsResponse(singletonMap(tp0, 90L))); | ||
Optional<ClientRequest> listOffsetRequest = client.requests().stream().filter(request -> request.requestBuilder().apiKey().equals(ApiKeys.LIST_OFFSETS)).findFirst(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this line seems tricky to read and we need it twice (for now). Would it be clearer to have a helper along the lines of findRequest(client, ApiKeys.LIST_OFFSETS)
?
final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5); | ||
client.respond(fetchResponse(singletonMap(tp0, fetchInfo))); | ||
assertTrue(fetchRequest.isPresent()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we probably don't need this assertion given that we already checked that the request was generated on the waitForCondition on ln 2510?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @lianetm, thanks for the review and suggestion. I addressed other comments. For this one, I would like to keep it. If we remove it, there may have warning message like 'Optional.get()' without 'isPresent()' check
.
Hey @FrankYang0529 , some other minor comments left, almost there. Thanks! |
bcbc49d
to
1e821a1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates @FrankYang0529 ! Couple of minor comments left.
@EnumSource(value = GroupProtocol.class, names = "CLASSIC") | ||
public void testListOffsetShouldUpdateSubscriptions(GroupProtocol groupProtocol) { | ||
@EnumSource(GroupProtocol.class) | ||
public void testListOffsetShouldUpdateSubscriptions(GroupProtocol groupProtocol) throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we don't need this throw Interrupted here anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the reminder. Removed it.
} | ||
|
||
private boolean requestGenerated(MockClient client, ApiKeys apiKey) { | ||
return client.requests().stream().anyMatch(request -> request.requestBuilder().apiKey().equals(apiKey)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we maybe reuse and simplify here to return findRequest(client, apiKey).isPresent()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I applied @kirktrue's suggestion, so we can't use findRequest(client, apiKey).isPresent()
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates, @FrankYang0529!
Just a couple of questions and a minor readability change request.
Thanks!
private Optional<ClientRequest> findRequest(MockClient client, ApiKeys apiKey) { | ||
return client.requests().stream().filter(request -> request.requestBuilder().apiKey().equals(apiKey)).findFirst(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It appears that the calls to this method quickly fail if there wasn't a match for the request type. How about something like this:
private Optional<ClientRequest> findRequest(MockClient client, ApiKeys apiKey) { | |
return client.requests().stream().filter(request -> request.requestBuilder().apiKey().equals(apiKey)).findFirst(); | |
} | |
private ClientRequest findRequest(MockClient client, ApiKeys apiKey) { | |
Optional<ClientRequest> request = client.requests().stream().filter(request -> request.requestBuilder().apiKey().equals(apiKey)).findFirst(); | |
assertTrue(request.isPresent(), "No " + apiKey + " request was submitted to the client... or whatever"); | |
return request.get(); | |
} |
|
||
// poll once to update with the current metadata | ||
consumer.poll(Duration.ofMillis(0)); | ||
client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, metadata.fetch().nodes().get(0))); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, why do we no longer want to ensure a find coordinator request/response occurred?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like @lianetm's suggestion #16982 (comment). In testListOffsetShouldUpdateSubscriptions
, we want to check endOffsets
function. It doesn't need group coordinator.
1e821a1
to
15c902c
Compare
@EnumSource(GroupProtocol.class) | ||
public void testFetchStableOffsetThrowInPoll(GroupProtocol groupProtocol) throws InterruptedException { | ||
setupThrowableConsumer(groupProtocol); | ||
TestUtils.waitForCondition(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this change still has me thinking. This test is about a single call to poll(ZERO)
, that is expected to throw an exception, but interesting fact is that the exception is generated when building the request here (it does not require the actual send or response). So I wonder if the async consumer should somehow ensure that when poll returns (even with low time), it has allowed for at least one run of the background thread runOnce?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @lianetm, the ApplicationEventProcessor#process(PollEvent)
only resets timer for some request managers. Even if we use ApplicationEventHandler#addAndGet
, we still can't guarantee runOnce
is executed. We may need ConsumerNetworkThread
to send a BackgroundEvent
to make sure the runOnce
is happened. However, this approach may make the process more complex. If we want to make this, we probably can create another Jira to handle this. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree that it's better to discuss separately. Definitely we could consider to align the behaviour of the 2 consumers a bit more regarding poll with low timeouts and the guarantees of requests sent, but there are tradeoffs to consider as @chia7712 pointed out on his comment on the Jira (totally agree on the trade-offs).
|
||
client.prepareResponse(listOffsetsResponse(singletonMap(tp0, 90L))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could client.prepareResponse
be moved to newConsumer
before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 could you address this? I guess the suggestion comes from the recent fix to some tests that were flaky for a similar situation (#17056)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @lianetm and @TaiJuWu, thanks for the suggestion, but I think we can keep this line just before consumer.endOffsets
.
In AsyncKafkaConsumer#endOffsets
, it sends ListOffsetsEvent
and ApplicationEventProcessor#process(ListOffsetsEvent)
calls OffsetsRequestManager#fetchOffsets
.
In OffsetsRequestManager#fetchOffsets
, it builds the request and AsyncKafkaConsumer#endOffsets
uses ApplicationEventHandler#addAndGet
to wait for the result, so I think it's safe to put it just before consumer.endOffsets
. WDYT? Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your explanations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense, but I still have a concern with this (wonder if it would make this test flaky). Let's say the moment we get to prepare this response, there is another request that got generated, wouldn't that make that the endOffsets request will never find the response? (Not sure if I'm missing details of how prepareResponse works)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the record, I was concerned here mostly about fetch requests (since we have a partition assigned and not paused), but just noticed we're not calling consumer.poll (so we shouldn't expect FETCH requests to be generated)
Still, worth mentioning that without this in-flight fix #17035 for Fetch, we could wrongly generate fetch requests even without polling. So enabling this test like this would be flaky for the new consumer without that fix I guess.
Hey @FrankYang0529 , there are some related build failures on |
15c902c
to
7037ef9
Compare
Hi @lianetm, thanks for the reminder. Rebased it to see latest result. |
@@ -2479,23 +2487,29 @@ public void testCurrentLag(GroupProtocol groupProtocol) { | |||
consumer.seek(tp0, 50L); | |||
consumer.poll(Duration.ofMillis(0)); | |||
// requests: list-offset, fetch | |||
assertEquals(2, client.inFlightRequestCount()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar to the reasoning that led to this fix, there is a check above for:
assertEquals(0, client.inFlightRequestCount())
I would say that may be flaky for the new consumer, right?
We cannot ensure that no requests will be generated just because the last api we called shouldn't generate one. Given that we called poll before (and there is a background thread running) I expect there could be requests generated for fetching and fetching offsets maybe? The point of that assert is to ensure there was no ListOffsets request generated, so I would say that we should look for that exact request type, and assert there is none. Makes sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @lianetm, thanks for catching this. I tried to add Thread.sleep(1000)
before assertEquals(0, client.inFlightRequestCount())
. It looks like both ListOffsetsRequest
and OffsetFetchRequest
will be sent after getting FindCoordinatorResponse
. I remove the assertion now. Do you think that we should keep this assertion for CLASSIC group protocol only? Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well definitely this bit is somewhere where the 2 consumers internasl are different, even though I see them conceptually aligned:
- what's the same in both consumers: a call to consumer lag will "signal" that it needs to retrieve the log endOffsets
- what's internally done different in both consumers: classic will only generate the request on the next poll (on the single thread it had and didn't want to block waiting for the offsets) vs async consumer, where the background thread poll will pick up the intention already expressed in the app thread and generate the request to get end offsets.
So I would say we keep the assertion (for the classic as you suggested), and it will be helpful to show this difference in the test actually. I would add an explanation for it too: Classic consumer does not send the LIST_OFFSETS right away (requires an explicit poll), different from the new async consumer, that will send the LIST_OFFSETS request in the background thread on the next background thread poll. Makes sense?
With all these tests we're enabling, worth running them repeatedly locally to try to spot any other flakiness similar to the ones we've been catching.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @lianetm, I added the assertion back and comments. I run 50 rounds on my laptop and there is no error. Let's check latest CI result. Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @FrankYang0529, this testCurrentLag still seems flaky even after the latest changes, I filed https://issues.apache.org/jira/browse/KAFKA-17560 with what I see and where I imagine the flakiness may be, but it needs more thinking probably. Here's a suggestion to make progress:
- we could leave
testCurrrentLag
disabled for the new consumer on this PR (just as it was before) - we unblock the other 2 tests, that seems to pass consistently after your changes
- we address
testCurrentLag
in a separate PR, with that jira I created (including the changes you had here, just that I think it needs more)
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @lianetm, thanks for filing the Jira. I would like to give "eventually updated" a try in this PR. If it still can't make the test stable, we can put it in next Jira. What do you think? Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @lianetm, do you think I need to rebase code, so we can have more CI running result? Or, it's good to keep current state? Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @lianetm, I rebased the code again today and latest CI result looks good. Could you help me review it when you have time? Thank you.
a10b657
to
daf9513
Compare
Hi @lianetm, I think the test result is stable now. Could you take a look when you have time? Thank you. |
fcac466
to
9cd97d3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @FrankYang0529, one more comment here. Almost there! Thanks!
|
||
final ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1)); | ||
assertEquals(5, records.count()); | ||
assertEquals(55L, consumer.position(tp0)); | ||
|
||
// correct lag result | ||
assertEquals(OptionalLong.of(45L), consumer.currentLag(tp0)); | ||
// For AsyncKafkaConsumer, subscription sate is updated in background, so the result will eventually be updated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo state
|
||
final ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1)); | ||
assertEquals(5, records.count()); | ||
assertEquals(55L, consumer.position(tp0)); | ||
|
||
// correct lag result | ||
assertEquals(OptionalLong.of(45L), consumer.currentLag(tp0)); | ||
// For AsyncKafkaConsumer, subscription sate is updated in background, so the result will eventually be updated. | ||
TestUtils.waitForCondition(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change really needed? In this case we just did a successful fetch, so position is updated to 55 (ln 2541). We should be able to retrieve the lag of 45 (end offsets is already known to be 100). (Is not exactly the same case as above, where we needed to allow for the ListOffsets response to be processed in the background). Makes sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you're right, if consumer.position
can get 45
, then the subscription state has already been updated. Remove TestUtils.waitForCondition
here. Thanks.
…ly for new consumer with poll(0) Signed-off-by: PoAn Yang <[email protected]>
9cd97d3
to
b914fba
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @FrankYang0529 ! LGTM.
…ly for new consumer with poll(0) (apache#16982) Reviewers: Lianet Magrans <[email protected]>, TaiJuWu <[email protected]>, Kirk True <[email protected]>, TengYao Chi <[email protected]>
Fix following tests for CONSUMER group protocol:
Committer Checklist (excluded from commit message)