Skip to content

Commit

Permalink
Vectorize decoding of RleBitPackingHybridDecoder in parquet
Browse files Browse the repository at this point in the history
m7g.2xlarge EC2 machine with Temurin JDK 22
BenchmarkRleBitPackingDecoder
(bitWidth)  (dataSet)   Mode  Cnt   Scalar score     Vector score     Units
         1     RANDOM  thrpt   10   650.368 ± 0.148   898.939 ± 0.345 ops/s
         2     RANDOM  thrpt   10   665.379 ± 0.162  1017.777 ± 0.130 ops/s
         3     RANDOM  thrpt   10   570.223 ± 0.275   765.640 ± 0.256 ops/s
         4     RANDOM  thrpt   10   620.299 ± 0.361  1073.897 ± 0.584 ops/s
         5     RANDOM  thrpt   10   560.624 ± 0.148   761.582 ± 0.085 ops/s
         6     RANDOM  thrpt   10   449.945 ± 0.201   741.803 ± 0.321 ops/s
         7     RANDOM  thrpt   10   516.331 ± 0.106   736.790 ± 0.535 ops/s
         8     RANDOM  thrpt   10   620.409 ± 0.907  1087.050 ± 0.340 ops/s
         9     RANDOM  thrpt   10   466.077 ± 0.085   824.186 ± 1.486 ops/s
        10     RANDOM  thrpt   10   462.300 ± 0.104   817.194 ± 1.144 ops/s
        11     RANDOM  thrpt   10   359.207 ± 0.046   810.836 ± 0.714 ops/s
        12     RANDOM  thrpt   10   468.968 ± 0.092   813.104 ± 0.238 ops/s
        13     RANDOM  thrpt   10   347.213 ± 0.089   818.207 ± 1.691 ops/s
        14     RANDOM  thrpt   10   346.392 ± 0.144   807.127 ± 1.213 ops/s
        15     RANDOM  thrpt   10   240.605 ± 0.217   801.025 ± 2.004 ops/s
        16     RANDOM  thrpt   10   506.141 ± 0.192  1149.157 ± 0.561 ops/s
        17     RANDOM  thrpt   10   369.903 ± 0.121   480.297 ± 0.588 ops/s
        18     RANDOM  thrpt   10   367.348 ± 0.085   461.633 ± 0.939 ops/s
        19     RANDOM  thrpt   10   250.100 ± 0.043   468.750 ± 1.367 ops/s
        20     RANDOM  thrpt   10   364.844 ± 0.146   461.019 ± 1.270 ops/s

BenchmarkFlatDefinitionLevelDecoder
(dataGenerator)  (size)  (vectorizedDecodingEnabled)   Mode  Cnt   Score   Error   Units
         RANDOM  100000                        false  thrpt   10  21.903 ± 0.014  ops/ms
         RANDOM  100000                         true  thrpt   10  32.216 ± 0.025  ops/ms

m5.2xlarge EC2 machine with Temurin JDK 22
BenchmarkBooleanColumnReader
(encoding)   Mode  Cnt  Before Score       After Score       Units
     PLAIN  thrpt   20  1420.173 ± 4.133   2710.464 ± 8.410  ops/s
       RLE  thrpt   20   689.606 ± 2.140    738.302 ± 2.089  ops/s
  • Loading branch information
raunaqmorarka committed May 9, 2024
1 parent ac52066 commit 636944c
Show file tree
Hide file tree
Showing 47 changed files with 2,300 additions and 358 deletions.
5 changes: 5 additions & 0 deletions docs/src/main/sphinx/object-storage/file-formats.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,9 @@ with Parquet files performed by supported object storage connectors:
entirely. The equivalent catalog session property is named
`parquet_small_file_threshold`.
- `3MB`
* - `parquet.experimental.vectorized-decoding.enabled`
- Enable using Java Vector API for faster decoding of parquet files.
The equivalent catalog session property is
`parquet_vectorized_decoding_enabled`.
- `true`
:::
20 changes: 20 additions & 0 deletions lib/trino-parquet/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,26 @@
</dependencies>

<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<compilerArgs combine.children="append">
<compilerArg>${extraJavaVectorArgs}</compilerArg>
</compilerArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<configuration>
<additionalOptions combine.self="append">${extraJavaVectorArgs}</additionalOptions>
</configuration>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class ParquetReaderOptions
private final boolean useColumnIndex;
private final boolean useBloomFilter;
private final DataSize smallFileThreshold;
private final boolean vectorizedDecodingEnabled;

public ParquetReaderOptions()
{
Expand All @@ -46,6 +47,7 @@ public ParquetReaderOptions()
useColumnIndex = true;
useBloomFilter = true;
smallFileThreshold = DEFAULT_SMALL_FILE_THRESHOLD;
vectorizedDecodingEnabled = true;
}

private ParquetReaderOptions(
Expand All @@ -56,7 +58,8 @@ private ParquetReaderOptions(
DataSize maxBufferSize,
boolean useColumnIndex,
boolean useBloomFilter,
DataSize smallFileThreshold)
DataSize smallFileThreshold,
boolean vectorizedDecodingEnabled)
{
this.ignoreStatistics = ignoreStatistics;
this.maxReadBlockSize = requireNonNull(maxReadBlockSize, "maxReadBlockSize is null");
Expand All @@ -67,6 +70,7 @@ private ParquetReaderOptions(
this.useColumnIndex = useColumnIndex;
this.useBloomFilter = useBloomFilter;
this.smallFileThreshold = requireNonNull(smallFileThreshold, "smallFileThreshold is null");
this.vectorizedDecodingEnabled = vectorizedDecodingEnabled;
}

public boolean isIgnoreStatistics()
Expand Down Expand Up @@ -94,6 +98,11 @@ public boolean useBloomFilter()
return useBloomFilter;
}

public boolean isVectorizedDecodingEnabled()
{
return vectorizedDecodingEnabled;
}

public DataSize getMaxBufferSize()
{
return maxBufferSize;
Expand All @@ -119,7 +128,8 @@ public ParquetReaderOptions withIgnoreStatistics(boolean ignoreStatistics)
maxBufferSize,
useColumnIndex,
useBloomFilter,
smallFileThreshold);
smallFileThreshold,
vectorizedDecodingEnabled);
}

public ParquetReaderOptions withMaxReadBlockSize(DataSize maxReadBlockSize)
Expand All @@ -132,7 +142,8 @@ public ParquetReaderOptions withMaxReadBlockSize(DataSize maxReadBlockSize)
maxBufferSize,
useColumnIndex,
useBloomFilter,
smallFileThreshold);
smallFileThreshold,
vectorizedDecodingEnabled);
}

public ParquetReaderOptions withMaxReadBlockRowCount(int maxReadBlockRowCount)
Expand All @@ -145,7 +156,8 @@ public ParquetReaderOptions withMaxReadBlockRowCount(int maxReadBlockRowCount)
maxBufferSize,
useColumnIndex,
useBloomFilter,
smallFileThreshold);
smallFileThreshold,
vectorizedDecodingEnabled);
}

public ParquetReaderOptions withMaxMergeDistance(DataSize maxMergeDistance)
Expand All @@ -158,7 +170,8 @@ public ParquetReaderOptions withMaxMergeDistance(DataSize maxMergeDistance)
maxBufferSize,
useColumnIndex,
useBloomFilter,
smallFileThreshold);
smallFileThreshold,
vectorizedDecodingEnabled);
}

public ParquetReaderOptions withMaxBufferSize(DataSize maxBufferSize)
Expand All @@ -171,7 +184,8 @@ public ParquetReaderOptions withMaxBufferSize(DataSize maxBufferSize)
maxBufferSize,
useColumnIndex,
useBloomFilter,
smallFileThreshold);
smallFileThreshold,
vectorizedDecodingEnabled);
}

public ParquetReaderOptions withUseColumnIndex(boolean useColumnIndex)
Expand All @@ -184,7 +198,8 @@ public ParquetReaderOptions withUseColumnIndex(boolean useColumnIndex)
maxBufferSize,
useColumnIndex,
useBloomFilter,
smallFileThreshold);
smallFileThreshold,
vectorizedDecodingEnabled);
}

public ParquetReaderOptions withBloomFilter(boolean useBloomFilter)
Expand All @@ -197,7 +212,8 @@ public ParquetReaderOptions withBloomFilter(boolean useBloomFilter)
maxBufferSize,
useColumnIndex,
useBloomFilter,
smallFileThreshold);
smallFileThreshold,
vectorizedDecodingEnabled);
}

public ParquetReaderOptions withSmallFileThreshold(DataSize smallFileThreshold)
Expand All @@ -210,6 +226,21 @@ public ParquetReaderOptions withSmallFileThreshold(DataSize smallFileThreshold)
maxBufferSize,
useColumnIndex,
useBloomFilter,
smallFileThreshold);
smallFileThreshold,
vectorizedDecodingEnabled);
}

public ParquetReaderOptions withVectorizedDecodingEnabled(boolean vectorizedDecodingEnabled)
{
return new ParquetReaderOptions(
ignoreStatistics,
maxReadBlockSize,
maxReadBlockRowCount,
maxMergeDistance,
maxBufferSize,
useColumnIndex,
useBloomFilter,
smallFileThreshold,
vectorizedDecodingEnabled);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@

import io.trino.memory.context.AggregatedMemoryContext;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.parquet.PrimitiveField;
import io.trino.parquet.reader.decoders.ValueDecoder;
import io.trino.parquet.reader.decoders.ValueDecoders;
import io.trino.parquet.reader.flat.ColumnAdapter;
import io.trino.parquet.reader.flat.FlatColumnReader;
import io.trino.parquet.reader.flat.FlatDefinitionLevelDecoder;
import io.trino.spi.TrinoException;
import io.trino.spi.type.AbstractIntType;
import io.trino.spi.type.AbstractLongType;
Expand Down Expand Up @@ -48,11 +47,13 @@

import static io.trino.parquet.ParquetEncoding.PLAIN;
import static io.trino.parquet.reader.decoders.ValueDecoder.ValueDecodersProvider;
import static io.trino.parquet.reader.decoders.ValueDecoder.createLevelsDecoder;
import static io.trino.parquet.reader.flat.BinaryColumnAdapter.BINARY_ADAPTER;
import static io.trino.parquet.reader.flat.ByteColumnAdapter.BYTE_ADAPTER;
import static io.trino.parquet.reader.flat.DictionaryDecoder.DictionaryDecoderProvider;
import static io.trino.parquet.reader.flat.DictionaryDecoder.getDictionaryDecoder;
import static io.trino.parquet.reader.flat.Fixed12ColumnAdapter.FIXED12_ADAPTER;
import static io.trino.parquet.reader.flat.FlatDefinitionLevelDecoder.getFlatDefinitionLevelDecoder;
import static io.trino.parquet.reader.flat.Int128ColumnAdapter.INT128_ADAPTER;
import static io.trino.parquet.reader.flat.IntColumnAdapter.INT_ADAPTER;
import static io.trino.parquet.reader.flat.LongColumnAdapter.LONG_ADAPTER;
Expand Down Expand Up @@ -83,11 +84,15 @@

public final class ColumnReaderFactory
{
private static final int PREFERRED_BIT_WIDTH = getVectorBitSize();

private final DateTimeZone timeZone;
private final boolean vectorizedDecodingEnabled;

public ColumnReaderFactory(DateTimeZone timeZone)
public ColumnReaderFactory(DateTimeZone timeZone, ParquetReaderOptions readerOptions)
{
this.timeZone = requireNonNull(timeZone, "dateTimeZone is null");
this.vectorizedDecodingEnabled = readerOptions.isVectorizedDecodingEnabled() && isVectorizedDecodingSupported();
}

public ColumnReader create(PrimitiveField field, AggregatedMemoryContext aggregatedMemoryContext)
Expand All @@ -96,7 +101,7 @@ public ColumnReader create(PrimitiveField field, AggregatedMemoryContext aggrega
PrimitiveTypeName primitiveType = field.getDescriptor().getPrimitiveType().getPrimitiveTypeName();
LogicalTypeAnnotation annotation = field.getDescriptor().getPrimitiveType().getLogicalTypeAnnotation();
LocalMemoryContext memoryContext = aggregatedMemoryContext.newLocalMemoryContext(ColumnReader.class.getSimpleName());
ValueDecoders valueDecoders = new ValueDecoders(field);
ValueDecoders valueDecoders = new ValueDecoders(field, vectorizedDecodingEnabled);
if (BOOLEAN.equals(type) && primitiveType == PrimitiveTypeName.BOOLEAN) {
return createColumnReader(field, valueDecoders::getBooleanDecoder, BYTE_ADAPTER, memoryContext);
}
Expand Down Expand Up @@ -272,7 +277,7 @@ && isIntegerOrDecimalPrimitive(primitiveType)) {
throw unsupportedException(type, field);
}

private static <T> ColumnReader createColumnReader(
private <T> ColumnReader createColumnReader(
PrimitiveField field,
ValueDecodersProvider<T> decodersProvider,
ColumnAdapter<T> columnAdapter,
Expand All @@ -282,20 +287,21 @@ private static <T> ColumnReader createColumnReader(
dictionaryPage,
columnAdapter,
decodersProvider.create(PLAIN),
isNonNull);
isNonNull,
vectorizedDecodingEnabled);
if (isFlatColumn(field)) {
return new FlatColumnReader<>(
field,
decodersProvider,
FlatDefinitionLevelDecoder::getFlatDefinitionLevelDecoder,
maxDefinitionLevel -> getFlatDefinitionLevelDecoder(maxDefinitionLevel, vectorizedDecodingEnabled),
dictionaryDecoderProvider,
columnAdapter,
memoryContext);
}
return new NestedColumnReader<>(
field,
decodersProvider,
ValueDecoder::createLevelsDecoder,
maxLevel -> createLevelsDecoder(maxLevel, vectorizedDecodingEnabled),
dictionaryDecoderProvider,
columnAdapter,
memoryContext);
Expand Down Expand Up @@ -355,4 +361,23 @@ private static TrinoException unsupportedException(Type type, PrimitiveField fie
{
return new TrinoException(NOT_SUPPORTED, format("Unsupported Trino column type (%s) for Parquet column (%s)", type, field.getDescriptor()));
}

private static boolean isVectorizedDecodingSupported()
{
// Performance gains with vectorized decoding are validated only when the hardware platform provides at least 256 bit width registers
// Graviton 2 machines return false here, whereas x86 and Graviton 3 machines return true
return PREFERRED_BIT_WIDTH >= 256;
}

// get VectorShape bit size via reflection to avoid requiring the preview feature is enabled
private static int getVectorBitSize()
{
try {
Class<?> clazz = Class.forName("jdk.incubator.vector.VectorShape");
return (int) clazz.getMethod("vectorBitSize").invoke(clazz.getMethod("preferredShape").invoke(null));
}
catch (Throwable e) {
return -1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public ParquetReader(
this.primitiveFields = getPrimitiveFields(columnFields.stream().map(Column::field).collect(toImmutableList()));
this.rowGroups = requireNonNull(rowGroups, "rowGroups is null");
this.dataSource = requireNonNull(dataSource, "dataSource is null");
this.columnReaderFactory = new ColumnReaderFactory(timeZone);
this.columnReaderFactory = new ColumnReaderFactory(timeZone, options);
this.memoryContext = requireNonNull(memoryContext, "memoryContext is null");
this.currentRowGroupMemoryContext = memoryContext.newAggregatedMemoryContext();
this.options = requireNonNull(options, "options is null");
Expand Down
Loading

0 comments on commit 636944c

Please sign in to comment.