Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(#3362): Add SSL/SASL support to Kafka adapter and sink #3364

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
<jsrosbridge.version>0.2.0</jsrosbridge.version>
<jjwt.version>0.11.2</jjwt.version>
<jts-core.version>1.19.0</jts-core.version>
<kafka.version>3.4.0</kafka.version>
<kafka.version>3.7.1</kafka.version>
<lightcouch.version>0.2.0</lightcouch.version>
<maven-plugin-annotations.version>3.13.0</maven-plugin-annotations.version>
<mailapi.version>1.4.3</mailapi.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,21 @@ public enum Envs {
SP_OPCUA_APPLICATION_URI(
"SP_OPCUA_APPLICATION_URI",
"urn:org:apache:streampipes:opcua:client"
);
),

// Default keystore and truststore
SP_SECURITY_KEYSTORE_FILENAME(
"SP_SECURITY_KEYSTORE_FILENAME",
"/streampipes-security/keystore.pfx"),
SP_SECURITY_KEYSTORE_PASSWORD("SP_SECURITY_KEYSTORE_PASSWORD", ""),
SP_SECURITY_KEYSTORE_TYPE("SP_SECURITY_KEYSTORE_TYPE", "PKCS12"),
SP_SECURITY_KEY_PASSWORD("SP_SECURITY_KEY_PASSWORD", null),
SP_SECURITY_TRUSTSTORE_FILENAME(
"SP_SECURITY_TRUSTSTORE_FILENAME",
"/streampipes-security/truststore.pfx"),
SP_SECURITY_TRUSTSTORE_PASSWORD("SP_SECURITY_TRUSTSTORE_PASSWORD", ""),
SP_SECURITY_TRUSTSTORE_TYPE("SP_SECURITY_TRUSTSTORE_TYPE", "PKCS12"),
SP_SECURITY_ALLOW_SELFSIGNED("SP_SECURITY_ALLOW_SELFSIGNED", "false");

private final String envVariableName;
private String defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ public class GlobalStreamPipesConstants {
public static final String STD_DOCUMENTATION_NAME = "documentation.md";

public static final String INTERNAL_TOPIC_PREFIX = "org-apache-streampipes-internal-";
public static final String CONNECT_TOPIC_PREFIX = "org.apache.streampipes.connect.";

}
Original file line number Diff line number Diff line change
Expand Up @@ -349,12 +349,52 @@ public StringEnvironmentVariable getOpcUaApplicationUri() {
}

@Override
public StringEnvironmentVariable getOPcUaKeystoreType() {
public StringEnvironmentVariable getOpcUaKeystoreType() {
return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_TYPE);
}

@Override
public StringEnvironmentVariable getOpcUaKeystoreAlias() {
return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_ALIAS);
}

@Override
public StringEnvironmentVariable getKeystoreFilename() {
return new StringEnvironmentVariable(Envs.SP_SECURITY_KEYSTORE_FILENAME);
}

@Override
public StringEnvironmentVariable getKeystorePassword() {
return new StringEnvironmentVariable(Envs.SP_SECURITY_KEYSTORE_PASSWORD);
}

@Override
public StringEnvironmentVariable getKeystoreType() {
return new StringEnvironmentVariable(Envs.SP_SECURITY_KEYSTORE_TYPE);
}

@Override
public StringEnvironmentVariable getKeyPassword() {
return new StringEnvironmentVariable(Envs.SP_SECURITY_KEY_PASSWORD);
}

@Override
public StringEnvironmentVariable getTruststoreFilename() {
return new StringEnvironmentVariable(Envs.SP_SECURITY_TRUSTSTORE_FILENAME);
}

@Override
public StringEnvironmentVariable getTruststorePassword() {
return new StringEnvironmentVariable(Envs.SP_SECURITY_TRUSTSTORE_PASSWORD);
}

@Override
public StringEnvironmentVariable getTruststoreType() {
return new StringEnvironmentVariable(Envs.SP_SECURITY_TRUSTSTORE_TYPE);
}

@Override
public BooleanEnvironmentVariable getAllowSelfSignedCertificates() {
return new BooleanEnvironmentVariable(Envs.SP_SECURITY_ALLOW_SELFSIGNED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,79 +32,56 @@ public interface Environment {
// Service base configuration

StringEnvironmentVariable getServiceHost();

IntEnvironmentVariable getServicePort();

StringEnvironmentVariable getSpCoreScheme();

StringEnvironmentVariable getSpCoreHost();

IntEnvironmentVariable getSpCorePort();

// Time series storage env variables

StringEnvironmentVariable getTsStorage();

StringEnvironmentVariable getTsStorageProtocol();

StringEnvironmentVariable getTsStorageHost();

IntEnvironmentVariable getTsStoragePort();

StringEnvironmentVariable getTsStorageToken();

StringEnvironmentVariable getTsStorageOrg();

StringEnvironmentVariable getTsStorageBucket();

IntEnvironmentVariable getIotDbSessionPoolSize();

BooleanEnvironmentVariable getIotDbSessionEnableCompression();

StringEnvironmentVariable getIotDbUser();

StringEnvironmentVariable getIotDbPassword();

// CouchDB env variables

StringEnvironmentVariable getCouchDbProtocol();

StringEnvironmentVariable getCouchDbHost();

IntEnvironmentVariable getCouchDbPort();

StringEnvironmentVariable getCouchDbUsername();

StringEnvironmentVariable getCouchDbPassword();


// JWT & Authentication

StringEnvironmentVariable getClientUser();

StringEnvironmentVariable getClientSecret();

StringEnvironmentVariable getJwtSecret();

StringEnvironmentVariable getJwtPublicKeyLoc();

StringEnvironmentVariable getJwtPrivateKeyLoc();

StringEnvironmentVariable getJwtSigningMode();

StringEnvironmentVariable getExtensionsAuthMode();

StringEnvironmentVariable getEncryptionPasscode();

BooleanEnvironmentVariable getOAuthEnabled();

StringEnvironmentVariable getOAuthRedirectUri();

List<OAuthConfiguration> getOAuthConfigurations();

// Messaging
StringEnvironmentVariable getKafkaRetentionTimeMs();

StringEnvironmentVariable getPrioritizedProtocol();


Expand Down Expand Up @@ -164,14 +141,18 @@ public interface Environment {
StringEnvironmentVariable getAllowedUploadFiletypes();

StringEnvironmentVariable getOpcUaSecurityDir();

StringEnvironmentVariable getOpcUaKeystoreFile();

StringEnvironmentVariable getOpcUaKeystorePassword();

StringEnvironmentVariable getOpcUaApplicationUri();

StringEnvironmentVariable getOPcUaKeystoreType();

StringEnvironmentVariable getOpcUaKeystoreType();
StringEnvironmentVariable getOpcUaKeystoreAlias();

StringEnvironmentVariable getKeystoreFilename();
StringEnvironmentVariable getKeystorePassword();
StringEnvironmentVariable getKeystoreType();
StringEnvironmentVariable getKeyPassword();
StringEnvironmentVariable getTruststoreFilename();
StringEnvironmentVariable getTruststorePassword();
StringEnvironmentVariable getTruststoreType();
BooleanEnvironmentVariable getAllowSelfSignedCertificates();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor;
import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfig;
import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConnectUtils;
import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaAdapterConfig;
import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigExtractor;
import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider;
import org.apache.streampipes.extensions.management.connect.adapter.BrokerEventProcessor;
import org.apache.streampipes.extensions.management.connect.adapter.parser.Parsers;
import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
Expand Down Expand Up @@ -67,13 +67,12 @@
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

public class KafkaProtocol implements StreamPipesAdapter, SupportsRuntimeConfig {

private static final Logger logger = LoggerFactory.getLogger(KafkaProtocol.class);
KafkaConfig config;
KafkaAdapterConfig config;

public static final String ID = "org.apache.streampipes.connect.iiot.protocol.stream.kafka";

Expand All @@ -84,14 +83,13 @@ public KafkaProtocol() {
}

private void applyConfiguration(IStaticPropertyExtractor extractor) {
this.config = KafkaConnectUtils.getConfig(extractor, true);
this.config = new KafkaConfigExtractor().extractAdapterConfig(extractor, true);
}

private Consumer<byte[], byte[]> createConsumer(KafkaConfig kafkaConfig) throws KafkaException {
private Consumer<byte[], byte[]> createConsumer(KafkaAdapterConfig kafkaConfig) throws KafkaException {
final Properties props = new Properties();

kafkaConfig.getSecurityConfig().appendConfig(props);
kafkaConfig.getAutoOffsetResetConfig().appendConfig(props);
kafkaConfig.getConfigAppenders().forEach(c -> c.appendConfig(props));

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaConfig.getKafkaHost() + ":" + kafkaConfig.getKafkaPort());
Expand All @@ -113,34 +111,36 @@ public StaticProperty resolveConfiguration(String staticPropertyInternalName,
IStaticPropertyExtractor extractor)
throws SpConfigurationException {
RuntimeResolvableOneOfStaticProperty config = extractor
.getStaticPropertyByName(KafkaConnectUtils.TOPIC_KEY, RuntimeResolvableOneOfStaticProperty.class);
KafkaConfig kafkaConfig = KafkaConnectUtils.getConfig(extractor, false);
boolean hideInternalTopics = extractor.slideToggleValue(KafkaConnectUtils.getHideInternalTopicsKey());
.getStaticPropertyByName(KafkaConfigProvider.TOPIC_KEY, RuntimeResolvableOneOfStaticProperty.class);
var kafkaConfig = new KafkaConfigExtractor().extractAdapterConfig(extractor, false);
boolean hideInternalTopics = extractor.slideToggleValue(KafkaConfigProvider.getHideInternalTopicsKey());

try {
Consumer<byte[], byte[]> consumer = createConsumer(kafkaConfig);
Set<String> topics = consumer.listTopics().keySet();
var consumer = createConsumer(kafkaConfig);
List<String> topics = new ArrayList<>(consumer.listTopics().keySet()).stream().sorted().toList();
consumer.close();

if (hideInternalTopics) {
topics = topics
.stream()
.filter(t -> !t.startsWith(GlobalStreamPipesConstants.INTERNAL_TOPIC_PREFIX))
.collect(Collectors.toSet());
.filter(t -> (!t.startsWith(GlobalStreamPipesConstants.INTERNAL_TOPIC_PREFIX)
&& !t.startsWith(GlobalStreamPipesConstants.CONNECT_TOPIC_PREFIX)))
.toList();
}

config.setOptions(topics.stream().map(Option::new).collect(Collectors.toList()));

return config;
} catch (KafkaException e) {
throw new SpConfigurationException(e.getMessage(), e);
var message = e.getCause() != null ? e.getCause().getMessage() : e.getMessage();
throw new SpConfigurationException(message, e);
}
}

@Override
public IAdapterConfiguration declareConfig() {

StaticPropertyAlternative latestAlternative = KafkaConnectUtils.getAlternativesLatest();
StaticPropertyAlternative latestAlternative = KafkaConfigProvider.getAlternativesLatest();
latestAlternative.setSelected(true);

return AdapterConfigurationBuilder
Expand All @@ -150,28 +150,28 @@ public IAdapterConfiguration declareConfig() {
.withLocales(Locales.EN)
.withCategory(AdapterType.Generic, AdapterType.Manufacturing)

.requiredAlternatives(KafkaConnectUtils.getAccessModeLabel(),
KafkaConnectUtils.getAlternativeUnauthenticatedPlain(),
KafkaConnectUtils.getAlternativeUnauthenticatedSSL(),
KafkaConnectUtils.getAlternativesSaslPlain(),
KafkaConnectUtils.getAlternativesSaslSSL())
.requiredAlternatives(KafkaConfigProvider.getAccessModeLabel(),
KafkaConfigProvider.getAlternativeUnauthenticatedPlain(),
KafkaConfigProvider.getAlternativeUnauthenticatedSSL(),
KafkaConfigProvider.getAlternativesSaslPlain(),
KafkaConfigProvider.getAlternativesSaslSSL())

.requiredTextParameter(KafkaConnectUtils.getHostLabel())
.requiredIntegerParameter(KafkaConnectUtils.getPortLabel())
.requiredTextParameter(KafkaConfigProvider.getHostLabel())
.requiredIntegerParameter(KafkaConfigProvider.getPortLabel())

.requiredAlternatives(KafkaConnectUtils.getConsumerGroupLabel(),
KafkaConnectUtils.getAlternativesRandomGroupId(),
KafkaConnectUtils.getAlternativesGroupId())
.requiredAlternatives(KafkaConfigProvider.getConsumerGroupLabel(),
KafkaConfigProvider.getAlternativesRandomGroupId(),
KafkaConfigProvider.getAlternativesGroupId())

.requiredSlideToggle(KafkaConnectUtils.getHideInternalTopicsLabel(), true)
.requiredSlideToggle(KafkaConfigProvider.getHideInternalTopicsLabel(), true)

.requiredSingleValueSelectionFromContainer(KafkaConnectUtils.getTopicLabel(), Arrays.asList(
KafkaConnectUtils.HOST_KEY,
KafkaConnectUtils.PORT_KEY))
.requiredAlternatives(KafkaConnectUtils.getAutoOffsetResetConfigLabel(),
KafkaConnectUtils.getAlternativesEarliest(),
latestAlternative,
KafkaConnectUtils.getAlternativesNone())
.requiredSingleValueSelectionFromContainer(KafkaConfigProvider.getTopicLabel(), Arrays.asList(
KafkaConfigProvider.HOST_KEY,
KafkaConfigProvider.PORT_KEY))
.requiredAlternatives(KafkaConfigProvider.getAutoOffsetResetConfigLabel(),
KafkaConfigProvider.getAlternativesEarliest(),
latestAlternative,
KafkaConfigProvider.getAlternativesNone())
.buildConfiguration();
}

Expand All @@ -185,17 +185,11 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor,
protocol.setBrokerHostname(config.getKafkaHost());
protocol.setTopicDefinition(new SimpleTopicDefinition(config.getTopic()));

List<KafkaConfigAppender> kafkaConfigAppenderList = new ArrayList<>(2);
kafkaConfigAppenderList.add(this.config.getSecurityConfig());
kafkaConfigAppenderList.add(this.config.getAutoOffsetResetConfig());

this.kafkaConsumer = new SpKafkaConsumer(protocol,
config.getTopic(),
new BrokerEventProcessor(extractor.selectedParser(), (event) -> {
collector.collect(event);
}),
kafkaConfigAppenderList
);
new BrokerEventProcessor(extractor.selectedParser(), collector),
config.getConfigAppenders()
);

thread = new Thread(this.kafkaConsumer);
thread.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
*
*/

package org.apache.streampipes.messaging.kafka.security;
package org.apache.streampipes.extensions.connectors.kafka.shared.kafka;

import java.util.Properties;
public class KafkaAdapterConfig extends KafkaBaseConfig {

public class KafkaSecurityUnauthenticatedPlainConfig extends KafkaSecurityConfig {
private String groupId;

@Override
public void appendConfig(Properties props) {
public String getGroupId() {
return groupId;
}

public void setGroupId(String groupId) {
this.groupId = groupId;
}
}
Loading
Loading