Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(#3355): Add new avro format parser #3358

Draft
wants to merge 7 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions streampipes-extensions-management/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.4</version>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@
import org.apache.streampipes.extensions.api.connect.IParser;
import org.apache.streampipes.messaging.InternalEventProcessor;

import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.io.ByteArrayInputStream;

public record BrokerEventProcessor(
IParser parser,
Expand All @@ -39,7 +38,7 @@ public record BrokerEventProcessor(
@Override
public void onEvent(byte[] payload) {
try {
parser.parse(IOUtils.toInputStream(new String(payload), StandardCharsets.UTF_8), collector::collect);
parser.parse(new ByteArrayInputStream(payload), collector::collect);
} catch (ParseException e) {
LOG.error("Error while parsing: " + e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.extensions.management.connect.adapter.parser;

import org.apache.streampipes.commons.exceptions.connect.ParseException;
import org.apache.streampipes.extensions.api.connect.IParser;
import org.apache.streampipes.extensions.api.connect.IParserEventHandler;
import org.apache.streampipes.model.connect.grounding.ParserDescription;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.staticproperty.StaticProperty;
import org.apache.streampipes.sdk.builder.adapter.ParserDescriptionBuilder;
import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Options;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.util.Utf8;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

public class AvroParser implements IParser {

private static final Logger LOG = LoggerFactory.getLogger(AvroParser.class);

public static final String ID = "org.apache.streampipes.extensions.management.connect.adapter.parser.avro";
public static final String LABEL = "Avro";
public static final String DESCRIPTION = "Can be used to read avro records";

public static final String SCHEMA = "schema";
public static final String SCHEMA_REGISTRY = "schemaRegistry";
public static final String FLATTEN_RECORDS = "flattenRecord";

private final ParserUtils parserUtils;
private DatumReader<GenericRecord> datumReader;
private boolean schemaRegistry;
private boolean flattenRecord;


public AvroParser() {
parserUtils = new ParserUtils();
}

public AvroParser(String schemaString, boolean schemaRegistry, boolean flattenRecord) {
this();
Schema schema = new Schema.Parser().parse(schemaString);
this.datumReader = new GenericDatumReader<>(schema);
this.schemaRegistry = schemaRegistry;
this.flattenRecord = flattenRecord;
}

@Override
public ParserDescription declareDescription() {
return ParserDescriptionBuilder.create(ID, LABEL, DESCRIPTION)
.requiredSingleValueSelection(
Labels.from(SCHEMA_REGISTRY, "Schema Registry",
"Does the messages include the schema registry header?"),
Options.from("yes", "no")
)
.requiredSingleValueSelection(
Labels.from(FLATTEN_RECORDS, "Flatten Records",
"Should nested records be flattened?"),
Options.from("no", "yes")
)
.requiredCodeblock(Labels.from(SCHEMA, "Schema",
"The schema of the avro record"),
"{\n"
+ " \"namespace\": \"example.avro\",\n"
+ " \"type\": \"record\",\n"
+ " \"name\": \"Test\",\n"
+ " \"fields\": [\n"
+ " {\"name\": \"id\", \"type\": \"string\"},\n"
+ " {\"name\": \"value\", \"type\": \"double\"}\n"
+ " ]\n"
+ "}")
.build();
}


@Override
public GuessSchema getGuessSchema(InputStream inputStream) throws ParseException {
GenericRecord avroRecord = getRecord(inputStream);
var event = toMap(avroRecord);
return parserUtils.getGuessSchema(event);
}

@Override
public void parse(InputStream inputStream, IParserEventHandler handler) throws ParseException {
GenericRecord avroRecord = getRecord(inputStream);
var event = toMap(avroRecord);
handler.handle(event);
}

@Override
public IParser fromDescription(List<StaticProperty> configuration) {
var extractor = StaticPropertyExtractor.from(configuration);
String schema = extractor.codeblockValue(SCHEMA);
boolean schemaRegistry = extractor
.selectedSingleValue(SCHEMA_REGISTRY, String.class)
.equals("yes");
boolean flattenRecords = extractor
.selectedSingleValue(FLATTEN_RECORDS, String.class)
.equals("yes");

return new AvroParser(schema, schemaRegistry, flattenRecords);
}

private GenericRecord getRecord(InputStream inputStream) throws ParseException {
try {
if (schemaRegistry) {
inputStream.skipNBytes(5);
}
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
return datumReader.read(null, decoder);
} catch (IOException e) {
throw new ParseException(
"Error decoding the avro message. Please check the schema."
);
}
}


private Map<String, Object> toMap(GenericRecord avroRecord) {
Map<String, Object> resultMap = new LinkedHashMap<>();
avroRecord.getSchema().getFields().forEach(field -> {
String fieldName = field.name();
Object fieldValue = avroRecord.get(fieldName);
if (flattenRecord && fieldValue instanceof GenericRecord){
Map<String, Object> flatMap = unwrapNestedRecord((GenericRecord) fieldValue, fieldName);
resultMap.putAll(flatMap);
} else {
resultMap.put(fieldName, toMapHelper(fieldValue));
}
});

return resultMap;
}

private Object toMapHelper(Object fieldValue) {
if (fieldValue instanceof GenericRecord){
return toMap((GenericRecord) fieldValue);
}
if (fieldValue instanceof GenericData.Array<?>){
List<Object> valueList = new ArrayList<>();
((GenericData.Array) fieldValue).forEach(value -> valueList.add(toMapHelper(value)));
return valueList;
}
if (fieldValue instanceof Map<?, ?>){
Map<Object, Object> valueMap = new LinkedHashMap<>();
((Map<Object, Object>) fieldValue).forEach((key, value1) -> valueMap.put(convertUTF8(key), toMapHelper(value1)));
return valueMap;
}
return convertUTF8(fieldValue);
}

private Map<String, Object> unwrapNestedRecord(GenericRecord nestedRecord, String prefix) {
Map<String, Object> flatMap = new HashMap<>();

nestedRecord.getSchema().getFields().forEach(field -> {
String fieldName = field.name();
Object fieldValue = nestedRecord.get(fieldName);
String newKey = prefix.isEmpty() ? fieldName : prefix + "_" + fieldName;
if (fieldValue instanceof GenericRecord) {
flatMap.putAll(unwrapNestedRecord((GenericRecord) fieldValue, newKey));
} else {
flatMap.put(newKey, toMapHelper(fieldValue));
}
});

return flatMap;
}

private Object convertUTF8(Object fieldValue) {
if (fieldValue instanceof Utf8){
return fieldValue.toString();
}
return fieldValue;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public static List<IParser> defaultParsers() {
new JsonParsers(),
new CsvParser(),
new XmlParser(),
new ImageParser()
new ImageParser(),
new AvroParser()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ private Consumer<byte[], byte[]> createConsumer(KafkaConfig kafkaConfig) throws
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaConfig.getKafkaHost() + ":" + kafkaConfig.getKafkaPort());

props.put(ConsumerConfig.GROUP_ID_CONFIG,
"KafkaExampleConsumer" + System.currentTimeMillis());
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGroupId());

props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 6000);

Expand Down Expand Up @@ -160,6 +159,10 @@ public IAdapterConfiguration declareConfig() {
.requiredTextParameter(KafkaConnectUtils.getHostLabel())
.requiredIntegerParameter(KafkaConnectUtils.getPortLabel())

.requiredAlternatives(KafkaConnectUtils.getConsumerGroupLabel(),
KafkaConnectUtils.getAlternativesRandomGroupId(),
KafkaConnectUtils.getAlternativesGroupId())

.requiredSlideToggle(KafkaConnectUtils.getHideInternalTopicsLabel(), true)

.requiredSingleValueSelectionFromContainer(KafkaConnectUtils.getTopicLabel(), Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,21 @@ public class KafkaConfig {
private String kafkaHost;
private Integer kafkaPort;
private String topic;
private String groupId;

KafkaSecurityConfig securityConfig;
AutoOffsetResetConfig autoOffsetResetConfig;

public KafkaConfig(String kafkaHost,
Integer kafkaPort,
String topic,
String groupId,
KafkaSecurityConfig securityConfig,
AutoOffsetResetConfig autoOffsetResetConfig) {
this.kafkaHost = kafkaHost;
this.kafkaPort = kafkaPort;
this.topic = topic;
this.groupId = groupId;
this.securityConfig = securityConfig;
this.autoOffsetResetConfig = autoOffsetResetConfig;
}
Expand Down Expand Up @@ -66,6 +69,14 @@ public void setTopic(String topic) {
this.topic = topic;
}

public String getGroupId() {
return groupId;
}

public void setGroupId(String groupId) {
this.groupId = groupId;
}

public KafkaSecurityConfig getSecurityConfig() {
return securityConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public class KafkaConnectUtils {
public static final String USERNAME_KEY = "username";
public static final String PASSWORD_KEY = "password";

public static final String CONSUMER_GROUP = "consumer-group";
public static final String RANDOM_GROUP_ID = "random-group-id";
public static final String GROUP_ID = "group-id";
public static final String GROUP_ID_INPUT = "group-id-input";


private static final String HIDE_INTERNAL_TOPICS = "hide-internal-topics";

Expand Down Expand Up @@ -88,6 +93,10 @@ public static Label getAccessModeLabel() {
return Labels.withId(ACCESS_MODE);
}

public static Label getConsumerGroupLabel() {
return Labels.withId(CONSUMER_GROUP);
}

public static Label getAutoOffsetResetConfigLabel() {
return Labels.withId(AUTO_OFFSET_RESET_CONFIG);
}
Expand Down Expand Up @@ -124,19 +133,26 @@ public static KafkaConfig getConfig(IStaticPropertyExtractor extractor, boolean
new KafkaSecurityUnauthenticatedPlainConfig();
}

String groupId;
if (extractor.selectedAlternativeInternalId(CONSUMER_GROUP).equals(RANDOM_GROUP_ID)){
groupId = "KafkaExampleConsumer" + System.currentTimeMillis();
} else {
groupId = extractor.singleValueParameter(GROUP_ID_INPUT, String.class);
}

StaticPropertyAlternatives alternatives = extractor.getStaticPropertyByName(AUTO_OFFSET_RESET_CONFIG,
StaticPropertyAlternatives.class);

// Set default value if no value is provided.
if (alternatives == null) {
AutoOffsetResetConfig autoOffsetResetConfig = new AutoOffsetResetConfig(KafkaConnectUtils.LATEST);

return new KafkaConfig(brokerUrl, port, topic, securityConfig, autoOffsetResetConfig);
return new KafkaConfig(brokerUrl, port, topic, groupId, securityConfig, autoOffsetResetConfig);
} else {
String auto = extractor.selectedAlternativeInternalId(AUTO_OFFSET_RESET_CONFIG);
AutoOffsetResetConfig autoOffsetResetConfig = new AutoOffsetResetConfig(auto);

return new KafkaConfig(brokerUrl, port, topic, securityConfig, autoOffsetResetConfig);
return new KafkaConfig(brokerUrl, port, topic, groupId, securityConfig, autoOffsetResetConfig);
}
}

Expand Down Expand Up @@ -172,6 +188,14 @@ public static StaticPropertyAlternative getAlternativesSaslSSL() {
StaticProperties.secretValue(Labels.withId(KafkaConnectUtils.PASSWORD_KEY))));
}

public static StaticPropertyAlternative getAlternativesRandomGroupId(){
return Alternatives.from(Labels.withId(RANDOM_GROUP_ID));
}

public static StaticPropertyAlternative getAlternativesGroupId(){
return Alternatives.from(Labels.withId(KafkaConnectUtils.GROUP_ID),
StaticProperties.stringFreeTextProperty(Labels.withId(KafkaConnectUtils.GROUP_ID_INPUT)));
}

public static StaticPropertyAlternative getAlternativesLatest() {
return Alternatives.from(Labels.withId(LATEST));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ sasl-ssl.description=Username and password, with ssl encryption

username-group.title=Username and password

consumer-group.title=Consumer Group
consumer-group.description=Use random group id or insert a specific one

random-group-id.title=Random group id
random-group-id.description=StreamPipes generates a random group id

group-id.title=Insert group id
group-id.description=Insert the group id

group-id-input.title=Group id
group-id-input.description=

key-deserialization.title=Key Deserializer
key-deserialization.description=

Expand Down
Loading
Loading