Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: cleanup KStream JavaDocs (6/N) - map[Values] #18755

Merged
merged 2 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 50 additions & 158 deletions streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,15 @@ <KOut> KStream<KOut, V> selectKey(final KeyValueMapper<? super K, ? super V, ? e
final Named named);

/**
* Transform the value of each input record into a new value (with possible new type) of the output record.
* The provided {@link ValueMapper} is applied to each input record value and computes a new value for it.
* Create a new {@code KStream} that consists of all records of this stream but with a modified value.
* The provided {@link ValueMapper} is applied to each input record value and computes a new value (possibly
* of a different type) for it.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
* If you need read access to the input record key, use {@link #mapValues(ValueMapperWithKey)}.
* This is a stateless record-by-record operation (cf.
* {@link #processValues(FixedKeyProcessorSupplier, String...)} for stateful value processing).
* <p>
* The example below counts the number of token of the value string.
*
* <p>The example below counts the number of token of the value string.
* <pre>{@code
* KStream<String, String> inputStream = builder.stream("topic");
* KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapper<String, Integer> {
Expand All @@ -164,137 +166,58 @@ <KOut> KStream<KOut, V> selectKey(final KeyValueMapper<? super K, ? super V, ? e
* }
* });
* }</pre>
*
* Setting a new value preserves data co-location with respect to the key.
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
* is applied to the result {@code KStream}. (cf. {@link #map(KeyValueMapper)})
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation
* or join) is applied to the result {@code KStream} (cf. {@link #map(KeyValueMapper)}).
*
* @param mapper
* a {@link ValueMapper} that computes a new value for each input record
*
* @param <VOut> the value type of the result stream
*
* @return A {@code KStream} that contains records with unmodified key and new values (possibly of a different type).
*
* @param mapper a {@link ValueMapper} that computes a new output value
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
* @see #selectKey(KeyValueMapper)
* @see #map(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
* @see #processValues(FixedKeyProcessorSupplier, String...)
*/
<VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper);
<VOut> KStream<K, VOut> mapValues(final ValueMapper<? super V, ? extends VOut> mapper);

/**
* Transform the value of each input record into a new value (with possible new type) of the output record.
* The provided {@link ValueMapper} is applied to each input record value and computes a new value for it.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
* This is a stateless record-by-record operation (cf.
* {@link #processValues(FixedKeyProcessorSupplier, String...)} for stateful value processing).
* <p>
* The example below counts the number of token of the value string.
* <pre>{@code
* KStream<String, String> inputStream = builder.stream("topic");
* KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapper<String, Integer> {
* Integer apply(String value) {
* return value.split(" ").length;
* }
* });
* }</pre>
* Setting a new value preserves data co-location with respect to the key.
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
* is applied to the result {@code KStream}. (cf. {@link #map(KeyValueMapper)})
* See {@link #mapValues(ValueMapper)}.
*
* @param mapper a {@link ValueMapper} that computes a new output value
* @param named a {@link Named} config used to name the processor in the topology
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
* @see #selectKey(KeyValueMapper)
* @see #map(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
* @see #processValues(FixedKeyProcessorSupplier, String...)
* <p>Takes an additional {@link Named} parameter that is used to name the processor in the topology.
*/
<VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
final Named named);
<VOut> KStream<K, VOut> mapValues(final ValueMapper<? super V, ? extends VOut> mapper,
final Named named);

/**
* Transform the value of each input record into a new value (with possible new type) of the output record.
* The provided {@link ValueMapperWithKey} is applied to each input record value and computes a new value for it.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
* This is a stateless record-by-record operation (cf.
* {@link #processValues(FixedKeyProcessorSupplier, String...)} for stateful value processing).
* <p>
* The example below counts the number of tokens of key and value strings.
* <pre>{@code
* KStream<String, String> inputStream = builder.stream("topic");
* KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapperWithKey<String, String, Integer> {
* Integer apply(String readOnlyKey, String value) {
* return readOnlyKey.split(" ").length + value.split(" ").length;
* }
* });
* }</pre>
* Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
* So, setting a new value preserves data co-location with respect to the key.
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
* is applied to the result {@code KStream}. (cf. {@link #map(KeyValueMapper)})
* See {@link #mapValues(ValueMapper)}.
*
* @param mapper a {@link ValueMapperWithKey} that computes a new output value
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
* @see #selectKey(KeyValueMapper)
* @see #map(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
* @see #processValues(FixedKeyProcessorSupplier, String...)
* <p>Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning.
*/
<VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper);
<VOut> KStream<K, VOut> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VOut> mapper);

/**
* Transform the value of each input record into a new value (with possible new type) of the output record.
* The provided {@link ValueMapperWithKey} is applied to each input record value and computes a new value for it.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
* This is a stateless record-by-record operation (cf.
* {@link #processValues(FixedKeyProcessorSupplier, String...)} for stateful value processing).
* <p>
* The example below counts the number of tokens of key and value strings.
* <pre>{@code
* KStream<String, String> inputStream = builder.stream("topic");
* KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapperWithKey<String, String, Integer> {
* Integer apply(String readOnlyKey, String value) {
* return readOnlyKey.split(" ").length + value.split(" ").length;
* }
* });
* }</pre>
* Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
* So, setting a new value preserves data co-location with respect to the key.
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
* is applied to the result {@code KStream}. (cf. {@link #map(KeyValueMapper)})
* See {@link #mapValues(ValueMapperWithKey)}.
*
* @param mapper a {@link ValueMapperWithKey} that computes a new output value
* @param named a {@link Named} config used to name the processor in the topology
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
* @see #selectKey(KeyValueMapper)
* @see #map(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
* @see #processValues(FixedKeyProcessorSupplier, String...)
* <p>Takes an additional {@link Named} parameter that is used to name the processor in the topology.
*/
<VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper,
final Named named);
<VOut> KStream<K, VOut> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VOut> mapper,
final Named named);

/**
* Transform each record of the input stream into a new record in the output stream (both key and value type can be
* altered arbitrarily).
* The provided {@link KeyValueMapper} is applied to each input record and computes a new output record.
* Create a new {@code KStream} that consists of a modified record for each record in this stream.
* The provided {@link KeyValueMapper} is applied to each input record and computes a new output record
* (possibly of a different key and/or value type) for it.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V'>}.
* This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for
* stateful record processing).
* <p>
* The example below normalizes the String key to upper-case letters and counts the number of token of the value string.
*
* <p>The example below normalizes the String key to upper-case letters and counts the number of token of the
* value string.
* <pre>{@code
* KStream<String, String> inputStream = builder.stream("topic");
* KStream<String, Integer> outputStream = inputStream.map(new KeyValueMapper<String, String, KeyValue<String, Integer>> {
Expand All @@ -304,63 +227,32 @@ <VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? e
* });
* }</pre>
* The provided {@link KeyValueMapper} must return a {@link KeyValue} type and must not return {@code null}.
* <p>
* Mapping records might result in an internal data redistribution if a key based operator (like an aggregation or
* join) is applied to the result {@code KStream}. (cf. {@link #mapValues(ValueMapper)})
*
* @param mapper a {@link KeyValueMapper} that computes a new output record
* @param <KR> the key type of the result stream
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains records with new key and value (possibly both of different type)
* <p>Mapping records might result in an internal data redistribution if a key-based operator (like an
* aggregation or join) is applied to the result {@code KStream} (cf. {@link #mapValues(ValueMapper)}).
*
* @param mapper
* a {@link KeyValueMapper} that computes a new {@link KeyValue} pair for each input record
*
* @param <KOut> the key type of the result stream
* @param <VOut> the value type of the result stream
*
* @return A {@code KStream} that contains records with new key and new value (possibly of different types).
*
* @see #selectKey(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
* @see #processValues(FixedKeyProcessorSupplier, String...)
*/
<KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);
<KOut, VOut> KStream<KOut, VOut> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KOut, ? extends VOut>> mapper);

/**
* Transform each record of the input stream into a new record in the output stream (both key and value type can be
* altered arbitrarily).
* The provided {@link KeyValueMapper} is applied to each input record and computes a new output record.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V'>}.
* This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for
* stateful record processing).
* <p>
* The example below normalizes the String key to upper-case letters and counts the number of token of the value string.
* <pre>{@code
* KStream<String, String> inputStream = builder.stream("topic");
* KStream<String, Integer> outputStream = inputStream.map(new KeyValueMapper<String, String, KeyValue<String, Integer>> {
* KeyValue<String, Integer> apply(String key, String value) {
* return new KeyValue<>(key.toUpperCase(), value.split(" ").length);
* }
* });
* }</pre>
* The provided {@link KeyValueMapper} must return a {@link KeyValue} type and must not return {@code null}.
* <p>
* Mapping records might result in an internal data redistribution if a key based operator (like an aggregation or
* join) is applied to the result {@code KStream}. (cf. {@link #mapValues(ValueMapper)})
* See {@link #map(KeyValueMapper)}.
*
* @param mapper a {@link KeyValueMapper} that computes a new output record
* @param named a {@link Named} config used to name the processor in the topology
* @param <KR> the key type of the result stream
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains records with new key and value (possibly both of different type)
* @see #selectKey(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
* @see #processValues(FixedKeyProcessorSupplier, String...)
* <p>Takes an additional {@link Named} parameter that is used to name the processor in the topology.
*/
<KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper,
final Named named);
<KOut, VOut> KStream<KOut, VOut> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KOut, ? extends VOut>> mapper,
final Named named);

/**
* Transform each record of the input stream into zero or more records in the output stream (both key and value type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,24 +237,24 @@ private <KR> ProcessorGraphNode<K, V> internalSelectKey(final KeyValueMapper<? s
}

@Override
public <VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> valueMapper) {
public <VOut> KStream<K, VOut> mapValues(final ValueMapper<? super V, ? extends VOut> valueMapper) {
return mapValues(withKey(valueMapper));
}

@Override
public <VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
final Named named) {
public <VOut> KStream<K, VOut> mapValues(final ValueMapper<? super V, ? extends VOut> mapper,
final Named named) {
return mapValues(withKey(mapper), named);
}

@Override
public <VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> valueMapperWithKey) {
public <VOut> KStream<K, VOut> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VOut> valueMapperWithKey) {
return mapValues(valueMapperWithKey, NamedInternal.empty());
}

@Override
public <VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> valueMapperWithKey,
final Named named) {
public <VOut> KStream<K, VOut> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VOut> valueMapperWithKey,
final Named named) {
Objects.requireNonNull(valueMapperWithKey, "valueMapperWithKey can't be null");
Objects.requireNonNull(named, "named can't be null");

Expand All @@ -279,13 +279,13 @@ public <VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super
}

@Override
public <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper) {
public <KOut, VOut> KStream<KOut, VOut> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KOut, ? extends VOut>> mapper) {
return map(mapper, NamedInternal.empty());
}

@Override
public <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper,
final Named named) {
public <KOut, VOut> KStream<KOut, VOut> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KOut, ? extends VOut>> mapper,
final Named named) {
Objects.requireNonNull(mapper, "mapper can't be null");
Objects.requireNonNull(named, "named can't be null");

Expand Down