diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index f32607e9d88e2..304ce8bb709bf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -152,13 +152,15 @@ KStream selectKey(final KeyValueMapper} can be transformed into an output record {@code }. + * If you need read access to the input record key, use {@link #mapValues(ValueMapperWithKey)}. * This is a stateless record-by-record operation (cf. * {@link #processValues(FixedKeyProcessorSupplier, String...)} for stateful value processing). - *

- * The example below counts the number of token of the value string. + * + *

The example below counts the number of token of the value string. *

{@code
      * KStream inputStream = builder.stream("topic");
      * KStream outputStream = inputStream.mapValues(new ValueMapper {
@@ -167,137 +169,58 @@  KStream selectKey(final KeyValueMapper
+     *
      * Setting a new value preserves data co-location with respect to the key.
-     * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
-     * is applied to the result {@code KStream}. (cf. {@link #map(KeyValueMapper)})
+     * Thus, no internal data redistribution is required if a key based operator (like an aggregation
+     * or join) is applied to the result {@code KStream} (cf. {@link #map(KeyValueMapper)}).
+     *
+     * @param mapper
+     *        a {@link ValueMapper} that computes a new value for each input record
+     *
+     * @param  the value type of the result stream
+     *
+     * @return A {@code KStream} that contains records with unmodified key and new values (possibly of a different type).
      *
-     * @param mapper a {@link ValueMapper} that computes a new output value
-     * @param    the value type of the result stream
-     * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type)
      * @see #selectKey(KeyValueMapper)
      * @see #map(KeyValueMapper)
      * @see #flatMap(KeyValueMapper)
      * @see #flatMapValues(ValueMapper)
-     * @see #flatMapValues(ValueMapperWithKey)
-     * @see #process(ProcessorSupplier, String...)
-     * @see #processValues(FixedKeyProcessorSupplier, String...)
      */
-     KStream mapValues(final ValueMapper mapper);
+     KStream mapValues(final ValueMapper mapper);
 
     /**
-     * Transform the value of each input record into a new value (with possible new type) of the output record.
-     * The provided {@link ValueMapper} is applied to each input record value and computes a new value for it.
-     * Thus, an input record {@code } can be transformed into an output record {@code }.
-     * This is a stateless record-by-record operation (cf.
-     * {@link #processValues(FixedKeyProcessorSupplier, String...)} for stateful value processing).
-     * 

- * The example below counts the number of token of the value string. - *

{@code
-     * KStream inputStream = builder.stream("topic");
-     * KStream outputStream = inputStream.mapValues(new ValueMapper {
-     *     Integer apply(String value) {
-     *         return value.split(" ").length;
-     *     }
-     * });
-     * }
- * Setting a new value preserves data co-location with respect to the key. - * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) - * is applied to the result {@code KStream}. (cf. {@link #map(KeyValueMapper)}) + * See {@link #mapValues(ValueMapper)}. * - * @param mapper a {@link ValueMapper} that computes a new output value - * @param named a {@link Named} config used to name the processor in the topology - * @param the value type of the result stream - * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type) - * @see #selectKey(KeyValueMapper) - * @see #map(KeyValueMapper) - * @see #flatMap(KeyValueMapper) - * @see #flatMapValues(ValueMapper) - * @see #flatMapValues(ValueMapperWithKey) - * @see #process(ProcessorSupplier, String...) - * @see #processValues(FixedKeyProcessorSupplier, String...) + *

Takes an additional {@link Named} parameter that is used to name the processor in the topology. */ - KStream mapValues(final ValueMapper mapper, - final Named named); + KStream mapValues(final ValueMapper mapper, + final Named named); /** - * Transform the value of each input record into a new value (with possible new type) of the output record. - * The provided {@link ValueMapperWithKey} is applied to each input record value and computes a new value for it. - * Thus, an input record {@code } can be transformed into an output record {@code }. - * This is a stateless record-by-record operation (cf. - * {@link #processValues(FixedKeyProcessorSupplier, String...)} for stateful value processing). - *

- * The example below counts the number of tokens of key and value strings. - *

{@code
-     * KStream inputStream = builder.stream("topic");
-     * KStream outputStream = inputStream.mapValues(new ValueMapperWithKey {
-     *     Integer apply(String readOnlyKey, String value) {
-     *         return readOnlyKey.split(" ").length + value.split(" ").length;
-     *     }
-     * });
-     * }
- * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. - * So, setting a new value preserves data co-location with respect to the key. - * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) - * is applied to the result {@code KStream}. (cf. {@link #map(KeyValueMapper)}) + * See {@link #mapValues(ValueMapper)}. * - * @param mapper a {@link ValueMapperWithKey} that computes a new output value - * @param the value type of the result stream - * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type) - * @see #selectKey(KeyValueMapper) - * @see #map(KeyValueMapper) - * @see #flatMap(KeyValueMapper) - * @see #flatMapValues(ValueMapper) - * @see #flatMapValues(ValueMapperWithKey) - * @see #process(ProcessorSupplier, String...) - * @see #processValues(FixedKeyProcessorSupplier, String...) + *

Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning. */ - KStream mapValues(final ValueMapperWithKey mapper); + KStream mapValues(final ValueMapperWithKey mapper); /** - * Transform the value of each input record into a new value (with possible new type) of the output record. - * The provided {@link ValueMapperWithKey} is applied to each input record value and computes a new value for it. - * Thus, an input record {@code } can be transformed into an output record {@code }. - * This is a stateless record-by-record operation (cf. - * {@link #processValues(FixedKeyProcessorSupplier, String...)} for stateful value processing). - *

- * The example below counts the number of tokens of key and value strings. - *

{@code
-     * KStream inputStream = builder.stream("topic");
-     * KStream outputStream = inputStream.mapValues(new ValueMapperWithKey {
-     *     Integer apply(String readOnlyKey, String value) {
-     *         return readOnlyKey.split(" ").length + value.split(" ").length;
-     *     }
-     * });
-     * }
- * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. - * So, setting a new value preserves data co-location with respect to the key. - * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) - * is applied to the result {@code KStream}. (cf. {@link #map(KeyValueMapper)}) + * See {@link #mapValues(ValueMapperWithKey)}. * - * @param mapper a {@link ValueMapperWithKey} that computes a new output value - * @param named a {@link Named} config used to name the processor in the topology - * @param the value type of the result stream - * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type) - * @see #selectKey(KeyValueMapper) - * @see #map(KeyValueMapper) - * @see #flatMap(KeyValueMapper) - * @see #flatMapValues(ValueMapper) - * @see #flatMapValues(ValueMapperWithKey) - * @see #process(ProcessorSupplier, String...) - * @see #processValues(FixedKeyProcessorSupplier, String...) + *

Takes an additional {@link Named} parameter that is used to name the processor in the topology. */ - KStream mapValues(final ValueMapperWithKey mapper, - final Named named); + KStream mapValues(final ValueMapperWithKey mapper, + final Named named); /** - * Transform each record of the input stream into a new record in the output stream (both key and value type can be - * altered arbitrarily). - * The provided {@link KeyValueMapper} is applied to each input record and computes a new output record. + * Create a new {@code KStream} that consists of a modified record for each record in this stream. + * The provided {@link KeyValueMapper} is applied to each input record and computes a new output record + * (possibly of a different key and/or value type) for it. * Thus, an input record {@code } can be transformed into an output record {@code }. * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for * stateful record processing). - *

- * The example below normalizes the String key to upper-case letters and counts the number of token of the value string. + * + *

The example below normalizes the String key to upper-case letters and counts the number of token of the + * value string. *

{@code
      * KStream inputStream = builder.stream("topic");
      * KStream outputStream = inputStream.map(new KeyValueMapper> {
@@ -307,63 +230,32 @@  KStream mapValues(final ValueMapperWithKey
      * The provided {@link KeyValueMapper} must return a {@link KeyValue} type and must not return {@code null}.
-     * 

- * Mapping records might result in an internal data redistribution if a key based operator (like an aggregation or - * join) is applied to the result {@code KStream}. (cf. {@link #mapValues(ValueMapper)}) * - * @param mapper a {@link KeyValueMapper} that computes a new output record - * @param the key type of the result stream - * @param the value type of the result stream - * @return a {@code KStream} that contains records with new key and value (possibly both of different type) + *

Mapping records might result in an internal data redistribution if a key-based operator (like an + * aggregation or join) is applied to the result {@code KStream} (cf. {@link #mapValues(ValueMapper)}). + * + * @param mapper + * a {@link KeyValueMapper} that computes a new {@link KeyValue} pair for each input record + * + * @param the key type of the result stream + * @param the value type of the result stream + * + * @return A {@code KStream} that contains records with new key and new value (possibly of different types). + * * @see #selectKey(KeyValueMapper) * @see #flatMap(KeyValueMapper) * @see #mapValues(ValueMapper) - * @see #mapValues(ValueMapperWithKey) * @see #flatMapValues(ValueMapper) - * @see #flatMapValues(ValueMapperWithKey) - * @see #process(ProcessorSupplier, String...) - * @see #processValues(FixedKeyProcessorSupplier, String...) */ - KStream map(final KeyValueMapper> mapper); + KStream map(final KeyValueMapper> mapper); /** - * Transform each record of the input stream into a new record in the output stream (both key and value type can be - * altered arbitrarily). - * The provided {@link KeyValueMapper} is applied to each input record and computes a new output record. - * Thus, an input record {@code } can be transformed into an output record {@code }. - * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for - * stateful record processing). - *

- * The example below normalizes the String key to upper-case letters and counts the number of token of the value string. - *

{@code
-     * KStream inputStream = builder.stream("topic");
-     * KStream outputStream = inputStream.map(new KeyValueMapper> {
-     *     KeyValue apply(String key, String value) {
-     *         return new KeyValue<>(key.toUpperCase(), value.split(" ").length);
-     *     }
-     * });
-     * }
- * The provided {@link KeyValueMapper} must return a {@link KeyValue} type and must not return {@code null}. - *

- * Mapping records might result in an internal data redistribution if a key based operator (like an aggregation or - * join) is applied to the result {@code KStream}. (cf. {@link #mapValues(ValueMapper)}) + * See {@link #map(KeyValueMapper)}. * - * @param mapper a {@link KeyValueMapper} that computes a new output record - * @param named a {@link Named} config used to name the processor in the topology - * @param the key type of the result stream - * @param the value type of the result stream - * @return a {@code KStream} that contains records with new key and value (possibly both of different type) - * @see #selectKey(KeyValueMapper) - * @see #flatMap(KeyValueMapper) - * @see #mapValues(ValueMapper) - * @see #mapValues(ValueMapperWithKey) - * @see #flatMapValues(ValueMapper) - * @see #flatMapValues(ValueMapperWithKey) - * @see #process(ProcessorSupplier, String...) - * @see #processValues(FixedKeyProcessorSupplier, String...) + *

Takes an additional {@link Named} parameter that is used to name the processor in the topology. */ - KStream map(final KeyValueMapper> mapper, - final Named named); + KStream map(final KeyValueMapper> mapper, + final Named named); /** * Transform each record of the input stream into zero or more records in the output stream (both key and value type diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 5101d02a03423..1ce61a4bf8b29 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -237,24 +237,24 @@ private ProcessorGraphNode internalSelectKey(final KeyValueMapper KStream mapValues(final ValueMapper valueMapper) { + public KStream mapValues(final ValueMapper valueMapper) { return mapValues(withKey(valueMapper)); } @Override - public KStream mapValues(final ValueMapper mapper, - final Named named) { + public KStream mapValues(final ValueMapper mapper, + final Named named) { return mapValues(withKey(mapper), named); } @Override - public KStream mapValues(final ValueMapperWithKey valueMapperWithKey) { + public KStream mapValues(final ValueMapperWithKey valueMapperWithKey) { return mapValues(valueMapperWithKey, NamedInternal.empty()); } @Override - public KStream mapValues(final ValueMapperWithKey valueMapperWithKey, - final Named named) { + public KStream mapValues(final ValueMapperWithKey valueMapperWithKey, + final Named named) { Objects.requireNonNull(valueMapperWithKey, "valueMapperWithKey can't be null"); Objects.requireNonNull(named, "named can't be null"); @@ -279,13 +279,13 @@ public KStream mapValues(final ValueMapperWithKey KStream map(final KeyValueMapper> mapper) { + public KStream map(final KeyValueMapper> mapper) { return map(mapper, NamedInternal.empty()); } @Override - public KStream map(final KeyValueMapper> mapper, - final Named named) { + public KStream map(final KeyValueMapper> mapper, + final Named named) { Objects.requireNonNull(mapper, "mapper can't be null"); Objects.requireNonNull(named, "named can't be null");