Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 2cba3b1

Browse files
committedOct 16, 2024·
WIP finish PscDynamicTableFactoryTest
1 parent e713312 commit 2cba3b1

File tree

5 files changed

+120
-92
lines changed

5 files changed

+120
-92
lines changed
 

‎psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptions.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,8 @@ public class PscConnectorOptions {
121121
"Topic names from which the table is read. Either 'topic-uri' or 'topic-uri-pattern' must be set for source. "
122122
+ "Option 'topic-uri' is required for sink.");
123123

124-
public static final ConfigOption<String> TOPIC_URI_PATTERN =
125-
ConfigOptions.key("topic-uri-pattern")
124+
public static final ConfigOption<String> TOPIC_PATTERN =
125+
ConfigOptions.key("topic-pattern")
126126
.stringType()
127127
.noDefaultValue()
128128
.withDescription(

‎psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscConnectorOptionsUtil.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
6060
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_PARTITIONER;
6161
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.TOPIC_URI;
62-
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.TOPIC_URI_PATTERN;
62+
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.TOPIC_PATTERN;
6363
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.TRANSACTIONAL_ID_PREFIX;
6464
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.VALUE_FIELDS_INCLUDE;
6565
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.VALUE_FORMAT;
@@ -108,11 +108,11 @@ public static void validateTableSinkOptions(ReadableConfig tableOptions) {
108108

109109
public static void validateSourceTopic(ReadableConfig tableOptions) {
110110
Optional<List<String>> topic = tableOptions.getOptional(TOPIC_URI);
111-
Optional<String> pattern = tableOptions.getOptional(TOPIC_URI_PATTERN);
111+
Optional<String> pattern = tableOptions.getOptional(TOPIC_PATTERN);
112112

113113
if (topic.isPresent() && pattern.isPresent()) {
114114
throw new ValidationException(
115-
"Option 'topic' and 'topic-pattern' shouldn't be set together.");
115+
"Option 'topic-uri' and 'topic-pattern' shouldn't be set together.");
116116
}
117117

118118
if (!topic.isPresent() && !pattern.isPresent()) {
@@ -122,17 +122,17 @@ public static void validateSourceTopic(ReadableConfig tableOptions) {
122122

123123
public static void validateSinkTopic(ReadableConfig tableOptions) {
124124
String errorMessageTemp =
125-
"Flink Kafka sink currently only supports single topic, but got %s: %s.";
125+
"Flink PSC sink currently only supports single topic, but got %s: %s.";
126126
if (!isSingleTopicUri(tableOptions)) {
127-
if (tableOptions.getOptional(TOPIC_URI_PATTERN).isPresent()) {
127+
if (tableOptions.getOptional(TOPIC_PATTERN).isPresent()) {
128128
throw new ValidationException(
129129
String.format(
130130
errorMessageTemp,
131131
"'topic-pattern'",
132-
tableOptions.get(TOPIC_URI_PATTERN)));
132+
tableOptions.get(TOPIC_PATTERN)));
133133
} else {
134134
throw new ValidationException(
135-
String.format(errorMessageTemp, "'topic'", tableOptions.get(TOPIC_URI)));
135+
String.format(errorMessageTemp, "'topic-uri'", tableOptions.get(TOPIC_URI)));
136136
}
137137
}
138138
}
@@ -208,7 +208,7 @@ public static List<String> getSourceTopicUris(ReadableConfig tableOptions) {
208208
}
209209

210210
public static Pattern getSourceTopicUriPattern(ReadableConfig tableOptions) {
211-
return tableOptions.getOptional(TOPIC_URI_PATTERN).map(Pattern::compile).orElse(null);
211+
return tableOptions.getOptional(TOPIC_PATTERN).map(Pattern::compile).orElse(null);
212212
}
213213

214214
private static boolean isSingleTopicUri(ReadableConfig tableOptions) {

‎psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicSource.java

+21-2
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import java.util.HashMap;
6262
import java.util.LinkedHashMap;
6363
import java.util.List;
64+
import java.util.Locale;
6465
import java.util.Map;
6566
import java.util.Objects;
6667
import java.util.Properties;
@@ -215,7 +216,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
215216
context.createTypeInformation(producedDataType);
216217

217218
final PscSource<RowData> kafkaSource =
218-
createKafkaSource(keyDeserialization, valueDeserialization, producedTypeInfo);
219+
createPscSource(keyDeserialization, valueDeserialization, producedTypeInfo);
219220

220221
return new DataStreamScanProvider() {
221222
@Override
@@ -373,7 +374,7 @@ public int hashCode() {
373374

374375
// --------------------------------------------------------------------------------------------
375376

376-
protected PscSource<RowData> createKafkaSource(
377+
protected PscSource<RowData> createPscSource(
377378
DeserializationSchema<RowData> keyDeserialization,
378379
DeserializationSchema<RowData> valueDeserialization,
379380
TypeInformation<RowData> producedTypeInfo) {
@@ -402,6 +403,7 @@ protected PscSource<RowData> createKafkaSource(
402403
properties.getProperty(
403404
PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET,
404405
PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_NONE);
406+
offsetResetConfig = getResetStrategy(offsetResetConfig);
405407
pscSourceBuilder.setStartingOffsets(
406408
OffsetsInitializer.committedOffsets(offsetResetConfig));
407409
break;
@@ -427,6 +429,23 @@ protected PscSource<RowData> createKafkaSource(
427429
return pscSourceBuilder.build();
428430
}
429431

432+
private String getResetStrategy(String offsetResetConfig) {
433+
final String[] validResetStrategies = {"EARLIEST", "LATEST", "NONE"};
434+
return Arrays.stream(validResetStrategies)
435+
.filter(ors -> ors.equals(offsetResetConfig.toUpperCase(Locale.ROOT)))
436+
.findAny()
437+
.orElseThrow(
438+
() ->
439+
new IllegalArgumentException(
440+
String.format(
441+
"%s can not be set to %s. Valid values: [%s]",
442+
PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET,
443+
offsetResetConfig,
444+
Arrays.stream(validResetStrategies)
445+
.map(String::toLowerCase)
446+
.collect(Collectors.joining(",")))));
447+
}
448+
430449
private PscDeserializationSchema<RowData> createPscDeserializationSchema(
431450
DeserializationSchema<RowData> keyDeserialization,
432451
DeserializationSchema<RowData> valueDeserialization,

‎psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactory.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@
7373
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_PARALLELISM;
7474
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.SINK_PARTITIONER;
7575
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.TOPIC_URI;
76-
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.TOPIC_URI_PATTERN;
76+
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.TOPIC_PATTERN;
7777
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.TRANSACTIONAL_ID_PREFIX;
7878
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.VALUE_FIELDS_INCLUDE;
7979
import static com.pinterest.flink.streaming.connectors.psc.table.PscConnectorOptions.VALUE_FORMAT;
@@ -128,7 +128,7 @@ public Set<ConfigOption<?>> optionalOptions() {
128128
options.add(VALUE_FORMAT);
129129
options.add(VALUE_FIELDS_INCLUDE);
130130
options.add(TOPIC_URI);
131-
options.add(TOPIC_URI_PATTERN);
131+
options.add(TOPIC_PATTERN);
132132
options.add(PROPS_GROUP_ID);
133133
options.add(SCAN_STARTUP_MODE);
134134
options.add(SCAN_STARTUP_SPECIFIC_OFFSETS);
@@ -147,7 +147,7 @@ public Set<ConfigOption<?>> forwardOptions() {
147147
return Stream.of(
148148
PROPS_GROUP_ID,
149149
TOPIC_URI,
150-
TOPIC_URI_PATTERN,
150+
TOPIC_PATTERN,
151151
SCAN_STARTUP_MODE,
152152
SCAN_STARTUP_SPECIFIC_OFFSETS,
153153
SCAN_TOPIC_PARTITION_DISCOVERY,

‎psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/table/PscDynamicTableFactoryTest.java

+86-77
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)
Please sign in to comment.