Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 79f6395

Browse files
Dabzloicgreffier
authored andcommittedFeb 1, 2025··
KAFKA-16505: Fix source raw key and value in store caches
1 parent f18457f commit 79f6395

28 files changed

+698
-87
lines changed
 

‎clients/src/main/java/org/apache/kafka/common/utils/Utils.java

+26
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
*/
1717
package org.apache.kafka.common.utils;
1818

19+
import static java.nio.charset.StandardCharsets.UTF_8;
20+
21+
import java.util.Optional;
1922
import org.apache.kafka.common.KafkaException;
2023
import org.apache.kafka.common.config.ConfigDef;
2124
import org.apache.kafka.common.config.ConfigException;
@@ -314,6 +317,29 @@ public static byte[] getNullableSizePrefixedArray(final ByteBuffer buffer) {
314317
return getNullableArray(buffer, size);
315318
}
316319

320+
/**
321+
* Starting from the current position, read an optional marker char indicating the presence of a byte array to read.
322+
* If the marker is present, read the size of the byte array to read, then read the array.
323+
* If the marker is not present, reset the buffer to the saved position and return an empty Optional.
324+
* @param marker The marker char to indicate the presence of a byte array
325+
* @param buffer The buffer to read a size-prefixed array from
326+
* @return The array
327+
*/
328+
public static Optional<byte[]> getOptionalField(final char marker, final ByteBuffer buffer) {
329+
if (buffer.remaining() < Character.BYTES) {
330+
return Optional.empty();
331+
}
332+
333+
buffer.mark();
334+
335+
char serializedMarker = buffer.getChar();
336+
if (serializedMarker == marker) {
337+
return Optional.of(getNullableSizePrefixedArray(buffer));
338+
}
339+
340+
buffer.reset(); // marker is not present, reset the buffer to the saved position
341+
return Optional.empty();
342+
}
317343
/**
318344
* Read a byte array of the given size. Consumes the buffer: upon returning, the buffer's position
319345
* is after the array that is returned.

‎streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java

+4
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,10 @@ private static void assertProcessingExceptionHandlerInputs(final ErrorHandlerCon
422422
assertTrue(Arrays.asList("ID123-A2", "ID123-A5").contains((String) record.value()));
423423
assertEquals("TOPIC_NAME", context.topic());
424424
assertEquals("KSTREAM-PROCESSOR-0000000003", context.processorNodeId());
425+
assertTrue(Arrays.equals("ID123-2-ERR".getBytes(), context.sourceRawKey())
426+
|| Arrays.equals("ID123-5-ERR".getBytes(), context.sourceRawKey()));
427+
assertTrue(Arrays.equals("ID123-A2".getBytes(), context.sourceRawValue())
428+
|| Arrays.equals("ID123-A5".getBytes(), context.sourceRawValue()));
425429
assertEquals(TIMESTAMP.toEpochMilli(), context.timestamp());
426430
assertTrue(exception.getMessage().contains("Exception should be handled by processing exception handler"));
427431
}

‎streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java

+34
Original file line numberDiff line numberDiff line change
@@ -147,4 +147,38 @@ public interface ErrorHandlerContext {
147147
* @return The timestamp.
148148
*/
149149
long timestamp();
150+
151+
/**
152+
* Return the non-deserialized byte[] of the input message key if the context has been triggered by a message.
153+
*
154+
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
155+
* punctuation callback}, or while processing a record that was forwarded by a punctuation
156+
* callback, it will return null.
157+
*
158+
* <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
159+
* to the repartition topic.
160+
*
161+
* <p> Always returns null if this method is invoked within a
162+
* ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception)
163+
*
164+
* @return the raw byte of the key of the source message
165+
*/
166+
byte[] sourceRawKey();
167+
168+
/**
169+
* Return the non-deserialized byte[] of the input message value if the context has been triggered by a message.
170+
*
171+
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
172+
* punctuation callback}, or while processing a record that was forwarded by a punctuation
173+
* callback, it will return null.
174+
*
175+
* <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
176+
* to the repartition topic.
177+
*
178+
* <p> Always returns null if this method is invoked within a
179+
* ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception)
180+
*
181+
* @return the raw byte of the value of the source message
182+
*/
183+
byte[] sourceRawValue();
150184
}

‎streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext {
3333
private final Headers headers;
3434
private final String processorNodeId;
3535
private final TaskId taskId;
36+
private final byte[] sourceRawKey;
37+
private final byte[] sourceRawValue;
3638

3739
private final long timestamp;
3840
private final ProcessorContext processorContext;
@@ -44,7 +46,9 @@ public DefaultErrorHandlerContext(final ProcessorContext processorContext,
4446
final Headers headers,
4547
final String processorNodeId,
4648
final TaskId taskId,
47-
final long timestamp) {
49+
final long timestamp,
50+
final byte[] sourceRawKey,
51+
final byte[] sourceRawValue) {
4852
this.topic = topic;
4953
this.partition = partition;
5054
this.offset = offset;
@@ -53,6 +57,8 @@ public DefaultErrorHandlerContext(final ProcessorContext processorContext,
5357
this.taskId = taskId;
5458
this.processorContext = processorContext;
5559
this.timestamp = timestamp;
60+
this.sourceRawKey = sourceRawKey;
61+
this.sourceRawValue = sourceRawValue;
5662
}
5763

5864
@Override
@@ -90,6 +96,14 @@ public long timestamp() {
9096
return timestamp;
9197
}
9298

99+
public byte[] sourceRawKey() {
100+
return sourceRawKey;
101+
}
102+
103+
public byte[] sourceRawValue() {
104+
return sourceRawValue;
105+
}
106+
93107
@Override
94108
public String toString() {
95109
// we do exclude headers on purpose, to not accidentally log user data

‎streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java

+34
Original file line numberDiff line numberDiff line change
@@ -110,4 +110,38 @@ public interface RecordContext {
110110
*/
111111
Headers headers();
112112

113+
/**
114+
* Return the non-deserialized byte[] of the input message key if the context has been triggered by a message.
115+
*
116+
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
117+
* punctuation callback}, or while processing a record that was forwarded by a punctuation
118+
* callback, it will return null.
119+
*
120+
* <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
121+
* to the repartition topic.
122+
*
123+
* <p> Always returns null if this method is invoked within a
124+
* ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception)
125+
*
126+
* @return the raw byte of the key of the source message
127+
*/
128+
byte[] sourceRawKey();
129+
130+
/**
131+
* Return the non-deserialized byte[] of the input message value if the context has been triggered by a message.
132+
*
133+
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
134+
* punctuation callback}, or while processing a record that was forwarded by a punctuation
135+
* callback, it will return null.
136+
*
137+
* <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
138+
* to the repartition topic.
139+
*
140+
* <p> Always returns null if this method is invoked within a
141+
* ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception)
142+
*
143+
* @return the raw byte of the value of the source message
144+
*/
145+
byte[] sourceRawValue();
146+
113147
}

‎streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,10 @@ public <K, V> void forward(final Record<K, V> record, final String childName) {
260260
recordContext.offset(),
261261
recordContext.partition(),
262262
recordContext.topic(),
263-
record.headers());
263+
record.headers(),
264+
recordContext.sourceRawKey(),
265+
recordContext.sourceRawValue()
266+
);
264267
}
265268

266269
if (childName == null) {

‎streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,9 @@ public void process(final Record<KIn, VIn> record) {
215215
internalProcessorContext.recordContext().headers(),
216216
internalProcessorContext.currentNode().name(),
217217
internalProcessorContext.taskId(),
218-
internalProcessorContext.recordContext().timestamp()
219-
);
218+
internalProcessorContext.recordContext().timestamp(),
219+
internalProcessorContext.recordContext().sourceRawKey(),
220+
internalProcessorContext.recordContext().sourceRawValue());
220221

221222
final ProcessingExceptionHandler.ProcessingHandlerResponse response;
222223
try {

‎streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java

+82-9
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,30 @@
1616
*/
1717
package org.apache.kafka.streams.processor.internals;
1818

19+
import static java.nio.charset.StandardCharsets.UTF_8;
20+
import static java.util.Objects.requireNonNull;
21+
import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;
22+
import static org.apache.kafka.common.utils.Utils.getOptionalField;
23+
24+
import java.nio.ByteBuffer;
25+
import java.util.Arrays;
26+
import java.util.Objects;
1927
import org.apache.kafka.common.header.Header;
2028
import org.apache.kafka.common.header.Headers;
2129
import org.apache.kafka.common.header.internals.RecordHeader;
2230
import org.apache.kafka.common.header.internals.RecordHeaders;
2331
import org.apache.kafka.streams.processor.RecordContext;
2432
import org.apache.kafka.streams.processor.api.RecordMetadata;
2533

26-
import java.nio.ByteBuffer;
27-
import java.util.Objects;
28-
29-
import static java.nio.charset.StandardCharsets.UTF_8;
30-
import static java.util.Objects.requireNonNull;
31-
import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;
32-
3334
public class ProcessorRecordContext implements RecordContext, RecordMetadata {
3435

3536
private final long timestamp;
3637
private final long offset;
3738
private final String topic;
3839
private final int partition;
3940
private final Headers headers;
41+
private byte[] sourceRawKey;
42+
private byte[] sourceRawValue;
4043

4144
public ProcessorRecordContext(final long timestamp,
4245
final long offset,
@@ -48,6 +51,24 @@ public ProcessorRecordContext(final long timestamp,
4851
this.topic = topic;
4952
this.partition = partition;
5053
this.headers = Objects.requireNonNull(headers);
54+
this.sourceRawKey = null;
55+
this.sourceRawValue = null;
56+
}
57+
58+
public ProcessorRecordContext(final long timestamp,
59+
final long offset,
60+
final int partition,
61+
final String topic,
62+
final Headers headers,
63+
final byte[] sourceRawKey,
64+
final byte[] sourceRawValue) {
65+
this.timestamp = timestamp;
66+
this.offset = offset;
67+
this.topic = topic;
68+
this.partition = partition;
69+
this.headers = Objects.requireNonNull(headers);
70+
this.sourceRawKey = sourceRawKey;
71+
this.sourceRawValue = sourceRawValue;
5172
}
5273

5374
@Override
@@ -75,6 +96,24 @@ public Headers headers() {
7596
return headers;
7697
}
7798

99+
@Override
100+
public byte[] sourceRawKey() {
101+
return sourceRawKey;
102+
}
103+
104+
@Override
105+
public byte[] sourceRawValue() {
106+
return sourceRawValue;
107+
}
108+
109+
public void setSourceRawKey(final byte[] sourceRawKey) {
110+
this.sourceRawKey = sourceRawKey;
111+
}
112+
113+
public void setSourceRawValue(final byte[] sourceRawValue) {
114+
this.sourceRawValue = sourceRawValue;
115+
}
116+
78117
public long residentMemorySizeEstimate() {
79118
long size = 0;
80119
size += Long.BYTES; // value.context.timestamp
@@ -124,6 +163,18 @@ public byte[] serialize() {
124163
headerValuesBytes[i] = valueBytes;
125164
}
126165

166+
if (sourceRawKey != null) {
167+
size += Character.BYTES; // marker for sourceRawKey being present
168+
size += Integer.BYTES; // size of sourceRawKey
169+
size += sourceRawKey.length;
170+
}
171+
172+
if (sourceRawValue != null) {
173+
size += Character.BYTES; // marker for sourceRawValue being present
174+
size += Integer.BYTES; // size of sourceRawValue
175+
size += sourceRawValue.length;
176+
}
177+
127178
final ByteBuffer buffer = ByteBuffer.allocate(size);
128179
buffer.putLong(timestamp);
129180
buffer.putLong(offset);
@@ -146,6 +197,18 @@ public byte[] serialize() {
146197
}
147198
}
148199

200+
if (sourceRawKey != null) {
201+
buffer.putChar('k');
202+
buffer.putInt(sourceRawKey.length);
203+
buffer.put(sourceRawKey);
204+
}
205+
206+
if (sourceRawValue != null) {
207+
buffer.putChar('v');
208+
buffer.putInt(sourceRawValue.length);
209+
buffer.put(sourceRawValue);
210+
}
211+
149212
return buffer.array();
150213
}
151214

@@ -173,7 +236,15 @@ public static ProcessorRecordContext deserialize(final ByteBuffer buffer) {
173236
headers = new RecordHeaders(headerArr);
174237
}
175238

176-
return new ProcessorRecordContext(timestamp, offset, partition, topic, headers);
239+
final byte[] rawKey = getOptionalField('k', buffer).orElse(null);
240+
final byte[] rawValue = getOptionalField('v', buffer).orElse(null);
241+
242+
return new ProcessorRecordContext(timestamp, offset, partition, topic, headers, rawKey, rawValue);
243+
}
244+
245+
public void freeRawRecord() {
246+
this.sourceRawKey = null;
247+
this.sourceRawValue = null;
177248
}
178249

179250
@Override
@@ -189,7 +260,9 @@ public boolean equals(final Object o) {
189260
offset == that.offset &&
190261
partition == that.partition &&
191262
Objects.equals(topic, that.topic) &&
192-
Objects.equals(headers, that.headers);
263+
Objects.equals(headers, that.headers) &&
264+
Arrays.equals(sourceRawKey, that.sourceRawKey) &&
265+
Arrays.equals(sourceRawValue, that.sourceRawValue);
193266
}
194267

195268
/**

‎streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,10 @@ public <K, V> void send(final String topic,
259259

260260
final ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers);
261261

262+
// As many records could be in-flight,
263+
// freeing raw records in the context to reduce memory pressure
264+
freeRawInputRecordFromContext(context);
265+
262266
streamsProducer.send(serializedRecord, (metadata, exception) -> {
263267
try {
264268
// if there's already an exception record, skip logging offsets or new exceptions
@@ -311,6 +315,12 @@ public <K, V> void send(final String topic,
311315
});
312316
}
313317

318+
private static void freeRawInputRecordFromContext(final InternalProcessorContext<Void, Void> context) {
319+
if (context != null && context.recordContext() != null) {
320+
context.recordContext().freeRawRecord();
321+
}
322+
}
323+
314324
private <K, V> void handleException(final ProductionExceptionHandler.SerializationExceptionOrigin origin,
315325
final String topic,
316326
final K key,
@@ -388,7 +398,9 @@ private DefaultErrorHandlerContext errorHandlerContext(final InternalProcessorCo
388398
recordContext.headers(),
389399
processorNodeId,
390400
taskId,
391-
recordContext.timestamp()
401+
recordContext.timestamp(),
402+
context.recordContext().sourceRawKey(),
403+
context.recordContext().sourceRawValue()
392404
) :
393405
new DefaultErrorHandlerContext(
394406
context,
@@ -398,7 +410,9 @@ private DefaultErrorHandlerContext errorHandlerContext(final InternalProcessorCo
398410
new RecordHeaders(),
399411
processorNodeId,
400412
taskId,
401-
-1L
413+
-1L,
414+
null,
415+
null
402416
);
403417
}
404418

0 commit comments

Comments
 (0)
Please sign in to comment.