Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added a new text embedded format #836

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
415caae
feat: added a new text embedded format
antonio-pedro99 Sep 20, 2023
063b62e
Apply suggestions from code review
antonio-pedro99 Sep 20, 2023
ae69e9f
Apply suggestions from code review
antonio-pedro99 Sep 20, 2023
a79bfbc
doc: undo changes in the javadoc
antonio-pedro99 Sep 20, 2023
4c46a50
Merge branch 'add-string-emb-format' of https://github.com/antonio-pe…
antonio-pedro99 Sep 20, 2023
64d0235
Add container signing and SBOMs (#837)
scholzj Oct 2, 2023
6e17aac
Fix publishing of the SBOM archive in release pipelines (#838)
scholzj Oct 3, 2023
7b6d666
Update Kafka to 3.6.0 (#839)
scholzj Oct 5, 2023
b328404
Some clean and deprecated method replaced (#840)
ppatierno Oct 7, 2023
d8e3f44
Added some finals (#841)
ppatierno Oct 8, 2023
1dff9dc
Update oauth 0.14.0 (#842)
ppatierno Oct 9, 2023
8aa8c07
Fixed wrong oauth version in the CHANGELOG (#844)
ppatierno Oct 11, 2023
7803c9f
Strimzi test container 0.105.0 (#843)
see-quick Oct 11, 2023
2dd9ca8
Bumped Vert.x 4.4.6 and Netty 4.1.100.Final for CVE-2023-44487 fix (#…
ppatierno Oct 11, 2023
2cd8a51
Bumped main branch to 0.28.0-SNAPSHOT (#846)
ppatierno Oct 13, 2023
d6625de
Removed unused methods and made members final (#847)
ppatierno Oct 16, 2023
26a8121
A few cleanup (unused constants and use expression lambdas) (#849)
ppatierno Nov 2, 2023
afcc7c7
Bumped Kubernetes Config Provider to 1.1.2 (#850)
ppatierno Nov 5, 2023
1c25cf3
Removed tracing related param which was used with OpenTracing only (#…
ppatierno Nov 18, 2023
07d61d0
Bump Vert.x and Jackson dependencies (#852)
ppatierno Nov 21, 2023
5e0c6d5
Updated Kafka 3.6.1 (#853)
ppatierno Dec 12, 2023
ff2ad8f
fix typo in HTTPBridgeITAbstract (#854)
antonio-pedro99 Dec 13, 2023
2f56885
Trying to add retries on s390x tests failing (#855)
ppatierno Dec 13, 2023
1fa547a
add text converter with json implementation
antonio-pedro99 Dec 19, 2023
8376fe4
Added missing messaging semantic attributes on the consumer spans (#856)
ppatierno Dec 20, 2023
0c65d26
added converter implementation
antonio-pedro99 Dec 22, 2023
ab3db8e
Remove release section from building guide (#857)
ppatierno Dec 26, 2023
459d720
improve converter and API def
antonio-pedro99 Dec 26, 2023
2b8bee1
remove tests for now
antonio-pedro99 Dec 28, 2023
fa94c27
remove tests for now
antonio-pedro99 Dec 28, 2023
d44e2d0
remove tests for now
antonio-pedro99 Dec 28, 2023
03b0a16
feat: added a new text embedded format
antonio-pedro99 Sep 20, 2023
6d2087a
doc: undo changes in the javadoc
antonio-pedro99 Sep 20, 2023
2e91233
Apply suggestions from code review
antonio-pedro99 Sep 20, 2023
5bb9a9c
Apply suggestions from code review
antonio-pedro99 Sep 20, 2023
2a0add3
add text converter with json implementation
antonio-pedro99 Dec 19, 2023
cf1719d
added converter implementation
antonio-pedro99 Dec 22, 2023
e0e2664
improve converter and API def
antonio-pedro99 Dec 26, 2023
88789d4
remove tests for now
antonio-pedro99 Dec 28, 2023
a497cb2
remove tests for now
antonio-pedro99 Dec 28, 2023
d948aa0
remove tests for now
antonio-pedro99 Dec 28, 2023
b1bc1b4
Merge branch 'add-string-emb-format' of https://github.com/antonio-pe…
antonio-pedro99 Dec 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/main/java/io/strimzi/kafka/bridge/BridgeContentType.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,16 @@ public class BridgeContentType {
/** JSON encoding with BINARY embedded format */
public static final String KAFKA_JSON_BINARY = "application/vnd.kafka.binary.v2+json";


/** JSON encoding with Plain text embedded format */
public static final String KAFKA_JSON_TEXT = "application/vnd.kafka.string.v2+json";
antonio-pedro99 marked this conversation as resolved.
Show resolved Hide resolved
antonio-pedro99 marked this conversation as resolved.
Show resolved Hide resolved

/** Specific Kafka JSON encoding */
public static final String KAFKA_JSON = "application/vnd.kafka.v2+json";

/** JSON encoding */
public static final String JSON = "application/json";

/** Plain text encoding */
public static final String TEXT = "text/plain";
}
17 changes: 14 additions & 3 deletions src/main/java/io/strimzi/kafka/bridge/EmbeddedFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,20 @@
*/
public enum EmbeddedFormat {

/** Define "binary" data as embedded format */
/**
* Define "binary" data as embedded format
*/
BINARY,

/** Define "json" data as embedded format */
JSON;
/**
* Define "json" data as embedded format
*/
JSON,

/**
* Define "text" data as embedded format
*/
TEXT;

/**
* Convert the String value in the corresponding enum
Expand All @@ -28,6 +37,8 @@ public static EmbeddedFormat from(String value) {
return JSON;
case "binary":
return BINARY;
case "text":
return TEXT;
}
throw new IllegalEmbeddedFormatException("Invalid format type.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.strimzi.kafka.bridge.http.converter.HttpJsonMessageConverter;
import io.strimzi.kafka.bridge.http.converter.JsonDecodeException;
import io.strimzi.kafka.bridge.http.converter.JsonUtils;
import io.strimzi.kafka.bridge.http.converter.HttpTextMessageConverter;
import io.strimzi.kafka.bridge.http.model.HttpBridgeError;
import io.strimzi.kafka.bridge.tracing.SpanHandle;
import io.strimzi.kafka.bridge.tracing.TracingHandle;
Expand Down Expand Up @@ -74,10 +75,10 @@ public class HttpSinkBridgeEndpoint<K, V> extends HttpBridgeEndpoint {
/**
* Constructor
*
* @param bridgeConfig the bridge configuration
* @param context the HTTP bridge context
* @param format the embedded format for consumed messages
* @param keyDeserializer key deserializer for consumed messages
* @param bridgeConfig the bridge configuration
antonio-pedro99 marked this conversation as resolved.
Show resolved Hide resolved
* @param context the HTTP bridge context
antonio-pedro99 marked this conversation as resolved.
Show resolved Hide resolved
* @param format the embedded format for consumed messages
antonio-pedro99 marked this conversation as resolved.
Show resolved Hide resolved
* @param keyDeserializer key deserializer for consumed messages
antonio-pedro99 marked this conversation as resolved.
Show resolved Hide resolved
* @param valueDeserializer value deserializer for consumed messages
*/
public HttpSinkBridgeEndpoint(BridgeConfig bridgeConfig, HttpBridgeContext<K, V> context, EmbeddedFormat format,
Expand Down Expand Up @@ -111,8 +112,8 @@ public void close() {
* Create a Kafka consumer
*
* @param routingContext the routing context
* @param bodyAsJson HTTP request body bringing consumer settings
* @param handler handler for the request
* @param bodyAsJson HTTP request body bringing consumer settings
antonio-pedro99 marked this conversation as resolved.
Show resolved Hide resolved
* @param handler handler for the request
antonio-pedro99 marked this conversation as resolved.
Show resolved Hide resolved
*/
private void doCreateConsumer(RoutingContext routingContext, JsonNode bodyAsJson, Handler<HttpBridgeEndpoint> handler) {
// get the consumer group-id
Expand Down Expand Up @@ -528,7 +529,7 @@ public void doUnsubscribe(RoutingContext routingContext) {
/**
* Add a configuration parameter with key and value to the provided Properties bag
*
* @param key key of the configuration parameter
* @param key key of the configuration parameter
antonio-pedro99 marked this conversation as resolved.
Show resolved Hide resolved
* @param value value of the configuration parameter
* @param props Properties bag where to put the configuration parameter
*/
Expand Down Expand Up @@ -608,6 +609,8 @@ private MessageConverter<K, V, byte[], byte[]> buildMessageConverter() {
return (MessageConverter<K, V, byte[], byte[]>) new HttpJsonMessageConverter();
case BINARY:
return (MessageConverter<K, V, byte[], byte[]>) new HttpBinaryMessageConverter();
case TEXT:
return (MessageConverter<K, V, byte[], byte[]>) new HttpTextMessageConverter();
}
return null;
}
Expand All @@ -618,6 +621,8 @@ private boolean checkAcceptedBody(String accept) {
return format == EmbeddedFormat.JSON;
case BridgeContentType.KAFKA_JSON_BINARY:
return format == EmbeddedFormat.BINARY;
case BridgeContentType.KAFKA_JSON_TEXT:
return format == EmbeddedFormat.TEXT;
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.strimzi.kafka.bridge.converter.MessageConverter;
import io.strimzi.kafka.bridge.http.converter.HttpBinaryMessageConverter;
import io.strimzi.kafka.bridge.http.converter.HttpJsonMessageConverter;
import io.strimzi.kafka.bridge.http.converter.HttpTextMessageConverter;
import io.strimzi.kafka.bridge.http.converter.JsonUtils;
import io.strimzi.kafka.bridge.http.model.HttpBridgeError;
import io.strimzi.kafka.bridge.http.model.HttpBridgeResult;
Expand Down Expand Up @@ -114,7 +115,7 @@ public void handle(RoutingContext routingContext, Handler<HttpBridgeEndpoint> ha
}
records = messageConverter.toKafkaRecords(topic, partition, routingContext.body().buffer().getByteBuf().array());

for (ProducerRecord<K, V> record :records) {
for (ProducerRecord<K, V> record : records) {
antonio-pedro99 marked this conversation as resolved.
Show resolved Hide resolved
span.inject(record);
}
} catch (Exception e) {
Expand Down Expand Up @@ -216,6 +217,8 @@ private MessageConverter<K, V, byte[], byte[]> buildMessageConverter() {
return (MessageConverter<K, V, byte[], byte[]>) new HttpJsonMessageConverter();
case BINARY:
return (MessageConverter<K, V, byte[], byte[]>) new HttpBinaryMessageConverter();
case TEXT:
return (MessageConverter<K, V, byte[], byte[]>) new HttpTextMessageConverter();
}
return null;
}
Expand Down
antonio-pedro99 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.kafka.bridge.http.converter;

import io.strimzi.kafka.bridge.converter.MessageConverter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;

import java.util.List;

/**
* Implementation of a message converter to deal with the "text" embedded data format
*/
public class HttpTextMessageConverter implements MessageConverter<byte[], byte[], byte[], byte[]> {

Integer partitionFromBody = null;
byte[] key = null;
byte[] value = null;
Headers headers = new RecordHeaders();

@Override
public ProducerRecord<byte[], byte[]> toKafkaRecord(String kafkaTopic, Integer partition, byte[] message) {
return null;
}

@Override
public List<ProducerRecord<byte[], byte[]>> toKafkaRecords(String kafkaTopic, Integer partition, byte[] messages) {
return null;
}

@Override
public byte[] toMessage(String address, ConsumerRecord<byte[], byte[]> record) {
return new byte[0];
}

@Override
public byte[] toMessages(ConsumerRecords<byte[], byte[]> records) {
return new byte[0];
}
}