From c0838e62ae5bf3016dec52f75e753bff66511c31 Mon Sep 17 00:00:00 2001 From: opeti Date: Wed, 9 Feb 2022 09:05:54 +0100 Subject: [PATCH] feat(topicdata): handle logical types local-timestamp-millis and local-timestamp-micros (#1015) --- .../java/org/akhq/utils/AvroDeserializer.java | 26 ++++++++ .../java/org/akhq/utils/AvroSerializer.java | 62 ++++++++++++++++++- .../org/akhq/utils/AvroDeserializerTest.java | 19 ++++-- 3 files changed, 99 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/akhq/utils/AvroDeserializer.java b/src/main/java/org/akhq/utils/AvroDeserializer.java index 6145f4190..9ef12ee68 100644 --- a/src/main/java/org/akhq/utils/AvroDeserializer.java +++ b/src/main/java/org/akhq/utils/AvroDeserializer.java @@ -14,6 +14,7 @@ import java.nio.ByteBuffer; import java.time.Instant; import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.LocalTime; import java.util.*; import java.util.stream.Collectors; @@ -26,6 +27,8 @@ public class AvroDeserializer { private static final String TIME_MICROS = "time-micros"; private static final String TIMESTAMP_MILLIS = "timestamp-millis"; private static final String TIMESTAMP_MICROS = "timestamp-micros"; + private static final String LOCAL_TIMESTAMP_MILLIS = "local-timestamp-millis"; + private static final String LOCAL_TIMESTAMP_MICROS = "local-timestamp-micros"; private static final DecimalConversion DECIMAL_CONVERSION = new DecimalConversion(); private static final UUIDConversion UUID_CONVERSION = new UUIDConversion(); @@ -34,6 +37,8 @@ public class AvroDeserializer { private static final TimeMillisConversion TIME_MILLIS_CONVERSION = new TimeMillisConversion(); private static final TimestampMicrosConversion TIMESTAMP_MICROS_CONVERSION = new TimestampMicrosConversion(); private static final TimestampMillisConversion TIMESTAMP_MILLIS_CONVERSION = new TimestampMillisConversion(); + private static final LocalTimestampMicrosConversion LOCAL_TIMESTAMP_MICROS_CONVERSION = new LocalTimestampMicrosConversion(); + private static final LocalTimestampMillisConversion LOCAL_TIMESTAMP_MILLIS_CONVERSION = new LocalTimestampMillisConversion(); public static Map recordDeserializer(GenericRecord record) { return record @@ -68,6 +73,10 @@ private static Object objectDeserializer(Object value, Schema schema) { return AvroDeserializer.timestampMicrosDeserializer(value, schema, primitiveType, logicalType); case TIMESTAMP_MILLIS: return AvroDeserializer.timestampMillisDeserializer(value, schema, primitiveType, logicalType); + case LOCAL_TIMESTAMP_MICROS: + return AvroDeserializer.localTimestampMicrosDeserializer(value, schema, primitiveType, logicalType); + case LOCAL_TIMESTAMP_MILLIS: + return AvroDeserializer.localTimestampMillisDeserializer(value, schema, primitiveType, logicalType); case UUID: return AvroDeserializer.uuidDeserializer(value, schema, primitiveType, logicalType); default: @@ -153,6 +162,23 @@ private static Instant timestampMillisDeserializer(Object value, Schema schema, throw new IllegalStateException("Unexpected value: " + primitiveType); } + private static LocalDateTime localTimestampMicrosDeserializer(Object value, Schema schema, Type primitiveType, LogicalType logicalType) { + switch (primitiveType) { + case LONG: + return AvroDeserializer.LOCAL_TIMESTAMP_MICROS_CONVERSION.fromLong((Long) value, schema, logicalType); + default: + throw new IllegalStateException("Unexpected value: " + primitiveType); + } + } + + private static LocalDateTime localTimestampMillisDeserializer(Object value, Schema schema, Type primitiveType, LogicalType logicalType) { + if (primitiveType == Type.LONG) { + return AvroDeserializer.LOCAL_TIMESTAMP_MILLIS_CONVERSION.fromLong((Long) value, schema, logicalType); + } + + throw new IllegalStateException("Unexpected value: " + primitiveType); + } + private static LocalTime timeMicrosDeserializer(Object value, Schema schema, Type primitiveType, LogicalType logicalType) { if (primitiveType == Type.LONG) { return AvroDeserializer.TIME_MICROS_CONVERSION.fromLong((Long) value, schema, logicalType); diff --git a/src/main/java/org/akhq/utils/AvroSerializer.java b/src/main/java/org/akhq/utils/AvroSerializer.java index 4c56edea9..bf113d999 100644 --- a/src/main/java/org/akhq/utils/AvroSerializer.java +++ b/src/main/java/org/akhq/utils/AvroSerializer.java @@ -15,7 +15,9 @@ import java.nio.ByteBuffer; import java.time.Instant; import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; import java.util.AbstractMap; @@ -33,6 +35,8 @@ public class AvroSerializer { private static final String TIME_MICROS = "time-micros"; private static final String TIMESTAMP_MILLIS = "timestamp-millis"; private static final String TIMESTAMP_MICROS = "timestamp-micros"; + private static final String LOCAL_TIMESTAMP_MILLIS = "local-timestamp-millis"; + private static final String LOCAL_TIMESTAMP_MICROS = "local-timestamp-micros"; private static final Conversions.DecimalConversion DECIMAL_CONVERSION = new Conversions.DecimalConversion(); private static final Conversions.UUIDConversion UUID_CONVERSION = new Conversions.UUIDConversion(); @@ -41,6 +45,8 @@ public class AvroSerializer { private static final TimeConversions.TimeMillisConversion TIME_MILLIS_CONVERSION = new TimeConversions.TimeMillisConversion(); private static final TimeConversions.TimestampMicrosConversion TIMESTAMP_MICROS_CONVERSION = new TimeConversions.TimestampMicrosConversion(); private static final TimeConversions.TimestampMillisConversion TIMESTAMP_MILLIS_CONVERSION = new TimeConversions.TimestampMillisConversion(); + private static final TimeConversions.LocalTimestampMicrosConversion LOCAL_TIMESTAMP_MICROS_CONVERSION = new TimeConversions.LocalTimestampMicrosConversion(); + private static final TimeConversions.LocalTimestampMillisConversion LOCAL_TIMESTAMP_MILLIS_CONVERSION = new TimeConversions.LocalTimestampMillisConversion(); protected static final String DATE_FORMAT = "yyyy-MM-dd[XXX]"; protected static final String TIME_FORMAT = "HH:mm[:ss][.SSSSSS][XXX]"; @@ -76,13 +82,17 @@ private static Object objectSerializer(Object value, Schema schema) { case DECIMAL: return AvroSerializer.decimalSerializer(value, schema, primitiveType, logicalType); case TIME_MICROS: - return AvroSerializer.timeMicrosDeserializer(value, schema, primitiveType, logicalType); + return AvroSerializer.timeMicrosSerializer(value, schema, primitiveType, logicalType); case TIME_MILLIS: return AvroSerializer.timeMillisSerializer(value, schema, primitiveType, logicalType); case TIMESTAMP_MICROS: return AvroSerializer.timestampMicrosSerializer(value, schema, primitiveType, logicalType); case TIMESTAMP_MILLIS: return AvroSerializer.timestampMillisSerializer(value, schema, primitiveType, logicalType); + case LOCAL_TIMESTAMP_MICROS: + return AvroSerializer.localTimestampMicrosSerializer(value, schema, primitiveType, logicalType); + case LOCAL_TIMESTAMP_MILLIS: + return AvroSerializer.localTimestampMillisSerializer(value, schema, primitiveType, logicalType); case UUID: return AvroSerializer.uuidSerializer(value, schema, primitiveType, logicalType); default: @@ -214,12 +224,60 @@ private static Long timestampMillisSerializer(Object data, Schema schema, Schema throw new IllegalStateException("Unexpected value: " + primitiveType + " on schema " + schema); } + private static Long localTimestampMicrosSerializer(Object data, Schema schema, Schema.Type primitiveType, LogicalType logicalType) { + LocalDateTime value; + + if (data instanceof String) { + try { + value = LocalDateTime.ofInstant(Instant.ofEpochSecond(0, Long.parseLong((String) data) * 1000), ZoneOffset.UTC); + } catch (NumberFormatException ignored) { + value = LocalDateTime.parse((String) data); + } + } else if (data instanceof Long) { + value = LocalDateTime.ofInstant(Instant.ofEpochSecond(0, (Long) data * 1000), ZoneOffset.UTC); + } else if (data instanceof Integer) { + value = LocalDateTime.ofInstant(Instant.ofEpochSecond(0, ((Integer) data).longValue() * 1000), ZoneOffset.UTC); + } else { + value = (LocalDateTime) data; + } + + if (primitiveType == Schema.Type.LONG) { + return AvroSerializer.LOCAL_TIMESTAMP_MICROS_CONVERSION.toLong(value, schema, logicalType); + } + + throw new IllegalStateException("Unexpected value: " + primitiveType + " on schema " + schema); + } + + private static Long localTimestampMillisSerializer(Object data, Schema schema, Schema.Type primitiveType, LogicalType logicalType) { + LocalDateTime value; + + if (data instanceof String) { + try { + value = LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong((String) data)), ZoneOffset.UTC); + } catch (NumberFormatException ignored) { + value = LocalDateTime.parse((String) data); + } + } else if (data instanceof Long) { + value = LocalDateTime.ofInstant(Instant.ofEpochMilli((Long) data), ZoneOffset.UTC); + } else if (data instanceof Integer) { + value = LocalDateTime.ofInstant(Instant.ofEpochMilli(((Integer) data).longValue()), ZoneOffset.UTC); + } else { + value = (LocalDateTime) data; + } + + if (primitiveType == Schema.Type.LONG) { + return AvroSerializer.LOCAL_TIMESTAMP_MILLIS_CONVERSION.toLong(value, schema, logicalType); + } + + throw new IllegalStateException("Unexpected value: " + primitiveType + " on schema " + schema); + } + protected static Instant parseDateTime(String data) { TimeZone tz = TimeZone.getDefault(); return DATETIME_FORMAT.withZone(tz.toZoneId()).parse(data, Instant::from); } - private static Long timeMicrosDeserializer(Object data, Schema schema, Schema.Type primitiveType, LogicalType logicalType) { + private static Long timeMicrosSerializer(Object data, Schema schema, Schema.Type primitiveType, LogicalType logicalType) { LocalTime value; if (data instanceof String) { value = LocalTime.parse((String) data, DateTimeFormatter.ofPattern(AvroSerializer.TIME_FORMAT)); diff --git a/src/test/java/org/akhq/utils/AvroDeserializerTest.java b/src/test/java/org/akhq/utils/AvroDeserializerTest.java index 21094a08d..cd9a06f83 100644 --- a/src/test/java/org/akhq/utils/AvroDeserializerTest.java +++ b/src/test/java/org/akhq/utils/AvroDeserializerTest.java @@ -11,6 +11,7 @@ import java.math.BigDecimal; import java.time.*; import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -40,10 +41,12 @@ static Stream primitiveSource() { Arguments.of(UUID.randomUUID(), "{\"type\": \"string\", \"logicalType\": \"uuid\"}"), Arguments.of(LocalDate.now(), "{\"type\": \"int\", \"logicalType\": \"date\"}"), Arguments.of(LocalTime.now(Clock.tickMillis(ZoneId.of("UTC"))), "{\"type\": \"int\", \"logicalType\": \"time-millis\"}"), - Arguments.of(LocalTime.now(), "{\"type\": \"long\", \"logicalType\": \"time-micros\"}"), + Arguments.of(LocalTime.now().truncatedTo(ChronoUnit.MICROS), "{\"type\": \"long\", \"logicalType\": \"time-micros\"}"), Arguments.of(Instant.now(Clock.tickMillis(ZoneId.of("UTC"))), "{\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}"), - Arguments.of(Instant.now(), "{\"type\": \"long\", \"logicalType\": \"timestamp-micros\"}"), - Arguments.of(Instant.now(), "[\"null\", {\"type\": \"long\", \"logicalType\": \"timestamp-micros\"}]") + Arguments.of(Instant.now().truncatedTo(ChronoUnit.MICROS), "{\"type\": \"long\", \"logicalType\": \"timestamp-micros\"}"), + Arguments.of(Instant.now().truncatedTo(ChronoUnit.MICROS), "[\"null\", {\"type\": \"long\", \"logicalType\": \"timestamp-micros\"}]"), + Arguments.of(LocalDateTime.now().truncatedTo(ChronoUnit.MICROS), "{\"type\": \"long\", \"logicalType\": \"local-timestamp-micros\"}"), + Arguments.of(LocalDateTime.now().truncatedTo(ChronoUnit.MILLIS), "{\"type\": \"long\", \"logicalType\": \"local-timestamp-millis\"}") ); } @@ -70,8 +73,10 @@ void testPrimitive(Object value, String type) { static Stream convertionSource() { UUID uuid = UUID.randomUUID(); - LocalTime localTime = LocalTime.now(); - Instant now = Instant.now(); + LocalTime localTime = LocalTime.now().truncatedTo(ChronoUnit.MICROS); + Instant now = Instant.now().truncatedTo(ChronoUnit.MICROS); + LocalDateTime localDateTimeMicros = LocalDateTime.now().truncatedTo(ChronoUnit.MICROS); + LocalDateTime localDateTimeMillis = LocalDateTime.now().truncatedTo(ChronoUnit.MILLIS); return Stream.of( Arguments.of("abc", "\"bytes\"", "abc".getBytes()), @@ -81,7 +86,9 @@ static Stream convertionSource() { Arguments.of(uuid.toString(), "{\"type\": \"string\", \"logicalType\": \"uuid\"}", uuid), Arguments.of(LocalDate.now().format(DateTimeFormatter.ISO_LOCAL_DATE), "{\"type\": \"int\", \"logicalType\": \"date\"}", LocalDate.now()), Arguments.of(localTime.format(DateTimeFormatter.ISO_LOCAL_TIME), "{\"type\": \"long\", \"logicalType\": \"time-micros\"}", localTime), - Arguments.of(now.atZone(ZoneId.systemDefault()).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME), "{\"type\": \"long\", \"logicalType\": \"timestamp-micros\"}", now) + Arguments.of(now.atZone(ZoneId.systemDefault()).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME), "{\"type\": \"long\", \"logicalType\": \"timestamp-micros\"}", now), + Arguments.of(localDateTimeMicros.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME), "{\"type\": \"long\", \"logicalType\": \"local-timestamp-micros\"}", localDateTimeMicros), + Arguments.of(localDateTimeMillis.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME), "{\"type\": \"long\", \"logicalType\": \"local-timestamp-millis\"}", localDateTimeMillis) ); }