From 7bb56d583ac01d80d02fe3f0ec5583577d0be8b4 Mon Sep 17 00:00:00 2001 From: Dominik Riemer Date: Sun, 1 Dec 2024 21:58:58 +0100 Subject: [PATCH 1/2] feat(#3362): Add SSL/SASL support to Kafka adapter and sink --- pom.xml | 2 +- .../streampipes/commons/constants/Envs.java | 16 +- .../constants/GlobalStreamPipesConstants.java | 1 + .../environment/DefaultEnvironment.java | 42 +++- .../commons/environment/Environment.java | 39 +--- .../kafka/adapter/KafkaProtocol.java | 84 ++++--- .../shared/kafka/KafkaAdapterConfig.java | 13 +- ...{KafkaConfig.java => KafkaBaseConfig.java} | 51 +---- .../shared/kafka/KafkaConfigExtractor.java | 121 ++++++++++ .../shared/kafka/KafkaConfigProvider.java | 153 +++++++++++++ .../kafka/shared/kafka/KafkaConnectUtils.java | 211 ------------------ .../kafka/sink/KafkaParameters.java | 90 -------- .../kafka/sink/KafkaPublishSink.java | 49 ++-- .../strings.en | 19 +- .../strings.en | 3 + .../opcua/config/security/KeyStoreLoader.java | 2 +- .../adapters/KafkaAdapterTester.java | 8 +- .../kafka/config/KafkaConfigAppender.java | 4 +- .../kafka/security/KafkaSecurityConfig.java | 25 --- .../KafkaSecurityProtocolConfigAppender.java | 70 ++++++ ...a => KafkaSecuritySaslConfigAppender.java} | 40 +++- .../security/KafkaSecuritySaslSSLConfig.java | 51 ----- ...KafkaSecurityUnauthenticatedSSLConfig.java | 32 --- .../connect/RuntimeResolvableResource.java | 2 + .../base-runtime-resolvable-input.ts | 1 + ...untime-resolvable-oneof-input.component.ts | 37 ++- 26 files changed, 575 insertions(+), 591 deletions(-) rename streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedPlainConfig.java => streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaAdapterConfig.java (74%) rename streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/{KafkaConfig.java => KafkaBaseConfig.java} (50%) create mode 100644 streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java create mode 100644 streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java delete mode 100644 streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConnectUtils.java delete mode 100644 streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/sink/KafkaParameters.java delete mode 100644 streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityConfig.java create mode 100644 streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityProtocolConfigAppender.java rename streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/{KafkaSecuritySaslPlainConfig.java => KafkaSecuritySaslConfigAppender.java} (50%) delete mode 100644 streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslSSLConfig.java delete mode 100644 streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedSSLConfig.java diff --git a/pom.xml b/pom.xml index 8f0d8ffded..455675231a 100644 --- a/pom.xml +++ b/pom.xml @@ -89,7 +89,7 @@ 0.2.0 0.11.2 1.19.0 - 3.4.0 + 3.7.1 0.2.0 3.13.0 1.4.3 diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java index 7a2d033e5a..2339475adb 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java @@ -119,7 +119,21 @@ public enum Envs { SP_OPCUA_APPLICATION_URI( "SP_OPCUA_APPLICATION_URI", "urn:org:apache:streampipes:opcua:client" - ); + ), + + // Default keystore and truststore + SP_SECURITY_KEYSTORE_FILENAME( + "SP_SECURITY_KEYSTORE_FILENAME", + "/streampipes-security/keystore.pfx"), + SP_SECURITY_KEYSTORE_PASSWORD("SP_SECURITY_KEYSTORE_PASSWORD", ""), + SP_SECURITY_KEYSTORE_TYPE("SP_SECURITY_KEYSTORE_TYPE", "PKCS12"), + SP_SECURITY_KEY_PASSWORD("SP_SECURITY_KEY_PASSWORD", null), + SP_SECURITY_TRUSTSTORE_FILENAME( + "SP_SECURITY_TRUSTSTORE_FILENAME", + "/streampipes-security/truststore.pfx"), + SP_SECURITY_TRUSTSTORE_PASSWORD("SP_SECURITY_TRUSTSTORE_PASSWORD", ""), + SP_SECURITY_TRUSTSTORE_TYPE("SP_SECURITY_TRUSTSTORE_TYPE", "PKCS12"), + SP_SECURITY_ALLOW_SELFSIGNED("SP_SECURITY_ALLOW_SELFSIGNED", "false"); private final String envVariableName; private String defaultValue; diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GlobalStreamPipesConstants.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GlobalStreamPipesConstants.java index b452d74c1a..409a9a9090 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GlobalStreamPipesConstants.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GlobalStreamPipesConstants.java @@ -23,5 +23,6 @@ public class GlobalStreamPipesConstants { public static final String STD_DOCUMENTATION_NAME = "documentation.md"; public static final String INTERNAL_TOPIC_PREFIX = "org-apache-streampipes-internal-"; + public static final String CONNECT_TOPIC_PREFIX = "org.apache.streampipes.connect."; } diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java index bb5bbac193..3434924e3e 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java @@ -349,7 +349,7 @@ public StringEnvironmentVariable getOpcUaApplicationUri() { } @Override - public StringEnvironmentVariable getOPcUaKeystoreType() { + public StringEnvironmentVariable getOpcUaKeystoreType() { return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_TYPE); } @@ -357,4 +357,44 @@ public StringEnvironmentVariable getOPcUaKeystoreType() { public StringEnvironmentVariable getOpcUaKeystoreAlias() { return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_ALIAS); } + + @Override + public StringEnvironmentVariable getKeystoreFilename() { + return new StringEnvironmentVariable(Envs.SP_SECURITY_KEYSTORE_FILENAME); + } + + @Override + public StringEnvironmentVariable getKeystorePassword() { + return new StringEnvironmentVariable(Envs.SP_SECURITY_KEYSTORE_PASSWORD); + } + + @Override + public StringEnvironmentVariable getKeystoreType() { + return new StringEnvironmentVariable(Envs.SP_SECURITY_KEYSTORE_TYPE); + } + + @Override + public StringEnvironmentVariable getKeyPassword() { + return new StringEnvironmentVariable(Envs.SP_SECURITY_KEY_PASSWORD); + } + + @Override + public StringEnvironmentVariable getTruststoreFilename() { + return new StringEnvironmentVariable(Envs.SP_SECURITY_TRUSTSTORE_FILENAME); + } + + @Override + public StringEnvironmentVariable getTruststorePassword() { + return new StringEnvironmentVariable(Envs.SP_SECURITY_TRUSTSTORE_PASSWORD); + } + + @Override + public StringEnvironmentVariable getTruststoreType() { + return new StringEnvironmentVariable(Envs.SP_SECURITY_TRUSTSTORE_TYPE); + } + + @Override + public BooleanEnvironmentVariable getAllowSelfSignedCertificates() { + return new BooleanEnvironmentVariable(Envs.SP_SECURITY_ALLOW_SELFSIGNED); + } } diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java index cb441f009b..d1c4adf6ae 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java @@ -32,79 +32,56 @@ public interface Environment { // Service base configuration StringEnvironmentVariable getServiceHost(); - IntEnvironmentVariable getServicePort(); StringEnvironmentVariable getSpCoreScheme(); StringEnvironmentVariable getSpCoreHost(); - IntEnvironmentVariable getSpCorePort(); // Time series storage env variables StringEnvironmentVariable getTsStorage(); - StringEnvironmentVariable getTsStorageProtocol(); - StringEnvironmentVariable getTsStorageHost(); - IntEnvironmentVariable getTsStoragePort(); - StringEnvironmentVariable getTsStorageToken(); - StringEnvironmentVariable getTsStorageOrg(); - StringEnvironmentVariable getTsStorageBucket(); IntEnvironmentVariable getIotDbSessionPoolSize(); - BooleanEnvironmentVariable getIotDbSessionEnableCompression(); - StringEnvironmentVariable getIotDbUser(); - StringEnvironmentVariable getIotDbPassword(); // CouchDB env variables StringEnvironmentVariable getCouchDbProtocol(); - StringEnvironmentVariable getCouchDbHost(); - IntEnvironmentVariable getCouchDbPort(); - StringEnvironmentVariable getCouchDbUsername(); - StringEnvironmentVariable getCouchDbPassword(); // JWT & Authentication StringEnvironmentVariable getClientUser(); - StringEnvironmentVariable getClientSecret(); StringEnvironmentVariable getJwtSecret(); - StringEnvironmentVariable getJwtPublicKeyLoc(); - StringEnvironmentVariable getJwtPrivateKeyLoc(); - StringEnvironmentVariable getJwtSigningMode(); StringEnvironmentVariable getExtensionsAuthMode(); - StringEnvironmentVariable getEncryptionPasscode(); BooleanEnvironmentVariable getOAuthEnabled(); - StringEnvironmentVariable getOAuthRedirectUri(); - List getOAuthConfigurations(); // Messaging StringEnvironmentVariable getKafkaRetentionTimeMs(); - StringEnvironmentVariable getPrioritizedProtocol(); @@ -164,14 +141,18 @@ public interface Environment { StringEnvironmentVariable getAllowedUploadFiletypes(); StringEnvironmentVariable getOpcUaSecurityDir(); - StringEnvironmentVariable getOpcUaKeystoreFile(); - StringEnvironmentVariable getOpcUaKeystorePassword(); - StringEnvironmentVariable getOpcUaApplicationUri(); - - StringEnvironmentVariable getOPcUaKeystoreType(); - + StringEnvironmentVariable getOpcUaKeystoreType(); StringEnvironmentVariable getOpcUaKeystoreAlias(); + + StringEnvironmentVariable getKeystoreFilename(); + StringEnvironmentVariable getKeystorePassword(); + StringEnvironmentVariable getKeystoreType(); + StringEnvironmentVariable getKeyPassword(); + StringEnvironmentVariable getTruststoreFilename(); + StringEnvironmentVariable getTruststorePassword(); + StringEnvironmentVariable getTruststoreType(); + BooleanEnvironmentVariable getAllowSelfSignedCertificates(); } diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java index fb387b59ee..90dd1ee2a1 100644 --- a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java @@ -31,12 +31,12 @@ import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor; import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor; import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig; -import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfig; -import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConnectUtils; +import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaAdapterConfig; +import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigExtractor; +import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider; import org.apache.streampipes.extensions.management.connect.adapter.BrokerEventProcessor; import org.apache.streampipes.extensions.management.connect.adapter.parser.Parsers; import org.apache.streampipes.messaging.kafka.SpKafkaConsumer; -import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender; import org.apache.streampipes.model.AdapterType; import org.apache.streampipes.model.connect.guess.GuessSchema; import org.apache.streampipes.model.extensions.ExtensionAssetType; @@ -67,13 +67,12 @@ import java.util.Collections; import java.util.List; import java.util.Properties; -import java.util.Set; import java.util.stream.Collectors; public class KafkaProtocol implements StreamPipesAdapter, SupportsRuntimeConfig { private static final Logger logger = LoggerFactory.getLogger(KafkaProtocol.class); - KafkaConfig config; + KafkaAdapterConfig config; public static final String ID = "org.apache.streampipes.connect.iiot.protocol.stream.kafka"; @@ -84,14 +83,13 @@ public KafkaProtocol() { } private void applyConfiguration(IStaticPropertyExtractor extractor) { - this.config = KafkaConnectUtils.getConfig(extractor, true); + this.config = new KafkaConfigExtractor().extractAdapterConfig(extractor, true); } - private Consumer createConsumer(KafkaConfig kafkaConfig) throws KafkaException { + private Consumer createConsumer(KafkaAdapterConfig kafkaConfig) throws KafkaException { final Properties props = new Properties(); - kafkaConfig.getSecurityConfig().appendConfig(props); - kafkaConfig.getAutoOffsetResetConfig().appendConfig(props); + kafkaConfig.getConfigAppenders().forEach(c -> c.appendConfig(props)); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getKafkaHost() + ":" + kafkaConfig.getKafkaPort()); @@ -113,34 +111,36 @@ public StaticProperty resolveConfiguration(String staticPropertyInternalName, IStaticPropertyExtractor extractor) throws SpConfigurationException { RuntimeResolvableOneOfStaticProperty config = extractor - .getStaticPropertyByName(KafkaConnectUtils.TOPIC_KEY, RuntimeResolvableOneOfStaticProperty.class); - KafkaConfig kafkaConfig = KafkaConnectUtils.getConfig(extractor, false); - boolean hideInternalTopics = extractor.slideToggleValue(KafkaConnectUtils.getHideInternalTopicsKey()); + .getStaticPropertyByName(KafkaConfigProvider.TOPIC_KEY, RuntimeResolvableOneOfStaticProperty.class); + var kafkaConfig = new KafkaConfigExtractor().extractAdapterConfig(extractor, false); + boolean hideInternalTopics = extractor.slideToggleValue(KafkaConfigProvider.getHideInternalTopicsKey()); try { - Consumer consumer = createConsumer(kafkaConfig); - Set topics = consumer.listTopics().keySet(); + var consumer = createConsumer(kafkaConfig); + List topics = new ArrayList<>(consumer.listTopics().keySet()).stream().sorted().toList(); consumer.close(); if (hideInternalTopics) { topics = topics .stream() - .filter(t -> !t.startsWith(GlobalStreamPipesConstants.INTERNAL_TOPIC_PREFIX)) - .collect(Collectors.toSet()); + .filter(t -> (!t.startsWith(GlobalStreamPipesConstants.INTERNAL_TOPIC_PREFIX) + && !t.startsWith(GlobalStreamPipesConstants.CONNECT_TOPIC_PREFIX))) + .toList(); } config.setOptions(topics.stream().map(Option::new).collect(Collectors.toList())); return config; } catch (KafkaException e) { - throw new SpConfigurationException(e.getMessage(), e); + var message = e.getCause() != null ? e.getCause().getMessage() : e.getMessage(); + throw new SpConfigurationException(message, e); } } @Override public IAdapterConfiguration declareConfig() { - StaticPropertyAlternative latestAlternative = KafkaConnectUtils.getAlternativesLatest(); + StaticPropertyAlternative latestAlternative = KafkaConfigProvider.getAlternativesLatest(); latestAlternative.setSelected(true); return AdapterConfigurationBuilder @@ -150,28 +150,28 @@ public IAdapterConfiguration declareConfig() { .withLocales(Locales.EN) .withCategory(AdapterType.Generic, AdapterType.Manufacturing) - .requiredAlternatives(KafkaConnectUtils.getAccessModeLabel(), - KafkaConnectUtils.getAlternativeUnauthenticatedPlain(), - KafkaConnectUtils.getAlternativeUnauthenticatedSSL(), - KafkaConnectUtils.getAlternativesSaslPlain(), - KafkaConnectUtils.getAlternativesSaslSSL()) + .requiredAlternatives(KafkaConfigProvider.getAccessModeLabel(), + KafkaConfigProvider.getAlternativeUnauthenticatedPlain(), + KafkaConfigProvider.getAlternativeUnauthenticatedSSL(), + KafkaConfigProvider.getAlternativesSaslPlain(), + KafkaConfigProvider.getAlternativesSaslSSL()) - .requiredTextParameter(KafkaConnectUtils.getHostLabel()) - .requiredIntegerParameter(KafkaConnectUtils.getPortLabel()) + .requiredTextParameter(KafkaConfigProvider.getHostLabel()) + .requiredIntegerParameter(KafkaConfigProvider.getPortLabel()) - .requiredAlternatives(KafkaConnectUtils.getConsumerGroupLabel(), - KafkaConnectUtils.getAlternativesRandomGroupId(), - KafkaConnectUtils.getAlternativesGroupId()) + .requiredAlternatives(KafkaConfigProvider.getConsumerGroupLabel(), + KafkaConfigProvider.getAlternativesRandomGroupId(), + KafkaConfigProvider.getAlternativesGroupId()) - .requiredSlideToggle(KafkaConnectUtils.getHideInternalTopicsLabel(), true) + .requiredSlideToggle(KafkaConfigProvider.getHideInternalTopicsLabel(), true) - .requiredSingleValueSelectionFromContainer(KafkaConnectUtils.getTopicLabel(), Arrays.asList( - KafkaConnectUtils.HOST_KEY, - KafkaConnectUtils.PORT_KEY)) - .requiredAlternatives(KafkaConnectUtils.getAutoOffsetResetConfigLabel(), - KafkaConnectUtils.getAlternativesEarliest(), - latestAlternative, - KafkaConnectUtils.getAlternativesNone()) + .requiredSingleValueSelectionFromContainer(KafkaConfigProvider.getTopicLabel(), Arrays.asList( + KafkaConfigProvider.HOST_KEY, + KafkaConfigProvider.PORT_KEY)) + .requiredAlternatives(KafkaConfigProvider.getAutoOffsetResetConfigLabel(), + KafkaConfigProvider.getAlternativesEarliest(), + latestAlternative, + KafkaConfigProvider.getAlternativesNone()) .buildConfiguration(); } @@ -185,17 +185,11 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor, protocol.setBrokerHostname(config.getKafkaHost()); protocol.setTopicDefinition(new SimpleTopicDefinition(config.getTopic())); - List kafkaConfigAppenderList = new ArrayList<>(2); - kafkaConfigAppenderList.add(this.config.getSecurityConfig()); - kafkaConfigAppenderList.add(this.config.getAutoOffsetResetConfig()); - this.kafkaConsumer = new SpKafkaConsumer(protocol, config.getTopic(), - new BrokerEventProcessor(extractor.selectedParser(), (event) -> { - collector.collect(event); - }), - kafkaConfigAppenderList - ); + new BrokerEventProcessor(extractor.selectedParser(), collector), + config.getConfigAppenders() + ); thread = new Thread(this.kafkaConsumer); thread.start(); diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedPlainConfig.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaAdapterConfig.java similarity index 74% rename from streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedPlainConfig.java rename to streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaAdapterConfig.java index 235a6eda39..e24dccf040 100644 --- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedPlainConfig.java +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaAdapterConfig.java @@ -16,14 +16,17 @@ * */ -package org.apache.streampipes.messaging.kafka.security; +package org.apache.streampipes.extensions.connectors.kafka.shared.kafka; -import java.util.Properties; +public class KafkaAdapterConfig extends KafkaBaseConfig { -public class KafkaSecurityUnauthenticatedPlainConfig extends KafkaSecurityConfig { + private String groupId; - @Override - public void appendConfig(Properties props) { + public String getGroupId() { + return groupId; + } + public void setGroupId(String groupId) { + this.groupId = groupId; } } diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfig.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaBaseConfig.java similarity index 50% rename from streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfig.java rename to streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaBaseConfig.java index 7f02e4c5b1..dd43893e89 100644 --- a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfig.java +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaBaseConfig.java @@ -18,31 +18,20 @@ package org.apache.streampipes.extensions.connectors.kafka.shared.kafka; -import org.apache.streampipes.messaging.kafka.config.AutoOffsetResetConfig; -import org.apache.streampipes.messaging.kafka.security.KafkaSecurityConfig; +import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender; -public class KafkaConfig { +import java.util.ArrayList; +import java.util.List; + +public class KafkaBaseConfig { private String kafkaHost; private Integer kafkaPort; private String topic; - private String groupId; - - KafkaSecurityConfig securityConfig; - AutoOffsetResetConfig autoOffsetResetConfig; + private List configAppenders; - public KafkaConfig(String kafkaHost, - Integer kafkaPort, - String topic, - String groupId, - KafkaSecurityConfig securityConfig, - AutoOffsetResetConfig autoOffsetResetConfig) { - this.kafkaHost = kafkaHost; - this.kafkaPort = kafkaPort; - this.topic = topic; - this.groupId = groupId; - this.securityConfig = securityConfig; - this.autoOffsetResetConfig = autoOffsetResetConfig; + public KafkaBaseConfig() { + this.configAppenders = new ArrayList<>(); } public String getKafkaHost() { @@ -69,27 +58,11 @@ public void setTopic(String topic) { this.topic = topic; } - public String getGroupId() { - return groupId; - } - - public void setGroupId(String groupId) { - this.groupId = groupId; - } - - public KafkaSecurityConfig getSecurityConfig() { - return securityConfig; - } - - public void setSecurityConfig(KafkaSecurityConfig securityConfig) { - this.securityConfig = securityConfig; - } - - public AutoOffsetResetConfig getAutoOffsetResetConfig() { - return autoOffsetResetConfig; + public List getConfigAppenders() { + return configAppenders; } - public void setAutoOffsetResetConfig(AutoOffsetResetConfig autoOffsetResetConfig) { - this.autoOffsetResetConfig = autoOffsetResetConfig; + public void setConfigAppenders(List configAppenders) { + this.configAppenders = configAppenders; } } diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java new file mode 100644 index 0000000000..d06e544e74 --- /dev/null +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.extensions.connectors.kafka.shared.kafka; + +import org.apache.streampipes.commons.environment.Environments; +import org.apache.streampipes.extensions.api.extractor.IParameterExtractor; +import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor; +import org.apache.streampipes.messaging.kafka.config.AutoOffsetResetConfig; +import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender; +import org.apache.streampipes.messaging.kafka.security.KafkaSecurityProtocolConfigAppender; +import org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslConfigAppender; +import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives; + +import org.apache.kafka.common.security.auth.SecurityProtocol; + +import java.util.ArrayList; + +import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.ACCESS_MODE; +import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.AUTO_OFFSET_RESET_CONFIG; +import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.CONSUMER_GROUP; +import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.GROUP_ID_INPUT; +import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.HOST_KEY; +import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.PASSWORD_KEY; +import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.PORT_KEY; +import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.RANDOM_GROUP_ID; +import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.SECURITY_MECHANISM; +import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.TOPIC_KEY; +import static org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider.USERNAME_KEY; + +public class KafkaConfigExtractor { + + public KafkaAdapterConfig extractAdapterConfig(IStaticPropertyExtractor extractor, + boolean containsTopic) { + + var config = extractCommonConfigs(extractor, new KafkaAdapterConfig()); + + var topic = ""; + if (containsTopic) { + topic = extractor.selectedSingleValue(TOPIC_KEY, String.class); + } + config.setTopic(topic); + + if (extractor.selectedAlternativeInternalId(CONSUMER_GROUP).equals(RANDOM_GROUP_ID)) { + config.setGroupId("StreamPipesKafkaConsumer" + System.currentTimeMillis()); + } else { + config.setGroupId(extractor.singleValueParameter(GROUP_ID_INPUT, String.class)); + } + + StaticPropertyAlternatives alternatives = extractor.getStaticPropertyByName(AUTO_OFFSET_RESET_CONFIG, + StaticPropertyAlternatives.class); + + // Set default value if no value is provided. + if (alternatives == null) { + config.getConfigAppenders().add(new AutoOffsetResetConfig(KafkaConfigProvider.LATEST)); + } else { + String auto = extractor.selectedAlternativeInternalId(AUTO_OFFSET_RESET_CONFIG); + config.getConfigAppenders().add(new AutoOffsetResetConfig(auto)); + } + return config; + } + + public KafkaBaseConfig extractSinkConfig(IParameterExtractor extractor) { + var config = extractCommonConfigs(extractor, new KafkaBaseConfig()); + config.setTopic(extractor.singleValueParameter(TOPIC_KEY, String.class)); + + return config; + } + + private T extractCommonConfigs(IParameterExtractor extractor, + T config) { + var configAppenders = new ArrayList(); + var env = Environments.getEnvironment(); + config.setKafkaHost(extractor.singleValueParameter(HOST_KEY, String.class)); + config.setKafkaPort(extractor.singleValueParameter(PORT_KEY, Integer.class)); + + var authentication = extractor.selectedAlternativeInternalId(ACCESS_MODE); + var securityProtocol = getSecurityProtocol(authentication); + configAppenders.add(new KafkaSecurityProtocolConfigAppender(securityProtocol, env)); + + // check if SASL authentication is defined + if (isSaslSecurityMechanism(securityProtocol)) { + String username = extractor.singleValueParameter(USERNAME_KEY, String.class); + String password = extractor.secretValue(PASSWORD_KEY); + String mechanism = extractor.selectedSingleValue(SECURITY_MECHANISM, String.class); + + configAppenders.add(new KafkaSecuritySaslConfigAppender(mechanism, username, password)); + } + config.setConfigAppenders(configAppenders); + + return config; + } + + private boolean isSaslSecurityMechanism(SecurityProtocol securityProtocol) { + return SecurityProtocol.SASL_PLAINTEXT == securityProtocol || SecurityProtocol.SASL_SSL == securityProtocol; + } + + public SecurityProtocol getSecurityProtocol(String selectedSecurityConfiguration) { + return switch (selectedSecurityConfiguration) { + case "unauthenticated-ssl" -> SecurityProtocol.SSL; + case "sasl-plain" -> SecurityProtocol.SASL_PLAINTEXT; + case "sasl-ssl" -> SecurityProtocol.SASL_SSL; + default -> SecurityProtocol.PLAINTEXT; + }; + } +} diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java new file mode 100644 index 0000000000..05f652f780 --- /dev/null +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigProvider.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.extensions.connectors.kafka.shared.kafka; + +import org.apache.streampipes.model.staticproperty.Option; +import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative; +import org.apache.streampipes.model.staticproperty.StaticPropertyGroup; +import org.apache.streampipes.sdk.StaticProperties; +import org.apache.streampipes.sdk.helpers.Alternatives; +import org.apache.streampipes.sdk.helpers.Label; +import org.apache.streampipes.sdk.helpers.Labels; + +import org.apache.kafka.clients.consumer.ConsumerConfig; + +import java.util.List; + +public class KafkaConfigProvider { + + public static final String TOPIC_KEY = "topic"; + public static final String HOST_KEY = "host"; + public static final String PORT_KEY = "port"; + + public static final String ACCESS_MODE = "access-mode"; + public static final String UNAUTHENTICATED_PLAIN = "unauthenticated-plain"; + public static final String UNAUTHENTICATED_SSL = "unauthenticated-ssl"; + public static final String SASL_PLAIN = "sasl-plain"; + public static final String SASL_SSL = "sasl-ssl"; + + public static final String SECURITY_MECHANISM = "security-mechanism"; + public static final String USERNAME_GROUP = "username-group"; + public static final String USERNAME_KEY = "username"; + public static final String PASSWORD_KEY = "password"; + + public static final String CONSUMER_GROUP = "consumer-group"; + public static final String RANDOM_GROUP_ID = "random-group-id"; + public static final String GROUP_ID = "group-id"; + public static final String GROUP_ID_INPUT = "group-id-input"; + + + private static final String HIDE_INTERNAL_TOPICS = "hide-internal-topics"; + + public static final String AUTO_OFFSET_RESET_CONFIG = ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; + public static final String EARLIEST = "earliest"; + public static final String LATEST = "latest"; + public static final String NONE = "none"; + + public static Label getTopicLabel() { + return Labels.withId(TOPIC_KEY); + } + + public static Label getHideInternalTopicsLabel() { + return Labels.withId(HIDE_INTERNAL_TOPICS); + } + + public static String getHideInternalTopicsKey() { + return HIDE_INTERNAL_TOPICS; + } + + public static Label getHostLabel() { + return Labels.withId(HOST_KEY); + } + + public static Label getPortLabel() { + return Labels.withId(PORT_KEY); + } + + public static Label getAccessModeLabel() { + return Labels.withId(ACCESS_MODE); + } + + public static Label getConsumerGroupLabel() { + return Labels.withId(CONSUMER_GROUP); + } + + public static Label getAutoOffsetResetConfigLabel() { + return Labels.withId(AUTO_OFFSET_RESET_CONFIG); + } + + public static StaticPropertyAlternative getAlternativeUnauthenticatedPlain() { + return Alternatives.from(Labels.withId(KafkaConfigProvider.UNAUTHENTICATED_PLAIN)); + } + + public static StaticPropertyAlternative getAlternativeUnauthenticatedSSL() { + return Alternatives.from(Labels.withId(KafkaConfigProvider.UNAUTHENTICATED_SSL)); + } + + public static StaticPropertyAlternative getAlternativesSaslPlain() { + return Alternatives.from(Labels.withId(KafkaConfigProvider.SASL_PLAIN), + makeAuthenticationGroup() + ); + } + + public static StaticPropertyAlternative getAlternativesSaslSSL() { + return Alternatives.from(Labels.withId(KafkaConfigProvider.SASL_SSL), + makeAuthenticationGroup()); + } + + public static StaticPropertyAlternative getAlternativesRandomGroupId() { + return Alternatives.from(Labels.withId(RANDOM_GROUP_ID)); + } + + public static StaticPropertyAlternative getAlternativesGroupId() { + return Alternatives.from(Labels.withId(KafkaConfigProvider.GROUP_ID), + StaticProperties.stringFreeTextProperty(Labels.withId(KafkaConfigProvider.GROUP_ID_INPUT))); + } + + public static StaticPropertyAlternative getAlternativesLatest() { + return Alternatives.from(Labels.withId(LATEST)); + } + + public static StaticPropertyAlternative getAlternativesEarliest() { + return Alternatives.from(Labels.withId(EARLIEST)); + } + + public static StaticPropertyAlternative getAlternativesNone() { + return Alternatives.from(Labels.withId(NONE)); + } + + private static StaticPropertyGroup makeAuthenticationGroup() { + var group = StaticProperties.group(Labels.withId(KafkaConfigProvider.USERNAME_GROUP), + StaticProperties.singleValueSelection( + Labels.withId(KafkaConfigProvider.SECURITY_MECHANISM), + makeSecurityMechanism()), + StaticProperties.stringFreeTextProperty(Labels.withId(KafkaConfigProvider.USERNAME_KEY)), + StaticProperties.secretValue(Labels.withId(KafkaConfigProvider.PASSWORD_KEY))); + group.setHorizontalRendering(false); + return group; + } + + public static List