Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17948: Potential issue during tryComplete and onComplete simultaneous calls to access global variables #17739

Open
wants to merge 11 commits into
base: trunk
Choose a base branch
from

Conversation

adixitconfluent
Copy link
Contributor

@adixitconfluent adixitconfluent commented Nov 10, 2024

About

This PR addresses the following issues -

  1. KAFKA-17984: Potential issue during tryComplete and onComplete simultaneous calls to access global variables
  2. Pending minor comments on AK PR KAFKA-17743: Add minBytes implementation to DelayedShareFetch #17539 -
    a. KAFKA-17743: Add minBytes implementation to DelayedShareFetch #17539 (comment)
    b. KAFKA-17743: Add minBytes implementation to DelayedShareFetch #17539 (comment)
    c. KAFKA-17743: Add minBytes implementation to DelayedShareFetch #17539 (comment)
    d. KAFKA-17743: Add minBytes implementation to DelayedShareFetch #17539 (comment) - Since we update fetchOffsetMetadata everytime we change endOffset, I don't think that we should have dependency on findNextFetchOffset while getting the value of fetchOffsetMetadata

Testing

Testing has been done with the help of new/already present unit tests and already present integration tests.

@github-actions github-actions bot added core Kafka Broker KIP-932 Queues for Kafka labels Nov 10, 2024
@adixitconfluent adixitconfluent marked this pull request as ready for review November 10, 2024 14:41
@adixitconfluent adixitconfluent marked this pull request as draft November 10, 2024 17:30
@adixitconfluent adixitconfluent marked this pull request as ready for review November 10, 2024 18:24
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adixitconfluent : Thanks for the PR. Left a comment.

@@ -1602,8 +1602,6 @@ protected void updateFetchOffsetMetadata(Optional<LogOffsetMetadata> fetchOffset
protected Optional<LogOffsetMetadata> fetchOffsetMetadata() {
lock.readLock().lock();
try {
if (findNextFetchOffset.get())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we update fetchOffsetMetadata everytime we change endOffset, I don't think that we should have dependency on findNextFetchOffset while getting the value of fetchOffsetMetadata

Hmm, the issue is that nextFetchOffset doesn't return endOffset if findNextFetchOffset is true. Currently we only reset fetchOffsetMetadata when updating the endOffset. It's possible that findNextFetchOffset stays on for multiple fetches without changing endOffset. In that case, we will set fetchOffsetMetadata for the first fetch and keep reusing it for subsequent fetches, which will be incorrect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @junrao, so shall we do this, that when the call goes to fetchOffsetMetadata(), we check if findNextFetchOffset is true or not, in case it is true, we do a call to nextFetchOffset() which will correctly update the endOffset if it needs to be updated or not. Finally, we just return fetchOffsetMetadata. Do you think it will work?

Copy link
Contributor Author

@adixitconfluent adixitconfluent Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @junrao, now that I think more about it, IIUC, considering the common case when all fetched data is acquirable -

  1. acknowledgements/acquisition lock timeout/ release of records on session close are the only places where we set findNextFetchOffset to true
  2. In all the 3 scenarios mentioned above, if there is a change to the endOffset, we update the endOffset (thereby fetchOffsetMetadata is also updated automatically with our changes)
    Hence, I feel that the findNextFetchOffset shouldn't be considered when dealing with the common case.
    In the not common cases, when Log Start Offset is later than the fetch offset and we need to archive records, then we set findNextFetchOffset to True. But we have done the minBytes implementation only for the common cases right now, hence i feel the current change is correct. Please correct me if I am wrong.
    cc - @apoorvmittal10

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree findNextFetchOffset=true is the uncommon case. It might be useful to at least have some kind of consistent behavior for the uncommon case. Since the minByte estimation will be off anyway in this case, we could choose to consistent satisfy the request immediately or wait for the timeout. With the logic in this PR, because fetchOffsetMetadata can be outdated in this uncommon case, it's not clear when the request will be satisfied.

Copy link
Contributor Author

@adixitconfluent adixitconfluent Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao, so, should I do what I suggested #17739 (comment) here, that when the call goes to fetchOffsetMetadata(), we check if findNextFetchOffset is true or not, in case it is true, we do a call to nextFetchOffset() which will correctly update the endOffset if it needs to be updated or not. Finally, we just return fetchOffsetMetadata OR for the uncommon case, I update the fetchOffsetMetadata to Optional.empty() and remove any dependency on findNextFetchOffset

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, my understanding is that endOffset always points to the endOffset of all cached batches in sharePartition. nextFetchOffset() doesn't update endOffset. It only turns findNextFetchOffset off when the next fetch offset reaches endOffset. It could take multiple nextFetchOffset() calls before findNextFetchOffset is reset to off.

Copy link
Collaborator

@apoorvmittal10 apoorvmittal10 Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adixitconfluent Here is my understanding: We want to keep fetchOffsetMetadata in SharePartition so when there is no change to endOffset then existing fetchOffsetMetadata can be used to calculate min bytes, correct?

For this, there could be couple of scenarios

  • end offset is just increasing monotonically (usually), hence next fetch offset shall point to new latest.
  • there is reset of some messages and we need to determine next fetch offset

For first I see you reset the fetchOffsetMetadata whenever you change the endOffset. For second, you use findNextFetchOffset always use Optional.empty to be sent. So the existing code seems fine to me.

The current code solves mostly all but there could be an edge case where only single message is released from between and the batch that holds that released message might not be able to alone satisfy min bytes criteria, is that something we are solving?

Can you confirm on my above understanding, if everything is true then the approach I suggest to have local copy of intermediate offset and metadata in delayed fetch as well. And change updateFetchOffsetMetadata in SharePartition to maybeUpdateFetchOffsetMetadata(Optional<LogOffsetMetadata> fetchOffsetMetadata, long fetchOffset). The method caches the Optional<LogOffsetMetadata> fetchOffsetMetadata only if fetchOffset matches endOffset.

I didn't get how the approach proposed in https://github.com/apache/kafka/pull/17739/files#r1842102368 can solve anything which current code cannot.

Copy link
Contributor Author

@adixitconfluent adixitconfluent Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10:

We want to keep fetchOffsetMetadata in SharePartition so when there is no change to endOffset then existing fetchOffsetMetadata can be used to calculate min bytes, correc?

Yes

The current code solves mostly all but there could be an edge case where only single message is released from between and the batch that holds that released message might not be able to alone satisfy min bytes criteria, is that something we are solving?

We have only solved the common case where all fetched data is acquirable. So, this case is something we haven't solved right now.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @adixitconfluent for confirming. The simplest way I can think of to handle common and uncommon case:

  • We shall maintain fetchOffsetMetadata only corresponding to the end offset and not for intermediate states, as that will are most common case i.e. fetch from last.

  • The endOffset shall always point to the first offfset of any log batch hence if we fetch fetchOffsetMetadata for end offset then it should match with fetchOffsetMetadata.messageOffset.

  • DelayedShareFetch arrives with fetchOffset which could match the partitions endOffset or anywhere earlier, delayed share fetch doesn't know. Hence call fetchOffsetMetadata() for share partition which if provides the fetchOffsetMetadata then check:
    * if the messageOffset matches with fetchOffset. If yes then we already have latest copy.
    * If empty, then we anyways need to readFromLog and update fetchOffsetMetadata.
    * If it is ahead of fetchOffset then we are fetching for released records. Hence we readFromLog and keep the fetchOffsetMetadata for the partition in DelayedShareFetch itself, which should be re-used. I am suggesting to keep it locally itself because if timeout occurs for request then anyways we will complete the request with whatever we can fetch and there is guaranteed that some messages will be acquired. Hence this intermediate fetchOffsetMetadata is only relevant for single session of DelayedShareFetch request.

  • Change updateFetchOffsetMetadata(Optional<LogOffsetMetadata> fetchOffsetMetadata) in SharePartition to boolean maybeUpdateFetchOffsetMetadata(Optional<LogOffsetMetadata> fetchOffsetMetadata, long fetchOffset) and only keep the fetchOffsetMetadata for the endOffset. Check if the fetchOffset matches the endOffset.

  • fetchOffsetMetadata() should just return stored fetchOffsetMetadata without any check.

@junrao wdyt? Is there gap in my overall understanding?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10: Overall, your approach could work.

  1. It seems that fetchOffsetMetadata() needs to take in nextFetchOffset too. This way, if the offset matches, we can avoid the readFromLog call.
  2. We want to be a bit careful with the offset matching. In the common case, the nextFetchOffset is always aligned on batch boundary so it will match fetchOffsetMetadata.messageOffset. However, a user could initialize an arbitrary offset in the middle of a batch. In this case, nextFetchOffset won't match fetchOffsetMetadata.messageOffset since the latter is at the batch boundary. So, we need to understand the impact of that.

@@ -90,39 +90,50 @@ public void onExpiration() {
*/
@Override
public void onComplete() {
// We are utilizing lock so that onComplete doesn't do a dirty read for global variables -
// partitionsAcquired and partitionsAlreadyFetched, since these variables can get updated in a different tryComplete thread.
lock.lock();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now for share fetch trycomplete and oncomplete will be under lock. Seems fine as anyways the execution should be sequential.

Comment on lines 101 to 102
if (shareFetchData.future().isDone())
return;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we have this check here for share fetch future completion, so if there are locks acquired for share partitions but the share fetch future is already completed in line 101 then how will they be released? I don't think code handles that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I agree that's a super corner case scenario, but definitely possible. I have pushed a fix for it. Thanks for pointing it out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, could shareFetchData.future().isDone() be true inside onComplete()? We complete the future only after DelayedOperation.completed is set to true. After that point, onComplete() is not expected to be called again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, if we have 2 different keys corresponding to a ShareFetch request, it could be a case that for one of those keys, we get a checkAndComplete call which could result in completing the share fetch request. Now when the purgatory entry corresponding to the other key could timeout/have checkAndComplete triggered, when the code reaches onComplete, the share fetch request's future was already complete, so it would hit shareFetchData.future().isDone() and return true.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onComplete is always called through forceComplete, right? So, only one thread could ever call onComplete.

    public boolean forceComplete() {
        if (completed.compareAndSet(false, true)) {
            // cancel the timeout timer
            cancel();
            onComplete();
            return true;
        } else {
            return false;
        }
    }

Copy link
Contributor Author

@adixitconfluent adixitconfluent Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @junrao , you're right. There was a gap in my understanding of purgatory operation where I thought the the copy of the operation goes to multiple watch keys used for that operation, but this line in documentation cleared it out.

Note that a delayed operation can be watched on multiple keys. 
It is possible that an operation is completed after it has been added to the watch list for some, but not all the keys. 
In this case, the operation is considered completed and won't be added to the watch list of the remaining keys. 
The expiration reaper thread will remove this operation from any watcher list in which the operation exists.

Hence, I've removed the mentioned condition from the code now. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved core Kafka Broker KIP-932 Queues for Kafka
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants