Skip to content

Commit

Permalink
feat(topicdata): configurable json inclusions in Avro to Json seriali…
Browse files Browse the repository at this point in the history
…zing (#799)
  • Loading branch information
magnusgundersen authored and tchiotludo committed Oct 24, 2021
1 parent 489be59 commit 96e80e1
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 21 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ akhq:
#### Pagination
* `akhq.pagination.page-size` number of topics per page (default : 25)

#### Avro Serializer
* `akhq.avro-serializer.json.serialization.inclusions` is list of ObjectMapper serialization inclusions that is used for converting Avro message to more
readable Json format in the UI. Supports Enums of JsonInclude.Include from Jackson library

#### Topic List
* `akhq.topic.internal-regexps` is list of regexp to be considered as internal (internal topic can't be deleted or updated)
* `akhq.topic.stream-regexps` is list of regexp to be considered as internal stream topic
Expand Down
7 changes: 7 additions & 0 deletions application.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ akhq:
page-size: 25 # number of elements per page (default : 25)
threads: 16 # Number of parallel threads to resolve page

# Configure avro-to-json serializer
avro-serializer:
json.serialization.inclusions: # ObjectMapper serialization inclusions used for avro-to-json conversion for display in the UI.
# Supports Enums in JsonInclude.Include from Jackson library
- NON_NULL
- NON_EMPTY

# Topic list display options (optional)
topic:
retention: 172800000 # default retention period when creating topic
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/org/akhq/models/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class Record {
private Deserializer kafkaProtoDeserializer;
@JsonIgnore
private Deserializer kafkaJsonDeserializer;
@JsonIgnore
private AvroToJsonSerializer avroToJsonSerializer;

@JsonIgnore
private SchemaRegistryClient client;
Expand Down Expand Up @@ -85,7 +87,7 @@ public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte
}

public Record(SchemaRegistryClient client, ConsumerRecord<byte[], byte[]> record, SchemaRegistryType schemaRegistryType, Deserializer kafkaAvroDeserializer,
Deserializer kafkaJsonDeserializer, Deserializer kafkaProtoDeserializer,
Deserializer kafkaJsonDeserializer, Deserializer kafkaProtoDeserializer, AvroToJsonSerializer avroToJsonSerializer,
ProtobufToJsonDeserializer protobufToJsonDeserializer, byte[] bytesValue, Topic topic) {
if (schemaRegistryType == SchemaRegistryType.TIBCO) {
this.MAGIC_BYTE = (byte) 0x80;
Expand All @@ -109,6 +111,7 @@ public Record(SchemaRegistryClient client, ConsumerRecord<byte[], byte[]> record
this.kafkaAvroDeserializer = kafkaAvroDeserializer;
this.protobufToJsonDeserializer = protobufToJsonDeserializer;
this.kafkaProtoDeserializer = kafkaProtoDeserializer;
this.avroToJsonSerializer = avroToJsonSerializer;
this.kafkaJsonDeserializer = kafkaJsonDeserializer;
}

Expand Down Expand Up @@ -154,7 +157,7 @@ private String convertToString(byte[] payload, Integer schemaId, boolean isKey)
}

Message dynamicMessage = (Message)toType;
return AvroToJsonSerializer.getMapper().readTree(JsonFormat.printer().print(dynamicMessage)).toString();
return avroToJsonSerializer.getMapper().readTree(JsonFormat.printer().print(dynamicMessage)).toString();
} else if ( schema.schemaType().equals(JsonSchema.TYPE) ) {
toType = kafkaJsonDeserializer.deserialize(topic.getName(), payload);
if ( !(toType instanceof JsonNode) ) {
Expand All @@ -173,7 +176,7 @@ private String convertToString(byte[] payload, Integer schemaId, boolean isKey)
}

GenericRecord record = (GenericRecord) toType;
return AvroToJsonSerializer.toJson(record);
return avroToJsonSerializer.toJson(record);

} catch (Exception exception) {
this.exceptions.add(exception.getMessage());
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.akhq.models.Topic;
import org.akhq.modules.AvroSerializer;
import org.akhq.modules.KafkaModule;
import org.akhq.utils.AvroToJsonSerializer;
import org.akhq.utils.Debug;
import org.apache.kafka.clients.admin.DeletedRecords;
import org.apache.kafka.clients.admin.RecordsToDelete;
Expand Down Expand Up @@ -49,6 +50,9 @@ public class RecordRepository extends AbstractRepository {
@Inject
private ConfigRepository configRepository;

@Inject
private AvroToJsonSerializer avroToJsonSerializer;

@Inject
private TopicRepository topicRepository;

Expand Down Expand Up @@ -433,6 +437,7 @@ private Record newRecord(ConsumerRecord<byte[], byte[]> record, String clusterId
this.schemaRegistryRepository.getKafkaAvroDeserializer(clusterId),
schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaJsonDeserializer(clusterId):null,
schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaProtoDeserializer(clusterId):null,
this.avroToJsonSerializer,
this.customDeserializerRepository.getProtobufToJsonDeserializer(clusterId),
avroWireFormatConverter.convertValueToWireFormat(record, client,
this.schemaRegistryRepository.getSchemaRegistryType(clusterId)),
Expand All @@ -450,6 +455,7 @@ private Record newRecord(ConsumerRecord<byte[], byte[]> record, BaseOptions opti
this.schemaRegistryRepository.getKafkaAvroDeserializer(options.clusterId),
schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaJsonDeserializer(options.clusterId):null,
schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaProtoDeserializer(options.clusterId):null,
this.avroToJsonSerializer,
this.customDeserializerRepository.getProtobufToJsonDeserializer(options.clusterId),
avroWireFormatConverter.convertValueToWireFormat(record, client,
this.schemaRegistryRepository.getSchemaRegistryType(options.clusterId)),
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/akhq/utils/AvroDeserializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class AvroDeserializer {
private static final String TIME_MICROS = "time-micros";
private static final String TIMESTAMP_MILLIS = "timestamp-millis";
private static final String TIMESTAMP_MICROS = "timestamp-micros";

private static final DecimalConversion DECIMAL_CONVERSION = new DecimalConversion();
private static final UUIDConversion UUID_CONVERSION = new UUIDConversion();
private static final DateConversion DATE_CONVERSION = new DateConversion();
Expand Down
50 changes: 33 additions & 17 deletions src/main/java/org/akhq/utils/AvroToJsonSerializer.java
Original file line number Diff line number Diff line change
@@ -1,32 +1,48 @@
package org.akhq.utils;

import com.fasterxml.jackson.annotation.JsonInclude;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import javax.inject.Singleton;

import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.micronaut.context.annotation.Value;
import io.micronaut.core.annotation.Nullable;
import org.apache.avro.generic.GenericRecord;

import java.io.IOException;
import java.util.Map;
import java.util.TimeZone;

@Singleton
public class AvroToJsonSerializer {
private static final ObjectMapper MAPPER = new ObjectMapper()
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
.setSerializationInclusion(JsonInclude.Include.NON_EMPTY)
.registerModule(new JavaTimeModule())
.registerModule(new Jdk8Module())
.setTimeZone(TimeZone.getDefault());
private final ObjectMapper mapper;

public static String toJson(GenericRecord record) throws IOException {
Map<String, Object> map = AvroDeserializer.recordDeserializer(record);
public AvroToJsonSerializer(@Value("${akhq.avro-serializer.json.serialization.inclusions}") @Nullable List<Include> jsonInclusions) {
List<Include> inclusions = jsonInclusions != null ? jsonInclusions : Collections.emptyList();
this.mapper = createObjectMapper(inclusions);
}

return MAPPER.writeValueAsString(map);
private ObjectMapper createObjectMapper(List<Include> jsonInclusions) {
ObjectMapper objectMapper = new ObjectMapper()
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
.registerModule(new JavaTimeModule())
.registerModule(new Jdk8Module())
.setTimeZone(TimeZone.getDefault());
for (Include include : jsonInclusions) {
objectMapper = objectMapper.setSerializationInclusion(include);
}
return objectMapper;
}

public String toJson(GenericRecord record) throws IOException {
Map<String, Object> map = AvroDeserializer.recordDeserializer(record);
return mapper.writeValueAsString(map);
}

public static ObjectMapper getMapper() {
return MAPPER;
public ObjectMapper getMapper() {
return mapper;
}
}
5 changes: 5 additions & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ akhq:
page-size: 25
threads: 16

avro-serializer:
json.serialization.inclusions:
- NON_NULL
- NON_EMPTY

topic:
replication: 1
retention: 86400000
Expand Down

0 comments on commit 96e80e1

Please sign in to comment.