diff --git a/pom.xml b/pom.xml
index 8f0d8ffded..455675231a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,7 +89,7 @@
0.2.0
0.11.2
1.19.0
- 3.4.0
+ 3.7.1
0.2.0
3.13.0
1.4.3
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
index 7a2d033e5a..2339475adb 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
@@ -119,7 +119,21 @@ public enum Envs {
SP_OPCUA_APPLICATION_URI(
"SP_OPCUA_APPLICATION_URI",
"urn:org:apache:streampipes:opcua:client"
- );
+ ),
+
+ // Default keystore and truststore
+ SP_SECURITY_KEYSTORE_FILENAME(
+ "SP_SECURITY_KEYSTORE_FILENAME",
+ "/streampipes-security/keystore.pfx"),
+ SP_SECURITY_KEYSTORE_PASSWORD("SP_SECURITY_KEYSTORE_PASSWORD", ""),
+ SP_SECURITY_KEYSTORE_TYPE("SP_SECURITY_KEYSTORE_TYPE", "PKCS12"),
+ SP_SECURITY_KEY_PASSWORD("SP_SECURITY_KEY_PASSWORD", null),
+ SP_SECURITY_TRUSTSTORE_FILENAME(
+ "SP_SECURITY_TRUSTSTORE_FILENAME",
+ "/streampipes-security/truststore.pfx"),
+ SP_SECURITY_TRUSTSTORE_PASSWORD("SP_SECURITY_TRUSTSTORE_PASSWORD", ""),
+ SP_SECURITY_TRUSTSTORE_TYPE("SP_SECURITY_TRUSTSTORE_TYPE", "PKCS12"),
+ SP_SECURITY_ALLOW_SELFSIGNED("SP_SECURITY_ALLOW_SELFSIGNED", "false");
private final String envVariableName;
private String defaultValue;
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GlobalStreamPipesConstants.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GlobalStreamPipesConstants.java
index b452d74c1a..409a9a9090 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GlobalStreamPipesConstants.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GlobalStreamPipesConstants.java
@@ -23,5 +23,6 @@ public class GlobalStreamPipesConstants {
public static final String STD_DOCUMENTATION_NAME = "documentation.md";
public static final String INTERNAL_TOPIC_PREFIX = "org-apache-streampipes-internal-";
+ public static final String CONNECT_TOPIC_PREFIX = "org.apache.streampipes.connect.";
}
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
index bb5bbac193..3434924e3e 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
@@ -349,7 +349,7 @@ public StringEnvironmentVariable getOpcUaApplicationUri() {
}
@Override
- public StringEnvironmentVariable getOPcUaKeystoreType() {
+ public StringEnvironmentVariable getOpcUaKeystoreType() {
return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_TYPE);
}
@@ -357,4 +357,44 @@ public StringEnvironmentVariable getOPcUaKeystoreType() {
public StringEnvironmentVariable getOpcUaKeystoreAlias() {
return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_ALIAS);
}
+
+ @Override
+ public StringEnvironmentVariable getKeystoreFilename() {
+ return new StringEnvironmentVariable(Envs.SP_SECURITY_KEYSTORE_FILENAME);
+ }
+
+ @Override
+ public StringEnvironmentVariable getKeystorePassword() {
+ return new StringEnvironmentVariable(Envs.SP_SECURITY_KEYSTORE_PASSWORD);
+ }
+
+ @Override
+ public StringEnvironmentVariable getKeystoreType() {
+ return new StringEnvironmentVariable(Envs.SP_SECURITY_KEYSTORE_TYPE);
+ }
+
+ @Override
+ public StringEnvironmentVariable getKeyPassword() {
+ return new StringEnvironmentVariable(Envs.SP_SECURITY_KEY_PASSWORD);
+ }
+
+ @Override
+ public StringEnvironmentVariable getTruststoreFilename() {
+ return new StringEnvironmentVariable(Envs.SP_SECURITY_TRUSTSTORE_FILENAME);
+ }
+
+ @Override
+ public StringEnvironmentVariable getTruststorePassword() {
+ return new StringEnvironmentVariable(Envs.SP_SECURITY_TRUSTSTORE_PASSWORD);
+ }
+
+ @Override
+ public StringEnvironmentVariable getTruststoreType() {
+ return new StringEnvironmentVariable(Envs.SP_SECURITY_TRUSTSTORE_TYPE);
+ }
+
+ @Override
+ public BooleanEnvironmentVariable getAllowSelfSignedCertificates() {
+ return new BooleanEnvironmentVariable(Envs.SP_SECURITY_ALLOW_SELFSIGNED);
+ }
}
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
index cb441f009b..d1c4adf6ae 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
@@ -32,79 +32,56 @@ public interface Environment {
// Service base configuration
StringEnvironmentVariable getServiceHost();
-
IntEnvironmentVariable getServicePort();
StringEnvironmentVariable getSpCoreScheme();
StringEnvironmentVariable getSpCoreHost();
-
IntEnvironmentVariable getSpCorePort();
// Time series storage env variables
StringEnvironmentVariable getTsStorage();
-
StringEnvironmentVariable getTsStorageProtocol();
-
StringEnvironmentVariable getTsStorageHost();
-
IntEnvironmentVariable getTsStoragePort();
-
StringEnvironmentVariable getTsStorageToken();
-
StringEnvironmentVariable getTsStorageOrg();
-
StringEnvironmentVariable getTsStorageBucket();
IntEnvironmentVariable getIotDbSessionPoolSize();
-
BooleanEnvironmentVariable getIotDbSessionEnableCompression();
-
StringEnvironmentVariable getIotDbUser();
-
StringEnvironmentVariable getIotDbPassword();
// CouchDB env variables
StringEnvironmentVariable getCouchDbProtocol();
-
StringEnvironmentVariable getCouchDbHost();
-
IntEnvironmentVariable getCouchDbPort();
-
StringEnvironmentVariable getCouchDbUsername();
-
StringEnvironmentVariable getCouchDbPassword();
// JWT & Authentication
StringEnvironmentVariable getClientUser();
-
StringEnvironmentVariable getClientSecret();
StringEnvironmentVariable getJwtSecret();
-
StringEnvironmentVariable getJwtPublicKeyLoc();
-
StringEnvironmentVariable getJwtPrivateKeyLoc();
-
StringEnvironmentVariable getJwtSigningMode();
StringEnvironmentVariable getExtensionsAuthMode();
-
StringEnvironmentVariable getEncryptionPasscode();
BooleanEnvironmentVariable getOAuthEnabled();
-
StringEnvironmentVariable getOAuthRedirectUri();
-
List getOAuthConfigurations();
// Messaging
StringEnvironmentVariable getKafkaRetentionTimeMs();
-
StringEnvironmentVariable getPrioritizedProtocol();
@@ -164,14 +141,18 @@ public interface Environment {
StringEnvironmentVariable getAllowedUploadFiletypes();
StringEnvironmentVariable getOpcUaSecurityDir();
-
StringEnvironmentVariable getOpcUaKeystoreFile();
-
StringEnvironmentVariable getOpcUaKeystorePassword();
-
StringEnvironmentVariable getOpcUaApplicationUri();
-
- StringEnvironmentVariable getOPcUaKeystoreType();
-
+ StringEnvironmentVariable getOpcUaKeystoreType();
StringEnvironmentVariable getOpcUaKeystoreAlias();
+
+ StringEnvironmentVariable getKeystoreFilename();
+ StringEnvironmentVariable getKeystorePassword();
+ StringEnvironmentVariable getKeystoreType();
+ StringEnvironmentVariable getKeyPassword();
+ StringEnvironmentVariable getTruststoreFilename();
+ StringEnvironmentVariable getTruststorePassword();
+ StringEnvironmentVariable getTruststoreType();
+ BooleanEnvironmentVariable getAllowSelfSignedCertificates();
}
diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
index fb387b59ee..90dd1ee2a1 100644
--- a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
+++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
@@ -31,12 +31,12 @@
import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor;
import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
-import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfig;
-import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConnectUtils;
+import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaAdapterConfig;
+import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigExtractor;
+import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider;
import org.apache.streampipes.extensions.management.connect.adapter.BrokerEventProcessor;
import org.apache.streampipes.extensions.management.connect.adapter.parser.Parsers;
import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
-import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
@@ -67,13 +67,12 @@
import java.util.Collections;
import java.util.List;
import java.util.Properties;
-import java.util.Set;
import java.util.stream.Collectors;
public class KafkaProtocol implements StreamPipesAdapter, SupportsRuntimeConfig {
private static final Logger logger = LoggerFactory.getLogger(KafkaProtocol.class);
- KafkaConfig config;
+ KafkaAdapterConfig config;
public static final String ID = "org.apache.streampipes.connect.iiot.protocol.stream.kafka";
@@ -84,14 +83,13 @@ public KafkaProtocol() {
}
private void applyConfiguration(IStaticPropertyExtractor extractor) {
- this.config = KafkaConnectUtils.getConfig(extractor, true);
+ this.config = new KafkaConfigExtractor().extractAdapterConfig(extractor, true);
}
- private Consumer createConsumer(KafkaConfig kafkaConfig) throws KafkaException {
+ private Consumer createConsumer(KafkaAdapterConfig kafkaConfig) throws KafkaException {
final Properties props = new Properties();
- kafkaConfig.getSecurityConfig().appendConfig(props);
- kafkaConfig.getAutoOffsetResetConfig().appendConfig(props);
+ kafkaConfig.getConfigAppenders().forEach(c -> c.appendConfig(props));
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaConfig.getKafkaHost() + ":" + kafkaConfig.getKafkaPort());
@@ -113,34 +111,36 @@ public StaticProperty resolveConfiguration(String staticPropertyInternalName,
IStaticPropertyExtractor extractor)
throws SpConfigurationException {
RuntimeResolvableOneOfStaticProperty config = extractor
- .getStaticPropertyByName(KafkaConnectUtils.TOPIC_KEY, RuntimeResolvableOneOfStaticProperty.class);
- KafkaConfig kafkaConfig = KafkaConnectUtils.getConfig(extractor, false);
- boolean hideInternalTopics = extractor.slideToggleValue(KafkaConnectUtils.getHideInternalTopicsKey());
+ .getStaticPropertyByName(KafkaConfigProvider.TOPIC_KEY, RuntimeResolvableOneOfStaticProperty.class);
+ var kafkaConfig = new KafkaConfigExtractor().extractAdapterConfig(extractor, false);
+ boolean hideInternalTopics = extractor.slideToggleValue(KafkaConfigProvider.getHideInternalTopicsKey());
try {
- Consumer consumer = createConsumer(kafkaConfig);
- Set topics = consumer.listTopics().keySet();
+ var consumer = createConsumer(kafkaConfig);
+ List topics = new ArrayList<>(consumer.listTopics().keySet()).stream().sorted().toList();
consumer.close();
if (hideInternalTopics) {
topics = topics
.stream()
- .filter(t -> !t.startsWith(GlobalStreamPipesConstants.INTERNAL_TOPIC_PREFIX))
- .collect(Collectors.toSet());
+ .filter(t -> (!t.startsWith(GlobalStreamPipesConstants.INTERNAL_TOPIC_PREFIX)
+ && !t.startsWith(GlobalStreamPipesConstants.CONNECT_TOPIC_PREFIX)))
+ .toList();
}
config.setOptions(topics.stream().map(Option::new).collect(Collectors.toList()));
return config;
} catch (KafkaException e) {
- throw new SpConfigurationException(e.getMessage(), e);
+ var message = e.getCause() != null ? e.getCause().getMessage() : e.getMessage();
+ throw new SpConfigurationException(message, e);
}
}
@Override
public IAdapterConfiguration declareConfig() {
- StaticPropertyAlternative latestAlternative = KafkaConnectUtils.getAlternativesLatest();
+ StaticPropertyAlternative latestAlternative = KafkaConfigProvider.getAlternativesLatest();
latestAlternative.setSelected(true);
return AdapterConfigurationBuilder
@@ -150,28 +150,28 @@ public IAdapterConfiguration declareConfig() {
.withLocales(Locales.EN)
.withCategory(AdapterType.Generic, AdapterType.Manufacturing)
- .requiredAlternatives(KafkaConnectUtils.getAccessModeLabel(),
- KafkaConnectUtils.getAlternativeUnauthenticatedPlain(),
- KafkaConnectUtils.getAlternativeUnauthenticatedSSL(),
- KafkaConnectUtils.getAlternativesSaslPlain(),
- KafkaConnectUtils.getAlternativesSaslSSL())
+ .requiredAlternatives(KafkaConfigProvider.getAccessModeLabel(),
+ KafkaConfigProvider.getAlternativeUnauthenticatedPlain(),
+ KafkaConfigProvider.getAlternativeUnauthenticatedSSL(),
+ KafkaConfigProvider.getAlternativesSaslPlain(),
+ KafkaConfigProvider.getAlternativesSaslSSL())
- .requiredTextParameter(KafkaConnectUtils.getHostLabel())
- .requiredIntegerParameter(KafkaConnectUtils.getPortLabel())
+ .requiredTextParameter(KafkaConfigProvider.getHostLabel())
+ .requiredIntegerParameter(KafkaConfigProvider.getPortLabel())
- .requiredAlternatives(KafkaConnectUtils.getConsumerGroupLabel(),
- KafkaConnectUtils.getAlternativesRandomGroupId(),
- KafkaConnectUtils.getAlternativesGroupId())
+ .requiredAlternatives(KafkaConfigProvider.getConsumerGroupLabel(),
+ KafkaConfigProvider.getAlternativesRandomGroupId(),
+ KafkaConfigProvider.getAlternativesGroupId())
- .requiredSlideToggle(KafkaConnectUtils.getHideInternalTopicsLabel(), true)
+ .requiredSlideToggle(KafkaConfigProvider.getHideInternalTopicsLabel(), true)
- .requiredSingleValueSelectionFromContainer(KafkaConnectUtils.getTopicLabel(), Arrays.asList(
- KafkaConnectUtils.HOST_KEY,
- KafkaConnectUtils.PORT_KEY))
- .requiredAlternatives(KafkaConnectUtils.getAutoOffsetResetConfigLabel(),
- KafkaConnectUtils.getAlternativesEarliest(),
- latestAlternative,
- KafkaConnectUtils.getAlternativesNone())
+ .requiredSingleValueSelectionFromContainer(KafkaConfigProvider.getTopicLabel(), Arrays.asList(
+ KafkaConfigProvider.HOST_KEY,
+ KafkaConfigProvider.PORT_KEY))
+ .requiredAlternatives(KafkaConfigProvider.getAutoOffsetResetConfigLabel(),
+ KafkaConfigProvider.getAlternativesEarliest(),
+ latestAlternative,
+ KafkaConfigProvider.getAlternativesNone())
.buildConfiguration();
}
@@ -185,17 +185,11 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor,
protocol.setBrokerHostname(config.getKafkaHost());
protocol.setTopicDefinition(new SimpleTopicDefinition(config.getTopic()));
- List kafkaConfigAppenderList = new ArrayList<>(2);
- kafkaConfigAppenderList.add(this.config.getSecurityConfig());
- kafkaConfigAppenderList.add(this.config.getAutoOffsetResetConfig());
-
this.kafkaConsumer = new SpKafkaConsumer(protocol,
config.getTopic(),
- new BrokerEventProcessor(extractor.selectedParser(), (event) -> {
- collector.collect(event);
- }),
- kafkaConfigAppenderList
- );
+ new BrokerEventProcessor(extractor.selectedParser(), collector),
+ config.getConfigAppenders()
+ );
thread = new Thread(this.kafkaConsumer);
thread.start();
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedPlainConfig.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaAdapterConfig.java
similarity index 74%
rename from streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedPlainConfig.java
rename to streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaAdapterConfig.java
index 235a6eda39..e24dccf040 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedPlainConfig.java
+++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaAdapterConfig.java
@@ -16,14 +16,17 @@
*
*/
-package org.apache.streampipes.messaging.kafka.security;
+package org.apache.streampipes.extensions.connectors.kafka.shared.kafka;
-import java.util.Properties;
+public class KafkaAdapterConfig extends KafkaBaseConfig {
-public class KafkaSecurityUnauthenticatedPlainConfig extends KafkaSecurityConfig {
+ private String groupId;
- @Override
- public void appendConfig(Properties props) {
+ public String getGroupId() {
+ return groupId;
+ }
+ public void setGroupId(String groupId) {
+ this.groupId = groupId;
}
}
diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfig.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaBaseConfig.java
similarity index 50%
rename from streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfig.java
rename to streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaBaseConfig.java
index 7f02e4c5b1..dd43893e89 100644
--- a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfig.java
+++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaBaseConfig.java
@@ -18,31 +18,20 @@
package org.apache.streampipes.extensions.connectors.kafka.shared.kafka;
-import org.apache.streampipes.messaging.kafka.config.AutoOffsetResetConfig;
-import org.apache.streampipes.messaging.kafka.security.KafkaSecurityConfig;
+import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
-public class KafkaConfig {
+import java.util.ArrayList;
+import java.util.List;
+
+public class KafkaBaseConfig {
private String kafkaHost;
private Integer kafkaPort;
private String topic;
- private String groupId;
-
- KafkaSecurityConfig securityConfig;
- AutoOffsetResetConfig autoOffsetResetConfig;
+ private List configAppenders;
- public KafkaConfig(String kafkaHost,
- Integer kafkaPort,
- String topic,
- String groupId,
- KafkaSecurityConfig securityConfig,
- AutoOffsetResetConfig autoOffsetResetConfig) {
- this.kafkaHost = kafkaHost;
- this.kafkaPort = kafkaPort;
- this.topic = topic;
- this.groupId = groupId;
- this.securityConfig = securityConfig;
- this.autoOffsetResetConfig = autoOffsetResetConfig;
+ public KafkaBaseConfig() {
+ this.configAppenders = new ArrayList<>();
}
public String getKafkaHost() {
@@ -69,27 +58,11 @@ public void setTopic(String topic) {
this.topic = topic;
}
- public String getGroupId() {
- return groupId;
- }
-
- public void setGroupId(String groupId) {
- this.groupId = groupId;
- }
-
- public KafkaSecurityConfig getSecurityConfig() {
- return securityConfig;
- }
-
- public void setSecurityConfig(KafkaSecurityConfig securityConfig) {
- this.securityConfig = securityConfig;
- }
-
- public AutoOffsetResetConfig getAutoOffsetResetConfig() {
- return autoOffsetResetConfig;
+ public List getConfigAppenders() {
+ return configAppenders;
}
- public void setAutoOffsetResetConfig(AutoOffsetResetConfig autoOffsetResetConfig) {
- this.autoOffsetResetConfig = autoOffsetResetConfig;
+ public void setConfigAppenders(List configAppenders) {
+ this.configAppenders = configAppenders;
}
}
diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java
new file mode 100644
index 0000000000..d06e544e74
--- /dev/null
+++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.kafka.shared.kafka;
+
+import org.apache.streampipes.commons.environment.Environments;
+import org.apache.streampipes.extensions.api.extractor.IParameterExtractor;
+import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.messaging.kafka.config.AutoOffsetResetConfig;
+import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
+import org.apache.streampipes.messaging.kafka.security.KafkaSecurityProtocolConfigAppender;
+import org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslConfigAppender;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
+
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.util.ArrayList;
+
+import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.ACCESS_MODE;
+import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.AUTO_OFFSET_RESET_CONFIG;
+import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.CONSUMER_GROUP;
+import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.GROUP_ID_INPUT;
+import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.HOST_KEY;
+import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.PASSWORD_KEY;
+import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.PORT_KEY;
+import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.RANDOM_GROUP_ID;
+import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.SECURITY_MECHANISM;
+import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.TOPIC_KEY;
+import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.USERNAME_KEY;
+
+public class KafkaConfigExtractor {
+
+ public KafkaAdapterConfig extractAdapterConfig(IStaticPropertyExtractor extractor,
+ boolean containsTopic) {
+
+ var config = extractCommonConfigs(extractor, new KafkaAdapterConfig());
+
+ var topic = "";
+ if (containsTopic) {
+ topic = extractor.selectedSingleValue(TOPIC_KEY, String.class);
+ }
+ config.setTopic(topic);
+
+ if (extractor.selectedAlternativeInternalId(CONSUMER_GROUP).equals(RANDOM_GROUP_ID)) {
+ config.setGroupId("StreamPipesKafkaConsumer" + System.currentTimeMillis());
+ } else {
+ config.setGroupId(extractor.singleValueParameter(GROUP_ID_INPUT, String.class));
+ }
+
+ StaticPropertyAlternatives alternatives = extractor.getStaticPropertyByName(AUTO_OFFSET_RESET_CONFIG,
+ StaticPropertyAlternatives.class);
+
+ // Set default value if no value is provided.
+ if (alternatives == null) {
+ config.getConfigAppenders().add(new AutoOffsetResetConfig(KafkaConfigProvider.LATEST));
+ } else {
+ String auto = extractor.selectedAlternativeInternalId(AUTO_OFFSET_RESET_CONFIG);
+ config.getConfigAppenders().add(new AutoOffsetResetConfig(auto));
+ }
+ return config;
+ }
+
+ public KafkaBaseConfig extractSinkConfig(IParameterExtractor extractor) {
+ var config = extractCommonConfigs(extractor, new KafkaBaseConfig());
+ config.setTopic(extractor.singleValueParameter(TOPIC_KEY, String.class));
+
+ return config;
+ }
+
+ private T extractCommonConfigs(IParameterExtractor extractor,
+ T config) {
+ var configAppenders = new ArrayList();
+ var env = Environments.getEnvironment();
+ config.setKafkaHost(extractor.singleValueParameter(HOST_KEY, String.class));
+ config.setKafkaPort(extractor.singleValueParameter(PORT_KEY, Integer.class));
+
+ var authentication = extractor.selectedAlternativeInternalId(ACCESS_MODE);
+ var securityProtocol = getSecurityProtocol(authentication);
+ configAppenders.add(new KafkaSecurityProtocolConfigAppender(securityProtocol, env));
+
+ // check if SASL authentication is defined
+ if (isSaslSecurityMechanism(securityProtocol)) {
+ String username = extractor.singleValueParameter(USERNAME_KEY, String.class);
+ String password = extractor.secretValue(PASSWORD_KEY);
+ String mechanism = extractor.selectedSingleValue(SECURITY_MECHANISM, String.class);
+
+ configAppenders.add(new KafkaSecuritySaslConfigAppender(mechanism, username, password));
+ }
+ config.setConfigAppenders(configAppenders);
+
+ return config;
+ }
+
+ private boolean isSaslSecurityMechanism(SecurityProtocol securityProtocol) {
+ return SecurityProtocol.SASL_PLAINTEXT == securityProtocol || SecurityProtocol.SASL_SSL == securityProtocol;
+ }
+
+ public SecurityProtocol getSecurityProtocol(String selectedSecurityConfiguration) {
+ return switch (selectedSecurityConfiguration) {
+ case "unauthenticated-ssl" -> SecurityProtocol.SSL;
+ case "sasl-plain" -> SecurityProtocol.SASL_PLAINTEXT;
+ case "sasl-ssl" -> SecurityProtocol.SASL_SSL;
+ default -> SecurityProtocol.PLAINTEXT;
+ };
+ }
+}
diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java
new file mode 100644
index 0000000000..05f652f780
--- /dev/null
+++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.kafka.shared.kafka;
+
+import org.apache.streampipes.model.staticproperty.Option;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
+import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
+import org.apache.streampipes.sdk.StaticProperties;
+import org.apache.streampipes.sdk.helpers.Alternatives;
+import org.apache.streampipes.sdk.helpers.Label;
+import org.apache.streampipes.sdk.helpers.Labels;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+
+import java.util.List;
+
+public class KafkaConfigProvider {
+
+ public static final String TOPIC_KEY = "topic";
+ public static final String HOST_KEY = "host";
+ public static final String PORT_KEY = "port";
+
+ public static final String ACCESS_MODE = "access-mode";
+ public static final String UNAUTHENTICATED_PLAIN = "unauthenticated-plain";
+ public static final String UNAUTHENTICATED_SSL = "unauthenticated-ssl";
+ public static final String SASL_PLAIN = "sasl-plain";
+ public static final String SASL_SSL = "sasl-ssl";
+
+ public static final String SECURITY_MECHANISM = "security-mechanism";
+ public static final String USERNAME_GROUP = "username-group";
+ public static final String USERNAME_KEY = "username";
+ public static final String PASSWORD_KEY = "password";
+
+ public static final String CONSUMER_GROUP = "consumer-group";
+ public static final String RANDOM_GROUP_ID = "random-group-id";
+ public static final String GROUP_ID = "group-id";
+ public static final String GROUP_ID_INPUT = "group-id-input";
+
+
+ private static final String HIDE_INTERNAL_TOPICS = "hide-internal-topics";
+
+ public static final String AUTO_OFFSET_RESET_CONFIG = ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+ public static final String EARLIEST = "earliest";
+ public static final String LATEST = "latest";
+ public static final String NONE = "none";
+
+ public static Label getTopicLabel() {
+ return Labels.withId(TOPIC_KEY);
+ }
+
+ public static Label getHideInternalTopicsLabel() {
+ return Labels.withId(HIDE_INTERNAL_TOPICS);
+ }
+
+ public static String getHideInternalTopicsKey() {
+ return HIDE_INTERNAL_TOPICS;
+ }
+
+ public static Label getHostLabel() {
+ return Labels.withId(HOST_KEY);
+ }
+
+ public static Label getPortLabel() {
+ return Labels.withId(PORT_KEY);
+ }
+
+ public static Label getAccessModeLabel() {
+ return Labels.withId(ACCESS_MODE);
+ }
+
+ public static Label getConsumerGroupLabel() {
+ return Labels.withId(CONSUMER_GROUP);
+ }
+
+ public static Label getAutoOffsetResetConfigLabel() {
+ return Labels.withId(AUTO_OFFSET_RESET_CONFIG);
+ }
+
+ public static StaticPropertyAlternative getAlternativeUnauthenticatedPlain() {
+ return Alternatives.from(Labels.withId(KafkaConfigProvider.UNAUTHENTICATED_PLAIN));
+ }
+
+ public static StaticPropertyAlternative getAlternativeUnauthenticatedSSL() {
+ return Alternatives.from(Labels.withId(KafkaConfigProvider.UNAUTHENTICATED_SSL));
+ }
+
+ public static StaticPropertyAlternative getAlternativesSaslPlain() {
+ return Alternatives.from(Labels.withId(KafkaConfigProvider.SASL_PLAIN),
+ makeAuthenticationGroup()
+ );
+ }
+
+ public static StaticPropertyAlternative getAlternativesSaslSSL() {
+ return Alternatives.from(Labels.withId(KafkaConfigProvider.SASL_SSL),
+ makeAuthenticationGroup());
+ }
+
+ public static StaticPropertyAlternative getAlternativesRandomGroupId() {
+ return Alternatives.from(Labels.withId(RANDOM_GROUP_ID));
+ }
+
+ public static StaticPropertyAlternative getAlternativesGroupId() {
+ return Alternatives.from(Labels.withId(KafkaConfigProvider.GROUP_ID),
+ StaticProperties.stringFreeTextProperty(Labels.withId(KafkaConfigProvider.GROUP_ID_INPUT)));
+ }
+
+ public static StaticPropertyAlternative getAlternativesLatest() {
+ return Alternatives.from(Labels.withId(LATEST));
+ }
+
+ public static StaticPropertyAlternative getAlternativesEarliest() {
+ return Alternatives.from(Labels.withId(EARLIEST));
+ }
+
+ public static StaticPropertyAlternative getAlternativesNone() {
+ return Alternatives.from(Labels.withId(NONE));
+ }
+
+ private static StaticPropertyGroup makeAuthenticationGroup() {
+ var group = StaticProperties.group(Labels.withId(KafkaConfigProvider.USERNAME_GROUP),
+ StaticProperties.singleValueSelection(
+ Labels.withId(KafkaConfigProvider.SECURITY_MECHANISM),
+ makeSecurityMechanism()),
+ StaticProperties.stringFreeTextProperty(Labels.withId(KafkaConfigProvider.USERNAME_KEY)),
+ StaticProperties.secretValue(Labels.withId(KafkaConfigProvider.PASSWORD_KEY)));
+ group.setHorizontalRendering(false);
+ return group;
+ }
+
+ public static List