Skip to content

Commit edec697

Browse files
KAFKA-18683: Handle slicing of file records for updated start position
1 parent 184b891 commit edec697

File tree

2 files changed

+36
-1
lines changed

2 files changed

+36
-1
lines changed

clients/src/main/java/org/apache/kafka/common/record/FileRecords.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,9 @@ private int availableBytes(int position, int size) {
161161

162162
if (position < 0)
163163
throw new IllegalArgumentException("Invalid position: " + position + " in read from " + this);
164-
if (position > currentSizeInBytes - start)
164+
// position should always be relative to the start of the file hence compare with file size
165+
// to verify if the position is within the file.
166+
if (position > currentSizeInBytes)
165167
throw new IllegalArgumentException("Slice from position " + position + " exceeds end position of " + this);
166168
if (size < 0)
167169
throw new IllegalArgumentException("Invalid size: " + size + " in read from " + this);

clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java

+33
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.util.concurrent.ExecutorService;
4545
import java.util.concurrent.Executors;
4646
import java.util.concurrent.Future;
47+
import java.util.stream.IntStream;
4748

4849
import static java.util.Arrays.asList;
4950
import static org.apache.kafka.common.utils.Utils.utf8;
@@ -433,6 +434,38 @@ public void testSearchForTimestamp() throws IOException {
433434
}
434435
}
435436

437+
/**
438+
* Test slice when already sliced file records have start position greater than available bytes
439+
* in the file records.
440+
*/
441+
@Test
442+
public void testSliceForAlreadySlicedFileRecords() throws IOException {
443+
byte[][] values = new byte[][] {
444+
"abcd".getBytes(),
445+
"efgh".getBytes(),
446+
"ijkl".getBytes(),
447+
"mnop".getBytes(),
448+
"qrst".getBytes()
449+
};
450+
try (FileRecords fileRecords = createFileRecords(values)) {
451+
List<RecordBatch> items = batches(fileRecords.slice(0, fileRecords.sizeInBytes()));
452+
453+
// Slice from fourth message until the end.
454+
int position = IntStream.range(0, 3).map(i -> items.get(i).sizeInBytes()).sum();
455+
FileRecords sliced = fileRecords.slice(position, fileRecords.sizeInBytes() - position);
456+
assertEquals(fileRecords.sizeInBytes() - position, sliced.sizeInBytes());
457+
assertEquals(items.subList(3, items.size()), batches(sliced), "Read starting from the fourth message");
458+
459+
// Further slice the already sliced file records, from fifth message until the end. Now the
460+
// bytes available in the sliced file records are less than the start position. However, the
461+
// position to slice is relative hence reset position to first batch in the sliced file records.
462+
position = items.get(4).sizeInBytes();
463+
FileRecords finalSliced = sliced.slice(position, sliced.sizeInBytes() - position);
464+
assertEquals(sliced.sizeInBytes() - position, finalSliced.sizeInBytes());
465+
assertEquals(items.subList(4, items.size()), batches(finalSliced), "Read starting from the fifth message");
466+
}
467+
}
468+
436469
private void testSearchForTimestamp(RecordVersion version) throws IOException {
437470
File temp = tempFile();
438471
FileRecords fileRecords = FileRecords.open(temp, false, 1024 * 1024, true);

0 commit comments

Comments
 (0)