Skip to content

Commit

Permalink
feat(#3362): Add SSL/SASL support to Kafka adapter and sink (#3364)
Browse files Browse the repository at this point in the history
* feat(#3362): Add SSL/SASL support to Kafka adapter and sink

* Remove logging
  • Loading branch information
dominikriemer authored Dec 2, 2024
1 parent 719d1d2 commit 2edc5e5
Show file tree
Hide file tree
Showing 25 changed files with 574 additions and 591 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
<jsrosbridge.version>0.2.0</jsrosbridge.version>
<jjwt.version>0.11.2</jjwt.version>
<jts-core.version>1.19.0</jts-core.version>
<kafka.version>3.4.0</kafka.version>
<kafka.version>3.7.1</kafka.version>
<lightcouch.version>0.2.0</lightcouch.version>
<maven-plugin-annotations.version>3.13.0</maven-plugin-annotations.version>
<mailapi.version>1.4.3</mailapi.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,21 @@ public enum Envs {
SP_OPCUA_APPLICATION_URI(
"SP_OPCUA_APPLICATION_URI",
"urn:org:apache:streampipes:opcua:client"
);
),

// Default keystore and truststore
SP_SECURITY_KEYSTORE_FILENAME(
"SP_SECURITY_KEYSTORE_FILENAME",
"/streampipes-security/keystore.pfx"),
SP_SECURITY_KEYSTORE_PASSWORD("SP_SECURITY_KEYSTORE_PASSWORD", ""),
SP_SECURITY_KEYSTORE_TYPE("SP_SECURITY_KEYSTORE_TYPE", "PKCS12"),
SP_SECURITY_KEY_PASSWORD("SP_SECURITY_KEY_PASSWORD", null),
SP_SECURITY_TRUSTSTORE_FILENAME(
"SP_SECURITY_TRUSTSTORE_FILENAME",
"/streampipes-security/truststore.pfx"),
SP_SECURITY_TRUSTSTORE_PASSWORD("SP_SECURITY_TRUSTSTORE_PASSWORD", ""),
SP_SECURITY_TRUSTSTORE_TYPE("SP_SECURITY_TRUSTSTORE_TYPE", "PKCS12"),
SP_SECURITY_ALLOW_SELFSIGNED("SP_SECURITY_ALLOW_SELFSIGNED", "false");

private final String envVariableName;
private String defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ public class GlobalStreamPipesConstants {
public static final String STD_DOCUMENTATION_NAME = "documentation.md";

public static final String INTERNAL_TOPIC_PREFIX = "org-apache-streampipes-internal-";
public static final String CONNECT_TOPIC_PREFIX = "org.apache.streampipes.connect.";

}
Original file line number Diff line number Diff line change
Expand Up @@ -349,12 +349,52 @@ public StringEnvironmentVariable getOpcUaApplicationUri() {
}

@Override
public StringEnvironmentVariable getOPcUaKeystoreType() {
public StringEnvironmentVariable getOpcUaKeystoreType() {
return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_TYPE);
}

@Override
public StringEnvironmentVariable getOpcUaKeystoreAlias() {
return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_ALIAS);
}

@Override
public StringEnvironmentVariable getKeystoreFilename() {
return new StringEnvironmentVariable(Envs.SP_SECURITY_KEYSTORE_FILENAME);
}

@Override
public StringEnvironmentVariable getKeystorePassword() {
return new StringEnvironmentVariable(Envs.SP_SECURITY_KEYSTORE_PASSWORD);
}

@Override
public StringEnvironmentVariable getKeystoreType() {
return new StringEnvironmentVariable(Envs.SP_SECURITY_KEYSTORE_TYPE);
}

@Override
public StringEnvironmentVariable getKeyPassword() {
return new StringEnvironmentVariable(Envs.SP_SECURITY_KEY_PASSWORD);
}

@Override
public StringEnvironmentVariable getTruststoreFilename() {
return new StringEnvironmentVariable(Envs.SP_SECURITY_TRUSTSTORE_FILENAME);
}

@Override
public StringEnvironmentVariable getTruststorePassword() {
return new StringEnvironmentVariable(Envs.SP_SECURITY_TRUSTSTORE_PASSWORD);
}

@Override
public StringEnvironmentVariable getTruststoreType() {
return new StringEnvironmentVariable(Envs.SP_SECURITY_TRUSTSTORE_TYPE);
}

@Override
public BooleanEnvironmentVariable getAllowSelfSignedCertificates() {
return new BooleanEnvironmentVariable(Envs.SP_SECURITY_ALLOW_SELFSIGNED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,79 +32,56 @@ public interface Environment {
// Service base configuration

StringEnvironmentVariable getServiceHost();

IntEnvironmentVariable getServicePort();

StringEnvironmentVariable getSpCoreScheme();

StringEnvironmentVariable getSpCoreHost();

IntEnvironmentVariable getSpCorePort();

// Time series storage env variables

StringEnvironmentVariable getTsStorage();

StringEnvironmentVariable getTsStorageProtocol();

StringEnvironmentVariable getTsStorageHost();

IntEnvironmentVariable getTsStoragePort();

StringEnvironmentVariable getTsStorageToken();

StringEnvironmentVariable getTsStorageOrg();

StringEnvironmentVariable getTsStorageBucket();

IntEnvironmentVariable getIotDbSessionPoolSize();

BooleanEnvironmentVariable getIotDbSessionEnableCompression();

StringEnvironmentVariable getIotDbUser();

StringEnvironmentVariable getIotDbPassword();

// CouchDB env variables

StringEnvironmentVariable getCouchDbProtocol();

StringEnvironmentVariable getCouchDbHost();

IntEnvironmentVariable getCouchDbPort();

StringEnvironmentVariable getCouchDbUsername();

StringEnvironmentVariable getCouchDbPassword();


// JWT & Authentication

StringEnvironmentVariable getClientUser();

StringEnvironmentVariable getClientSecret();

StringEnvironmentVariable getJwtSecret();

StringEnvironmentVariable getJwtPublicKeyLoc();

StringEnvironmentVariable getJwtPrivateKeyLoc();

StringEnvironmentVariable getJwtSigningMode();

StringEnvironmentVariable getExtensionsAuthMode();

StringEnvironmentVariable getEncryptionPasscode();

BooleanEnvironmentVariable getOAuthEnabled();

StringEnvironmentVariable getOAuthRedirectUri();

List<OAuthConfiguration> getOAuthConfigurations();

// Messaging
StringEnvironmentVariable getKafkaRetentionTimeMs();

StringEnvironmentVariable getPrioritizedProtocol();


Expand Down Expand Up @@ -164,14 +141,18 @@ public interface Environment {
StringEnvironmentVariable getAllowedUploadFiletypes();

StringEnvironmentVariable getOpcUaSecurityDir();

StringEnvironmentVariable getOpcUaKeystoreFile();

StringEnvironmentVariable getOpcUaKeystorePassword();

StringEnvironmentVariable getOpcUaApplicationUri();

StringEnvironmentVariable getOPcUaKeystoreType();

StringEnvironmentVariable getOpcUaKeystoreType();
StringEnvironmentVariable getOpcUaKeystoreAlias();

StringEnvironmentVariable getKeystoreFilename();
StringEnvironmentVariable getKeystorePassword();
StringEnvironmentVariable getKeystoreType();
StringEnvironmentVariable getKeyPassword();
StringEnvironmentVariable getTruststoreFilename();
StringEnvironmentVariable getTruststorePassword();
StringEnvironmentVariable getTruststoreType();
BooleanEnvironmentVariable getAllowSelfSignedCertificates();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor;
import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfig;
import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConnectUtils;
import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaAdapterConfig;
import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigExtractor;
import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider;
import org.apache.streampipes.extensions.management.connect.adapter.BrokerEventProcessor;
import org.apache.streampipes.extensions.management.connect.adapter.parser.Parsers;
import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
Expand Down Expand Up @@ -67,13 +67,12 @@
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

public class KafkaProtocol implements StreamPipesAdapter, SupportsRuntimeConfig {

private static final Logger logger = LoggerFactory.getLogger(KafkaProtocol.class);
KafkaConfig config;
KafkaAdapterConfig config;

public static final String ID = "org.apache.streampipes.connect.iiot.protocol.stream.kafka";

Expand All @@ -84,14 +83,13 @@ public KafkaProtocol() {
}

private void applyConfiguration(IStaticPropertyExtractor extractor) {
this.config = KafkaConnectUtils.getConfig(extractor, true);
this.config = new KafkaConfigExtractor().extractAdapterConfig(extractor, true);
}

private Consumer<byte[], byte[]> createConsumer(KafkaConfig kafkaConfig) throws KafkaException {
private Consumer<byte[], byte[]> createConsumer(KafkaAdapterConfig kafkaConfig) throws KafkaException {
final Properties props = new Properties();

kafkaConfig.getSecurityConfig().appendConfig(props);
kafkaConfig.getAutoOffsetResetConfig().appendConfig(props);
kafkaConfig.getConfigAppenders().forEach(c -> c.appendConfig(props));

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaConfig.getKafkaHost() + ":" + kafkaConfig.getKafkaPort());
Expand All @@ -113,34 +111,36 @@ public StaticProperty resolveConfiguration(String staticPropertyInternalName,
IStaticPropertyExtractor extractor)
throws SpConfigurationException {
RuntimeResolvableOneOfStaticProperty config = extractor
.getStaticPropertyByName(KafkaConnectUtils.TOPIC_KEY, RuntimeResolvableOneOfStaticProperty.class);
KafkaConfig kafkaConfig = KafkaConnectUtils.getConfig(extractor, false);
boolean hideInternalTopics = extractor.slideToggleValue(KafkaConnectUtils.getHideInternalTopicsKey());
.getStaticPropertyByName(KafkaConfigProvider.TOPIC_KEY, RuntimeResolvableOneOfStaticProperty.class);
var kafkaConfig = new KafkaConfigExtractor().extractAdapterConfig(extractor, false);
boolean hideInternalTopics = extractor.slideToggleValue(KafkaConfigProvider.getHideInternalTopicsKey());

try {
Consumer<byte[], byte[]> consumer = createConsumer(kafkaConfig);
Set<String> topics = consumer.listTopics().keySet();
var consumer = createConsumer(kafkaConfig);
List<String> topics = new ArrayList<>(consumer.listTopics().keySet()).stream().sorted().toList();
consumer.close();

if (hideInternalTopics) {
topics = topics
.stream()
.filter(t -> !t.startsWith(GlobalStreamPipesConstants.INTERNAL_TOPIC_PREFIX))
.collect(Collectors.toSet());
.filter(t -> (!t.startsWith(GlobalStreamPipesConstants.INTERNAL_TOPIC_PREFIX)
&& !t.startsWith(GlobalStreamPipesConstants.CONNECT_TOPIC_PREFIX)))
.toList();
}

config.setOptions(topics.stream().map(Option::new).collect(Collectors.toList()));

return config;
} catch (KafkaException e) {
throw new SpConfigurationException(e.getMessage(), e);
var message = e.getCause() != null ? e.getCause().getMessage() : e.getMessage();
throw new SpConfigurationException(message, e);
}
}

@Override
public IAdapterConfiguration declareConfig() {

StaticPropertyAlternative latestAlternative = KafkaConnectUtils.getAlternativesLatest();
StaticPropertyAlternative latestAlternative = KafkaConfigProvider.getAlternativesLatest();
latestAlternative.setSelected(true);

return AdapterConfigurationBuilder
Expand All @@ -150,28 +150,28 @@ public IAdapterConfiguration declareConfig() {
.withLocales(Locales.EN)
.withCategory(AdapterType.Generic, AdapterType.Manufacturing)

.requiredAlternatives(KafkaConnectUtils.getAccessModeLabel(),
KafkaConnectUtils.getAlternativeUnauthenticatedPlain(),
KafkaConnectUtils.getAlternativeUnauthenticatedSSL(),
KafkaConnectUtils.getAlternativesSaslPlain(),
KafkaConnectUtils.getAlternativesSaslSSL())
.requiredAlternatives(KafkaConfigProvider.getAccessModeLabel(),
KafkaConfigProvider.getAlternativeUnauthenticatedPlain(),
KafkaConfigProvider.getAlternativeUnauthenticatedSSL(),
KafkaConfigProvider.getAlternativesSaslPlain(),
KafkaConfigProvider.getAlternativesSaslSSL())

.requiredTextParameter(KafkaConnectUtils.getHostLabel())
.requiredIntegerParameter(KafkaConnectUtils.getPortLabel())
.requiredTextParameter(KafkaConfigProvider.getHostLabel())
.requiredIntegerParameter(KafkaConfigProvider.getPortLabel())

.requiredAlternatives(KafkaConnectUtils.getConsumerGroupLabel(),
KafkaConnectUtils.getAlternativesRandomGroupId(),
KafkaConnectUtils.getAlternativesGroupId())
.requiredAlternatives(KafkaConfigProvider.getConsumerGroupLabel(),
KafkaConfigProvider.getAlternativesRandomGroupId(),
KafkaConfigProvider.getAlternativesGroupId())

.requiredSlideToggle(KafkaConnectUtils.getHideInternalTopicsLabel(), true)
.requiredSlideToggle(KafkaConfigProvider.getHideInternalTopicsLabel(), true)

.requiredSingleValueSelectionFromContainer(KafkaConnectUtils.getTopicLabel(), Arrays.asList(
KafkaConnectUtils.HOST_KEY,
KafkaConnectUtils.PORT_KEY))
.requiredAlternatives(KafkaConnectUtils.getAutoOffsetResetConfigLabel(),
KafkaConnectUtils.getAlternativesEarliest(),
latestAlternative,
KafkaConnectUtils.getAlternativesNone())
.requiredSingleValueSelectionFromContainer(KafkaConfigProvider.getTopicLabel(), Arrays.asList(
KafkaConfigProvider.HOST_KEY,
KafkaConfigProvider.PORT_KEY))
.requiredAlternatives(KafkaConfigProvider.getAutoOffsetResetConfigLabel(),
KafkaConfigProvider.getAlternativesEarliest(),
latestAlternative,
KafkaConfigProvider.getAlternativesNone())
.buildConfiguration();
}

Expand All @@ -185,17 +185,11 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor,
protocol.setBrokerHostname(config.getKafkaHost());
protocol.setTopicDefinition(new SimpleTopicDefinition(config.getTopic()));

List<KafkaConfigAppender> kafkaConfigAppenderList = new ArrayList<>(2);
kafkaConfigAppenderList.add(this.config.getSecurityConfig());
kafkaConfigAppenderList.add(this.config.getAutoOffsetResetConfig());

this.kafkaConsumer = new SpKafkaConsumer(protocol,
config.getTopic(),
new BrokerEventProcessor(extractor.selectedParser(), (event) -> {
collector.collect(event);
}),
kafkaConfigAppenderList
);
new BrokerEventProcessor(extractor.selectedParser(), collector),
config.getConfigAppenders()
);

thread = new Thread(this.kafkaConsumer);
thread.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
*
*/

package org.apache.streampipes.messaging.kafka.security;
package org.apache.streampipes.extensions.connectors.kafka.shared.kafka;

import java.util.Properties;
public class KafkaAdapterConfig extends KafkaBaseConfig {

public class KafkaSecurityUnauthenticatedPlainConfig extends KafkaSecurityConfig {
private String groupId;

@Override
public void appendConfig(Properties props) {
public String getGroupId() {
return groupId;
}

public void setGroupId(String groupId) {
this.groupId = groupId;
}
}
Loading

0 comments on commit 2edc5e5

Please sign in to comment.