Skip to content

Commit

Permalink
feat(#3453): Add additional schema options to StaticMetadataEnricher
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikriemer committed Jan 31, 2025
1 parent a061f13 commit d9a6db5
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.streampipes.extensions.api.migration.IModelMigrator;
import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement;
import org.apache.streampipes.processors.transformation.jvm.migrations.StaticMetadataEnrichmentProcessorMigrationV1;
import org.apache.streampipes.processors.transformation.jvm.migrations.StaticMetadataEnrichmentProcessorMigrationV2;
import org.apache.streampipes.processors.transformation.jvm.processor.array.count.CountArrayProcessor;
import org.apache.streampipes.processors.transformation.jvm.processor.array.split.SplitArrayProcessor;
import org.apache.streampipes.processors.transformation.jvm.processor.booloperator.counter.BooleanCounterProcessor;
Expand Down Expand Up @@ -94,6 +95,8 @@ public List<IStreamPipesPipelineElement<?>> pipelineElements() {

@Override
public List<IModelMigrator<?, ?>> migrators() {
return List.of(new StaticMetadataEnrichmentProcessorMigrationV1());
return List.of(
new StaticMetadataEnrichmentProcessorMigrationV1(),
new StaticMetadataEnrichmentProcessorMigrationV2());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.processors.transformation.jvm.migrations;

import org.apache.streampipes.extensions.api.extractor.IDataProcessorParameterExtractor;
import org.apache.streampipes.extensions.api.migration.IDataProcessorMigrator;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.migration.MigrationResult;
import org.apache.streampipes.model.migration.ModelMigratorConfig;
import org.apache.streampipes.model.staticproperty.CollectionStaticProperty;
import org.apache.streampipes.model.staticproperty.StaticProperty;
import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
import org.apache.streampipes.processors.transformation.jvm.processor.staticmetadata.StaticMetaDataEnrichmentProcessor;
import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.helpers.Labels;

import java.util.List;

import static org.apache.streampipes.processors.transformation.jvm.processor.staticmetadata.StaticMetaDataEnrichmentProcessor.STATIC_METADATA_INPUT_DESCRIPTION;
import static org.apache.streampipes.processors.transformation.jvm.processor.staticmetadata.StaticMetaDataEnrichmentProcessor.STATIC_METADATA_INPUT_LABEL;

public class StaticMetadataEnrichmentProcessorMigrationV2 implements IDataProcessorMigrator {
@Override
public ModelMigratorConfig config() {
return new ModelMigratorConfig(
StaticMetaDataEnrichmentProcessor.ID,
SpServiceTagPrefix.DATA_PROCESSOR,
1,
2
);
}

@Override
public MigrationResult<DataProcessorInvocation> migrate(DataProcessorInvocation element,
IDataProcessorParameterExtractor extractor)
throws RuntimeException {
element.getStaticProperties()
.stream()
.filter(sp -> sp.getInternalName().equals(StaticMetaDataEnrichmentProcessor.STATIC_METADATA_INPUT))
.map(sp -> (CollectionStaticProperty) sp)
.forEach(sp -> {
addLabelAndDescriptionConfig((StaticPropertyGroup) sp.getStaticPropertyTemplate());
sp.getMembers().forEach(member -> {
addLabelAndDescriptionConfig((StaticPropertyGroup) member);
});
});
return MigrationResult.success(element);
}

private void addLabelAndDescriptionConfig(StaticPropertyGroup group) {
group.setHorizontalRendering(false);
group.getStaticProperties().addAll(
List.of(
makeFreeTextProperty(STATIC_METADATA_INPUT_LABEL),
makeFreeTextProperty(STATIC_METADATA_INPUT_DESCRIPTION)
)
);
}

private StaticProperty makeFreeTextProperty(String id) {
var sp = StaticProperties.stringFreeTextProperty(
Labels.withId(
id));
sp.setOptional(true);
sp.setValue("");
return sp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
public record StaticMetaDataConfiguration(
String runtimeName,
String value,
String dataType
String dataType,
String label,
String description
) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,16 @@

public class StaticMetaDataEnrichmentProcessor
implements IStreamPipesDataProcessor,
ResolvesContainerProvidedOutputStrategy<DataProcessorInvocation, ProcessingElementParameterExtractor> {
ResolvesContainerProvidedOutputStrategy<DataProcessorInvocation, ProcessingElementParameterExtractor> {

public static final String ID = "org.apache.streampipes.processors.transformation.jvm.processor.staticmetadata";

protected static final String STATIC_METADATA_INPUT = "static-metadata-input";
public static final String STATIC_METADATA_INPUT = "static-metadata-input";
protected static final String STATIC_METADATA_INPUT_RUNTIME_NAME = "static-metadata-input-runtime-name";
protected static final String STATIC_METADATA_INPUT_VALUE = "static-metadata-input-value";
protected static final String STATIC_METADATA_INPUT_DATATYPE = "static-metadata-input-datatype";
public static final String STATIC_METADATA_INPUT_LABEL = "static-metadata-input-label";
public static final String STATIC_METADATA_INPUT_DESCRIPTION = "static-metadata-input-description";

protected static final String OPTION_BOOL = "Bool";
protected static final String OPTION_STRING = "String";
Expand All @@ -75,42 +77,51 @@ public IDataProcessorConfiguration declareConfig() {
return DataProcessorConfiguration.create(
StaticMetaDataEnrichmentProcessor::new,
ProcessingElementBuilder.create(
ID,
1
)
.category(
DataProcessorType.ENRICH)
.withLocales(
Locales.EN)
.withAssets(
ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON
)
.requiredCollection(
Labels.withId(
STATIC_METADATA_INPUT),
StaticProperties.stringFreeTextProperty(
Labels.withId(
STATIC_METADATA_INPUT_RUNTIME_NAME)),
StaticProperties.stringFreeTextProperty(
Labels.withId(
STATIC_METADATA_INPUT_VALUE)),
StaticProperties.singleValueSelection(
Labels.withId(
STATIC_METADATA_INPUT_DATATYPE),
Options.from(
OPTION_BOOL,
OPTION_STRING,
OPTION_FLOAT,
OPTION_INTEGER
)
)
)
.requiredStream(
StreamRequirementsBuilder.any())
.outputStrategy(
OutputStrategies.customTransformation())
.build()
ID,
2
)
.category(
DataProcessorType.ENRICH)
.withLocales(
Locales.EN)
.withAssets(
ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON
)
.requiredStaticProperty(
StaticProperties.collection(
Labels.withId(
STATIC_METADATA_INPUT),
false,
StaticProperties.stringFreeTextProperty(
Labels.withId(
STATIC_METADATA_INPUT_RUNTIME_NAME)),
StaticProperties.stringFreeTextProperty(
Labels.withId(
STATIC_METADATA_INPUT_VALUE)),
StaticProperties.singleValueSelection(
Labels.withId(
STATIC_METADATA_INPUT_DATATYPE),
Options.from(
OPTION_BOOL,
OPTION_STRING,
OPTION_FLOAT,
OPTION_INTEGER
)
),
StaticProperties.stringFreeTextProperty(
Labels.withId(
STATIC_METADATA_INPUT_LABEL)),
StaticProperties.stringFreeTextProperty(
Labels.withId(
STATIC_METADATA_INPUT_DESCRIPTION))
)
)
.requiredStream(
StreamRequirementsBuilder.any())
.outputStrategy(
OutputStrategies.customTransformation())
.build()
);
}

Expand All @@ -123,8 +134,8 @@ public EventSchema resolveOutputStrategy(
var metaDataConfigurations = getMetaDataConfigurations(parameterExtractor);

var eventSchema = processingElement.getInputStreams()
.get(0)
.getEventSchema();
.get(0)
.getEventSchema();

addMetaDataConfigurationPropertiesToEventSchema(metaDataConfigurations, eventSchema);

Expand Down Expand Up @@ -185,7 +196,9 @@ private StaticMetaDataConfiguration getMetaDataConfiguration(StaticPropertyExtra
var runtimeName = memberExtractor.textParameter(STATIC_METADATA_INPUT_RUNTIME_NAME);
var value = memberExtractor.textParameter(STATIC_METADATA_INPUT_VALUE);
var dataType = memberExtractor.selectedSingleValue(STATIC_METADATA_INPUT_DATATYPE, String.class);
return new StaticMetaDataConfiguration(runtimeName, value, dataType);
var label = memberExtractor.textParameter(STATIC_METADATA_INPUT_LABEL);
var description = memberExtractor.textParameter(STATIC_METADATA_INPUT_DESCRIPTION);
return new StaticMetaDataConfiguration(runtimeName, value, dataType, label, description);
}

protected Object castValueOfMetaDataConfiguration(StaticMetaDataConfiguration staticMetaDataConfiguration) {
Expand Down Expand Up @@ -214,7 +227,7 @@ private void addMetaDataConfigurationPropertiesToEventSchema(
for (StaticMetaDataConfiguration metaDataConfiguration : metaDataConfigurations) {
var metaDataEventProperty = getMetaDataEventProperty(metaDataConfiguration);
eventSchema.getEventProperties()
.add(metaDataEventProperty);
.add(metaDataEventProperty);
}
}

Expand All @@ -226,6 +239,8 @@ private EventPropertyPrimitive getMetaDataEventProperty(
transformToStreamPipesDataType(metaDataConfiguration.dataType()),
metaDataConfiguration.runtimeName()
)
.label(metaDataConfiguration.label())
.description(metaDataConfiguration.description())
.scope(PropertyScope.MEASUREMENT_PROPERTY)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,8 @@ static-metadata-input-value.description=This is used for the runtime value in th
static-metadata-input-datatype.title=Data Type
static-metadata-input-datatype.description=Select the data type of the value

static-metadata-input-label.title=Label
static-metadata-input-label.description=An optional short label which describes the field

static-metadata-input-description.title=Description
static-metadata-input-description.description=An optional description of the field
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.http.entity.ContentType;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class CustomTransformOutputSchemaGenerator extends OutputSchemaGenerator<CustomTransformOutputStrategy> {

Expand Down Expand Up @@ -79,7 +80,7 @@ private EventSchema makeRequest() {
}

private EventSchema handleResponse(Response httpResp) throws JsonSyntaxException, IOException {
String resp = httpResp.returnContent().asString();
String resp = httpResp.returnContent().asString(StandardCharsets.UTF_8);

return JacksonSerializer
.getObjectMapper()
Expand Down

0 comments on commit d9a6db5

Please sign in to comment.