Skip to content

Commit a68b90c

Browse files
mjsaxpdruley
authored andcommitted
MINOR: cleanup KStream JavaDocs (5/N) - stream-globalTable-inner-join (apache#18747)
Reviewers: Lucas Brutschy <[email protected]>
1 parent b7cd2de commit a68b90c

File tree

2 files changed

+159
-185
lines changed

2 files changed

+159
-185
lines changed

streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java

+82-108
Original file line numberDiff line numberDiff line change
@@ -2097,134 +2097,108 @@ <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
20972097
final Joined<K, V, VT> joined);
20982098

20992099
/**
2100-
* Join records of this stream with {@link GlobalKTable}'s records using non-windowed inner equi join.
2100+
* Join records of this stream with {@link GlobalKTable}'s records using non-windowed inner equi-join.
21012101
* The join is a primary key table lookup join with join attribute
2102-
* {@code keyValueMapper.map(stream.keyValue) == table.key}.
2102+
* {@code keyValueMapper.map(streamRecord) == tableRecord.key}.
21032103
* "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
2104-
* This is done by performing a lookup for matching records in the <em>current</em> internal {@link GlobalKTable}
2105-
* state.
2104+
* This is done by performing a lookup for matching records in the <em>current</em> (i.e., processing time)
2105+
* internal {@link GlobalKTable} state.
21062106
* In contrast, processing {@link GlobalKTable} input records will only update the internal {@link GlobalKTable}
21072107
* state and will not produce any result records.
2108-
* <p>
2109-
* For each {@code KStream} record that finds a corresponding record in {@link GlobalKTable} the provided
2108+
*
2109+
* <p>For each {@code KStream} record that finds a joining record in the {@link GlobalKTable} the provided
21102110
* {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
2111-
* The key of the result record is the same as the key of this {@code KStream}.
2112-
* If a {@code KStream} input value is {@code null} the record will not be included in the join operation
2113-
* and thus no output record will be added to the resulting {@code KStream}.
2111+
* The key of the result record is the same as the stream record's key.
2112+
* If you need read access to the {@code KStream} key, use {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}.
2113+
* If a {@code KStream} input record's value is {@code null} or if the provided {@link KeyValueMapper keySelector}
2114+
* returns {@code null}, the input record will be dropped, and no join computation is triggered.
2115+
* If a {@link GlobalKTable} input record's key is {@code null} the input record will be dropped, and the table
2116+
* state won't be updated.
2117+
* {@link GlobalKTable} input records with {@code null} values are considered deletes (so-called tombstone) for
2118+
* the table.
2119+
*
2120+
* <p>Example, using the first value attribute as join key:
2121+
* <table border='1'>
2122+
* <tr>
2123+
* <th>KStream</th>
2124+
* <th>GlobalKTable</th>
2125+
* <th>state</th>
2126+
* <th>result</th>
2127+
* </tr>
2128+
* <tr>
2129+
* <td>&lt;K1:(GK1,A)&gt;</td>
2130+
* <td></td>
2131+
* <td></td>
2132+
* <td></td>
2133+
* </tr>
2134+
* <tr>
2135+
* <td></td>
2136+
* <td>&lt;GK1:b&gt;</td>
2137+
* <td>&lt;GK1:b&gt;</td>
2138+
* <td></td>
2139+
* </tr>
2140+
* <tr>
2141+
* <td>&lt;K1:(GK1,C)&gt;</td>
2142+
* <td></td>
2143+
* <td>&lt;GK1:b&gt;</td>
2144+
* <td>&lt;K1:ValueJoiner((GK1,C),b)&gt;</td>
2145+
* </tr>
2146+
* </table>
2147+
*
2148+
* In contrast to {@link #join(KTable, ValueJoiner)}, there is no co-partitioning requirement between this
2149+
* {@code KStream} and the {@link GlobalKTable}.
2150+
* Also note that there are no ordering guarantees between the updates on the left and the right side of this join,
2151+
* since updates to the {@link GlobalKTable} are in no way synchronized.
2152+
* Therefore, the result of the join is inherently non-deterministic.
2153+
*
2154+
* @param globalTable
2155+
* the {@link GlobalKTable} to be joined with this stream
2156+
* @param keySelector
2157+
* a {@link KeyValueMapper} that computes the join key for stream input records
2158+
* @param joiner
2159+
* a {@link ValueJoiner} that computes the join result for a pair of matching records
2160+
*
2161+
* @param <GlobalKey> the key type of the global table
2162+
* @param <GlobalValue> the value type of the global table
2163+
* @param <VOut> the value type of the result stream
2164+
*
2165+
* @return A {@code KStream} that contains join-records, one for each matched stream record, with the corresponding
2166+
* key and a value computed by the given {@link ValueJoiner}.
21142167
*
2115-
* @param globalTable the {@link GlobalKTable} to be joined with this stream
2116-
* @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream
2117-
* to the key of the {@link GlobalKTable}
2118-
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
2119-
* @param <GK> the key type of {@link GlobalKTable}
2120-
* @param <GV> the value type of the {@link GlobalKTable}
2121-
* @param <RV> the value type of the resulting {@code KStream}
2122-
* @return a {@code KStream} that contains join-records for each key and values computed by the given
2123-
* {@link ValueJoiner}, one output for each input {@code KStream} record
21242168
* @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
21252169
*/
2126-
<GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalTable,
2127-
final KeyValueMapper<? super K, ? super V, ? extends GK> keySelector,
2128-
final ValueJoiner<? super V, ? super GV, ? extends RV> joiner);
2170+
<GlobalKey, GlobalValue, VOut> KStream<K, VOut> join(final GlobalKTable<GlobalKey, GlobalValue> globalTable,
2171+
final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
2172+
final ValueJoiner<? super V, ? super GlobalValue, ? extends VOut> joiner);
21292173

21302174
/**
2131-
* Join records of this stream with {@link GlobalKTable}'s records using non-windowed inner equi join.
2132-
* The join is a primary key table lookup join with join attribute
2133-
* {@code keyValueMapper.map(stream.keyValue) == table.key}.
2134-
* "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
2135-
* This is done by performing a lookup for matching records in the <em>current</em> internal {@link GlobalKTable}
2136-
* state.
2137-
* In contrast, processing {@link GlobalKTable} input records will only update the internal {@link GlobalKTable}
2138-
* state and will not produce any result records.
2139-
* <p>
2140-
* For each {@code KStream} record that finds a corresponding record in {@link GlobalKTable} the provided
2141-
* {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record.
2142-
* The key of the result record is the same as the key of this {@code KStream}.
2143-
* Note that the key is read-only and should not be modified, as this can lead to undefined behaviour.
2144-
* If a {@code KStream} input value is {@code null} the record will not be included in the join operation
2145-
* and thus no output record will be added to the resulting {@code KStream}.
2175+
* See {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
21462176
*
2147-
* @param globalTable the {@link GlobalKTable} to be joined with this stream
2148-
* @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream
2149-
* to the key of the {@link GlobalKTable}
2150-
* @param joiner a {@link ValueJoinerWithKey} that computes the join result for a pair of matching records
2151-
* @param <GK> the key type of {@link GlobalKTable}
2152-
* @param <GV> the value type of the {@link GlobalKTable}
2153-
* @param <RV> the value type of the resulting {@code KStream}
2154-
* @return a {@code KStream} that contains join-records for each key and values computed by the given
2155-
* {@link ValueJoinerWithKey}, one output for each input {@code KStream} record
2156-
* @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
2177+
* <p>Note that the {@link KStream} key is read-only and must not be modified, as this can lead to corrupt partitioning.
21572178
*/
2158-
<GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalTable,
2159-
final KeyValueMapper<? super K, ? super V, ? extends GK> keySelector,
2160-
final ValueJoinerWithKey<? super K, ? super V, ? super GV, ? extends RV> joiner);
2179+
<GlobalKey, GlobalValue, VOut> KStream<K, VOut> join(final GlobalKTable<GlobalKey, GlobalValue> globalTable,
2180+
final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
2181+
final ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner);
21612182

21622183
/**
2163-
* Join records of this stream with {@link GlobalKTable}'s records using non-windowed inner equi join.
2164-
* The join is a primary key table lookup join with join attribute
2165-
* {@code keyValueMapper.map(stream.keyValue) == table.key}.
2166-
* "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
2167-
* This is done by performing a lookup for matching records in the <em>current</em> internal {@link GlobalKTable}
2168-
* state.
2169-
* In contrast, processing {@link GlobalKTable} input records will only update the internal {@link GlobalKTable}
2170-
* state and will not produce any result records.
2171-
* <p>
2172-
* For each {@code KStream} record that finds a corresponding record in {@link GlobalKTable} the provided
2173-
* {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
2174-
* The key of the result record is the same as the key of this {@code KStream}.
2175-
* If a {@code KStream} input value is {@code null} the record will not be included in the join operation
2176-
* and thus no output record will be added to the resulting {@code KStream}.
2184+
* See {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
21772185
*
2178-
* @param globalTable the {@link GlobalKTable} to be joined with this stream
2179-
* @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream
2180-
* to the key of the {@link GlobalKTable}
2181-
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
2182-
* @param named a {@link Named} config used to name the processor in the topology
2183-
* @param <GK> the key type of {@link GlobalKTable}
2184-
* @param <GV> the value type of the {@link GlobalKTable}
2185-
* @param <RV> the value type of the resulting {@code KStream}
2186-
* @return a {@code KStream} that contains join-records for each key and values computed by the given
2187-
* {@link ValueJoiner}, one output for each input {@code KStream} record
2188-
* @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
2186+
* <p>Takes an additional {@link Named} parameter that is used to name the processor in the topology.
21892187
*/
2190-
<GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalTable,
2191-
final KeyValueMapper<? super K, ? super V, ? extends GK> keySelector,
2192-
final ValueJoiner<? super V, ? super GV, ? extends RV> joiner,
2193-
final Named named);
2188+
<GlobalKey, GlobalValue, VOut> KStream<K, VOut> join(final GlobalKTable<GlobalKey, GlobalValue> globalTable,
2189+
final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
2190+
final ValueJoiner<? super V, ? super GlobalValue, ? extends VOut> joiner,
2191+
final Named named);
21942192

21952193
/**
2196-
* Join records of this stream with {@link GlobalKTable}'s records using non-windowed inner equi join.
2197-
* The join is a primary key table lookup join with join attribute
2198-
* {@code keyValueMapper.map(stream.keyValue) == table.key}.
2199-
* "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
2200-
* This is done by performing a lookup for matching records in the <em>current</em> internal {@link GlobalKTable}
2201-
* state.
2202-
* In contrast, processing {@link GlobalKTable} input records will only update the internal {@link GlobalKTable}
2203-
* state and will not produce any result records.
2204-
* <p>
2205-
* For each {@code KStream} record that finds a corresponding record in {@link GlobalKTable} the provided
2206-
* {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record.
2207-
* The key of the result record is the same as the key of this {@code KStream}.
2208-
* Note that the key is read-only and should not be modified, as this can lead to undefined behaviour.
2209-
* If a {@code KStream} input value is {@code null} the record will not be included in the join operation
2210-
* and thus no output record will be added to the resulting {@code KStream}.
2194+
* See {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}.
22112195
*
2212-
* @param globalTable the {@link GlobalKTable} to be joined with this stream
2213-
* @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream
2214-
* to the key of the {@link GlobalKTable}
2215-
* @param joiner a {@link ValueJoinerWithKey} that computes the join result for a pair of matching records
2216-
* @param named a {@link Named} config used to name the processor in the topology
2217-
* @param <GK> the key type of {@link GlobalKTable}
2218-
* @param <GV> the value type of the {@link GlobalKTable}
2219-
* @param <RV> the value type of the resulting {@code KStream}
2220-
* @return a {@code KStream} that contains join-records for each key and values computed by the given
2221-
* {@link ValueJoinerWithKey}, one output for each input {@code KStream} record
2222-
* @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
2196+
* <p>Takes an additional {@link Named} parameter that is used to name the processor in the topology.
22232197
*/
2224-
<GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalTable,
2225-
final KeyValueMapper<? super K, ? super V, ? extends GK> keySelector,
2226-
final ValueJoinerWithKey<? super K, ? super V, ? super GV, ? extends RV> joiner,
2227-
final Named named);
2198+
<GlobalKey, GlobalValue, VOut> KStream<K, VOut> join(final GlobalKTable<GlobalKey, GlobalValue> globalTable,
2199+
final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
2200+
final ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner,
2201+
final Named named);
22282202

22292203
/**
22302204
* Join records of this stream with {@link GlobalKTable}'s records using non-windowed left equi join.

0 commit comments

Comments
 (0)