Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

332 comsngularkloadgenserializerprotobufserializer nullpointerexception #355

Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/schemas.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,23 @@ The same applies when there is more than one level. The last child in the last l
The other exception applies to JSON Schema: in order to support `null`values, the collections within objects (type : object) cannot be null. Therefore, they will be **required by default** and they will be **initialized as an empty collection** if the object than contains them is not null.

> Within JSON Schema, a maximum of **2 nesting levels** is allowed.


### Protobuf Schema - Schema Registry

If you need use the Protobuf Schema with the Schema Registry, you must put the subject with the same name as the protobuf file, as the following example:

```
{
"schema": "...",
"schemaType": "PROTOBUF",
"references": [
{
"name": "[the_name_you_want].proto",
"subject": "[the_name_you_want]",
"version": ...
}]
}

```
> This example is based on a petition from Schema Registry
2 changes: 1 addition & 1 deletion pom-maven-central.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<artifactId>kloadgen</artifactId>

<version>5.4.1</version>
<version>5.4.2</version>

<name>KLoadGen</name>
<description>Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial
Expand Down
13 changes: 12 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<artifactId>kloadgen</artifactId>

<version>5.4.1</version>
<version>5.4.2</version>

<name>KLoadGen</name>
<description>Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial
Expand Down Expand Up @@ -235,6 +235,17 @@
</roles>
<timezone>Europe/Madrid</timezone>
</developer>
<developer>
<id>alfredo9f</id>
<name>Alfredo González</name>
<email>[email protected]</email>
<organization>Sngular</organization>
<organizationUrl>https://www.sngular.com</organizationUrl>
<roles>
<role>Senior Backend Developer</role>
</roles>
<timezone>Europe/Madrid</timezone>
</developer>
</developers>

<scm>
Expand Down
4 changes: 2 additions & 2 deletions schema_registry_docker/docker-compose-noauth.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ services:
environment:
AUTH_ENABLED: false
kafka-manager:
image: kafkamanager/kafka-manager
image: iunera/cmak
depends_on:
- kafka
- zookeeper
ports:
- 9000:9000
environment:
ZK_HOSTS: http://zookeper:2181
ZK_HOSTS: zookeeper:2181
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ public class ProtobufObjectCreatorFactory implements ObjectCreatorFactory {

public ProtobufObjectCreatorFactory(final Object schema, final Object metadata) throws DescriptorValidationException, IOException {
if (schema instanceof ParsedSchema) {
this.schema = SchemaProcessorUtils.buildProtoDescriptor((ProtoFileElement) ((ParsedSchema) schema).rawSchema());
this.schema = SchemaProcessorUtils.buildProtoDescriptor((ProtoFileElement) ((ParsedSchema) schema).rawSchema(), metadata);
} else if (schema instanceof ProtoFileElement) {
this.schema = SchemaProcessorUtils.buildProtoDescriptor((ProtoFileElement) schema);
this.schema = SchemaProcessorUtils.buildProtoDescriptor((ProtoFileElement) schema, metadata);
} else {
throw new KLoadGenException("Unsupported schema type");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,21 @@
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.sngular.kloadgen.model.FieldValueMapping;
import com.sngular.kloadgen.util.JMeterHelper;
import com.sngular.kloadgen.util.ProtobufHelper;
import com.squareup.wire.schema.internal.parser.EnumElement;
import com.squareup.wire.schema.internal.parser.FieldElement;
import com.squareup.wire.schema.internal.parser.MessageElement;
import com.squareup.wire.schema.internal.parser.ProtoFileElement;
import com.squareup.wire.schema.internal.parser.TypeElement;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.Predicate;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.jmeter.threads.JMeterContextService;

public class SchemaProcessorUtils {

Expand Down Expand Up @@ -142,11 +147,11 @@ public static String[] splitAndNormalizeFullFieldName(final String fullFieldName
return Arrays.stream(fields).map(field -> field.replaceAll("\\[.*]", "")).toArray(String[]::new);
}

public static Descriptors.Descriptor buildProtoDescriptor(final ProtoFileElement schema) throws Descriptors.DescriptorValidationException, IOException {
public static Descriptors.Descriptor buildProtoDescriptor(final ProtoFileElement schema, final Object metadata) throws Descriptors.DescriptorValidationException, IOException {

final DynamicSchema.Builder schemaBuilder = DynamicSchema.newBuilder();
final List<String> imports = schema.getImports();
for (String importedClass : imports) {
for (final String importedClass : imports) {
try (final InputStream resourceStream = SchemaProcessorUtils.class.getClassLoader().getResourceAsStream(importedClass)) {
if (null != resourceStream) {
final String schemaToString = new String(resourceStream.readAllBytes());
Expand All @@ -156,6 +161,12 @@ public static Descriptors.Descriptor buildProtoDescriptor(final ProtoFileElement
schemaBuilder.addDependency(importedSchema.getFileDescriptorSet().getFile(0).getName());
schemaBuilder.addSchema(importedSchema);
}
} else {
final var importSchema = JMeterHelper.getParsedSchema(getSubjectName(importedClass, metadata), JMeterContextService.getContext().getProperties());
if (!ProtobufHelper.NOT_ACCEPTED_IMPORTS.contains(importedClass)) {
schemaBuilder.addDependency(((ProtobufSchema) importSchema).toDescriptor().getFullName());
schemaBuilder.addSchema(convertDynamicSchema((ProtobufSchema) importSchema));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not an error, but I think it would be cleaner to create a variable to hold the value of importSchema because that way you avoid to do the cast several times.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

}
}
}
}
Expand All @@ -173,6 +184,24 @@ public static Descriptors.Descriptor buildProtoDescriptor(final ProtoFileElement
return schemaBuilder.build().getMessageDescriptor(messageElement.getName());
}

private static String getSubjectName(final String importedClass, final Object metadata) {
final List<SchemaReference> references = ((SchemaMetadata) metadata).getReferences();
String subjectName = null;

for (final SchemaReference schemaReference : references) {
if (schemaReference.getName().equals(importedClass)) {
subjectName = schemaReference.getSubject();
break;
}
}

return Objects.requireNonNullElse(subjectName, importedClass);
}

private static DynamicSchema convertDynamicSchema(final ProtobufSchema importSchema) throws DescriptorValidationException {
return processImported(Arrays.asList(importSchema.rawSchema().toSchema().split("\\n")));
}

private static Predicate<String> isValid() {
return line -> !line.contains("//") && !line.isEmpty();
}
Expand All @@ -184,7 +213,7 @@ private static DynamicSchema processImported(final List<String> importedLines) t
String packageName = "";
final var linesIterator = importedLines.listIterator();
while (linesIterator.hasNext()) {
final var fileLine = linesIterator.next();
final var fileLine = linesIterator.next().trim();

if (fileLine.startsWith("package")) {
packageName = StringUtils.chop(fileLine.substring(7).trim());
Expand All @@ -211,9 +240,11 @@ private static MessageDefinition buildMessage(final String messageName, final Li
while (messageLines.hasNext() && !exit) {
final var field = messageLines.next().trim().split("\\s");
if (ProtobufHelper.isValidType(field[0])) {
messageDefinition.addField(OPTIONAL, field[0], field[1], Integer.parseInt(checkIfChoppable(field[3])));
messageDefinition.addField(OPTIONAL, field[0], field[1], Integer.parseInt(checkIfGreppable(field[3])));
} else if (ProtobufHelper.LABEL.contains(field[0])) {
messageDefinition.addField(field[0], field[1], field[2], Integer.parseInt(checkIfChoppable(field[4])));
messageDefinition.addField(field[0], field[1], field[2], Integer.parseInt(checkIfGreppable(field[4])));
} else if ("message".equalsIgnoreCase(field[0])) {
messageDefinition.addMessageDefinition(buildMessage(field[1], messageLines));
} else if ("}".equalsIgnoreCase(field[0])) {
exit = true;
}
Expand All @@ -222,7 +253,7 @@ private static MessageDefinition buildMessage(final String messageName, final Li
return messageDefinition.build();
}

private static String checkIfChoppable(final String field) {
private static String checkIfGreppable(final String field) {
String choppedField = field;
if (field.endsWith(";")) {
choppedField = StringUtils.chop(field);
Expand All @@ -231,8 +262,8 @@ private static String checkIfChoppable(final String field) {
}

private static MessageDefinition buildProtoMessageDefinition(
final String fieldName, final TypeElement messageElement, final HashMap<Integer, HashMap<String, HashMap<String, TypeElement>>> globalNestedTypesByLevelAndMessage,
final int previousDeepLevel) {
final String fieldName, final TypeElement messageElement, final HashMap<Integer, HashMap<String, HashMap<String, TypeElement>>> globalNestedTypesByLevelAndMessage,
final int previousDeepLevel) {

final int nextDeepLevel = previousDeepLevel + 1;

Expand All @@ -241,20 +272,20 @@ private static MessageDefinition buildProtoMessageDefinition(
final MessageDefinition.Builder msgDef = MessageDefinition.newBuilder(fieldName);
final var element = (MessageElement) messageElement;
extracted(globalNestedTypesByLevelAndMessage, msgDef, element.getFields(), nextDeepLevel, fieldName);
for (var optionalField : element.getOneOfs()) {
for (final var optionalField : element.getOneOfs()) {
extracted(globalNestedTypesByLevelAndMessage, msgDef, optionalField.getFields(), nextDeepLevel, fieldName);
}
return msgDef.build();
}

private static void extracted(
final HashMap<Integer, HashMap<String, HashMap<String, TypeElement>>> globalNestedTypesByLevelAndMessage, final Builder msgDef, final List<FieldElement> fieldElementList,
final int deepLevel, final String messageName) {
final HashMap<Integer, HashMap<String, HashMap<String, TypeElement>>> globalNestedTypesByLevelAndMessage, final Builder msgDef, final List<FieldElement> fieldElementList,
final int deepLevel, final String messageName) {

final HashMap<String, TypeElement> nestedTypes = processLevelTypes(globalNestedTypesByLevelAndMessage, msgDef, fieldElementList, deepLevel,
messageName);
messageName);
5uso-sng marked this conversation as resolved.
Show resolved Hide resolved

for (var elementField : fieldElementList) {
for (final var elementField : fieldElementList) {
final var elementFieldType = elementField.getType();
final var dotType = checkDotType(elementFieldType);
if (nestedTypes.containsKey(elementFieldType)) {
Expand Down Expand Up @@ -295,7 +326,7 @@ private static void extracted(
msgDef.addField("repeated", "typemapnumber" + elementField.getName(), elementField.getName(), elementField.getTag());

msgDef.addMessageDefinition(
MessageDefinition.newBuilder("typemapnumber" + elementField.getName()).addField(OPTIONAL, "string", "key", 1).addField(OPTIONAL, realType, "value", 2).build());
MessageDefinition.newBuilder("typemapnumber" + elementField.getName()).addField(OPTIONAL, "string", "key", 1).addField(OPTIONAL, realType, "value", 2).build());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You use the value of "typemapnumber" several times, so it would be nice to create a variable with that value.

} else if (Objects.nonNull(elementField.getLabel())) {
msgDef.addField(elementField.getLabel().toString().toLowerCase(), elementField.getType(), elementField.getName(), elementField.getTag());
} else {
Expand All @@ -305,9 +336,9 @@ private static void extracted(
}

private static HashMap<String, TypeElement> processLevelTypes(
final HashMap<Integer, HashMap<String, HashMap<String, TypeElement>>> globalNestedTypesByLevelAndMessage, final Builder msgDef, final List<FieldElement> fieldElementList,
final int deepLevel,
final String messageName) {
final HashMap<Integer, HashMap<String, HashMap<String, TypeElement>>> globalNestedTypesByLevelAndMessage, final Builder msgDef, final List<FieldElement> fieldElementList,
final int deepLevel,
final String messageName) {

final List<String> allTypesInstantiatedByAttributesWithSimpleNames = new ArrayList<>();
for (final FieldElement fieldElement : fieldElementList) {
Expand Down Expand Up @@ -340,13 +371,13 @@ private static HashMap<String, TypeElement> processLevelTypes(
}

private static void addDefinition(
final MessageDefinition.Builder msgDef, final String typeName, final TypeElement typeElement,
final HashMap<Integer, HashMap<String, HashMap<String, TypeElement>>> globalNestedTypesByLevelAndMessage, final int deepLevel) {
final MessageDefinition.Builder msgDef, final String typeName, final TypeElement typeElement,
final HashMap<Integer, HashMap<String, HashMap<String, TypeElement>>> globalNestedTypesByLevelAndMessage, final int deepLevel) {

if (typeElement instanceof EnumElement) {
final var enumElement = (EnumElement) typeElement;
final EnumDefinition.Builder builder = EnumDefinition.newBuilder(enumElement.getName());
for (var constant : enumElement.getConstants()) {
for (final var constant : enumElement.getConstants()) {
builder.addValue(constant.getName(), constant.getTag());
}
msgDef.addEnumDefinition(builder.build());
Expand All @@ -358,8 +389,8 @@ private static void addDefinition(
}

private static void fillNestedTypes(
final TypeElement messageElement, final HashMap<Integer, HashMap<String, HashMap<String, TypeElement>>> globalNestedTypesByLevelAndMessage,
final int deepLevel) {
final TypeElement messageElement, final HashMap<Integer, HashMap<String, HashMap<String, TypeElement>>> globalNestedTypesByLevelAndMessage,
final int deepLevel) {

HashMap<String, HashMap<String, TypeElement>> messageNestedTypes = globalNestedTypesByLevelAndMessage.get(deepLevel);
if (messageNestedTypes == null) {
Expand Down
Loading