Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue]: Trouble with using Apicurio Schema Registry + Confluent SerDes #389

Closed
2 tasks done
CROprogrammer opened this issue Jun 14, 2023 · 21 comments · Fixed by #391
Closed
2 tasks done

[Issue]: Trouble with using Apicurio Schema Registry + Confluent SerDes #389

CROprogrammer opened this issue Jun 14, 2023 · 21 comments · Fixed by #391
Assignees

Comments

@CROprogrammer
Copy link

What happened?

Hi,
I'm trying to run Kafka producer that uses Apicurio schema registry and Confluent SerDes for serializing.
In Schema Registry Config Element I've defined that I uses Apicurio schema registry, and its URL http://apicurio-registry-croz-event-streaming.mos-cloud.lan.croz.net/. KLoadGen can connect to it and it retrieves existing schemas.
In KLG - Value Schema Serializer Config element for name strategy I've defined Confluent's RecordNameStrategy, and for serializer Confluent's KafkaAvroSerializer. I've also chosen which schema to use.

The problem is that when I use the combination of Apicurio schema registry and Confluent's SerDes I get an error and test plan can't be run. The error is that when using this combination, serializer is trying to fetch schema from http://apicurio-registry-croz-event-streaming.mos-cloud.lan.croz.net/subjects/test.FirstSchemaStreamingPOCTest/versions?normalize=false because it expects that Confluent schema registry is being used.
Serializer should try to fetch from http://apicurio-registry-croz-event-streaming.mos-cloud.lan.croz.net/apis/ccompat/v6/subjects/test.FirstSchemaStreamingPOCTest/versions?normalize=false
because Apicurio schema registry is being used, and it has compatible API /apis/ccompat/v6 when using it in combination with Confluent's SerDes.

Schema used is:

{
  "type": "record",
  "name": "FirstSchemaStreamingPOCTest",
  "namespace": "test",
  "fields": [
    {
      "name": "randomNumber",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "randomCode",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}

JMX testplan:
Kafka Producer Test Plan.zip

KloadGen Version

KLoadGen 5.6.3

Relevant log output

2023-06-14 11:42:15,158 ERROR o.a.j.t.JMeterThread: Error while processing sampler: 'Kafka Producer'.
org.apache.kafka.common.errors.SerializationException: Error registering Avro schema{"type":"record","name":"FirstSchemaStreamingPOCTest","namespace":"test","fields":[{"name":"randomNumber","type":["null","string"],"default":null},{"name":"randomCode","type":["null","string"],"default":null}]}
	at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:259) ~[kloadgen-5.6.3.jar:?]
	at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:156) ~[kloadgen-5.6.3.jar:?]
	at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:61) ~[kloadgen-5.6.3.jar:?]
	at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kloadgen-5.6.3.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:945) ~[kloadgen-5.6.3.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:905) ~[kloadgen-5.6.3.jar:?]
	at com.sngular.kloadgen.sampler.KafkaProducerSampler.runTest(KafkaProducerSampler.java:149) ~[kloadgen-5.6.3.jar:?]
	at org.apache.jmeter.protocol.java.sampler.JavaSampler.sample(JavaSampler.java:197) ~[ApacheJMeter_java.jar:5.5]
	at org.apache.jmeter.threads.JMeterThread.doSampling(JMeterThread.java:651) ~[ApacheJMeter_core.jar:5.5]
	at org.apache.jmeter.threads.JMeterThread.executeSamplePackage(JMeterThread.java:570) ~[ApacheJMeter_core.jar:5.5]
	at org.apache.jmeter.threads.JMeterThread.processSampler(JMeterThread.java:501) ~[ApacheJMeter_core.jar:5.5]
	at org.apache.jmeter.threads.JMeterThread.run(JMeterThread.java:268) ~[ApacheJMeter_core.jar:5.5]
	at java.lang.Thread.run(Thread.java:829) ~[?:?]
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: RESTEASY003210: Could not find resource for full path: http://apicurio-registry-croz-event-streaming.mos-cloud.lan.croz.net/subjects/test.FirstSchemaStreamingPOCTest/versions?normalize=false; error code: 404
	at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297) ~[kloadgen-5.6.3.jar:?]
	at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:367) ~[kloadgen-5.6.3.jar:?]
	at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:544) ~[kloadgen-5.6.3.jar:?]
	at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:532) ~[kloadgen-5.6.3.jar:?]
	at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:490) ~[kloadgen-5.6.3.jar:?]
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:257) ~[kloadgen-5.6.3.jar:?]
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:366) ~[kloadgen-5.6.3.jar:?]
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:337) ~[kloadgen-5.6.3.jar:?]
	at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:115) ~[kloadgen-5.6.3.jar:?]
	... 11 more

Have you added your JMX Testplan or a replica?

  • I have added a JMX testplan

Have you added your Schema or a replica?

  • I have added the Schema
@github-actions
Copy link

Thank you for collaborating with the project by giving us feedback! In case it applies, it would be helpful if you could attach an example AVRO and an example test plan for us to reproduce the error. Cheers!

@AdrianLagartera AdrianLagartera self-assigned this Jun 14, 2023
@AdrianLagartera
Copy link
Member

Thanks for collaborating with us, I will take a look at it and contact you as soon as we have a solution.

@CROprogrammer
Copy link
Author

Hi, is there any progress with this problem? Thanks :)

@AdrianLagartera
Copy link
Member

Hi @CROprogrammer currently I am testing a solution in the branch attached, if you want you can test it yourself.

We check a missing strategies from Apicurio which could solve your problem, but the initial petition wasn't an option just for the increment on the complexity in our application, because we need check every situation where someone wants use a different combination between serializations and strategies, as you can the combinations are high, and that not the purpose of KloadGen.

So we recommend you use only the serializers and strategies related from the source of the schema registry instead of mix them

@CROprogrammer
Copy link
Author

In project I've worked for, we've used Apicurio Schema Registry and Confluent SerDes exactly because of Apicurio's compatible API (if Apicurio would not be sufficient for our needs someday, so that we can switch to Confluent's schema registry if needed). We wanted to performance test our Kafka clients and system with KLoadGen.

So using this combination excludes KLoadGen from performance testing system with Apicurio Schema Registry and Confluent SerDes?

@AdrianLagartera
Copy link
Member

AdrianLagartera commented Jun 27, 2023

It is not that it is not compatible, the problem is that the serialization that you want to make io.confluent.kafka.serializers.kafkaavroserializer cannot be used at the same time that apicurio, because the implementation that provides us that serializer forces us to look in a schema registry of confluent to find the subject of the schema. Literally with any other serialization it generates you the message with its filled fields without any problem, but it would not be the specific serialization of confuent.

Anyway I think there should not be any problem with the other serializations. Try to use for example the com.sngular.kloadgen.serializer.GenericAvroRecordSerializer as the image below

Captura de pantalla 2023-06-27 a las 12 45 35

That give me the following result

Captura de pantalla 2023-06-27 a las 12 46 12

I think with this you can do your tests without problems, if not please let me know and we check again if another solution is posible

P.S. If its necessary use exactly io.confluent.kafka.serializers.kafkaavroserializer we will need more time to apply the logic for that.

@CROprogrammer
Copy link
Author

Okay thanks. I just wanted to make sure because I will send events to topic through KLoadGen, and my consumer will consume messages written in that topic. But because consumer uses Confluent's SerDes, it would not be able to read messages written with another serializer.

@AdrianLagartera
Copy link
Member

I think you can test it, because I should be any difference between other serializers and Confluent's SerDes, basically you receive a message generated by Avro Structure, so Confluent's SerDes should work, basically the SerDes of Confluent just check your registry and check if have the correct format.

@CROprogrammer
Copy link
Author

There is a difference which makes using Apicurio's Serde in KLoadGen and Confluent's SerDes in Kafka consumer impossible. For example Apicurio uses 8 bytes to store schema ID in serialized Kafka message, while Confluent uses 4 bytes.

@CROprogrammer
Copy link
Author

CROprogrammer commented Jun 28, 2023

Is there any possibility that we could configure used Apicurio Serde in KLoadGen that it uses 4 bytes to store ID instead of 8.

There is possiblity to configure that in Apicurio Serde:

io.apicurio.registry.serde.DefaultIdHandler: Stores the ID as an 8-byte long

io.apicurio.registry.serde.Legacy4ByteIdHandler: Stores the ID as an 4-byte integer

And then using Confluent SerDes in Kafka clients would be possible.
What do you think about it?

@AdrianLagartera
Copy link
Member

Is there any possibility that we could configure used Apicurio Serde in KLoadGen that it uses 4 bytes to store ID instead of 8.

There is possiblity to do that in Apicurio Serde:

io.apicurio.registry.serde.DefaultIdHandler: Stores the ID as an 8-byte long

io.apicurio.registry.serde.Legacy4ByteIdHandler: Stores the ID as an 4-byte integer

And then using Confluent SerDes in Kafka clients would be possible. What do you think about it?

Well... Right now I am testing that because I implemented that a few hours early hahaha, in a moment I will upload the changes and explain you what solution we have thought of. And thank you again for the contribution.

@CROprogrammer
Copy link
Author

hahah great then, happy to hear that :D

@AdrianLagartera
Copy link
Member

Well, to explain a little what we did. As you comment a few minutes early was give the chance to choose the IdHandler. At the beginning when we think about the use of the Confluent Serializer way to mix Apicurio with Confluent was a little hard... so this solution is a really good one, and Apicurio gives the support so... good for everybody.

On the other hand, the way to enable it is by a new property of the Java Request (where all the properties of the producer are set).

Captura de pantalla 2023-06-28 a las 16 27 53

By default the property is on "NO" just change it to "YES" and that will change the default ID of 8bytes to 4bytes. I hope that solutions solve your problem, you have the changes in the attached branch, try it and if everything works fine, please let us know to close the issue.

@CROprogrammer
Copy link
Author

The solution worked for me, but I had to change one thing, I disabled using headers in Apicurio Serde:

if (context.getParameter(ProducerKeysHelper.APICURIO_LEGACY_ID_HANDLER).equals(ProducerKeysHelper.FLAG_YES)) {
//      props.put(SerdeConfig.ENABLE_HEADERS, "true");
      props.put(SerdeConfig.ENABLE_HEADERS, "false");
      props.put(SerdeConfig.ID_HANDLER, Legacy4ByteIdHandler.class.getName());
}

I'm not sure if Confluent SerDes can be used when schema ID is being sent in Kafka message headers, and it would be great if you could add another config in JMeter or somewhere so that using headers could be enabled/disabled.

@AdrianLagartera
Copy link
Member

First of all, I am glad of hear works fine.

And reading the documentation I interpreted as the headers was necessary to work with the ID_HANDLER property, but looks like not necessary at all. And as you suggested I will put a property for that.

@CROprogrammer
Copy link
Author

Hi,

I've noticed that you've added:

if (context.getParameter(ProducerKeysHelper.APICURIO_ENABLE_HEADERS_ID).equals(ProducerKeysHelper.FLAG_YES)) {
      props.put(SerdeConfig.ENABLE_HEADERS, "true");
}

and headers are enabled by default. So it would be true it either case. Maybe check if flag is NO, and then disable headers.

image

@AdrianLagartera
Copy link
Member

Mmm yes, you're right, I dont know why the put the other part of the documentation about if the header is absent at true... a bit confuse tbh. Thanks again for put the information.

@CROprogrammer
Copy link
Author

Yeah that caused me problems in past, very confusing part of documentation.

@AdrianLagartera
Copy link
Member

Well at least I am not alone... haha. Now the changes are up.

@CROprogrammer
Copy link
Author

It's all good now, but maybe a bit confusing that you have to put apicurio.avro.header.id: YES to disable it, would expect to but NO. Just my suggestion.

@AdrianLagartera
Copy link
Member

Yes, I agree with that suggestion after check a few doubts about the default behavior. Apparently they use always the header enabled by default. So doesn't matter put at true if by default the put it. At the beginning I was thinking about we need manage the status of that by ourselves. Anyway, I will upload the change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants