diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index 4cb9ce0cf424..7bc91c197a97 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -36,11 +36,11 @@ import org.slf4j.LoggerFactory; import java.util.Collections; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.locks.Lock; import java.util.stream.Collectors; import scala.Tuple2; @@ -57,13 +57,12 @@ public class DelayedShareFetch extends DelayedOperation { private final ShareFetchData shareFetchData; private final ReplicaManager replicaManager; - - private Map partitionsAcquired; - private Map partitionsAlreadyFetched; private final SharePartitionManager sharePartitionManager; // The topic partitions that need to be completed for the share fetch request are given by sharePartitions. // sharePartitions is a subset of shareFetchData. The order of insertion/deletion of entries in sharePartitions is important. private final LinkedHashMap sharePartitions; + private LinkedHashMap partitionsAcquired; + private LinkedHashMap partitionsAlreadyFetched; DelayedShareFetch( ShareFetchData shareFetchData, @@ -90,31 +89,39 @@ public void onExpiration() { */ @Override public void onComplete() { + // We are utilizing lock so that onComplete doesn't do a dirty read for global variables - + // partitionsAcquired and partitionsAlreadyFetched, since these variables can get updated in a different tryComplete thread. + lock.lock(); log.trace("Completing the delayed share fetch request for group {}, member {}, " + "topic partitions {}", shareFetchData.groupId(), shareFetchData.memberId(), partitionsAcquired.keySet()); - if (shareFetchData.future().isDone()) - return; + try { + LinkedHashMap topicPartitionData; + // tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch. + if (partitionsAcquired.isEmpty()) + topicPartitionData = acquirablePartitions(); + // tryComplete invoked forceComplete, so we can use the data from tryComplete. + else + topicPartitionData = partitionsAcquired; - Map topicPartitionData; - // tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch. - if (partitionsAcquired.isEmpty()) - topicPartitionData = acquirablePartitions(); - // tryComplete invoked forceComplete, so we can use the data from tryComplete. - else - topicPartitionData = partitionsAcquired; + if (topicPartitionData.isEmpty()) { + // No locks for share partitions could be acquired, so we complete the request with an empty response. + shareFetchData.future().complete(Collections.emptyMap()); + return; + } + log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}", + topicPartitionData, shareFetchData.groupId(), shareFetchData.fetchParams()); - if (topicPartitionData.isEmpty()) { - // No locks for share partitions could be acquired, so we complete the request with an empty response. - shareFetchData.future().complete(Collections.emptyMap()); - return; + completeShareFetchRequest(topicPartitionData); + } finally { + lock.unlock(); } - log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}", - topicPartitionData, shareFetchData.groupId(), shareFetchData.fetchParams()); + } + private void completeShareFetchRequest(LinkedHashMap topicPartitionData) { try { - Map responseData; + LinkedHashMap responseData; if (partitionsAlreadyFetched.isEmpty()) responseData = readFromLog(topicPartitionData); else @@ -122,7 +129,7 @@ public void onComplete() { // updated in a different tryComplete thread. responseData = combineLogReadResponse(topicPartitionData, partitionsAlreadyFetched); - Map fetchPartitionsData = new LinkedHashMap<>(); + LinkedHashMap fetchPartitionsData = new LinkedHashMap<>(); for (Map.Entry entry : responseData.entrySet()) fetchPartitionsData.put(entry.getKey(), entry.getValue().toFetchPartitionData(false)); @@ -149,14 +156,14 @@ public void onComplete() { */ @Override public boolean tryComplete() { - Map topicPartitionData = acquirablePartitions(); + LinkedHashMap topicPartitionData = acquirablePartitions(); try { if (!topicPartitionData.isEmpty()) { // In case, fetch offset metadata doesn't exist for one or more topic partitions, we do a // replicaManager.readFromLog to populate the offset metadata and update the fetch offset metadata for // those topic partitions. - Map replicaManagerReadResponse = maybeReadFromLog(topicPartitionData); + LinkedHashMap replicaManagerReadResponse = maybeReadFromLog(topicPartitionData); maybeUpdateFetchOffsetMetadata(replicaManagerReadResponse); if (anyPartitionHasLogReadError(replicaManagerReadResponse) || isMinBytesSatisfied(topicPartitionData)) { partitionsAcquired = topicPartitionData; @@ -193,9 +200,9 @@ public boolean tryComplete() { * Prepare fetch request structure for partitions in the share fetch request for which we can acquire records. */ // Visible for testing - Map acquirablePartitions() { + LinkedHashMap acquirablePartitions() { // Initialize the topic partitions for which the fetch should be attempted. - Map topicPartitionData = new LinkedHashMap<>(); + LinkedHashMap topicPartitionData = new LinkedHashMap<>(); sharePartitions.forEach((topicIdPartition, sharePartition) -> { int partitionMaxBytes = shareFetchData.partitionMaxBytes().getOrDefault(topicIdPartition, 0); @@ -230,8 +237,8 @@ Map acquirablePartitions() { return topicPartitionData; } - private Map maybeReadFromLog(Map topicPartitionData) { - Map partitionsMissingFetchOffsetMetadata = new LinkedHashMap<>(); + private LinkedHashMap maybeReadFromLog(LinkedHashMap topicPartitionData) { + LinkedHashMap partitionsMissingFetchOffsetMetadata = new LinkedHashMap<>(); topicPartitionData.forEach((topicIdPartition, partitionData) -> { SharePartition sharePartition = sharePartitions.get(topicIdPartition); if (sharePartition.fetchOffsetMetadata().isEmpty()) { @@ -239,14 +246,14 @@ private Map maybeReadFromLog(Map(); } // We fetch data from replica manager corresponding to the topic partitions that have missing fetch offset metadata. return readFromLog(partitionsMissingFetchOffsetMetadata); } private void maybeUpdateFetchOffsetMetadata( - Map replicaManagerReadResponseData) { + LinkedHashMap replicaManagerReadResponseData) { for (Map.Entry entry : replicaManagerReadResponseData.entrySet()) { TopicIdPartition topicIdPartition = entry.getKey(); SharePartition sharePartition = sharePartitions.get(topicIdPartition); @@ -261,7 +268,7 @@ private void maybeUpdateFetchOffsetMetadata( } // minByes estimation currently assumes the common case where all fetched data is acquirable. - private boolean isMinBytesSatisfied(Map topicPartitionData) { + private boolean isMinBytesSatisfied(LinkedHashMap topicPartitionData) { long accumulatedSize = 0; for (Map.Entry entry : topicPartitionData.entrySet()) { TopicIdPartition topicIdPartition = entry.getKey(); @@ -314,7 +321,7 @@ else if (isolationType == FetchIsolation.HIGH_WATERMARK) } - private Map readFromLog(Map topicPartitionData) { + private LinkedHashMap readFromLog(LinkedHashMap topicPartitionData) { Seq> responseLogResult = replicaManager.readFromLog( shareFetchData.fetchParams(), CollectionConverters.asScala( @@ -324,7 +331,7 @@ private Map readFromLog(Map responseData = new HashMap<>(); + LinkedHashMap responseData = new LinkedHashMap<>(); responseLogResult.foreach(tpLogResult -> { responseData.put(tpLogResult._1(), tpLogResult._2()); return BoxedUnit.UNIT; @@ -334,15 +341,15 @@ private Map readFromLog(Map replicaManagerReadResponse) { + private boolean anyPartitionHasLogReadError(LinkedHashMap replicaManagerReadResponse) { return replicaManagerReadResponse.values().stream() .anyMatch(logReadResult -> logReadResult.error().code() != Errors.NONE.code()); } // Visible for testing. - Map combineLogReadResponse(Map topicPartitionData, - Map existingFetchedData) { - Map missingLogReadTopicPartitions = new LinkedHashMap<>(); + LinkedHashMap combineLogReadResponse(LinkedHashMap topicPartitionData, + LinkedHashMap existingFetchedData) { + LinkedHashMap missingLogReadTopicPartitions = new LinkedHashMap<>(); topicPartitionData.forEach((topicIdPartition, partitionData) -> { if (!existingFetchedData.containsKey(topicIdPartition)) { missingLogReadTopicPartitions.put(topicIdPartition, partitionData); @@ -351,7 +358,7 @@ Map combineLogReadResponse(Map missingTopicPartitionsLogReadResponse = readFromLog(missingLogReadTopicPartitions); + LinkedHashMap missingTopicPartitionsLogReadResponse = readFromLog(missingLogReadTopicPartitions); missingTopicPartitionsLogReadResponse.putAll(existingFetchedData); return missingTopicPartitionsLogReadResponse; } @@ -363,4 +370,9 @@ void releasePartitionLocks(Set topicIdPartitions) { sharePartition.releaseFetchLock(); }); } + + // Visible for testing. + Lock lock() { + return lock; + } } diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index 740bf697de40..7e42fd8d528f 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -1602,8 +1602,6 @@ protected void updateFetchOffsetMetadata(Optional fetchOffset protected Optional fetchOffsetMetadata() { lock.readLock().lock(); try { - if (findNextFetchOffset.get()) - return Optional.empty(); return fetchOffsetMetadata; } finally { lock.readLock().unlock(); diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index 0c7b488f1802..e6e52de2cb42 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -127,6 +127,8 @@ public void testDelayedShareFetchTryCompleteReturnsFalseDueToNonAcquirablePartit assertFalse(delayedShareFetch.tryComplete()); assertFalse(delayedShareFetch.isCompleted()); Mockito.verify(delayedShareFetch, times(0)).releasePartitionLocks(any()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); } @Test @@ -182,6 +184,8 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnFirstFetch() { assertFalse(delayedShareFetch.tryComplete()); assertFalse(delayedShareFetch.isCompleted()); Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); } @Test @@ -233,6 +237,8 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnSubsequentFetch() { assertFalse(delayedShareFetch.tryComplete()); assertFalse(delayedShareFetch.isCompleted()); Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); } @Test @@ -278,6 +284,8 @@ public void testDelayedShareFetchTryCompleteReturnsTrue() { assertTrue(delayedShareFetch.tryComplete()); assertTrue(delayedShareFetch.isCompleted()); Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); } @Test @@ -320,6 +328,8 @@ public void testEmptyFutureReturnedByDelayedShareFetchOnComplete() { any(), any(), any(ReplicaQuota.class), anyBoolean()); assertTrue(delayedShareFetch.isCompleted()); Mockito.verify(delayedShareFetch, times(0)).releasePartitionLocks(any()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); } @Test @@ -367,6 +377,8 @@ public void testReplicaManagerFetchShouldHappenOnComplete() { assertTrue(delayedShareFetch.isCompleted()); assertTrue(shareFetchData.future().isDone()); Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); } @Test @@ -403,6 +415,8 @@ public void testToCompleteAnAlreadyCompletedFuture() { // Verifying that the first forceComplete calls acquirablePartitions method in DelayedShareFetch. Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions(); assertEquals(0, shareFetchData.future().join().size()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); // Force completing the share fetch request for the second time should hit the future completion check and not // proceed ahead in the function. @@ -411,6 +425,8 @@ public void testToCompleteAnAlreadyCompletedFuture() { // Verifying that the second forceComplete does not call acquirablePartitions method in DelayedShareFetch. Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions(); Mockito.verify(delayedShareFetch, times(0)).releasePartitionLocks(any()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); } @Test @@ -461,6 +477,8 @@ public void testForceCompleteTriggersDelayedActionsQueue() { assertEquals(2, delayedShareFetchPurgatory.watched()); assertFalse(shareFetchData1.future().isDone()); + assertTrue(delayedShareFetch1.lock().tryLock()); + delayedShareFetch1.lock().unlock(); Map partitionMaxBytes2 = new HashMap<>(); partitionMaxBytes2.put(tp1, PARTITION_MAX_BYTES); @@ -498,6 +516,8 @@ public void testForceCompleteTriggersDelayedActionsQueue() { Mockito.verify(replicaManager, times(1)).addToActionQueue(any()); Mockito.verify(replicaManager, times(0)).tryCompleteActions(); Mockito.verify(delayedShareFetch2, times(1)).releasePartitionLocks(any()); + assertTrue(delayedShareFetch2.lock().tryLock()); + delayedShareFetch2.lock().unlock(); } @Test @@ -529,21 +549,21 @@ public void testCombineLogReadResponse() { .withSharePartitions(sharePartitions) .build(); - Map topicPartitionData = new HashMap<>(); + LinkedHashMap topicPartitionData = new LinkedHashMap<>(); topicPartitionData.put(tp0, mock(FetchRequest.PartitionData.class)); topicPartitionData.put(tp1, mock(FetchRequest.PartitionData.class)); // Case 1 - logReadResponse contains tp0. - Map logReadResponse = Collections.singletonMap( - tp0, mock(LogReadResult.class)); + LinkedHashMap logReadResponse = new LinkedHashMap<>(); + logReadResponse.put(tp0, mock(LogReadResult.class)); doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); - Map combinedLogReadResponse = delayedShareFetch.combineLogReadResponse(topicPartitionData, logReadResponse); + LinkedHashMap combinedLogReadResponse = delayedShareFetch.combineLogReadResponse(topicPartitionData, logReadResponse); assertEquals(topicPartitionData.keySet(), combinedLogReadResponse.keySet()); assertEquals(combinedLogReadResponse.get(tp0), logReadResponse.get(tp0)); // Case 2 - logReadResponse contains tp0 and tp1. - logReadResponse = new HashMap<>(); + logReadResponse = new LinkedHashMap<>(); logReadResponse.put(tp0, mock(LogReadResult.class)); logReadResponse.put(tp1, mock(LogReadResult.class)); combinedLogReadResponse = delayedShareFetch.combineLogReadResponse(topicPartitionData, logReadResponse); @@ -597,6 +617,8 @@ public void testExceptionInMinBytesCalculation() { any(), any(), any(ReplicaQuota.class), anyBoolean()); // releasePartitionLocks will be called twice, once from tryComplete and then from onComplete. Mockito.verify(delayedShareFetch, times(2)).releasePartitionLocks(any()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); } @Test @@ -629,6 +651,8 @@ public void testLocksReleasedForCompletedFetch() { assertFalse(spy.tryComplete()); Mockito.verify(sp0, times(1)).releaseFetchLock(); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); } @Test @@ -653,6 +677,8 @@ public void testLocksReleasedAcquireException() { assertFalse(delayedShareFetch.tryComplete()); Mockito.verify(sp0, times(1)).releaseFetchLock(); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); } static void mockTopicIdPartitionToReturnDataEqualToMinBytes(ReplicaManager replicaManager, TopicIdPartition topicIdPartition, int minBytes) { diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index 67c2a6cce778..2454cf4912ba 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -1718,6 +1718,8 @@ public void testAcknowledgeCompletesDelayedShareFetchRequest() { Mockito.verify(sp1, times(1)).nextFetchOffset(); Mockito.verify(sp2, times(0)).nextFetchOffset(); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); } @Test @@ -1816,6 +1818,8 @@ public void testAcknowledgeDoesNotCompleteDelayedShareFetchRequest() { Mockito.verify(sp1, times(0)).nextFetchOffset(); Mockito.verify(sp2, times(0)).nextFetchOffset(); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); } @Test @@ -1914,6 +1918,8 @@ public void testReleaseSessionCompletesDelayedShareFetchRequest() { Mockito.verify(sp1, times(1)).nextFetchOffset(); Mockito.verify(sp2, times(0)).nextFetchOffset(); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); } @Test @@ -2016,6 +2022,8 @@ public void testReleaseSessionDoesNotCompleteDelayedShareFetchRequest() { Mockito.verify(sp1, times(0)).nextFetchOffset(); Mockito.verify(sp2, times(0)).nextFetchOffset(); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); } @Test diff --git a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java index 0ad638240c89..f3c818cb9c6c 100644 --- a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java +++ b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java @@ -42,8 +42,8 @@ public abstract class DelayedOperation extends TimerTask { private final AtomicBoolean completed = new AtomicBoolean(false); - // Visible for testing - final Lock lock; + + protected final Lock lock; public DelayedOperation(long delayMs, Optional lockOpt) { this(delayMs, lockOpt.orElse(new ReentrantLock()));