Skip to content

Commit

Permalink
#332 Removed generator set up by TopicNameStrategy
Browse files Browse the repository at this point in the history
  • Loading branch information
dhergonsngular committed Mar 27, 2023
1 parent 4858372 commit 0fd3920
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,17 +183,21 @@ public static Descriptors.Descriptor buildProtoDescriptor(final ProtoFileElement

return schemaBuilder.build().getMessageDescriptor(messageElement.getName());
}
private static String getSubjectName(String importedClass, Object metadata) {
List<SchemaReference> references = ((SchemaMetadata) metadata).getReferences();

for(final SchemaReference schemaReference: references){
if(schemaReference.getName().equals(importedClass)){
return schemaReference.getSubject();
private static String getSubjectName(final String importedClass, final Object metadata) {
final List<SchemaReference> references = ((SchemaMetadata) metadata).getReferences();
String subjectName = null;

for (final SchemaReference schemaReference : references) {
if (schemaReference.getName().equals(importedClass)) {
subjectName = schemaReference.getSubject();
break;
}
}

return importedClass;
return Objects.requireNonNullElse(subjectName, importedClass);
}

private static DynamicSchema convertDynamicSchema(final ProtobufSchema importSchema) throws DescriptorValidationException {
return processImported(Arrays.asList(importSchema.rawSchema().toSchema().split("\\n")));
}
Expand Down Expand Up @@ -258,8 +262,8 @@ private static String checkIfGreppable(final String field) {
}

private static MessageDefinition buildProtoMessageDefinition(
final String fieldName, final TypeElement messageElement, final HashMap<Integer, HashMap<String, HashMap<String, TypeElement>>> globalNestedTypesByLevelAndMessage,
final int previousDeepLevel) {
final String fieldName, final TypeElement messageElement, final HashMap<Integer, HashMap<String, HashMap<String, TypeElement>>> globalNestedTypesByLevelAndMessage,
final int previousDeepLevel) {

final int nextDeepLevel = previousDeepLevel + 1;

Expand All @@ -275,11 +279,11 @@ private static MessageDefinition buildProtoMessageDefinition(
}

private static void extracted(
final HashMap<Integer, HashMap<String, HashMap<String, TypeElement>>> globalNestedTypesByLevelAndMessage, final Builder msgDef, final List<FieldElement> fieldElementList,
final int deepLevel, final String messageName) {
final HashMap<Integer, HashMap<String, HashMap<String, TypeElement>>> globalNestedTypesByLevelAndMessage, final Builder msgDef, final List<FieldElement> fieldElementList,
final int deepLevel, final String messageName) {

final HashMap<String, TypeElement> nestedTypes = processLevelTypes(globalNestedTypesByLevelAndMessage, msgDef, fieldElementList, deepLevel,
messageName);
messageName);

for (final var elementField : fieldElementList) {
final var elementFieldType = elementField.getType();
Expand Down Expand Up @@ -322,7 +326,7 @@ private static void extracted(
msgDef.addField("repeated", "typemapnumber" + elementField.getName(), elementField.getName(), elementField.getTag());

msgDef.addMessageDefinition(
MessageDefinition.newBuilder("typemapnumber" + elementField.getName()).addField(OPTIONAL, "string", "key", 1).addField(OPTIONAL, realType, "value", 2).build());
MessageDefinition.newBuilder("typemapnumber" + elementField.getName()).addField(OPTIONAL, "string", "key", 1).addField(OPTIONAL, realType, "value", 2).build());
} else if (Objects.nonNull(elementField.getLabel())) {
msgDef.addField(elementField.getLabel().toString().toLowerCase(), elementField.getType(), elementField.getName(), elementField.getTag());
} else {
Expand All @@ -332,9 +336,9 @@ private static void extracted(
}

private static HashMap<String, TypeElement> processLevelTypes(
final HashMap<Integer, HashMap<String, HashMap<String, TypeElement>>> globalNestedTypesByLevelAndMessage, final Builder msgDef, final List<FieldElement> fieldElementList,
final int deepLevel,
final String messageName) {
final HashMap<Integer, HashMap<String, HashMap<String, TypeElement>>> globalNestedTypesByLevelAndMessage, final Builder msgDef, final List<FieldElement> fieldElementList,
final int deepLevel,
final String messageName) {

final List<String> allTypesInstantiatedByAttributesWithSimpleNames = new ArrayList<>();
for (final FieldElement fieldElement : fieldElementList) {
Expand Down Expand Up @@ -367,8 +371,8 @@ private static HashMap<String, TypeElement> processLevelTypes(
}

private static void addDefinition(
final MessageDefinition.Builder msgDef, final String typeName, final TypeElement typeElement,
final HashMap<Integer, HashMap<String, HashMap<String, TypeElement>>> globalNestedTypesByLevelAndMessage, final int deepLevel) {
final MessageDefinition.Builder msgDef, final String typeName, final TypeElement typeElement,
final HashMap<Integer, HashMap<String, HashMap<String, TypeElement>>> globalNestedTypesByLevelAndMessage, final int deepLevel) {

if (typeElement instanceof EnumElement) {
final var enumElement = (EnumElement) typeElement;
Expand All @@ -385,8 +389,8 @@ private static void addDefinition(
}

private static void fillNestedTypes(
final TypeElement messageElement, final HashMap<Integer, HashMap<String, HashMap<String, TypeElement>>> globalNestedTypesByLevelAndMessage,
final int deepLevel) {
final TypeElement messageElement, final HashMap<Integer, HashMap<String, HashMap<String, TypeElement>>> globalNestedTypesByLevelAndMessage,
final int deepLevel) {

HashMap<String, HashMap<String, TypeElement>> messageNestedTypes = globalNestedTypesByLevelAndMessage.get(deepLevel);
if (messageNestedTypes == null) {
Expand Down
50 changes: 20 additions & 30 deletions src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public static Properties setupCommonProperties(final JavaSamplerContext context)

if (Objects.nonNull(context.getParameter(SchemaRegistryKeyHelper.ENABLE_AUTO_SCHEMA_REGISTRATION_CONFIG))) {
props.put(SchemaRegistryKeyHelper.ENABLE_AUTO_SCHEMA_REGISTRATION_CONFIG,
context.getParameter(SchemaRegistryKeyHelper.ENABLE_AUTO_SCHEMA_REGISTRATION_CONFIG));
context.getParameter(SchemaRegistryKeyHelper.ENABLE_AUTO_SCHEMA_REGISTRATION_CONFIG));
}
final Iterator<String> parameters = context.getParameterNamesIterator();
parameters.forEachRemaining(parameter -> {
Expand Down Expand Up @@ -178,7 +178,7 @@ public static Arguments getCommonConsumerDefaultParameters() {
defaultParameters.addArgument(ConsumerConfig.SECURITY_PROVIDERS_CONFIG, "");
defaultParameters.addArgument(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS);
defaultParameters.addArgument(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,
SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM);
SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM);
defaultParameters.addArgument(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM);
defaultParameters.addArgument(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE);
defaultParameters.addArgument(SslConfigs.SSL_PROVIDER_CONFIG, "");
Expand Down Expand Up @@ -300,9 +300,9 @@ private static void verifySecurity(final JavaSamplerContext context, final Prope
props.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, context.getParameter(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG));

props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,
propertyOrDefault(context.getParameter(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG),
ProducerKeysHelper.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
""));
propertyOrDefault(context.getParameter(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG),
ProducerKeysHelper.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
""));

props.put(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, context.getParameter(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG));
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, context.getParameter(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG));
Expand Down Expand Up @@ -338,7 +338,7 @@ public static BaseLoadGenerator configureValueGenerator(final Properties props)
}

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Objects.requireNonNullElse(jMeterVariables.get(PropsKeysHelper.VALUE_SERIALIZER_CLASS_PROPERTY), ProducerKeysHelper.VALUE_SERIALIZER_CLASS_CONFIG_DEFAULT));
Objects.requireNonNullElse(jMeterVariables.get(PropsKeysHelper.VALUE_SERIALIZER_CLASS_PROPERTY), ProducerKeysHelper.VALUE_SERIALIZER_CLASS_CONFIG_DEFAULT));

if (Objects.nonNull(jMeterVariables.get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL))) {
final Map<String, String> originals = new HashMap<>();
Expand All @@ -347,29 +347,19 @@ public static BaseLoadGenerator configureValueGenerator(final Properties props)
props.putAll(originals);

try {
if (props.getProperty(ProducerKeysHelper.VALUE_NAME_STRATEGY).equals(NamingStrategyKeyHelper.TOPIC_NAME_STRATEGY)) {
generator.setUpGenerator(originals, props.getProperty(ProducerKeysHelper.KAFKA_TOPIC_CONFIG),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
} else {
generator.setUpGenerator(
originals,
jMeterVariables.get(PropsKeysHelper.VALUE_SUBJECT_NAME),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
}

generator.setUpGenerator(originals, jMeterVariables.get(PropsKeysHelper.VALUE_SUBJECT_NAME),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
} catch (final KLoadGenException exc) {
if (Objects.nonNull(props.get(SchemaRegistryKeyHelper.ENABLE_AUTO_SCHEMA_REGISTRATION_CONFIG))) {
generator.setUpGenerator(
jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
generator.setUpGenerator(jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
} else {
throw exc;
}
}
} else {
generator.setUpGenerator(
jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
generator.setUpGenerator(jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
}

return generator;
Expand Down Expand Up @@ -404,7 +394,7 @@ public static BaseLoadGenerator configureKeyGenerator(final Properties props) {
}

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Objects.requireNonNullElse(jMeterVariables.get(PropsKeysHelper.VALUE_SERIALIZER_CLASS_PROPERTY), ProducerKeysHelper.KEY_SERIALIZER_CLASS_CONFIG_DEFAULT));
Objects.requireNonNullElse(jMeterVariables.get(PropsKeysHelper.VALUE_SERIALIZER_CLASS_PROPERTY), ProducerKeysHelper.KEY_SERIALIZER_CLASS_CONFIG_DEFAULT));

if (Objects.nonNull(jMeterVariables.get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL))) {
final Map<String, String> originals = new HashMap<>();
Expand All @@ -423,13 +413,13 @@ public static BaseLoadGenerator configureKeyGenerator(final Properties props) {
props.putAll(originals);

generator.setUpGenerator(
originals,
jMeterVariables.get(PropsKeysHelper.KEY_SUBJECT_NAME),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.KEY_SCHEMA_PROPERTIES));
originals,
jMeterVariables.get(PropsKeysHelper.KEY_SUBJECT_NAME),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.KEY_SCHEMA_PROPERTIES));
} else {
generator.setUpGenerator(
jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.KEY_SCHEMA_PROPERTIES));
jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA),
(List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.KEY_SCHEMA_PROPERTIES));
}

return generator;
Expand All @@ -439,8 +429,8 @@ public static List<String> populateHeaders(final List<HeaderMapping> kafkaHeader
final List<String> headersSB = new ArrayList<>();
for (final HeaderMapping kafkaHeader : kafkaHeaders) {
final String headerValue = STATELESS_GENERATOR_TOOL.generateObject(kafkaHeader.getHeaderName(), kafkaHeader.getHeaderValue(),
10,
Collections.emptyList()).toString();
10,
Collections.emptyList()).toString();
headersSB.add(kafkaHeader.getHeaderName().concat(":").concat(headerValue));
producerRecord.headers().add(kafkaHeader.getHeaderName(), headerValue.getBytes(StandardCharsets.UTF_8));
}
Expand Down

0 comments on commit 0fd3920

Please sign in to comment.