Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apache Flink & SchemaRegistry with GenericRecords and Avro4s #783

Open
andrelu opened this issue Jul 9, 2023 · 0 comments
Open

Apache Flink & SchemaRegistry with GenericRecords and Avro4s #783

andrelu opened this issue Jul 9, 2023 · 0 comments

Comments

@andrelu
Copy link

andrelu commented Jul 9, 2023

Hello forks, first thanks for this awesome library. Its help me build a lot of automation around Scala case classes and avro. BTW I know that this library is in maintenance mode; never than less I'm here with some doubts about it. Said that lets say that I have a Flink Job that outputs Data to many topic and each output is represented by a case class as given:

case class Topic1(attributeA: String, attributeB: String)
case class Topic2(attributeA: String, attributeB: String, attributeC: Int)
case class Topic3(attributeA: String, attributeB: String, attributeD: Long)

All my outputs are backed by Schema Registry so we can keep track of schemas outputted by our jobs. To output the data to Kafka with Schema registry I had to create a SerializationSchema that first converts the case class to GenericRecord and then use ConfluentRegistryAvroSerializationSchema to proper produce the data to Kafka. The problem is that I'm still facing serialization issues with the RecordEncoder class from avro4s. I have managed to handle org.apache.avro.Schema serialization issues but I could not find a way of handling this situation with RecordEncoder. here is how my SerializationSchema class is right now:

class CustomAvroSerializationSchema[T >: Null: SchemaFor: Encoder](subjectName: String, schemaRegistryUrl: String) extends SerializationSchema[T]:

  val schema = SchemaFor[T]
  val encoder = Encoder[T]

  private val confluentSerialization: ConfluentRegistryAvroSerializationSchema[GenericRecord] =
    ConfluentRegistryAvroSerializationSchema
      .forGeneric(
        subjectName,
        schema.schema,
        schemaRegistryUrl
      )

  override def open(context: SerializationSchema.InitializationContext): Unit =
    confluentSerialization.open(context)
    super.open(context)

  override def serialize(element: T): Array[Byte] =
    val record = encoder.encode(schema.schema).asInstanceOf[Record]
    confluentSerialization.serialize(record)

Question is: Is there any other way to use Avro4s with schemaregistry and Kafka serializers? I saw SerDe implementation but I could not figure out how to adapt to my situation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant