Skip to content

Commit

Permalink
fixed no schema matches error (strimzi#895)
Browse files Browse the repository at this point in the history
* fixed no schema matches error

Signed-off-by: Antonio Pedro <[email protected]>

* Added missings defintions and improve the test

Signed-off-by: Antonio Pedro <[email protected]>

* improve CHANGELOG.md

Signed-off-by: António Pedro <[email protected]>

* Update CHANGELOG.md

Co-authored-by: Paolo Patierno <[email protected]>
Signed-off-by: António Pedro <[email protected]>

---------

Signed-off-by: Antonio Pedro <[email protected]>
Signed-off-by: António Pedro <[email protected]>
Co-authored-by: Paolo Patierno <[email protected]>
  • Loading branch information
antonio-pedro99 and ppatierno authored Apr 26, 2024
1 parent 3bf3818 commit 86783da
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 0.29.0

* Dependency updates (Vert.x 4.5.7, Netty 4.1.108.Final)
* Added support for records key and value to be JSON array when using the `json` embedded format.

## 0.28.0

Expand Down
15 changes: 15 additions & 0 deletions src/main/resources/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -1494,6 +1494,9 @@
"properties": {
"key": {
"oneOf": [
{
"type": "array"
},
{
"type": "object"
},
Expand Down Expand Up @@ -1809,6 +1812,9 @@
},
"value": {
"oneOf": [
{
"type": "array"
},
{
"type": "object",
"nullable": true
Expand All @@ -1821,6 +1827,9 @@
},
"key": {
"oneOf": [
{
"type": "array"
},
{
"type": "object"
},
Expand Down Expand Up @@ -1887,6 +1896,9 @@
"properties": {
"value": {
"oneOf": [
{
"type": "array"
},
{
"type": "object"
},
Expand All @@ -1897,6 +1909,9 @@
},
"key": {
"oneOf": [
{
"type": "array"
},
{
"type": "object"
},
Expand Down
6 changes: 6 additions & 0 deletions src/main/resources/openapiv2.json
Original file line number Diff line number Diff line change
Expand Up @@ -1331,6 +1331,7 @@
"properties": {
"key": {
"type": [
"array",
"object",
"string"
]
Expand All @@ -1348,6 +1349,7 @@
},
"value": {
"type": [
"array",
"object",
"string",
"null"
Expand Down Expand Up @@ -1626,13 +1628,15 @@
},
"value": {
"type": [
"array",
"object",
"string",
"null"
]
},
"key": {
"type": [
"array",
"object",
"string"
]
Expand Down Expand Up @@ -1695,12 +1699,14 @@
"properties": {
"value": {
"type": [
"array",
"object",
"string"
]
},
"key": {
"type": [
"array",
"object",
"string"
]
Expand Down
97 changes: 97 additions & 0 deletions src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,103 @@ void sendSimpleMessageWithKey(VertxTestContext context) throws InterruptedExcept
});
}

@Test
void sendSimpleMessangeWithArrayKey(VertxTestContext context) throws InterruptedException, ExecutionException {
KafkaFuture<Void> future = adminClientFacade.createTopic(topic, 2, 1);

String value = "message-value";
JsonArray key = new JsonArray();
key.add("some-element").add(new JsonObject().put("some-field", "element-2"));

JsonArray records = new JsonArray();
JsonObject json = new JsonObject();
json.put("value", value);
json.put("key", key);
records.add(json);

JsonObject root = new JsonObject();
root.put("records", records);

future.get();

producerService()
.sendRecordsRequest(topic, root, BridgeContentType.KAFKA_JSON_JSON)
.sendJsonObject(root, verifyOK(context));

Properties consumerProperties = Consumer.fillDefaultProperties();
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUri);

KafkaConsumer<byte[], String> consumer = KafkaConsumer.create(vertx, consumerProperties,
new ByteArrayDeserializer(), new KafkaJsonDeserializer<>(String.class));
consumer.handler(record -> {
context.verify(() -> {
assertThat(record.value(), is(value));
assertThat(record.topic(), is(topic));
assertThat(record.partition(), notNullValue());
assertThat(record.offset(), is(0L));
assertThat(record.key(), is(key.toBuffer().getBytes()));
});
LOGGER.info("Message consumed topic={} partition={} offset={}, key={}, value={}",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
consumer.close();
context.completeNow();
});

consumer.subscribe(topic, done -> {
if (!done.succeeded()) {
context.failNow(done.cause());
}
});
}


@Test
void sendSimpleMessageWithArrayValue(VertxTestContext context) throws InterruptedException, ExecutionException {
KafkaFuture<Void> future = adminClientFacade.createTopic(topic, 2, 1);

JsonArray value = new JsonArray();
value.add("some-element").add(new JsonObject().put("some-field", "element-2"));

JsonArray records = new JsonArray();
JsonObject json = new JsonObject();
json.put("value", value);
records.add(json);

JsonObject root = new JsonObject();
root.put("records", records);

future.get();

producerService()
.sendRecordsRequest(topic, root, BridgeContentType.KAFKA_JSON_JSON)
.sendJsonObject(root, verifyOK(context));

Properties consumerProperties = Consumer.fillDefaultProperties();
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUri);

KafkaConsumer<String, byte[]> consumer = KafkaConsumer.create(vertx, consumerProperties,
new StringDeserializer(), new ByteArrayDeserializer());
consumer.handler(record -> {
context.verify(() -> {
assertThat(record.value(), is(value.toBuffer().getBytes()));
assertThat(record.topic(), is(topic));
assertThat(record.partition(), notNullValue());
assertThat(record.offset(), is(0L));
assertThat(record.key(), nullValue());
});
LOGGER.info("Message consumed topic={} partition={} offset={}, key={}, value={}",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
consumer.close();
context.completeNow();
});

consumer.subscribe(topic, done -> {
if (!done.succeeded()) {
context.failNow(done.cause());
}
});
}

@Disabled("Will be check in the next PR, this is just external tests for Bridge")
@DisabledIfEnvironmentVariable(named = "EXTERNAL_BRIDGE", matches = "((?i)TRUE(?-i))")
@Test
Expand Down

0 comments on commit 86783da

Please sign in to comment.