Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17948: Potential issue during tryComplete and onComplete simultaneous calls to access global variables #17739

Open
wants to merge 11 commits into
base: trunk
Choose a base branch
from
53 changes: 31 additions & 22 deletions core/src/main/java/kafka/server/share/DelayedShareFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;

import scala.Tuple2;
Expand All @@ -58,8 +58,8 @@ public class DelayedShareFetch extends DelayedOperation {
private final ShareFetchData shareFetchData;
private final ReplicaManager replicaManager;

private Map<TopicIdPartition, FetchRequest.PartitionData> partitionsAcquired;
private Map<TopicIdPartition, LogReadResult> partitionsAlreadyFetched;
private LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> partitionsAcquired;
private LinkedHashMap<TopicIdPartition, LogReadResult> partitionsAlreadyFetched;
adixitconfluent marked this conversation as resolved.
Show resolved Hide resolved
private final SharePartitionManager sharePartitionManager;
// The topic partitions that need to be completed for the share fetch request are given by sharePartitions.
// sharePartitions is a subset of shareFetchData. The order of insertion/deletion of entries in sharePartitions is important.
Expand Down Expand Up @@ -90,14 +90,17 @@ public void onExpiration() {
*/
@Override
public void onComplete() {
// We are utilizing lock so that onComplete doesn't do a dirty read for global variables -
// partitionsAcquired and partitionsAlreadyFetched, since these variables can get updated in a different tryComplete thread.
lock.lock();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now for share fetch trycomplete and oncomplete will be under lock. Seems fine as anyways the execution should be sequential.

log.trace("Completing the delayed share fetch request for group {}, member {}, "
+ "topic partitions {}", shareFetchData.groupId(), shareFetchData.memberId(),
partitionsAcquired.keySet());

if (shareFetchData.future().isDone())
return;
adixitconfluent marked this conversation as resolved.
Show resolved Hide resolved

Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
// tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch.
if (partitionsAcquired.isEmpty())
topicPartitionData = acquirablePartitions();
Expand All @@ -114,15 +117,15 @@ public void onComplete() {
topicPartitionData, shareFetchData.groupId(), shareFetchData.fetchParams());

try {
Map<TopicIdPartition, LogReadResult> responseData;
LinkedHashMap<TopicIdPartition, LogReadResult> responseData;
if (partitionsAlreadyFetched.isEmpty())
responseData = readFromLog(topicPartitionData);
else
// There shouldn't be a case when we have a partitionsAlreadyFetched value here and this variable is getting
// updated in a different tryComplete thread.
responseData = combineLogReadResponse(topicPartitionData, partitionsAlreadyFetched);

Map<TopicIdPartition, FetchPartitionData> fetchPartitionsData = new LinkedHashMap<>();
LinkedHashMap<TopicIdPartition, FetchPartitionData> fetchPartitionsData = new LinkedHashMap<>();
for (Map.Entry<TopicIdPartition, LogReadResult> entry : responseData.entrySet())
fetchPartitionsData.put(entry.getKey(), entry.getValue().toFetchPartitionData(false));

Expand All @@ -141,6 +144,7 @@ public void onComplete() {
replicaManager.addToActionQueue(() -> topicPartitionData.keySet().forEach(topicIdPartition ->
replicaManager.completeDelayedShareFetchRequest(
new DelayedShareFetchGroupKey(shareFetchData.groupId(), topicIdPartition.topicId(), topicIdPartition.partition()))));
lock.unlock();
}
}

Expand All @@ -149,14 +153,14 @@ public void onComplete() {
*/
@Override
public boolean tryComplete() {
Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = acquirablePartitions();
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = acquirablePartitions();

try {
if (!topicPartitionData.isEmpty()) {
// In case, fetch offset metadata doesn't exist for one or more topic partitions, we do a
// replicaManager.readFromLog to populate the offset metadata and update the fetch offset metadata for
// those topic partitions.
Map<TopicIdPartition, LogReadResult> replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
maybeUpdateFetchOffsetMetadata(replicaManagerReadResponse);
if (anyPartitionHasLogReadError(replicaManagerReadResponse) || isMinBytesSatisfied(topicPartitionData)) {
partitionsAcquired = topicPartitionData;
Expand Down Expand Up @@ -193,9 +197,9 @@ public boolean tryComplete() {
* Prepare fetch request structure for partitions in the share fetch request for which we can acquire records.
*/
// Visible for testing
Map<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions() {
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions() {
// Initialize the topic partitions for which the fetch should be attempted.
Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = new LinkedHashMap<>();
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = new LinkedHashMap<>();

sharePartitions.forEach((topicIdPartition, sharePartition) -> {
int partitionMaxBytes = shareFetchData.partitionMaxBytes().getOrDefault(topicIdPartition, 0);
Expand Down Expand Up @@ -230,23 +234,23 @@ Map<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions() {
return topicPartitionData;
}

private Map<TopicIdPartition, LogReadResult> maybeReadFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
Map<TopicIdPartition, FetchRequest.PartitionData> partitionsMissingFetchOffsetMetadata = new LinkedHashMap<>();
private LinkedHashMap<TopicIdPartition, LogReadResult> maybeReadFromLog(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> partitionsMissingFetchOffsetMetadata = new LinkedHashMap<>();
topicPartitionData.forEach((topicIdPartition, partitionData) -> {
SharePartition sharePartition = sharePartitions.get(topicIdPartition);
if (sharePartition.fetchOffsetMetadata().isEmpty()) {
partitionsMissingFetchOffsetMetadata.put(topicIdPartition, partitionData);
}
});
if (partitionsMissingFetchOffsetMetadata.isEmpty()) {
return Collections.emptyMap();
return new LinkedHashMap<>();
}
// We fetch data from replica manager corresponding to the topic partitions that have missing fetch offset metadata.
return readFromLog(partitionsMissingFetchOffsetMetadata);
}

private void maybeUpdateFetchOffsetMetadata(
Map<TopicIdPartition, LogReadResult> replicaManagerReadResponseData) {
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponseData) {
for (Map.Entry<TopicIdPartition, LogReadResult> entry : replicaManagerReadResponseData.entrySet()) {
TopicIdPartition topicIdPartition = entry.getKey();
SharePartition sharePartition = sharePartitions.get(topicIdPartition);
Expand All @@ -261,7 +265,7 @@ private void maybeUpdateFetchOffsetMetadata(
}

// minByes estimation currently assumes the common case where all fetched data is acquirable.
private boolean isMinBytesSatisfied(Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
private boolean isMinBytesSatisfied(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
long accumulatedSize = 0;
for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry : topicPartitionData.entrySet()) {
TopicIdPartition topicIdPartition = entry.getKey();
Expand Down Expand Up @@ -314,7 +318,7 @@ else if (isolationType == FetchIsolation.HIGH_WATERMARK)

}

private Map<TopicIdPartition, LogReadResult> readFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
private LinkedHashMap<TopicIdPartition, LogReadResult> readFromLog(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = replicaManager.readFromLog(
shareFetchData.fetchParams(),
CollectionConverters.asScala(
Expand All @@ -324,7 +328,7 @@ private Map<TopicIdPartition, LogReadResult> readFromLog(Map<TopicIdPartition, F
QuotaFactory.UNBOUNDED_QUOTA,
true);

Map<TopicIdPartition, LogReadResult> responseData = new HashMap<>();
LinkedHashMap<TopicIdPartition, LogReadResult> responseData = new LinkedHashMap<>();
responseLogResult.foreach(tpLogResult -> {
responseData.put(tpLogResult._1(), tpLogResult._2());
return BoxedUnit.UNIT;
Expand All @@ -334,15 +338,15 @@ private Map<TopicIdPartition, LogReadResult> readFromLog(Map<TopicIdPartition, F
return responseData;
}

private boolean anyPartitionHasLogReadError(Map<TopicIdPartition, LogReadResult> replicaManagerReadResponse) {
private boolean anyPartitionHasLogReadError(LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse) {
return replicaManagerReadResponse.values().stream()
.anyMatch(logReadResult -> logReadResult.error().code() != Errors.NONE.code());
}

// Visible for testing.
Map<TopicIdPartition, LogReadResult> combineLogReadResponse(Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData,
Map<TopicIdPartition, LogReadResult> existingFetchedData) {
Map<TopicIdPartition, FetchRequest.PartitionData> missingLogReadTopicPartitions = new LinkedHashMap<>();
LinkedHashMap<TopicIdPartition, LogReadResult> combineLogReadResponse(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData,
LinkedHashMap<TopicIdPartition, LogReadResult> existingFetchedData) {
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> missingLogReadTopicPartitions = new LinkedHashMap<>();
topicPartitionData.forEach((topicIdPartition, partitionData) -> {
if (!existingFetchedData.containsKey(topicIdPartition)) {
missingLogReadTopicPartitions.put(topicIdPartition, partitionData);
Expand All @@ -351,7 +355,7 @@ Map<TopicIdPartition, LogReadResult> combineLogReadResponse(Map<TopicIdPartition
if (missingLogReadTopicPartitions.isEmpty()) {
return existingFetchedData;
}
Map<TopicIdPartition, LogReadResult> missingTopicPartitionsLogReadResponse = readFromLog(missingLogReadTopicPartitions);
LinkedHashMap<TopicIdPartition, LogReadResult> missingTopicPartitionsLogReadResponse = readFromLog(missingLogReadTopicPartitions);
missingTopicPartitionsLogReadResponse.putAll(existingFetchedData);
return missingTopicPartitionsLogReadResponse;
}
Expand All @@ -363,4 +367,9 @@ void releasePartitionLocks(Set<TopicIdPartition> topicIdPartitions) {
sharePartition.releaseFetchLock();
});
}

// Visible for testing.
Lock lock() {
return lock;
}
}
36 changes: 31 additions & 5 deletions core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ public void testDelayedShareFetchTryCompleteReturnsFalseDueToNonAcquirablePartit
assertFalse(delayedShareFetch.tryComplete());
assertFalse(delayedShareFetch.isCompleted());
Mockito.verify(delayedShareFetch, times(0)).releasePartitionLocks(any());
assertTrue(delayedShareFetch.lock().tryLock());
delayedShareFetch.lock().unlock();
}

@Test
Expand Down Expand Up @@ -182,6 +184,8 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnFirstFetch() {
assertFalse(delayedShareFetch.tryComplete());
assertFalse(delayedShareFetch.isCompleted());
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any());
assertTrue(delayedShareFetch.lock().tryLock());
delayedShareFetch.lock().unlock();
}

@Test
Expand Down Expand Up @@ -233,6 +237,8 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnSubsequentFetch() {
assertFalse(delayedShareFetch.tryComplete());
assertFalse(delayedShareFetch.isCompleted());
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any());
assertTrue(delayedShareFetch.lock().tryLock());
delayedShareFetch.lock().unlock();
}

@Test
Expand Down Expand Up @@ -278,6 +284,8 @@ public void testDelayedShareFetchTryCompleteReturnsTrue() {
assertTrue(delayedShareFetch.tryComplete());
assertTrue(delayedShareFetch.isCompleted());
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any());
assertTrue(delayedShareFetch.lock().tryLock());
delayedShareFetch.lock().unlock();
}

@Test
Expand Down Expand Up @@ -320,6 +328,8 @@ public void testEmptyFutureReturnedByDelayedShareFetchOnComplete() {
any(), any(), any(ReplicaQuota.class), anyBoolean());
assertTrue(delayedShareFetch.isCompleted());
Mockito.verify(delayedShareFetch, times(0)).releasePartitionLocks(any());
assertTrue(delayedShareFetch.lock().tryLock());
delayedShareFetch.lock().unlock();
}

@Test
Expand Down Expand Up @@ -367,6 +377,8 @@ public void testReplicaManagerFetchShouldHappenOnComplete() {
assertTrue(delayedShareFetch.isCompleted());
assertTrue(shareFetchData.future().isDone());
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any());
assertTrue(delayedShareFetch.lock().tryLock());
delayedShareFetch.lock().unlock();
}

@Test
Expand Down Expand Up @@ -403,6 +415,8 @@ public void testToCompleteAnAlreadyCompletedFuture() {
// Verifying that the first forceComplete calls acquirablePartitions method in DelayedShareFetch.
Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions();
assertEquals(0, shareFetchData.future().join().size());
assertTrue(delayedShareFetch.lock().tryLock());
delayedShareFetch.lock().unlock();

// Force completing the share fetch request for the second time should hit the future completion check and not
// proceed ahead in the function.
Expand All @@ -411,6 +425,8 @@ public void testToCompleteAnAlreadyCompletedFuture() {
// Verifying that the second forceComplete does not call acquirablePartitions method in DelayedShareFetch.
Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions();
Mockito.verify(delayedShareFetch, times(0)).releasePartitionLocks(any());
assertTrue(delayedShareFetch.lock().tryLock());
delayedShareFetch.lock().unlock();
}

@Test
Expand Down Expand Up @@ -461,6 +477,8 @@ public void testForceCompleteTriggersDelayedActionsQueue() {

assertEquals(2, delayedShareFetchPurgatory.watched());
assertFalse(shareFetchData1.future().isDone());
assertTrue(delayedShareFetch1.lock().tryLock());
delayedShareFetch1.lock().unlock();

Map<TopicIdPartition, Integer> partitionMaxBytes2 = new HashMap<>();
partitionMaxBytes2.put(tp1, PARTITION_MAX_BYTES);
Expand Down Expand Up @@ -498,6 +516,8 @@ public void testForceCompleteTriggersDelayedActionsQueue() {
Mockito.verify(replicaManager, times(1)).addToActionQueue(any());
Mockito.verify(replicaManager, times(0)).tryCompleteActions();
Mockito.verify(delayedShareFetch2, times(1)).releasePartitionLocks(any());
assertTrue(delayedShareFetch2.lock().tryLock());
delayedShareFetch2.lock().unlock();
}

@Test
Expand Down Expand Up @@ -529,21 +549,21 @@ public void testCombineLogReadResponse() {
.withSharePartitions(sharePartitions)
.build();

Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = new HashMap<>();
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = new LinkedHashMap<>();
topicPartitionData.put(tp0, mock(FetchRequest.PartitionData.class));
topicPartitionData.put(tp1, mock(FetchRequest.PartitionData.class));

// Case 1 - logReadResponse contains tp0.
Map<TopicIdPartition, LogReadResult> logReadResponse = Collections.singletonMap(
tp0, mock(LogReadResult.class));
LinkedHashMap<TopicIdPartition, LogReadResult> logReadResponse = new LinkedHashMap<>();
logReadResponse.put(tp0, mock(LogReadResult.class));

doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
Map<TopicIdPartition, LogReadResult> combinedLogReadResponse = delayedShareFetch.combineLogReadResponse(topicPartitionData, logReadResponse);
LinkedHashMap<TopicIdPartition, LogReadResult> combinedLogReadResponse = delayedShareFetch.combineLogReadResponse(topicPartitionData, logReadResponse);
assertEquals(topicPartitionData.keySet(), combinedLogReadResponse.keySet());
assertEquals(combinedLogReadResponse.get(tp0), logReadResponse.get(tp0));

// Case 2 - logReadResponse contains tp0 and tp1.
logReadResponse = new HashMap<>();
logReadResponse = new LinkedHashMap<>();
logReadResponse.put(tp0, mock(LogReadResult.class));
logReadResponse.put(tp1, mock(LogReadResult.class));
combinedLogReadResponse = delayedShareFetch.combineLogReadResponse(topicPartitionData, logReadResponse);
Expand Down Expand Up @@ -597,6 +617,8 @@ public void testExceptionInMinBytesCalculation() {
any(), any(), any(ReplicaQuota.class), anyBoolean());
// releasePartitionLocks will be called twice, once from tryComplete and then from onComplete.
Mockito.verify(delayedShareFetch, times(2)).releasePartitionLocks(any());
assertTrue(delayedShareFetch.lock().tryLock());
delayedShareFetch.lock().unlock();
}

@Test
Expand Down Expand Up @@ -629,6 +651,8 @@ public void testLocksReleasedForCompletedFetch() {

assertFalse(spy.tryComplete());
Mockito.verify(sp0, times(1)).releaseFetchLock();
assertTrue(delayedShareFetch.lock().tryLock());
delayedShareFetch.lock().unlock();
}

@Test
Expand All @@ -653,6 +677,8 @@ public void testLocksReleasedAcquireException() {

assertFalse(delayedShareFetch.tryComplete());
Mockito.verify(sp0, times(1)).releaseFetchLock();
assertTrue(delayedShareFetch.lock().tryLock());
delayedShareFetch.lock().unlock();
}

static void mockTopicIdPartitionToReturnDataEqualToMinBytes(ReplicaManager replicaManager, TopicIdPartition topicIdPartition, int minBytes) {
Expand Down
Loading