From efe5d34a4b086a0efbd3e64f998853ca221a591c Mon Sep 17 00:00:00 2001 From: Jiashen Zhang Date: Tue, 26 Sep 2023 22:36:01 -0700 Subject: [PATCH] Address comments and add more tests --- .../column/statistics/Float16Statistics.java | 17 +- .../parquet/column/statistics/Statistics.java | 16 +- .../parquet/schema/PrimitiveComparator.java | 3 +- .../apache/parquet/schema/PrimitiveType.java | 5 - .../schema/TestPrimitiveComparator.java | 22 +-- .../java/org/apache/parquet/type/Float16.java | 10 +- .../type/InvalidFloat16ValueException.java | 3 +- .../org/apache/parquet/type/TestFloat16.java | 19 ++- .../statistics/TestFloat16Statistics.java | 157 ++++++++++++++++++ 9 files changed, 204 insertions(+), 48 deletions(-) create mode 100644 parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestFloat16Statistics.java diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/Float16Statistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/Float16Statistics.java index 09f3418417..014f2d3409 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/Float16Statistics.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/Float16Statistics.java @@ -19,24 +19,9 @@ package org.apache.parquet.column.statistics; -import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; -import org.apache.parquet.schema.Types; - -public class Float16Statistics extends BinaryStatistics -{ - // A fake type object to be used to generate the proper comparator - private static final PrimitiveType DEFAULT_FAKE_TYPE = Types.optional(PrimitiveType.PrimitiveTypeName.BINARY) - .named("fake_binary_float16_type").withLogicalTypeAnnotation(LogicalTypeAnnotation.float16Type()); - - /** - * @deprecated will be removed in 2.0.0. Use {@link Statistics#createStats(org.apache.parquet.schema.Type)} instead - */ - @Deprecated - public Float16Statistics() { - this(DEFAULT_FAKE_TYPE); - } +public class Float16Statistics extends BinaryStatistics { Float16Statistics(PrimitiveType type) { super(type); } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java index a0c8edae13..05b89ebfff 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java @@ -144,7 +144,7 @@ public Statistics build() { private static class Float16Builder extends Builder { public Float16Builder(PrimitiveType type) { super(type); - assert type.getPrimitiveTypeName() == PrimitiveTypeName.BINARY; + assert type.getPrimitiveTypeName() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; } @Override @@ -157,18 +157,18 @@ public Statistics build() { short max = Float16.fromBytesLittleEndian(bMax.getBytes()); // Drop min/max values in case of NaN as the sorting order of values is undefined for this case if (Float16.isNaN(min) || Float16.isNaN(max)) { - bMin = Binary.fromConstantByteArray(Float16.toBytesLittleEndian(Float16.POSITIVE_ZERO)); - bMax = Binary.fromConstantByteArray(Float16.toBytesLittleEndian(Float16.POSITIVE_ZERO)); + bMin = Binary.fromConstantByteArray(Float16.POSITIVE_ZERO_BYTES_LITTLE_ENDIAN); + bMax = Binary.fromConstantByteArray(Float16.POSITIVE_ZERO_BYTES_LITTLE_ENDIAN); stats.setMinMax(bMin, bMax); ((Statistics) stats).hasNonNullValue = false; } else { // Updating min to -0.0 and max to +0.0 to ensure that no 0.0 values would be skipped - if (Float16.equals(min, Float16.POSITIVE_ZERO)) { - bMin = Binary.fromConstantByteArray(Float16.toBytesLittleEndian(Float16.NEGATIVE_ZERO)); + if (min == Float16.POSITIVE_ZERO) { + bMin = Binary.fromConstantByteArray(Float16.NEGATIVE_ZERO_BYTES_LITTLE_ENDIAN); stats.setMinMax(bMin, bMax); } - if (Float16.equals(max, Float16.NEGATIVE_ZERO)) { - bMax = Binary.fromConstantByteArray(Float16.toBytesLittleEndian(Float16.POSITIVE_ZERO)); + if (max == Float16.NEGATIVE_ZERO) { + bMax = Binary.fromConstantByteArray(Float16.POSITIVE_ZERO_BYTES_LITTLE_ENDIAN); stats.setMinMax(bMin, bMax); } } @@ -268,7 +268,7 @@ public static Builder getBuilderForReading(PrimitiveType type) { return new FloatBuilder(type); case DOUBLE: return new DoubleBuilder(type); - case BINARY: + case FIXED_LEN_BYTE_ARRAY: LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation(); if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.Float16LogicalTypeAnnotation) { return new Float16Builder(type); diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java index 78117f6c66..b920db755e 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java @@ -285,8 +285,7 @@ public String toString() { static final PrimitiveComparator BINARY_AS_FLOAT16_COMPARATOR = new BinaryComparator() { @Override - int compareBinary(Binary b1, Binary b2) - { + int compareBinary(Binary b1, Binary b2) { return Float16.compare(Float16.fromBytesLittleEndian(b1.getBytes()), Float16.fromBytesLittleEndian(b2.getBytes())); } diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java index 396d6cb98f..0f65310c25 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java @@ -260,11 +260,6 @@ public Optional visit(LogicalTypeAnnotation.JsonLogicalType public Optional visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) { return of(PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR); } - - @Override - public Optional visit(LogicalTypeAnnotation.Float16LogicalTypeAnnotation float16LogicalType) { - return of(PrimitiveComparator.BINARY_AS_FLOAT16_COMPARATOR); - } }).orElseThrow(() -> new ShouldNeverHappenException("No comparator logic implemented for BINARY logical type: " + logicalType)); } }, diff --git a/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java b/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java index 9e92d95aee..156a9ca04b 100644 --- a/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java +++ b/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java @@ -274,16 +274,14 @@ public void testBinaryAsSignedIntegerComparatorWithEquals() { @Test public void testFloat16Comparator() { short[] valuesInAscendingOrder = { - (short) 0xfc00, - Float16.MIN_VALUE, - -Float16.MAX_VALUE, - (short) 0xc000, - -Float16.MIN_VALUE, - 0, - Float16.MIN_VALUE, - (short) 0x7bff, - Float16.MAX_VALUE, - (short) 0x7c00}; + (short) 0xfc00, // -Infinity + (short) 0xc000, // -2.0 + -Float16.MAX_VALUE, // -6.109476E-5 + Float16.NEGATIVE_ZERO, // -0 + Float16.POSITIVE_ZERO, // +0 + Float16.MIN_VALUE, // 5.9604645E-8 + Float16.MAX_VALUE, // 65504.0 + (short) 0x7c00}; // Infinity for (int i = 0; i < valuesInAscendingOrder.length; ++i) { for (int j = 0; j < valuesInAscendingOrder.length; ++j) { @@ -297,6 +295,10 @@ public void testFloat16Comparator() { float fj = Float16.toFloat(vj); assertEquals(Float.compare(fi, fj), BINARY_AS_FLOAT16_COMPARATOR.compare( Binary.fromConstantByteArray(bbi.array()), Binary.fromConstantByteArray(bbj.array()))); + if (i < j) { + assertEquals(-1, Float.compare(fi, fj)); + } + } } } diff --git a/parquet-common/src/main/java/org/apache/parquet/type/Float16.java b/parquet-common/src/main/java/org/apache/parquet/type/Float16.java index bb2dad686f..1abd8c94c8 100644 --- a/parquet-common/src/main/java/org/apache/parquet/type/Float16.java +++ b/parquet-common/src/main/java/org/apache/parquet/type/Float16.java @@ -20,6 +20,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.util.Arrays; /** * The class is a utility class to manipulate half-precision 16-bit @@ -46,8 +47,7 @@ * floating points (float32). * Ref: https://android.googlesource.com/platform/libcore/+/master/luni/src/main/java/libcore/util/FP16.java */ -public class Float16 -{ +public class Float16 { // Smallest negative value a half-precision float may have. public static final short LOWEST_VALUE = (short) 0xfbff; // Maximum positive finite value a half-precision float may have. @@ -60,6 +60,10 @@ public class Float16 public static final short POSITIVE_ZERO = (short) 0x0000; // Negative 0 of type half-precision float. public static final short NEGATIVE_ZERO = (short) 0x8000; + // Byte array in little endian for positive 0 of type half-precision float. + public static final byte[] POSITIVE_ZERO_BYTES_LITTLE_ENDIAN = Float16.toBytesLittleEndian(Float16.POSITIVE_ZERO); + // Byte array in little endian for negative 0 of type half-precision float. + public static final byte[] NEGATIVE_ZERO_BYTES_LITTLE_ENDIAN = Float16.toBytesLittleEndian(Float16.NEGATIVE_ZERO); // A Not-a-Number representation of a half-precision float. static final short NaN = (short) 0x7e00; // Positive infinity of type half-precision float. @@ -304,7 +308,7 @@ public static int compare(short x, short y) { */ public static short fromBytesLittleEndian(byte[] bytes) { if (bytes.length != 2) { - throw new InvalidFloat16ValueException(String.valueOf(bytes)); + throw new InvalidFloat16ValueException(Arrays.toString(bytes)); } ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN); diff --git a/parquet-common/src/main/java/org/apache/parquet/type/InvalidFloat16ValueException.java b/parquet-common/src/main/java/org/apache/parquet/type/InvalidFloat16ValueException.java index 883697024b..b38a27e52f 100644 --- a/parquet-common/src/main/java/org/apache/parquet/type/InvalidFloat16ValueException.java +++ b/parquet-common/src/main/java/org/apache/parquet/type/InvalidFloat16ValueException.java @@ -23,8 +23,7 @@ /** * Thrown if Binary is invalid as a Float16 value. */ -public class InvalidFloat16ValueException extends ParquetRuntimeException -{ +public class InvalidFloat16ValueException extends ParquetRuntimeException { private static final long serialVersionUID = 1L; public InvalidFloat16ValueException(String message) { diff --git a/parquet-common/src/test/java/org/apache/parquet/type/TestFloat16.java b/parquet-common/src/test/java/org/apache/parquet/type/TestFloat16.java index adcc898451..510d33a8c8 100644 --- a/parquet-common/src/test/java/org/apache/parquet/type/TestFloat16.java +++ b/parquet-common/src/test/java/org/apache/parquet/type/TestFloat16.java @@ -35,9 +35,9 @@ import static org.apache.parquet.type.Float16.NaN; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; -public class TestFloat16 -{ +public class TestFloat16 { @Test public void testFloat16ToFloat() { // Zeroes @@ -241,4 +241,19 @@ public void testCompare() { assertEquals(1, Float16.compare(toFloat16(12.462f), toFloat16(-12.462f))); assertEquals(-1, Float16.compare(toFloat16(-12.462f), toFloat16(12.462f))); } + + @Test + public void testFromBytesLittleEndian() { + // bytes of 0xfbff stored in Little-Endian + byte[] float16Value = new byte[] {-1, -5}; + short h = Float16.fromBytesLittleEndian(float16Value); + assertEquals(LOWEST_VALUE, h); + byte[] wrongFloat16Bytes = new byte[] {0, 0, 0}; + try { + Float16.fromBytesLittleEndian(wrongFloat16Bytes); + fail("Invalid float16 value"); + } catch (InvalidFloat16ValueException e) { + assertTrue(e.getMessage().contains("[0, 0, 0] is invalid as a Float16 value")); + } + } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestFloat16Statistics.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestFloat16Statistics.java new file mode 100644 index 0000000000..853bb60205 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestFloat16Statistics.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.statistics; + +import org.apache.parquet.Preconditions; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.GroupFactory; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Types; +import org.apache.parquet.type.Float16; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.util.HadoopInputFile; + +import static org.apache.parquet.schema.LogicalTypeAnnotation.float16Type; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; +import static org.junit.Assert.assertEquals; + +public class TestFloat16Statistics { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private short[] valuesInAscendingOrder = { + (short) 0xfc00, // -Infinity + (short) 0xc000, // -2.0 + -Float16.MAX_VALUE, // -6.109476E-5 + Float16.NEGATIVE_ZERO, // -0 + Float16.POSITIVE_ZERO, // +0 + Float16.MIN_VALUE, // 5.9604645E-8 + Float16.MAX_VALUE, // 65504.0 + (short) 0x7c00}; // Infinity + + @Test + public void testFloat16ColumnIndex() throws IOException + { + MessageType schema = Types.buildMessage(). + required(FIXED_LEN_BYTE_ARRAY).as(float16Type()).length(2).named("col_float16").named("msg"); + + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(schema, conf); + + GroupFactory factory = new SimpleGroupFactory(schema); + Path path = newTempPath(); + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withConf(conf) + .withDictionaryEncoding(false) + .build()) { + + for (short value : valuesInAscendingOrder) { + writer.write(factory.newGroup().append("col_float16", Binary.fromConstantByteArray(Float16.toBytesLittleEndian(value)))); + } + } + + try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()))) { + + ColumnChunkMetaData column = reader.getFooter().getBlocks().get(0).getColumns().get(0); + ColumnIndex index = reader.readColumnIndex(column); + assertEquals(Collections.singletonList((short) 0xfc00), toFloat16List(index.getMinValues())); + assertEquals(Collections.singletonList((short) 0x7c00), toFloat16List(index.getMaxValues())); + } + } + + @Test + public void testFloat16Statistics() throws IOException { + for (int i = 0; i < valuesInAscendingOrder.length; ++i) { + for (int j = 0; j < valuesInAscendingOrder.length; ++j) { + int minIndex = i; + int maxIndex = j; + + if (Float16.compare(valuesInAscendingOrder[i], valuesInAscendingOrder[j]) > 0) { + minIndex = j; + maxIndex = i; + } + + // Refer to Float16Builder class + if (valuesInAscendingOrder[minIndex] == Float16.POSITIVE_ZERO) { + minIndex = 3; + } + if (valuesInAscendingOrder[maxIndex] == Float16.NEGATIVE_ZERO) { + maxIndex = 4; + } + + MessageType schema = Types.buildMessage(). + required(FIXED_LEN_BYTE_ARRAY).as(float16Type()).length(2).named("col_float16").named("msg"); + + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(schema, conf); + + GroupFactory factory = new SimpleGroupFactory(schema); + Path path = newTempPath(); + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withConf(conf) + .withDictionaryEncoding(false) + .build()) { + writer.write(factory.newGroup().append("col_float16", Binary.fromConstantByteArray(Float16.toBytesLittleEndian(valuesInAscendingOrder[i])))); + writer.write(factory.newGroup().append("col_float16", Binary.fromConstantByteArray(Float16.toBytesLittleEndian(valuesInAscendingOrder[j])))); + } + + try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()))) { + ColumnChunkMetaData column = reader.getFooter().getBlocks().get(0).getColumns().get(0); + Statistics statistics = column.getStatistics(); + + assertEquals(valuesInAscendingOrder[minIndex], Float16.fromBytesLittleEndian(statistics.getMinBytes())); + assertEquals(valuesInAscendingOrder[maxIndex], Float16.fromBytesLittleEndian(statistics.getMaxBytes())); + } + } + } + } + + private Path newTempPath() throws IOException { + File file = temp.newFile(); + Preconditions.checkArgument(file.delete(), "Could not remove temp file"); + return new Path(file.getAbsolutePath()); + } + + private static List toFloat16List(List buffers) { + return buffers.stream() + .map(buffer -> Float16.fromBytesLittleEndian(buffer.array())) + .collect(Collectors.toList()); + } +}