Skip to content

Commit

Permalink
KAFKA-17872: Update consumed offsets on records with invalid timestamp (
Browse files Browse the repository at this point in the history
#17710)

TimestampExtractor allows to drop records by returning a timestamp of -1. For this case, we still need to update consumed offsets to allows us to commit progress.

Reviewers: Bill Bejeck <[email protected]>
  • Loading branch information
mjsax authored Nov 10, 2024
1 parent 4b80591 commit 0bc91be
Show file tree
Hide file tree
Showing 3 changed files with 264 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ private void updateHead() {
deserialized.topic(), deserialized.partition(), deserialized.offset(), timestamp, timestampExtractor.getClass().getCanonicalName()
);
droppedRecordsSensor.record();
lastCorruptedRecord = raw;
continue;
}
headRecord = new StampedRecord(deserialized, timestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
Expand Down Expand Up @@ -74,8 +74,7 @@ public class RecordQueueTest {
private final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "mock", "processId", new MockTime());

@SuppressWarnings("rawtypes")
final InternalMockProcessorContext context = new InternalMockProcessorContext<>(
final InternalMockProcessorContext<Integer, Integer> context = new InternalMockProcessorContext<>(
StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
new MockRecordCollector(),
metrics
Expand All @@ -88,19 +87,28 @@ public class RecordQueueTest {
timestampExtractor,
new LogAndFailExceptionHandler(),
context,
new LogContext());
new LogContext()
);
private final RecordQueue queueThatSkipsDeserializeErrors = new RecordQueue(
new TopicPartition("topic", 1),
mockSourceNodeWithMetrics,
timestampExtractor,
new LogAndContinueExceptionHandler(),
context,
new LogContext());
new LogContext()
);
private final RecordQueue queueThatSkipsInvalidTimestamps = new RecordQueue(
new TopicPartition("topic", 1),
mockSourceNodeWithMetrics,
new LogAndSkipOnInvalidTimestamp(),
new LogAndFailExceptionHandler(),
context,
new LogContext()
);

private final byte[] recordValue = intSerializer.serialize(null, 10);
private final byte[] recordKey = intSerializer.serialize(null, 1);

@SuppressWarnings("unchecked")
@BeforeEach
public void before() {
mockSourceNodeWithMetrics.init(context);
Expand Down Expand Up @@ -340,7 +348,7 @@ public void shouldSetTimestampAndRespectMaxTimestampPolicy() {

@Test
public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() {
final byte[] key = Serdes.Long().serializer().serialize("foo", 1L);
final byte[] key = new LongSerializer().serialize("foo", 1L);
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, key, recordValue,
new RecordHeaders(), Optional.empty()));
Expand All @@ -354,7 +362,7 @@ public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() {

@Test
public void shouldThrowStreamsExceptionWhenValueDeserializationFails() {
final byte[] value = Serdes.Long().serializer().serialize("foo", 1L);
final byte[] value = new LongSerializer().serialize("foo", 1L);
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, value,
new RecordHeaders(), Optional.empty()));
Expand All @@ -368,7 +376,7 @@ public void shouldThrowStreamsExceptionWhenValueDeserializationFails() {

@Test
public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler() {
final byte[] key = Serdes.Long().serializer().serialize("foo", 1L);
final byte[] key = new LongSerializer().serialize("foo", 1L);
final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("topic", 1, 1, 0L,
TimestampType.CREATE_TIME, 0, 0, key, recordValue,
new RecordHeaders(), Optional.empty());
Expand All @@ -381,7 +389,7 @@ public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHan

@Test
public void shouldNotThrowStreamsExceptionWhenValueDeserializationFailsWithSkipHandler() {
final byte[] value = Serdes.Long().serializer().serialize("foo", 1L);
final byte[] value = new LongSerializer().serialize("foo", 1L);
final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("topic", 1, 1, 0L,
TimestampType.CREATE_TIME, 0, 0, recordKey, value,
new RecordHeaders(), Optional.empty());
Expand All @@ -404,7 +412,7 @@ public void shouldThrowOnNegativeTimestamp() {
mockSourceNodeWithMetrics,
new FailOnInvalidTimestamp(),
new LogAndContinueExceptionHandler(),
new InternalMockProcessorContext(),
new InternalMockProcessorContext<>(),
new LogContext());

final StreamsException exception = assertThrows(
Expand All @@ -421,20 +429,25 @@ public void shouldThrowOnNegativeTimestamp() {

@Test
public void shouldDropOnNegativeTimestamp() {
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
new RecordHeaders(), Optional.empty()));
final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
"topic",
1,
1,
-1L, // negative timestamp
TimestampType.CREATE_TIME,
0,
0,
recordKey,
recordValue,
new RecordHeaders(),
Optional.empty()
);
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(record);

final RecordQueue queue = new RecordQueue(
new TopicPartition("topic", 1),
mockSourceNodeWithMetrics,
new LogAndSkipOnInvalidTimestamp(),
new LogAndContinueExceptionHandler(),
new InternalMockProcessorContext(),
new LogContext());
queue.addRawRecords(records);
queueThatSkipsInvalidTimestamps.addRawRecords(records);

assertEquals(0, queue.size());
assertEquals(1, queueThatSkipsInvalidTimestamps.size());
assertEquals(new CorruptedRecord(record), queueThatSkipsInvalidTimestamps.poll(0));
}

@Test
Expand Down
Loading

0 comments on commit 0bc91be

Please sign in to comment.