Skip to content

Commit e1b274a

Browse files
committed
rebase fixup
1 parent 4f94cc7 commit e1b274a

File tree

1 file changed

+11
-74
lines changed
  • streams/src/main/java/org/apache/kafka/streams/kstream/internals

1 file changed

+11
-74
lines changed

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java

+11-74
Original file line numberDiff line numberDiff line change
@@ -1057,14 +1057,14 @@ public <VO, VR> KStream<K, VR> leftJoin(final KTable<K, VO> table,
10571057
}
10581058

10591059
@SuppressWarnings({"unchecked", "resource"})
1060-
private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table,
1061-
final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner,
1062-
final JoinedInternal<K, V, VO> joinedInternal,
1063-
final boolean leftJoin) {
1060+
private <VTable, VOut> KStream<K, VOut> doStreamTableJoin(final KTable<K, VTable> table,
1061+
final ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner,
1062+
final JoinedInternal<K, V, VTable> joinedInternal,
1063+
final boolean leftJoin) {
10641064
Objects.requireNonNull(table, "table can't be null");
10651065
Objects.requireNonNull(joiner, "joiner can't be null");
10661066

1067-
final Set<String> allSourceNodes = ensureCopartitionWith(Collections.singleton((AbstractStream<K, VO>) table));
1067+
final Set<String> allSourceNodes = ensureCopartitionWith(Collections.singleton((AbstractStream<K, VTable>) table));
10681068

10691069
final NamedInternal renamed = new NamedInternal(joinedInternal.name());
10701070

@@ -1073,7 +1073,7 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table,
10731073
Optional<StoreBuilder<?>> bufferStoreBuilder = Optional.empty();
10741074

10751075
if (joinedInternal.gracePeriod() != null) {
1076-
if (!((KTableImpl<K, ?, VO>) table).graphNode.isOutputVersioned().orElse(true)) {
1076+
if (!((KTableImpl<K, ?, VTable>) table).graphNode.isOutputVersioned().orElse(true)) {
10771077
throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
10781078
}
10791079
final String bufferName = name + "-Buffer";
@@ -1086,19 +1086,19 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table,
10861086
);
10871087
}
10881088

1089-
final ProcessorSupplier<K, V, K, VR> processorSupplier = new KStreamKTableJoin<>(
1090-
((KTableImpl<K, ?, VO>) table).valueGetterSupplier(),
1089+
final ProcessorSupplier<K, V, K, VOut> processorSupplier = new KStreamKTableJoin<>(
1090+
((KTableImpl<K, ?, VTable>) table).valueGetterSupplier(),
10911091
joiner,
10921092
leftJoin,
10931093
Optional.ofNullable(joinedInternal.gracePeriod()),
10941094
bufferStoreBuilder
10951095
);
10961096

1097-
final ProcessorParameters<K, V, K, VR> processorParameters = new ProcessorParameters<>(processorSupplier, name);
1098-
final StreamTableJoinNode<K, V, VR> streamTableJoinNode = new StreamTableJoinNode<>(
1097+
final ProcessorParameters<K, V, K, VOut> processorParameters = new ProcessorParameters<>(processorSupplier, name);
1098+
final StreamTableJoinNode<K, V, VOut> streamTableJoinNode = new StreamTableJoinNode<>(
10991099
name,
11001100
processorParameters,
1101-
((KTableImpl<K, ?, VO>) table).valueGetterSupplier().storeNames(),
1101+
((KTableImpl<K, ?, VTable>) table).valueGetterSupplier().storeNames(),
11021102
this.name,
11031103
joinedInternal.gracePeriod()
11041104
);
@@ -1219,69 +1219,6 @@ private <GlobalKey, GlobalValue, VOut> KStream<K, VOut> globalTableJoin(
12191219
builder);
12201220
}
12211221

1222-
@SuppressWarnings({"unchecked", "resource"})
1223-
private <VTable, VOut> KStream<K, VOut> doStreamTableJoin(final KTable<K, VTable> table,
1224-
final ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner,
1225-
final JoinedInternal<K, V, VTable> joinedInternal,
1226-
final boolean leftJoin) {
1227-
Objects.requireNonNull(table, "table can't be null");
1228-
Objects.requireNonNull(joiner, "joiner can't be null");
1229-
1230-
final Set<String> allSourceNodes = ensureCopartitionWith(Collections.singleton((AbstractStream<K, VTable>) table));
1231-
1232-
final NamedInternal renamed = new NamedInternal(joinedInternal.name());
1233-
1234-
final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
1235-
1236-
Optional<StoreBuilder<?>> bufferStoreBuilder = Optional.empty();
1237-
1238-
if (joinedInternal.gracePeriod() != null) {
1239-
if (!((KTableImpl<K, ?, VTable>) table).graphNode.isOutputVersioned().orElse(true)) {
1240-
throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
1241-
}
1242-
final String bufferName = name + "-Buffer";
1243-
bufferStoreBuilder = Optional.of(new RocksDBTimeOrderedKeyValueBuffer.Builder<>(
1244-
bufferName,
1245-
joinedInternal.keySerde() != null ? joinedInternal.keySerde() : keySerde,
1246-
joinedInternal.leftValueSerde() != null ? joinedInternal.leftValueSerde() : valueSerde,
1247-
joinedInternal.gracePeriod(),
1248-
name)
1249-
);
1250-
}
1251-
1252-
final ProcessorSupplier<K, V, K, VOut> processorSupplier = new KStreamKTableJoin<>(
1253-
((KTableImpl<K, ?, VTable>) table).valueGetterSupplier(),
1254-
joiner,
1255-
leftJoin,
1256-
Optional.ofNullable(joinedInternal.gracePeriod()),
1257-
bufferStoreBuilder
1258-
);
1259-
1260-
final ProcessorParameters<K, V, K, VOut> processorParameters = new ProcessorParameters<>(processorSupplier, name);
1261-
final StreamTableJoinNode<K, V, VOut> streamTableJoinNode = new StreamTableJoinNode<>(
1262-
name,
1263-
processorParameters,
1264-
((KTableImpl<K, ?, VTable>) table).valueGetterSupplier().storeNames(),
1265-
this.name,
1266-
joinedInternal.gracePeriod()
1267-
);
1268-
1269-
builder.addGraphNode(graphNode, streamTableJoinNode);
1270-
if (leftJoin) {
1271-
streamTableJoinNode.labels().add(GraphNode.Label.NULL_KEY_RELAXED_JOIN);
1272-
}
1273-
1274-
// do not have serde for joined result
1275-
return new KStreamImpl<>(
1276-
name,
1277-
joinedInternal.keySerde() != null ? joinedInternal.keySerde() : keySerde,
1278-
null,
1279-
allSourceNodes,
1280-
false,
1281-
streamTableJoinNode,
1282-
builder);
1283-
}
1284-
12851222
@Override
12861223
public <KOut, VOut> KStream<KOut, VOut> process(
12871224
final ProcessorSupplier<? super K, ? super V, ? extends KOut, ? extends VOut> processorSupplier,

0 commit comments

Comments
 (0)