Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nakadi Avro publishing #370

Open
adyach opened this issue Aug 18, 2022 · 5 comments
Open

Nakadi Avro publishing #370

adyach opened this issue Aug 18, 2022 · 5 comments

Comments

@adyach
Copy link
Contributor

adyach commented Aug 18, 2022

Since recently, Nakadi supports publishing in Apache Avro format(zalando/nakadi#1430). This issue is the request for discussion and possible approaches to implement it in nakadi-java.

Nakadi Avro Envelope enables to send binary events through Nakadi in a form of Avro schema defined envelopes consisting metadata and events. Every published events batch has to be wrapped in PublishingBatch. The schemas can be loaded using https://nakadi.io/manual.html#/avro-schemas/name/versions_get. Name can be either batch.publishing or batch.consumption.

For publishing events, the event type with the schema type avro_schema has to be created first.

The client has to support the following mode of operation:

  • the client instance is built during start up and should make sure that the event type has a schema which user defined. In order to fulfil the requirement, the client has to check the provided shema in Nakadi using the following endpoint by providing the schema. In response it will receive the version of the schema if exists otherwise it should fail to start, because unknown schema is used.
  • the returned version of the schema then has to be used in metadata version field.

https://github.com/zalando/nakadi/wiki/Nakadi-Avro-Envelope#nakadi-avro-envelope

@adyach
Copy link
Contributor Author

adyach commented Sep 1, 2022

@dehora I had a look at the way events published right now in the library, I would like to suggest to have separate resource for Avro events, because the resource interface does not match with the current EventResource.java. The AvroEventResourceReal.java is constructed using Avro Schema class, which is identify the schema version used via Nakadi API. The client api accepts BusinessEventMapped (reused, but can come up with another class) which extends Avro generated class SpecificRecord.java (I would like to recommend users to use Avro generated class for this api. there is also a way to serialise to Avro binary from POJO using Jackson extension). That gives a guarantee that the event will be of the expected schema. Happy to work on that, let me know what you think.

diff --git a/gradle/libs.gradle b/gradle/libs.gradle
index 6453cb2..a3fd4c5 100644
--- a/gradle/libs.gradle
+++ b/gradle/libs.gradle
@@ -13,7 +13,8 @@ versions += [
   rxjava2: "2.0.9",
   slf4j: "1.8.0-beta2",
   logback: "1.3.0-alpha5",
-  mockito: "1.+"
+  mockito: "1.+",
+  avro: "1.11.1"
 ]

 libs += [
@@ -29,7 +30,8 @@ libs += [
   slf4jsimple: "org.slf4j:slf4j-simple:$versions.slf4j",
   logback_core: "ch.qos.logback:logback-core:$versions.logback",
   logback_classic: "ch.qos.logback:logback-classic:$versions.logback",
-  mockito_core: "org.mockito:mockito-core:$versions.mockito"
+  mockito_core: "org.mockito:mockito-core:$versions.mockito",
+  avro: "org.apache.avro:avro:$versions.avro"
 ]

 ext {
diff --git a/nakadi-java-client/build.gradle b/nakadi-java-client/build.gradle
index 18a92f7..88bf1e2 100644
--- a/nakadi-java-client/build.gradle
+++ b/nakadi-java-client/build.gradle
@@ -7,6 +7,7 @@ dependencies {
   implementation project.libs.okhttp3log
   implementation project.libs.rxjava2
   implementation project.libs.slf4j
+  implementation project.libs.avro

   testImplementation project.libs.junit
   testImplementation project.libs.logback_core
diff --git a/nakadi-java-client/src/main/java/nakadi/AvroEventResource.java b/nakadi-java-client/src/main/java/nakadi/AvroEventResource.java
new file mode 100644
index 0000000..18fd7c7
--- /dev/null
+++ b/nakadi-java-client/src/main/java/nakadi/AvroEventResource.java
@@ -0,0 +1,11 @@
+package nakadi;
+
+import org.apache.avro.specific.SpecificRecord;
+
+import java.util.List;
+
+public interface AvroEventResource {
+
+    <T extends SpecificRecord> BatchItemResponseCollection sendBatch(List<BusinessEventMapped<T>> events);
+
+}
diff --git a/nakadi-java-client/src/main/java/nakadi/AvroEventResourceReal.java b/nakadi-java-client/src/main/java/nakadi/AvroEventResourceReal.java
new file mode 100644
index 0000000..fe29c91
--- /dev/null
+++ b/nakadi-java-client/src/main/java/nakadi/AvroEventResourceReal.java
@@ -0,0 +1,27 @@
+package nakadi;
+
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificRecord;
+
+import java.util.List;
+
+public class AvroEventResourceReal implements AvroEventResource {
+
+    private final NakadiClient client;
+    private final String eventType;
+    private final Schema schema;
+
+    public AvroEventResourceReal(final NakadiClient client,
+                                 final String eventType,
+                                 final Schema schema) {
+        this.client = client;
+        this.eventType = eventType;
+        this.schema = schema;
+    }
+
+    @Override
+    public <T extends SpecificRecord> BatchItemResponseCollection sendBatch(final List<BusinessEventMapped<T>> events) {
+        //...
+        return null;
+    }
+}
diff --git a/nakadi-java-client/src/main/java/nakadi/Resources.java b/nakadi-java-client/src/main/java/nakadi/Resources.java
index 017dd0c..65a67ef 100644
--- a/nakadi-java-client/src/main/java/nakadi/Resources.java
+++ b/nakadi-java-client/src/main/java/nakadi/Resources.java
@@ -1,5 +1,7 @@
 package nakadi;

+import org.apache.avro.Schema;
+
 /**
  * Allows access to the API via resource classes.
  */
@@ -49,6 +51,15 @@ public class Resources {
     return new EventResourceReal(client);
   }

+  /**
+   * The resource for binary events
+   *
+   * @return a resource for working with events
+   */
+  public AvroEventResource binaryEvents(String eventType, Schema eventSchema) {
+    return new AvroEventResourceReal(client, eventType, eventSchema);
+  }
+
   /**
    * The resource for subscriptions
    *

@dehora
Copy link
Owner

dehora commented Sep 4, 2022

AvroEventResource

This approach seems overly keen to justify Avro for wildcard event type consumers rather than think rigorously on non-JSON representations. It puts Apache Avro into the signature space of the clients, which is effectively impossible to unwind, and means the signature can only be used for that format. Nakadi signatures end to end should not be tied to a specific format: for example we don't have a JSONSchemaV4EventResource.

My suggestion is to step back and rethink this in terms of messaging protocols, rather than specific format choices Zalando happens to be making.

@adyach
Copy link
Contributor Author

adyach commented Sep 8, 2022

Nakadi binary publishing API has the same structure as JSON, the only difference is the content type (payload format). looking at the code it should be possible to use current client publishing api (EventResource.java) to support different payloads. EventResourceReal.java can be constructed using different types of serializers (JsonSupport, AvroSupport etc.). That option allows to reuse parts of the code, keep the api unchanged and have extensible mechanism. Another option I have in mind is to just extend EventResource.java and separate implementation for binary payloads having pluggable classes for serialisation. wdyt?

@adyach
Copy link
Contributor Author

adyach commented Dec 16, 2022

I came up with another approach which allows to use current library API without changing user code. The only required change is to update library version and set the serialiser implementation to Avro when configure Nakadi client (one line change for the user). The change is abstracted from the main library code and uses separate module with least possible changes to the core library. Since it is one line change for the user it is very easy to integrate that's why I think it should be successful at least to experiment. There are still risks that it won't be picked up by teams or they will have serialisation issues, because at the moment implementation uses the latest schema for the model serialisation.

Integration:

    final NakadiClient client = NakadiClient.newBuilder()
        ...
        .serializationSupport(AvroSerializationSupport.newInstance())
        ...
        .build();

The change: #383

@dehora do you think we can merge that into the library for experimentation purposes?

@ePaul
Copy link

ePaul commented Jan 13, 2023

Just for understanding: This will apply to all events sent via this client, and there is no separation by even type?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants