Skip to content

Commit

Permalink
Add Kafka migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikriemer committed Dec 2, 2024
1 parent 2edc5e5 commit b07bb3b
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import org.apache.streampipes.extensions.api.migration.IModelMigrator;
import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement;
import org.apache.streampipes.extensions.connectors.kafka.adapter.KafkaProtocol;
import org.apache.streampipes.extensions.connectors.kafka.migration.KafkaAdapterMigrationV1;
import org.apache.streampipes.extensions.connectors.kafka.migration.KafkaSinkMigrationV1;
import org.apache.streampipes.extensions.connectors.kafka.sink.KafkaPublishSink;

import java.util.Collections;
import java.util.List;

public class KafkaConnectorsModuleExport implements IExtensionModuleExport {
Expand All @@ -45,6 +46,9 @@ public List<IStreamPipesPipelineElement<?>> pipelineElements() {

@Override
public List<IModelMigrator<?, ?>> migrators() {
return Collections.emptyList();
return List.of(
new KafkaAdapterMigrationV1(),
new KafkaSinkMigrationV1()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public IAdapterConfiguration declareConfig() {
latestAlternative.setSelected(true);

return AdapterConfigurationBuilder
.create(ID, 0, KafkaProtocol::new)
.create(ID, 1, KafkaProtocol::new)
.withSupportedParsers(Parsers.defaultParsers())
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.withLocales(Locales.EN)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.extensions.connectors.kafka.migration;

import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.extensions.api.migration.IAdapterMigrator;
import org.apache.streampipes.extensions.connectors.kafka.adapter.KafkaProtocol;
import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider;
import org.apache.streampipes.extensions.management.connect.adapter.parser.AvroParser;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
import org.apache.streampipes.model.migration.MigrationResult;
import org.apache.streampipes.model.migration.ModelMigratorConfig;
import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.helpers.Labels;

public class KafkaAdapterMigrationV1 implements IAdapterMigrator {
@Override
public ModelMigratorConfig config() {
return new ModelMigratorConfig(
KafkaProtocol.ID,
SpServiceTagPrefix.ADAPTER,
0,
1
);
}

@Override
public MigrationResult<AdapterDescription> migrate(AdapterDescription element,
IStaticPropertyExtractor extractor) throws RuntimeException {

migrateSecurity((StaticPropertyAlternatives) element.getConfig().get(0));
migrateAvro((StaticPropertyAlternatives) element.getConfig().get(6));
element.getConfig().add(3, makeConsumerGroup());
return MigrationResult.success(element);
}

public void migrateSecurity(StaticPropertyAlternatives securityAlternatives) {
migrateGroup(securityAlternatives.getAlternatives().get(2));
migrateGroup(securityAlternatives.getAlternatives().get(3));
}

public void migrateAvro(StaticPropertyAlternatives formatAlternatives) {
var parser = new AvroParser();
var avroParserDescription = new StaticPropertyAlternative(
parser.declareDescription().getName(),
parser.declareDescription().getName(),
parser.declareDescription().getDescription());

avroParserDescription.setStaticProperty(parser.declareDescription().getConfig());
formatAlternatives.getAlternatives().add(
avroParserDescription
);
}

private StaticPropertyAlternatives makeConsumerGroup() {
var consumerGroupAlternatives = StaticProperties.alternatives(
KafkaConfigProvider.getConsumerGroupLabel(),
KafkaConfigProvider.getAlternativesRandomGroupId(),
KafkaConfigProvider.getAlternativesGroupId()
);
consumerGroupAlternatives.getAlternatives().get(0).setSelected(true);
return consumerGroupAlternatives;
}

private void migrateGroup(StaticPropertyAlternative alternative) {
boolean selected = alternative.getSelected();
var securityMechanism = StaticProperties.singleValueSelection(
Labels.withId(KafkaConfigProvider.SECURITY_MECHANISM),
KafkaConfigProvider.makeSecurityMechanism());
securityMechanism.getOptions().get(0).setSelected(selected);
((StaticPropertyGroup) alternative.getStaticProperty()).setHorizontalRendering(false);
((StaticPropertyGroup) alternative.getStaticProperty()).getStaticProperties().add(
0,
securityMechanism
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.extensions.connectors.kafka.migration;

import org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
import org.apache.streampipes.extensions.api.migration.IDataSinkMigrator;
import org.apache.streampipes.extensions.connectors.kafka.sink.KafkaPublishSink;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.model.migration.MigrationResult;
import org.apache.streampipes.model.migration.ModelMigratorConfig;
import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;

public class KafkaSinkMigrationV1 implements IDataSinkMigrator {

@Override
public ModelMigratorConfig config() {
return new ModelMigratorConfig(
KafkaPublishSink.ID,
SpServiceTagPrefix.DATA_SINK,
0,
1
);
}

@Override
public MigrationResult<DataSinkInvocation> migrate(DataSinkInvocation element,
IDataSinkParameterExtractor extractor) throws RuntimeException {

new KafkaAdapterMigrationV1().migrateSecurity(
(StaticPropertyAlternatives) element.getStaticProperties().get(3));

return MigrationResult.success(element);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaBaseConfig;
import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigExtractor;
import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider;
import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
Expand All @@ -38,24 +37,28 @@
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class KafkaPublishSink implements IStreamPipesDataSink {

public static final String ID = "org.apache.streampipes.sinks.brokers.jvm.kafka";
private static final Logger LOG = LoggerFactory.getLogger(KafkaPublishSink.class);

private SpKafkaProducer producer;

private JsonDataFormatDefinition dataFormatDefinition;

private KafkaBaseConfig kafkaConfig;

public KafkaPublishSink() {
}

@Override
public IDataSinkConfiguration declareConfig() {
return DataSinkConfiguration.create(
KafkaPublishSink::new,
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.kafka", 0)
DataSinkBuilder.create(ID, 1)
.category(DataSinkType.MESSAGING)
.withLocales(Locales.EN)
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
Expand All @@ -80,7 +83,7 @@ public IDataSinkConfiguration declareConfig() {
@Override
public void onPipelineStarted(IDataSinkParameters parameters,
EventSinkRuntimeContext runtimeContext) {
this.kafkaConfig = new KafkaConfigExtractor().extractSinkConfig(parameters.extractor());
var kafkaConfig = new KafkaConfigExtractor().extractSinkConfig(parameters.extractor());
this.dataFormatDefinition = new JsonDataFormatDefinition();

this.producer = new SpKafkaProducer(
Expand All @@ -95,7 +98,7 @@ public void onEvent(Event event) throws SpRuntimeException {
Map<String, Object> rawEvent = event.getRaw();
this.producer.publish(dataFormatDefinition.fromMap(rawEvent));
} catch (SpRuntimeException e) {
e.printStackTrace();
LOG.error(e.getMessage(), e);
}
}

Expand Down

0 comments on commit b07bb3b

Please sign in to comment.