-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17510: Exception handling and purgatory completion on initialization delay #17709
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 : Thanks for the PR. Left a few comments.
// for respective share partition as completing the full request might result in | ||
// some acquired records to not being sent: https://issues.apache.org/jira/browse/KAFKA-17510 | ||
maybeCompleteInitializationWithException(sharePartitionKey, shareFetchData.future(), throwable); | ||
handleInitializationException(sharePartitionKey, shareFetch, throwable); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are triggering delayedShareFetch below, do we need to handle the error for shareFetch here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should do that. If the code reaches here that means SharePartition is not yet initialized or in some error state, which means no fetch lock will be acquired in delay share fetch on respective SharePartition hence no further handling in DelayedShareFetch. However this code will handle that error appropriately.
@@ -198,7 +198,7 @@ Map<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions() { | |||
Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = new LinkedHashMap<>(); | |||
|
|||
sharePartitions.forEach((topicIdPartition, sharePartition) -> { | |||
int partitionMaxBytes = shareFetchData.partitionMaxBytes().getOrDefault(topicIdPartition, 0); | |||
int partitionMaxBytes = shareFetch.partitionMaxBytes().getOrDefault(topicIdPartition, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we skip erroneous partitions in shareFetch
? Also, when calling sharePartition.maybeAcquireFetchLock()
, if the partition is in ERROR or FENCED state, should we add the partition to erroneous partitions in shareFetch
too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we skip erroneous partitions in shareFetch?
Here the method is iterating over sharePartitions
which have been filled in SharePartitionManager
hence some of the errored share partitions might not come here itself as already added to erroneous. You are right, here again there could be a sharePartition which errored out so rather skipping them from fetch we should do the second recommendation of yours which is to add them to errorneous if received exception from maybeAcquireFetchLock
.
There already exists a jira for same, from previous PR comments, which I am planning to do next: https://issues.apache.org/jira/browse/KAFKA-17901
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we know there is a partition with an error, we can skip the readFromLog
call, which can be a bit expensive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, I added a filter check prior replica manager read which tries to filter if any erroneous topic partition is present.
} | ||
|
||
private LogOffsetMetadata endOffsetMetadataForTopicPartition(TopicIdPartition topicIdPartition) { | ||
Partition partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition()); | ||
LogOffsetSnapshot offsetSnapshot = partition.fetchOffsetSnapshot(Optional.empty(), true); | ||
// The FetchIsolation type that we use for share fetch is FetchIsolation.HIGH_WATERMARK. In the future, we can | ||
// extend it to support other FetchIsolation types. | ||
FetchIsolation isolationType = shareFetchData.fetchParams().isolation; | ||
FetchIsolation isolationType = shareFetch.fetchParams().isolation; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replicaManager.getPartitionOrException
above throws an exception. Should we handle that and add it to shareFetch.erroneous?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
// Do not process the fetch request for this partition as the leader is not initialized yet. | ||
// The fetch request will be retried in the next poll. | ||
// TODO: Add the request to delayed fetch purgatory. | ||
// Skip any handling for this error as the share partition is still loading. The request |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When do we get a LeaderNotAvailableException? My understanding is that the throwable is based on the error code from ReadShareGroupStateResponse and it doesn't seem to return LeaderNotAvailableException.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can only get LeaderNotAvailableException when SharePartition is in INITIALIZING state. Which means that requests should be re-triggered once initialization completes. This exception is only to know if SharePartition is still initializing and never returned to client.
} | ||
|
||
/** | ||
* May be complete the share fetch request with the given exception for the topicIdPartitions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be => Maybe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
* @param throwable The exception that occurred while fetching messages. | ||
*/ | ||
public void handleFetchException( | ||
String groupId, | ||
ShareFetch shareFetch, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we move this method to DelayedShareFetch
and make it private since it's only called there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} | ||
}) | ||
} | ||
responses.size == 3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we reset responses during retry? Ditto below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't as the problem we are trying to solve here is that when we enable DefaultStatePersister then we do see a delay in SharePartition getting initialized, which is supposed to happen. And with multi topic-partition share fetch call, say tp0 and tp1, there can be scenario where tp0 is initialized and triggers purgatory's checkAndComplete. Hence share fetch will respond with acquired records of tp0 only.
I have added the retires here where the test case is considered successful when all topic-partitions, tp0 and tp1 in this case, respond with acquired records.
Prior adding topic-partitions in response array I check if the share fetch response does have acquired records or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 : Thanks for the updated PR. A few more comments.
/** | ||
* The partitions that had an error during the fetch. | ||
*/ | ||
private volatile Map<TopicIdPartition, Throwable> erroneous; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
volatile
guarantees that a subsequent reader will pick up the latest reference to the map, but not necessarily the latest content in the map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have moved errorneous in synchronized, thanks for pointing out.
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) | ||
val partitionsCount = shareFetchResponseData.responses().get(0).partitions().size() | ||
if (partitionsCount > 0) { | ||
assertEquals(1, shareFetchResponseData.responses().size()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we guarantee that only 1 partition is included in the response?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually the number of topic in response which could only be 1 as we are fetching for single topic but multiple partitions. However I realized the assertion of topic size should prior to get. I fixed that.
@@ -198,7 +198,7 @@ Map<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions() { | |||
Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = new LinkedHashMap<>(); | |||
|
|||
sharePartitions.forEach((topicIdPartition, sharePartition) -> { | |||
int partitionMaxBytes = shareFetchData.partitionMaxBytes().getOrDefault(topicIdPartition, 0); | |||
int partitionMaxBytes = shareFetch.partitionMaxBytes().getOrDefault(topicIdPartition, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we know there is a partition with an error, we can skip the readFromLog
call, which can be a bit expensive.
@junrao Thanks for review. I have addressed the comments. |
@junrao Please if you can re-review. The build passes on Java 11 and can see unrelated tests failure for Java 23. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 : Thanks for the updated PR. A few more comments.
* @param topicIdPartitions The topic id partitions to filter. | ||
* @return The topic id partitions without the erroneous partitions. | ||
*/ | ||
public synchronized Set<TopicIdPartition> filterErroneousTopicPartitions(Set<TopicIdPartition> topicIdPartitions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The input ordering is important. Could we make that explicit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it make sense but the problem is even LinkedHashMap's keySet method returns Set
interface with an internal implementation of LinkedKeySet. Hence I kept the method definition as Set
because of the constraint.
* Check if all the partitions in the request have errored. | ||
* @return true if all the partitions in the request have errored, false otherwise. | ||
*/ | ||
public synchronized boolean isErrored() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isErrored => errorInAllPartitions ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -315,11 +325,17 @@ else if (isolationType == FetchIsolation.HIGH_WATERMARK) | |||
} | |||
|
|||
private Map<TopicIdPartition, LogReadResult> readFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) { | |||
// Filter if there already exists any erroneous topic partition. | |||
Set<TopicIdPartition> partitionsToFetch = shareFetch.filterErroneousTopicPartitions(topicPartitionData.keySet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably better to do this in acquirablePartitions()
since it avoids acquiring the locks for error partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The getPartitionorException API is called after acquiring share partitions hence if some share partition has errored out after acquiring then the readFromLog will trigger for that topicPartition. Hence it's best to filter before the expecsive readFromLog
API call.
Also if the share partition is errored which is not recoverable then in acquirablePartitions
the fetch lock API should return false.
future
, added methods to complete future at a single place.Committer Checklist (excluded from commit message)