diff --git a/streampipes-extensions-management/pom.xml b/streampipes-extensions-management/pom.xml index 3c0655c595..9c0427023b 100644 --- a/streampipes-extensions-management/pom.xml +++ b/streampipes-extensions-management/pom.xml @@ -94,6 +94,11 @@ com.opencsv opencsv + + org.apache.avro + avro + 1.11.4 + diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/BrokerEventProcessor.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/BrokerEventProcessor.java index 2ad113e8a7..b3ed8c6fd1 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/BrokerEventProcessor.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/BrokerEventProcessor.java @@ -23,11 +23,10 @@ import org.apache.streampipes.extensions.api.connect.IParser; import org.apache.streampipes.messaging.InternalEventProcessor; -import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.charset.StandardCharsets; +import java.io.ByteArrayInputStream; public record BrokerEventProcessor( IParser parser, @@ -39,7 +38,7 @@ public record BrokerEventProcessor( @Override public void onEvent(byte[] payload) { try { - parser.parse(IOUtils.toInputStream(new String(payload), StandardCharsets.UTF_8), collector::collect); + parser.parse(new ByteArrayInputStream(payload), collector::collect); } catch (ParseException e) { LOG.error("Error while parsing: " + e.getMessage()); } diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/AvroParser.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/AvroParser.java new file mode 100644 index 0000000000..32bf26f489 --- /dev/null +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/AvroParser.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.extensions.management.connect.adapter.parser; + +import org.apache.streampipes.commons.exceptions.connect.ParseException; +import org.apache.streampipes.extensions.api.connect.IParser; +import org.apache.streampipes.extensions.api.connect.IParserEventHandler; +import org.apache.streampipes.model.connect.grounding.ParserDescription; +import org.apache.streampipes.model.connect.guess.GuessSchema; +import org.apache.streampipes.model.staticproperty.StaticProperty; +import org.apache.streampipes.sdk.builder.adapter.ParserDescriptionBuilder; +import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor; +import org.apache.streampipes.sdk.helpers.Labels; +import org.apache.streampipes.sdk.helpers.Options; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.util.Utf8; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public class AvroParser implements IParser { + + private static final Logger LOG = LoggerFactory.getLogger(AvroParser.class); + + public static final String ID = "org.apache.streampipes.extensions.management.connect.adapter.parser.avro"; + public static final String LABEL = "Avro"; + public static final String DESCRIPTION = "Can be used to read avro records"; + + public static final String SCHEMA = "schema"; + public static final String SCHEMA_REGISTRY = "schemaRegistry"; + public static final String FLATTEN_RECORDS = "flattenRecord"; + + private final ParserUtils parserUtils; + private DatumReader datumReader; + private boolean schemaRegistry; + private boolean flattenRecord; + + + public AvroParser() { + parserUtils = new ParserUtils(); + } + + public AvroParser(String schemaString, boolean schemaRegistry, boolean flattenRecord) { + this(); + Schema schema = new Schema.Parser().parse(schemaString); + this.datumReader = new GenericDatumReader<>(schema); + this.schemaRegistry = schemaRegistry; + this.flattenRecord = flattenRecord; + } + + @Override + public ParserDescription declareDescription() { + return ParserDescriptionBuilder.create(ID, LABEL, DESCRIPTION) + .requiredSingleValueSelection( + Labels.from(SCHEMA_REGISTRY, "Schema Registry", + "Does the messages include the schema registry header?"), + Options.from("yes", "no") + ) + .requiredSingleValueSelection( + Labels.from(FLATTEN_RECORDS, "Flatten Records", + "Should nested records be flattened?"), + Options.from("no", "yes") + ) + .requiredCodeblock(Labels.from(SCHEMA, "Schema", + "The schema of the avro record"), + "{\n" + + " \"namespace\": \"example.avro\",\n" + + " \"type\": \"record\",\n" + + " \"name\": \"Test\",\n" + + " \"fields\": [\n" + + " {\"name\": \"id\", \"type\": \"string\"},\n" + + " {\"name\": \"value\", \"type\": \"double\"}\n" + + " ]\n" + + "}") + .build(); + } + + + @Override + public GuessSchema getGuessSchema(InputStream inputStream) throws ParseException { + GenericRecord avroRecord = getRecord(inputStream); + var event = toMap(avroRecord); + return parserUtils.getGuessSchema(event); + } + + @Override + public void parse(InputStream inputStream, IParserEventHandler handler) throws ParseException { + GenericRecord avroRecord = getRecord(inputStream); + var event = toMap(avroRecord); + handler.handle(event); + } + + @Override + public IParser fromDescription(List configuration) { + var extractor = StaticPropertyExtractor.from(configuration); + String schema = extractor.codeblockValue(SCHEMA); + boolean schemaRegistry = extractor + .selectedSingleValue(SCHEMA_REGISTRY, String.class) + .equals("yes"); + boolean flattenRecords = extractor + .selectedSingleValue(FLATTEN_RECORDS, String.class) + .equals("yes"); + + return new AvroParser(schema, schemaRegistry, flattenRecords); + } + + private GenericRecord getRecord(InputStream inputStream) throws ParseException { + try { + if (schemaRegistry) { + inputStream.skipNBytes(5); + } + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + return datumReader.read(null, decoder); + } catch (IOException e) { + throw new ParseException( + "Error decoding the avro message. Please check the schema." + ); + } + } + + + private Map toMap(GenericRecord avroRecord) { + Map resultMap = new LinkedHashMap<>(); + avroRecord.getSchema().getFields().forEach(field -> { + String fieldName = field.name(); + Object fieldValue = avroRecord.get(fieldName); + if (flattenRecord && fieldValue instanceof GenericRecord){ + Map flatMap = unwrapNestedRecord((GenericRecord) fieldValue, fieldName); + resultMap.putAll(flatMap); + } else { + resultMap.put(fieldName, toMapHelper(fieldValue)); + } + }); + + return resultMap; + } + + private Object toMapHelper(Object fieldValue) { + if (fieldValue instanceof GenericRecord){ + return toMap((GenericRecord) fieldValue); + } + if (fieldValue instanceof GenericData.Array){ + List valueList = new ArrayList<>(); + ((GenericData.Array) fieldValue).forEach(value -> valueList.add(toMapHelper(value))); + return valueList; + } + if (fieldValue instanceof Map){ + Map valueMap = new LinkedHashMap<>(); + ((Map) fieldValue).forEach((key, value1) -> valueMap.put(convertUTF8(key), toMapHelper(value1))); + return valueMap; + } + return convertUTF8(fieldValue); + } + + private Map unwrapNestedRecord(GenericRecord nestedRecord, String prefix) { + Map flatMap = new HashMap<>(); + + nestedRecord.getSchema().getFields().forEach(field -> { + String fieldName = field.name(); + Object fieldValue = nestedRecord.get(fieldName); + String newKey = prefix.isEmpty() ? fieldName : prefix + "_" + fieldName; + if (fieldValue instanceof GenericRecord) { + flatMap.putAll(unwrapNestedRecord((GenericRecord) fieldValue, newKey)); + } else { + flatMap.put(newKey, toMapHelper(fieldValue)); + } + }); + + return flatMap; + } + + private Object convertUTF8(Object fieldValue) { + if (fieldValue instanceof Utf8){ + return fieldValue.toString(); + } + return fieldValue; + } + +} diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/Parsers.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/Parsers.java index 3c8a0110da..1dc2937792 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/Parsers.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/Parsers.java @@ -30,7 +30,8 @@ public static List defaultParsers() { new JsonParsers(), new CsvParser(), new XmlParser(), - new ImageParser() + new ImageParser(), + new AvroParser() ); } } diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java index f2e0a76918..fb387b59ee 100644 --- a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java @@ -96,8 +96,7 @@ private Consumer createConsumer(KafkaConfig kafkaConfig) throws props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getKafkaHost() + ":" + kafkaConfig.getKafkaPort()); - props.put(ConsumerConfig.GROUP_ID_CONFIG, - "KafkaExampleConsumer" + System.currentTimeMillis()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGroupId()); props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 6000); @@ -160,6 +159,10 @@ public IAdapterConfiguration declareConfig() { .requiredTextParameter(KafkaConnectUtils.getHostLabel()) .requiredIntegerParameter(KafkaConnectUtils.getPortLabel()) + .requiredAlternatives(KafkaConnectUtils.getConsumerGroupLabel(), + KafkaConnectUtils.getAlternativesRandomGroupId(), + KafkaConnectUtils.getAlternativesGroupId()) + .requiredSlideToggle(KafkaConnectUtils.getHideInternalTopicsLabel(), true) .requiredSingleValueSelectionFromContainer(KafkaConnectUtils.getTopicLabel(), Arrays.asList( diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfig.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfig.java index 770cfc0561..7f02e4c5b1 100644 --- a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfig.java +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfig.java @@ -26,6 +26,7 @@ public class KafkaConfig { private String kafkaHost; private Integer kafkaPort; private String topic; + private String groupId; KafkaSecurityConfig securityConfig; AutoOffsetResetConfig autoOffsetResetConfig; @@ -33,11 +34,13 @@ public class KafkaConfig { public KafkaConfig(String kafkaHost, Integer kafkaPort, String topic, + String groupId, KafkaSecurityConfig securityConfig, AutoOffsetResetConfig autoOffsetResetConfig) { this.kafkaHost = kafkaHost; this.kafkaPort = kafkaPort; this.topic = topic; + this.groupId = groupId; this.securityConfig = securityConfig; this.autoOffsetResetConfig = autoOffsetResetConfig; } @@ -66,6 +69,14 @@ public void setTopic(String topic) { this.topic = topic; } + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + public KafkaSecurityConfig getSecurityConfig() { return securityConfig; } diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConnectUtils.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConnectUtils.java index f56a230cbb..aa473dcbf7 100644 --- a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConnectUtils.java +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConnectUtils.java @@ -56,6 +56,11 @@ public class KafkaConnectUtils { public static final String USERNAME_KEY = "username"; public static final String PASSWORD_KEY = "password"; + public static final String CONSUMER_GROUP = "consumer-group"; + public static final String RANDOM_GROUP_ID = "random-group-id"; + public static final String GROUP_ID = "group-id"; + public static final String GROUP_ID_INPUT = "group-id-input"; + private static final String HIDE_INTERNAL_TOPICS = "hide-internal-topics"; @@ -88,6 +93,10 @@ public static Label getAccessModeLabel() { return Labels.withId(ACCESS_MODE); } + public static Label getConsumerGroupLabel() { + return Labels.withId(CONSUMER_GROUP); + } + public static Label getAutoOffsetResetConfigLabel() { return Labels.withId(AUTO_OFFSET_RESET_CONFIG); } @@ -124,6 +133,13 @@ public static KafkaConfig getConfig(IStaticPropertyExtractor extractor, boolean new KafkaSecurityUnauthenticatedPlainConfig(); } + String groupId; + if (extractor.selectedAlternativeInternalId(CONSUMER_GROUP).equals(RANDOM_GROUP_ID)){ + groupId = "KafkaExampleConsumer" + System.currentTimeMillis(); + } else { + groupId = extractor.singleValueParameter(GROUP_ID_INPUT, String.class); + } + StaticPropertyAlternatives alternatives = extractor.getStaticPropertyByName(AUTO_OFFSET_RESET_CONFIG, StaticPropertyAlternatives.class); @@ -131,12 +147,12 @@ public static KafkaConfig getConfig(IStaticPropertyExtractor extractor, boolean if (alternatives == null) { AutoOffsetResetConfig autoOffsetResetConfig = new AutoOffsetResetConfig(KafkaConnectUtils.LATEST); - return new KafkaConfig(brokerUrl, port, topic, securityConfig, autoOffsetResetConfig); + return new KafkaConfig(brokerUrl, port, topic, groupId, securityConfig, autoOffsetResetConfig); } else { String auto = extractor.selectedAlternativeInternalId(AUTO_OFFSET_RESET_CONFIG); AutoOffsetResetConfig autoOffsetResetConfig = new AutoOffsetResetConfig(auto); - return new KafkaConfig(brokerUrl, port, topic, securityConfig, autoOffsetResetConfig); + return new KafkaConfig(brokerUrl, port, topic, groupId, securityConfig, autoOffsetResetConfig); } } @@ -172,6 +188,14 @@ public static StaticPropertyAlternative getAlternativesSaslSSL() { StaticProperties.secretValue(Labels.withId(KafkaConnectUtils.PASSWORD_KEY)))); } + public static StaticPropertyAlternative getAlternativesRandomGroupId(){ + return Alternatives.from(Labels.withId(RANDOM_GROUP_ID)); + } + + public static StaticPropertyAlternative getAlternativesGroupId(){ + return Alternatives.from(Labels.withId(KafkaConnectUtils.GROUP_ID), + StaticProperties.stringFreeTextProperty(Labels.withId(KafkaConnectUtils.GROUP_ID_INPUT))); + } public static StaticPropertyAlternative getAlternativesLatest() { return Alternatives.from(Labels.withId(LATEST)); diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en index fd8ef54bc4..81b8b78859 100644 --- a/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en @@ -51,6 +51,18 @@ sasl-ssl.description=Username and password, with ssl encryption username-group.title=Username and password +consumer-group.title=Consumer Group +consumer-group.description=Use random group id or insert a specific one + +random-group-id.title=Random group id +random-group-id.description=StreamPipes generates a random group id + +group-id.title=Insert group id +group-id.description=Insert the group id + +group-id-input.title=Group id +group-id-input.description= + key-deserialization.title=Key Deserializer key-deserialization.description= diff --git a/streampipes-extensions/streampipes-extensions-all-jvm/pom.xml b/streampipes-extensions/streampipes-extensions-all-jvm/pom.xml index 88cede6913..3b9e002677 100644 --- a/streampipes-extensions/streampipes-extensions-all-jvm/pom.xml +++ b/streampipes-extensions/streampipes-extensions-all-jvm/pom.xml @@ -64,6 +64,11 @@ streampipes-connectors-kafka 0.97.0-SNAPSHOT + + org.apache.avro + avro + 1.11.4 + org.apache.streampipes streampipes-connectors-mqtt diff --git a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java index 75883a244a..b5e242eb50 100644 --- a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java +++ b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/KafkaAdapterTester.java @@ -66,7 +66,7 @@ public IAdapterConfiguration prepareAdapter() throws AdapterException { list.add(new Option(TOPIC)); ((RuntimeResolvableOneOfStaticProperty) configuration.getAdapterDescription() .getConfig() - .get(4)) + .get(5)) .setOptions(list); List> configs = new ArrayList<>(); configs.add(Map.of(KafkaConnectUtils.HOST_KEY, kafkaContainer.getBrokerHost())); @@ -89,17 +89,25 @@ public IAdapterConfiguration prepareAdapter() throws AdapterException { .get(0) .setSelected(true); + // Set consumer group to random group id + ((StaticPropertyAlternatives) (desc) + .getConfig() + .get(3)) + .getAlternatives() + .get(0) + .setSelected(true); + // Set AUTO_OFFSET_RESET_CONFIG configuration to Earliest option ((StaticPropertyAlternatives) (desc) .getConfig() - .get(5)) + .get(6)) .getAlternatives() .get(0) .setSelected(true); ((StaticPropertyAlternatives) (desc) .getConfig() - .get(5)) + .get(6)) .getAlternatives() .get(1) .setSelected(false); @@ -107,7 +115,7 @@ public IAdapterConfiguration prepareAdapter() throws AdapterException { // Set format to Json ((StaticPropertyAlternatives) (desc) .getConfig() - .get(6)) + .get(7)) .getAlternatives() .get(0) .setSelected(true); diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/ParserDescriptionBuilder.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/ParserDescriptionBuilder.java index 8a3d351638..591f8b4b67 100644 --- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/ParserDescriptionBuilder.java +++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/adapter/ParserDescriptionBuilder.java @@ -53,6 +53,7 @@ protected void prepareBuild() { elementDescription.getName(), elementDescription.getDescription(), getStaticProperties()); + group.setHorizontalRendering(false); this.elementDescription.setConfig(group); } }