From 96e80e18838674304adda4392d062854d7a905ac Mon Sep 17 00:00:00 2001 From: Magnus Gundersen Date: Mon, 30 Aug 2021 08:19:54 +0200 Subject: [PATCH] feat(topicdata): configurable json inclusions in Avro to Json serializing (#799) --- README.md | 4 ++ application.example.yml | 7 +++ src/main/java/org/akhq/models/Record.java | 9 ++-- .../akhq/repositories/RecordRepository.java | 6 +++ .../java/org/akhq/utils/AvroDeserializer.java | 2 +- .../org/akhq/utils/AvroToJsonSerializer.java | 50 ++++++++++++------- src/main/resources/application.yml | 5 ++ 7 files changed, 62 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index c961e1e19..f60025cf7 100644 --- a/README.md +++ b/README.md @@ -275,6 +275,10 @@ akhq: #### Pagination * `akhq.pagination.page-size` number of topics per page (default : 25) +#### Avro Serializer +* `akhq.avro-serializer.json.serialization.inclusions` is list of ObjectMapper serialization inclusions that is used for converting Avro message to more + readable Json format in the UI. Supports Enums of JsonInclude.Include from Jackson library + #### Topic List * `akhq.topic.internal-regexps` is list of regexp to be considered as internal (internal topic can't be deleted or updated) * `akhq.topic.stream-regexps` is list of regexp to be considered as internal stream topic diff --git a/application.example.yml b/application.example.yml index 78fb3d1fa..e32d6cd30 100644 --- a/application.example.yml +++ b/application.example.yml @@ -131,6 +131,13 @@ akhq: page-size: 25 # number of elements per page (default : 25) threads: 16 # Number of parallel threads to resolve page + # Configure avro-to-json serializer + avro-serializer: + json.serialization.inclusions: # ObjectMapper serialization inclusions used for avro-to-json conversion for display in the UI. + # Supports Enums in JsonInclude.Include from Jackson library + - NON_NULL + - NON_EMPTY + # Topic list display options (optional) topic: retention: 172800000 # default retention period when creating topic diff --git a/src/main/java/org/akhq/models/Record.java b/src/main/java/org/akhq/models/Record.java index 260662502..01ff2c01d 100644 --- a/src/main/java/org/akhq/models/Record.java +++ b/src/main/java/org/akhq/models/Record.java @@ -45,6 +45,8 @@ public class Record { private Deserializer kafkaProtoDeserializer; @JsonIgnore private Deserializer kafkaJsonDeserializer; + @JsonIgnore + private AvroToJsonSerializer avroToJsonSerializer; @JsonIgnore private SchemaRegistryClient client; @@ -85,7 +87,7 @@ public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte } public Record(SchemaRegistryClient client, ConsumerRecord record, SchemaRegistryType schemaRegistryType, Deserializer kafkaAvroDeserializer, - Deserializer kafkaJsonDeserializer, Deserializer kafkaProtoDeserializer, + Deserializer kafkaJsonDeserializer, Deserializer kafkaProtoDeserializer, AvroToJsonSerializer avroToJsonSerializer, ProtobufToJsonDeserializer protobufToJsonDeserializer, byte[] bytesValue, Topic topic) { if (schemaRegistryType == SchemaRegistryType.TIBCO) { this.MAGIC_BYTE = (byte) 0x80; @@ -109,6 +111,7 @@ public Record(SchemaRegistryClient client, ConsumerRecord record this.kafkaAvroDeserializer = kafkaAvroDeserializer; this.protobufToJsonDeserializer = protobufToJsonDeserializer; this.kafkaProtoDeserializer = kafkaProtoDeserializer; + this.avroToJsonSerializer = avroToJsonSerializer; this.kafkaJsonDeserializer = kafkaJsonDeserializer; } @@ -154,7 +157,7 @@ private String convertToString(byte[] payload, Integer schemaId, boolean isKey) } Message dynamicMessage = (Message)toType; - return AvroToJsonSerializer.getMapper().readTree(JsonFormat.printer().print(dynamicMessage)).toString(); + return avroToJsonSerializer.getMapper().readTree(JsonFormat.printer().print(dynamicMessage)).toString(); } else if ( schema.schemaType().equals(JsonSchema.TYPE) ) { toType = kafkaJsonDeserializer.deserialize(topic.getName(), payload); if ( !(toType instanceof JsonNode) ) { @@ -173,7 +176,7 @@ private String convertToString(byte[] payload, Integer schemaId, boolean isKey) } GenericRecord record = (GenericRecord) toType; - return AvroToJsonSerializer.toJson(record); + return avroToJsonSerializer.toJson(record); } catch (Exception exception) { this.exceptions.add(exception.getMessage()); diff --git a/src/main/java/org/akhq/repositories/RecordRepository.java b/src/main/java/org/akhq/repositories/RecordRepository.java index 316d33342..7ccaaffe9 100644 --- a/src/main/java/org/akhq/repositories/RecordRepository.java +++ b/src/main/java/org/akhq/repositories/RecordRepository.java @@ -18,6 +18,7 @@ import org.akhq.models.Topic; import org.akhq.modules.AvroSerializer; import org.akhq.modules.KafkaModule; +import org.akhq.utils.AvroToJsonSerializer; import org.akhq.utils.Debug; import org.apache.kafka.clients.admin.DeletedRecords; import org.apache.kafka.clients.admin.RecordsToDelete; @@ -49,6 +50,9 @@ public class RecordRepository extends AbstractRepository { @Inject private ConfigRepository configRepository; + @Inject + private AvroToJsonSerializer avroToJsonSerializer; + @Inject private TopicRepository topicRepository; @@ -433,6 +437,7 @@ private Record newRecord(ConsumerRecord record, String clusterId this.schemaRegistryRepository.getKafkaAvroDeserializer(clusterId), schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaJsonDeserializer(clusterId):null, schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaProtoDeserializer(clusterId):null, + this.avroToJsonSerializer, this.customDeserializerRepository.getProtobufToJsonDeserializer(clusterId), avroWireFormatConverter.convertValueToWireFormat(record, client, this.schemaRegistryRepository.getSchemaRegistryType(clusterId)), @@ -450,6 +455,7 @@ private Record newRecord(ConsumerRecord record, BaseOptions opti this.schemaRegistryRepository.getKafkaAvroDeserializer(options.clusterId), schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaJsonDeserializer(options.clusterId):null, schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaProtoDeserializer(options.clusterId):null, + this.avroToJsonSerializer, this.customDeserializerRepository.getProtobufToJsonDeserializer(options.clusterId), avroWireFormatConverter.convertValueToWireFormat(record, client, this.schemaRegistryRepository.getSchemaRegistryType(options.clusterId)), diff --git a/src/main/java/org/akhq/utils/AvroDeserializer.java b/src/main/java/org/akhq/utils/AvroDeserializer.java index f25069096..4ea15995c 100644 --- a/src/main/java/org/akhq/utils/AvroDeserializer.java +++ b/src/main/java/org/akhq/utils/AvroDeserializer.java @@ -26,7 +26,7 @@ public class AvroDeserializer { private static final String TIME_MICROS = "time-micros"; private static final String TIMESTAMP_MILLIS = "timestamp-millis"; private static final String TIMESTAMP_MICROS = "timestamp-micros"; - + private static final DecimalConversion DECIMAL_CONVERSION = new DecimalConversion(); private static final UUIDConversion UUID_CONVERSION = new UUIDConversion(); private static final DateConversion DATE_CONVERSION = new DateConversion(); diff --git a/src/main/java/org/akhq/utils/AvroToJsonSerializer.java b/src/main/java/org/akhq/utils/AvroToJsonSerializer.java index 4ec3bfb55..4a104701c 100644 --- a/src/main/java/org/akhq/utils/AvroToJsonSerializer.java +++ b/src/main/java/org/akhq/utils/AvroToJsonSerializer.java @@ -1,32 +1,48 @@ package org.akhq.utils; -import com.fasterxml.jackson.annotation.JsonInclude; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import javax.inject.Singleton; + +import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import io.micronaut.context.annotation.Value; +import io.micronaut.core.annotation.Nullable; import org.apache.avro.generic.GenericRecord; -import java.io.IOException; -import java.util.Map; -import java.util.TimeZone; - +@Singleton public class AvroToJsonSerializer { - private static final ObjectMapper MAPPER = new ObjectMapper() - .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false) - .setSerializationInclusion(JsonInclude.Include.NON_NULL) - .setSerializationInclusion(JsonInclude.Include.NON_EMPTY) - .registerModule(new JavaTimeModule()) - .registerModule(new Jdk8Module()) - .setTimeZone(TimeZone.getDefault()); + private final ObjectMapper mapper; - public static String toJson(GenericRecord record) throws IOException { - Map map = AvroDeserializer.recordDeserializer(record); + public AvroToJsonSerializer(@Value("${akhq.avro-serializer.json.serialization.inclusions}") @Nullable List jsonInclusions) { + List inclusions = jsonInclusions != null ? jsonInclusions : Collections.emptyList(); + this.mapper = createObjectMapper(inclusions); + } - return MAPPER.writeValueAsString(map); + private ObjectMapper createObjectMapper(List jsonInclusions) { + ObjectMapper objectMapper = new ObjectMapper() + .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false) + .registerModule(new JavaTimeModule()) + .registerModule(new Jdk8Module()) + .setTimeZone(TimeZone.getDefault()); + for (Include include : jsonInclusions) { + objectMapper = objectMapper.setSerializationInclusion(include); + } + return objectMapper; + } + + public String toJson(GenericRecord record) throws IOException { + Map map = AvroDeserializer.recordDeserializer(record); + return mapper.writeValueAsString(map); } - public static ObjectMapper getMapper() { - return MAPPER; + public ObjectMapper getMapper() { + return mapper; } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index adac48c68..9b409dd70 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -103,6 +103,11 @@ akhq: page-size: 25 threads: 16 + avro-serializer: + json.serialization.inclusions: + - NON_NULL + - NON_EMPTY + topic: replication: 1 retention: 86400000