Skip to content

Commit f14bd2e

Browse files
committed
KAFKA-16505: Fix source raw key and value in store caches
1 parent 7c987d4 commit f14bd2e

File tree

5 files changed

+345
-75
lines changed

5 files changed

+345
-75
lines changed

clients/src/main/java/org/apache/kafka/common/utils/Utils.java

+26
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
*/
1717
package org.apache.kafka.common.utils;
1818

19+
import static java.nio.charset.StandardCharsets.UTF_8;
20+
21+
import java.util.Optional;
1922
import org.apache.kafka.common.KafkaException;
2023
import org.apache.kafka.common.config.ConfigDef;
2124
import org.apache.kafka.common.config.ConfigException;
@@ -314,6 +317,29 @@ public static byte[] getNullableSizePrefixedArray(final ByteBuffer buffer) {
314317
return getNullableArray(buffer, size);
315318
}
316319

320+
/**
321+
* Starting from the current position, read an optional marker char indicating the presence of a byte array to read.
322+
* If the marker is present, read the size of the byte array to read, then read the array.
323+
* If the marker is not present, reset the buffer to the saved position and return an empty Optional.
324+
* @param marker The marker char to indicate the presence of a byte array
325+
* @param buffer The buffer to read a size-prefixed array from
326+
* @return The array
327+
*/
328+
public static Optional<byte[]> getOptionalField(final char marker, final ByteBuffer buffer) {
329+
if (buffer.remaining() < Character.BYTES) {
330+
return Optional.empty();
331+
}
332+
333+
buffer.mark();
334+
335+
char serializedMarker = buffer.getChar();
336+
if (serializedMarker == marker) {
337+
return Optional.of(getNullableSizePrefixedArray(buffer));
338+
}
339+
340+
buffer.reset(); // marker is not present, reset the buffer to the saved position
341+
return Optional.empty();
342+
}
317343
/**
318344
* Read a byte array of the given size. Consumes the buffer: upon returning, the buffer's position
319345
* is after the array that is returned.

streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java

+19-29
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static java.nio.charset.StandardCharsets.UTF_8;
2020
import static java.util.Objects.requireNonNull;
2121
import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;
22+
import static org.apache.kafka.common.utils.Utils.getOptionalField;
2223

2324
import java.nio.ByteBuffer;
2425
import java.util.Arrays;
@@ -162,6 +163,18 @@ public byte[] serialize() {
162163
headerValuesBytes[i] = valueBytes;
163164
}
164165

166+
if (sourceRawKey != null) {
167+
size += Character.BYTES; // marker for sourceRawKey being present
168+
size += Integer.BYTES; // size of sourceRawKey
169+
size += sourceRawKey.length;
170+
}
171+
172+
if (sourceRawValue != null) {
173+
size += Character.BYTES; // marker for sourceRawValue being present
174+
size += Integer.BYTES; // size of sourceRawValue
175+
size += sourceRawValue.length;
176+
}
177+
165178
final ByteBuffer buffer = ByteBuffer.allocate(size);
166179
buffer.putLong(timestamp);
167180
buffer.putLong(offset);
@@ -185,39 +198,13 @@ public byte[] serialize() {
185198
}
186199

187200
if (sourceRawKey != null) {
201+
buffer.putChar('k');
188202
buffer.putInt(sourceRawKey.length);
189203
buffer.put(sourceRawKey);
190204
}
191205

192206
if (sourceRawValue != null) {
193-
buffer.putInt(sourceRawValue.length);
194-
buffer.put(sourceRawValue);
195-
}
196-
197-
return buffer.array();
198-
}
199-
200-
public byte[] serializeRawKeyValue() {
201-
int size = 0;
202-
203-
if (sourceRawKey != null) {
204-
size += Integer.BYTES; // size of sourceRawKey
205-
size += sourceRawKey.length;
206-
}
207-
208-
if (sourceRawValue != null) {
209-
size += Integer.BYTES; // size of sourceRawValue
210-
size += sourceRawValue.length;
211-
}
212-
213-
final ByteBuffer buffer = ByteBuffer.allocate(size);
214-
215-
if (sourceRawKey != null) {
216-
buffer.putInt(sourceRawKey.length);
217-
buffer.put(sourceRawKey);
218-
}
219-
220-
if (sourceRawValue != null) {
207+
buffer.putChar('v');
221208
buffer.putInt(sourceRawValue.length);
222209
buffer.put(sourceRawValue);
223210
}
@@ -249,7 +236,10 @@ public static ProcessorRecordContext deserialize(final ByteBuffer buffer) {
249236
headers = new RecordHeaders(headerArr);
250237
}
251238

252-
return new ProcessorRecordContext(timestamp, offset, partition, topic, headers);
239+
final byte[] rawKey = getOptionalField('k', buffer).orElse(null);
240+
final byte[] rawValue = getOptionalField('v', buffer).orElse(null);
241+
242+
return new ProcessorRecordContext(timestamp, offset, partition, topic, headers, rawKey, rawValue);
253243
}
254244

255245
public void freeRawRecord() {

streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java

-12
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,6 @@ static BufferValue deserialize(final ByteBuffer buffer) {
8181

8282
final byte[] newValue = getNullableSizePrefixedArray(buffer);
8383

84-
// Backward compatibility: if there is remaining data in the buffer, it is the raw key and value.
85-
if (buffer.remaining() > 0) {
86-
context.setSourceRawKey(getNullableSizePrefixedArray(buffer));
87-
context.setSourceRawValue(getNullableSizePrefixedArray(buffer));
88-
}
89-
9084
return new BufferValue(priorValue, oldValue, newValue, context);
9185
}
9286

@@ -99,15 +93,13 @@ ByteBuffer serialize(final int endPadding) {
9993
final int sizeOfNewValue = newValue == null ? 0 : newValue.length;
10094

10195
final byte[] serializedContext = recordContext.serialize();
102-
final byte[] serializedRawKeyValueContext = recordContext.serializeRawKeyValue();
10396

10497
final ByteBuffer buffer = ByteBuffer.allocate(
10598
serializedContext.length
10699
+ sizeOfValueLength + sizeOfPriorValue
107100
+ sizeOfValueLength + sizeOfOldValue
108101
+ sizeOfValueLength + sizeOfNewValue
109102
+ endPadding
110-
+ serializedRawKeyValueContext.length
111103
);
112104

113105
buffer.put(serializedContext);
@@ -125,10 +117,6 @@ ByteBuffer serialize(final int endPadding) {
125117

126118
addValue(buffer, newValue);
127119

128-
if (serializedRawKeyValueContext.length != 0) {
129-
buffer.put(serializedRawKeyValueContext);
130-
}
131-
132120
return buffer;
133121
}
134122

streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java

+145-4
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,29 @@
1616
*/
1717
package org.apache.kafka.streams.processor.internals;
1818

19+
import static java.nio.charset.StandardCharsets.UTF_8;
20+
import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;
21+
import static org.apache.kafka.common.utils.Utils.getOptionalField;
22+
import static org.junit.jupiter.api.Assertions.assertEquals;
23+
import static org.junit.jupiter.api.Assertions.assertFalse;
24+
import static org.junit.jupiter.api.Assertions.assertThrows;
25+
import static org.junit.jupiter.api.Assertions.assertTrue;
26+
27+
import java.nio.ByteBuffer;
28+
import java.util.Optional;
1929
import org.apache.kafka.common.header.Headers;
2030
import org.apache.kafka.common.header.internals.RecordHeaders;
21-
2231
import org.junit.jupiter.api.Test;
23-
24-
import static org.junit.jupiter.api.Assertions.assertEquals;
25-
import static org.junit.jupiter.api.Assertions.assertThrows;
32+
import org.junit.jupiter.params.ParameterizedTest;
33+
import org.junit.jupiter.params.provider.CsvSource;
2634

2735
public class ProcessorRecordContextTest {
2836
// timestamp + offset + partition: 8 + 8 + 4
2937
private static final long MIN_SIZE = 20L;
38+
private static final String SOURCE_RAW_KEY = "sourceRawKey";
39+
private static final byte[] SOURCE_RAW_KEY_BYTES = SOURCE_RAW_KEY.getBytes();
40+
private static final String SOURCE_RAW_VALUE = "sourceRawValue";
41+
private static final byte[] SOURCE_RAW_VALUE_BYTES = SOURCE_RAW_VALUE.getBytes();
3042

3143
@Test
3244
public void shouldNotAllowNullHeaders() {
@@ -111,4 +123,133 @@ public void shouldEstimateNullValueInHeaderAsZero() {
111123

112124
assertEquals(MIN_SIZE + 10L, context.residentMemorySizeEstimate());
113125
}
126+
127+
@Test
128+
public void shouldSerializeProcessorRecordContext() {
129+
final ProcessorRecordContext context = new ProcessorRecordContext(
130+
42L,
131+
73L,
132+
0,
133+
"topic",
134+
new RecordHeaders()
135+
);
136+
137+
final ByteBuffer serializedContext = ByteBuffer.wrap(context.serialize());
138+
139+
assertEquals(42L, serializedContext.getLong());
140+
assertEquals(73L, serializedContext.getLong());
141+
assertEquals("topic", new String(getNullableSizePrefixedArray(serializedContext), UTF_8));
142+
assertEquals(0, serializedContext.getInt());
143+
assertEquals(0, serializedContext.getInt());
144+
assertFalse(serializedContext.hasRemaining());
145+
}
146+
147+
@Test
148+
public void shouldSerializeProcessorRecordContextWithRawKey() {
149+
final ProcessorRecordContext context = new ProcessorRecordContext(
150+
42L,
151+
73L,
152+
0,
153+
"topic",
154+
new RecordHeaders(),
155+
SOURCE_RAW_KEY_BYTES,
156+
null
157+
);
158+
159+
final ByteBuffer serializedContext = ByteBuffer.wrap(context.serialize());
160+
161+
assertEquals(42L, serializedContext.getLong());
162+
assertEquals(73L, serializedContext.getLong());
163+
assertEquals("topic", new String(getNullableSizePrefixedArray(serializedContext), UTF_8));
164+
assertEquals(0, serializedContext.getInt());
165+
assertEquals(0, serializedContext.getInt());
166+
167+
final Optional<byte[]> rawKey = getOptionalField('k', serializedContext);
168+
assertTrue(rawKey.isPresent());
169+
assertEquals(SOURCE_RAW_KEY, new String(rawKey.get(), UTF_8));
170+
171+
assertFalse(serializedContext.hasRemaining());
172+
}
173+
174+
@Test
175+
public void shouldSerializeProcessorRecordContextWithRawValue() {
176+
final ProcessorRecordContext context = new ProcessorRecordContext(
177+
42L,
178+
73L,
179+
0,
180+
"topic",
181+
new RecordHeaders(),
182+
null,
183+
SOURCE_RAW_VALUE_BYTES
184+
);
185+
186+
final ByteBuffer serializedContext = ByteBuffer.wrap(context.serialize());
187+
188+
assertEquals(42L, serializedContext.getLong());
189+
assertEquals(73L, serializedContext.getLong());
190+
assertEquals("topic", new String(getNullableSizePrefixedArray(serializedContext), UTF_8));
191+
assertEquals(0, serializedContext.getInt());
192+
assertEquals(0, serializedContext.getInt());
193+
194+
final Optional<byte[]> rawValue = getOptionalField('v', serializedContext);
195+
assertTrue(rawValue.isPresent());
196+
assertEquals(SOURCE_RAW_VALUE, new String(rawValue.get(), UTF_8));
197+
198+
assertFalse(serializedContext.hasRemaining());
199+
}
200+
201+
@Test
202+
public void shouldSerializeProcessorRecordContextWithBothRawKeyAndRawValue() {
203+
final ProcessorRecordContext context = new ProcessorRecordContext(
204+
42L,
205+
73L,
206+
0,
207+
"topic",
208+
new RecordHeaders(),
209+
SOURCE_RAW_KEY_BYTES,
210+
SOURCE_RAW_VALUE_BYTES
211+
);
212+
213+
final ByteBuffer serializedContext = ByteBuffer.wrap(context.serialize());
214+
215+
assertEquals(42L, serializedContext.getLong());
216+
assertEquals(73L, serializedContext.getLong());
217+
assertEquals("topic", new String(getNullableSizePrefixedArray(serializedContext), UTF_8));
218+
assertEquals(0, serializedContext.getInt());
219+
assertEquals(0, serializedContext.getInt());
220+
221+
final Optional<byte[]> rawKey = getOptionalField('k', serializedContext);
222+
assertTrue(rawKey.isPresent());
223+
assertEquals(SOURCE_RAW_KEY, new String(rawKey.get(), UTF_8));
224+
225+
final Optional<byte[]> rawValue = getOptionalField('v', serializedContext);
226+
assertTrue(rawValue.isPresent());
227+
assertEquals(SOURCE_RAW_VALUE, new String(rawValue.get(), UTF_8));
228+
229+
assertFalse(serializedContext.hasRemaining());
230+
}
231+
232+
@ParameterizedTest
233+
@CsvSource(value = {
234+
"null,null",
235+
"rawKey,null",
236+
"null,rawValue",
237+
"rawKey,rawValue"
238+
}, nullValues = "null")
239+
public void shouldDeserializeProcessorRecordContext(final String rawKey, final String rawValue) {
240+
final ProcessorRecordContext context = new ProcessorRecordContext(
241+
42L,
242+
73L,
243+
0,
244+
"topic",
245+
new RecordHeaders(),
246+
rawKey != null ? rawKey.getBytes() : null,
247+
rawValue != null ? rawValue.getBytes() : null
248+
);
249+
250+
final byte[] serializedContext = context.serialize();
251+
final ProcessorRecordContext deserializedContext = ProcessorRecordContext.deserialize(ByteBuffer.wrap(serializedContext));
252+
253+
assertEquals(context, deserializedContext);
254+
}
114255
}

0 commit comments

Comments
 (0)