Skip to content

Commit

Permalink
MINOR: cleanup KStream JavaDocs (6/N) - map[Values] (apache#18755)
Browse files Browse the repository at this point in the history
Reviewers: Lucas Brutschy <[email protected]>
  • Loading branch information
mjsax authored Feb 5, 2025
1 parent 00dddee commit 5988ee5
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 167 deletions.
208 changes: 50 additions & 158 deletions streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,15 @@ <KOut> KStream<KOut, V> selectKey(final KeyValueMapper<? super K, ? super V, ? e
final Named named);

/**
* Transform the value of each input record into a new value (with possible new type) of the output record.
* The provided {@link ValueMapper} is applied to each input record value and computes a new value for it.
* Create a new {@code KStream} that consists of all records of this stream but with a modified value.
* The provided {@link ValueMapper} is applied to each input record value and computes a new value (possibly
* of a different type) for it.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
* If you need read access to the input record key, use {@link #mapValues(ValueMapperWithKey)}.
* This is a stateless record-by-record operation (cf.
* {@link #processValues(FixedKeyProcessorSupplier, String...)} for stateful value processing).
* <p>
* The example below counts the number of token of the value string.
*
* <p>The example below counts the number of token of the value string.
* <pre>{@code
* KStream<String, String> inputStream = builder.stream("topic");
* KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapper<String, Integer> {
Expand All @@ -167,137 +169,58 @@ <KOut> KStream<KOut, V> selectKey(final KeyValueMapper<? super K, ? super V, ? e
* }
* });
* }</pre>
*
* Setting a new value preserves data co-location with respect to the key.
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
* is applied to the result {@code KStream}. (cf. {@link #map(KeyValueMapper)})
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation
* or join) is applied to the result {@code KStream} (cf. {@link #map(KeyValueMapper)}).
*
* @param mapper
* a {@link ValueMapper} that computes a new value for each input record
*
* @param <VOut> the value type of the result stream
*
* @return A {@code KStream} that contains records with unmodified key and new values (possibly of a different type).
*
* @param mapper a {@link ValueMapper} that computes a new output value
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
* @see #selectKey(KeyValueMapper)
* @see #map(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
* @see #processValues(FixedKeyProcessorSupplier, String...)
*/
<VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper);
<VOut> KStream<K, VOut> mapValues(final ValueMapper<? super V, ? extends VOut> mapper);

/**
* Transform the value of each input record into a new value (with possible new type) of the output record.
* The provided {@link ValueMapper} is applied to each input record value and computes a new value for it.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
* This is a stateless record-by-record operation (cf.
* {@link #processValues(FixedKeyProcessorSupplier, String...)} for stateful value processing).
* <p>
* The example below counts the number of token of the value string.
* <pre>{@code
* KStream<String, String> inputStream = builder.stream("topic");
* KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapper<String, Integer> {
* Integer apply(String value) {
* return value.split(" ").length;
* }
* });
* }</pre>
* Setting a new value preserves data co-location with respect to the key.
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
* is applied to the result {@code KStream}. (cf. {@link #map(KeyValueMapper)})
* See {@link #mapValues(ValueMapper)}.
*
* @param mapper a {@link ValueMapper} that computes a new output value
* @param named a {@link Named} config used to name the processor in the topology
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
* @see #selectKey(KeyValueMapper)
* @see #map(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
* @see #processValues(FixedKeyProcessorSupplier, String...)
* <p>Takes an additional {@link Named} parameter that is used to name the processor in the topology.
*/
<VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
final Named named);
<VOut> KStream<K, VOut> mapValues(final ValueMapper<? super V, ? extends VOut> mapper,
final Named named);

/**
* Transform the value of each input record into a new value (with possible new type) of the output record.
* The provided {@link ValueMapperWithKey} is applied to each input record value and computes a new value for it.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
* This is a stateless record-by-record operation (cf.
* {@link #processValues(FixedKeyProcessorSupplier, String...)} for stateful value processing).
* <p>
* The example below counts the number of tokens of key and value strings.
* <pre>{@code
* KStream<String, String> inputStream = builder.stream("topic");
* KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapperWithKey<String, String, Integer> {
* Integer apply(String readOnlyKey, String value) {
* return readOnlyKey.split(" ").length + value.split(" ").length;
* }
* });
* }</pre>
* Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
* So, setting a new value preserves data co-location with respect to the key.
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
* is applied to the result {@code KStream}. (cf. {@link #map(KeyValueMapper)})
* See {@link #mapValues(ValueMapper)}.
*
* @param mapper a {@link ValueMapperWithKey} that computes a new output value
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
* @see #selectKey(KeyValueMapper)
* @see #map(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
* @see #processValues(FixedKeyProcessorSupplier, String...)
* <p>Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning.
*/
<VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper);
<VOut> KStream<K, VOut> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VOut> mapper);

/**
* Transform the value of each input record into a new value (with possible new type) of the output record.
* The provided {@link ValueMapperWithKey} is applied to each input record value and computes a new value for it.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
* This is a stateless record-by-record operation (cf.
* {@link #processValues(FixedKeyProcessorSupplier, String...)} for stateful value processing).
* <p>
* The example below counts the number of tokens of key and value strings.
* <pre>{@code
* KStream<String, String> inputStream = builder.stream("topic");
* KStream<String, Integer> outputStream = inputStream.mapValues(new ValueMapperWithKey<String, String, Integer> {
* Integer apply(String readOnlyKey, String value) {
* return readOnlyKey.split(" ").length + value.split(" ").length;
* }
* });
* }</pre>
* Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
* So, setting a new value preserves data co-location with respect to the key.
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
* is applied to the result {@code KStream}. (cf. {@link #map(KeyValueMapper)})
* See {@link #mapValues(ValueMapperWithKey)}.
*
* @param mapper a {@link ValueMapperWithKey} that computes a new output value
* @param named a {@link Named} config used to name the processor in the topology
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
* @see #selectKey(KeyValueMapper)
* @see #map(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
* @see #processValues(FixedKeyProcessorSupplier, String...)
* <p>Takes an additional {@link Named} parameter that is used to name the processor in the topology.
*/
<VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper,
final Named named);
<VOut> KStream<K, VOut> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VOut> mapper,
final Named named);

/**
* Transform each record of the input stream into a new record in the output stream (both key and value type can be
* altered arbitrarily).
* The provided {@link KeyValueMapper} is applied to each input record and computes a new output record.
* Create a new {@code KStream} that consists of a modified record for each record in this stream.
* The provided {@link KeyValueMapper} is applied to each input record and computes a new output record
* (possibly of a different key and/or value type) for it.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V'>}.
* This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for
* stateful record processing).
* <p>
* The example below normalizes the String key to upper-case letters and counts the number of token of the value string.
*
* <p>The example below normalizes the String key to upper-case letters and counts the number of token of the
* value string.
* <pre>{@code
* KStream<String, String> inputStream = builder.stream("topic");
* KStream<String, Integer> outputStream = inputStream.map(new KeyValueMapper<String, String, KeyValue<String, Integer>> {
Expand All @@ -307,63 +230,32 @@ <VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? e
* });
* }</pre>
* The provided {@link KeyValueMapper} must return a {@link KeyValue} type and must not return {@code null}.
* <p>
* Mapping records might result in an internal data redistribution if a key based operator (like an aggregation or
* join) is applied to the result {@code KStream}. (cf. {@link #mapValues(ValueMapper)})
*
* @param mapper a {@link KeyValueMapper} that computes a new output record
* @param <KR> the key type of the result stream
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains records with new key and value (possibly both of different type)
* <p>Mapping records might result in an internal data redistribution if a key-based operator (like an
* aggregation or join) is applied to the result {@code KStream} (cf. {@link #mapValues(ValueMapper)}).
*
* @param mapper
* a {@link KeyValueMapper} that computes a new {@link KeyValue} pair for each input record
*
* @param <KOut> the key type of the result stream
* @param <VOut> the value type of the result stream
*
* @return A {@code KStream} that contains records with new key and new value (possibly of different types).
*
* @see #selectKey(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
* @see #processValues(FixedKeyProcessorSupplier, String...)
*/
<KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);
<KOut, VOut> KStream<KOut, VOut> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KOut, ? extends VOut>> mapper);

/**
* Transform each record of the input stream into a new record in the output stream (both key and value type can be
* altered arbitrarily).
* The provided {@link KeyValueMapper} is applied to each input record and computes a new output record.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V'>}.
* This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for
* stateful record processing).
* <p>
* The example below normalizes the String key to upper-case letters and counts the number of token of the value string.
* <pre>{@code
* KStream<String, String> inputStream = builder.stream("topic");
* KStream<String, Integer> outputStream = inputStream.map(new KeyValueMapper<String, String, KeyValue<String, Integer>> {
* KeyValue<String, Integer> apply(String key, String value) {
* return new KeyValue<>(key.toUpperCase(), value.split(" ").length);
* }
* });
* }</pre>
* The provided {@link KeyValueMapper} must return a {@link KeyValue} type and must not return {@code null}.
* <p>
* Mapping records might result in an internal data redistribution if a key based operator (like an aggregation or
* join) is applied to the result {@code KStream}. (cf. {@link #mapValues(ValueMapper)})
* See {@link #map(KeyValueMapper)}.
*
* @param mapper a {@link KeyValueMapper} that computes a new output record
* @param named a {@link Named} config used to name the processor in the topology
* @param <KR> the key type of the result stream
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains records with new key and value (possibly both of different type)
* @see #selectKey(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
* @see #processValues(FixedKeyProcessorSupplier, String...)
* <p>Takes an additional {@link Named} parameter that is used to name the processor in the topology.
*/
<KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper,
final Named named);
<KOut, VOut> KStream<KOut, VOut> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KOut, ? extends VOut>> mapper,
final Named named);

/**
* Transform each record of the input stream into zero or more records in the output stream (both key and value type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,24 +237,24 @@ private <KR> ProcessorGraphNode<K, V> internalSelectKey(final KeyValueMapper<? s
}

@Override
public <VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> valueMapper) {
public <VOut> KStream<K, VOut> mapValues(final ValueMapper<? super V, ? extends VOut> valueMapper) {
return mapValues(withKey(valueMapper));
}

@Override
public <VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
final Named named) {
public <VOut> KStream<K, VOut> mapValues(final ValueMapper<? super V, ? extends VOut> mapper,
final Named named) {
return mapValues(withKey(mapper), named);
}

@Override
public <VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> valueMapperWithKey) {
public <VOut> KStream<K, VOut> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VOut> valueMapperWithKey) {
return mapValues(valueMapperWithKey, NamedInternal.empty());
}

@Override
public <VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> valueMapperWithKey,
final Named named) {
public <VOut> KStream<K, VOut> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VOut> valueMapperWithKey,
final Named named) {
Objects.requireNonNull(valueMapperWithKey, "valueMapperWithKey can't be null");
Objects.requireNonNull(named, "named can't be null");

Expand All @@ -279,13 +279,13 @@ public <VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super
}

@Override
public <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper) {
public <KOut, VOut> KStream<KOut, VOut> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KOut, ? extends VOut>> mapper) {
return map(mapper, NamedInternal.empty());
}

@Override
public <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper,
final Named named) {
public <KOut, VOut> KStream<KOut, VOut> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KOut, ? extends VOut>> mapper,
final Named named) {
Objects.requireNonNull(mapper, "mapper can't be null");
Objects.requireNonNull(named, "named can't be null");

Expand Down

0 comments on commit 5988ee5

Please sign in to comment.