Skip to content

Commit

Permalink
Avoid reading underlying stream when seeking RLE decoders (#7918)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #7918

`seekToRowGroup` is meant to be a lightweight operation and should not
read the underlying stream.  Currently this is not the case with RLE decoders,
because we eagerly skip the values after seeking, which requires read the RLE
headers from the encoded data.  We have pattern that seek multiple times before
actually reading the data, and the IO and decompression cost of seeking can be
avoided.  This is especially visible in low selectivity queries where
decompression takes majority of the CPU time.

Optimize these repeated IO and decompression away by lazily skipping values in
RLE decoders.  For a typical low selectivity query, this improve the CPU time
from 69.84 hours to 40.2 hours (Presto Java is taking 41.45 hours on the same
query).

Reviewed By: oerling

Differential Revision: D51947121

fbshipit-source-id: 3c242d87b16deba895b659a4c9e407e549e46b05
  • Loading branch information
Yuhta authored and facebook-github-bot committed Dec 11, 2023
1 parent 16e1af9 commit 4617100
Show file tree
Hide file tree
Showing 17 changed files with 229 additions and 193 deletions.
10 changes: 2 additions & 8 deletions velox/dwio/common/DirectDecoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,21 @@ void DirectDecoder<isSigned>::seekToRowGroup(
IntDecoder<isSigned>::inputStream->seekToPosition(location);
// force a re-read from the stream
IntDecoder<isSigned>::bufferEnd = IntDecoder<isSigned>::bufferStart;
this->pendingSkip = 0;
}

template void DirectDecoder<true>::seekToRowGroup(
dwio::common::PositionProvider& location);
template void DirectDecoder<false>::seekToRowGroup(
dwio::common::PositionProvider& location);

template <bool isSigned>
void DirectDecoder<isSigned>::skip(uint64_t numValues) {
IntDecoder<isSigned>::skipLongs(numValues);
}

template void DirectDecoder<true>::skip(uint64_t numValues);
template void DirectDecoder<false>::skip(uint64_t numValues);

template <bool isSigned>
template <typename T>
void DirectDecoder<isSigned>::nextValues(
T* data,
uint64_t numValues,
const uint64_t* nulls) {
skipPending();
uint64_t position = 0;
// skipNulls()
if (nulls) {
Expand Down
37 changes: 15 additions & 22 deletions velox/dwio/common/DirectDecoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#pragma once

#include "velox/common/base/Nulls.h"
#include "velox/dwio/common/DecoderUtil.h"
#include "velox/dwio/common/IntDecoder.h"

Expand All @@ -36,7 +35,13 @@ class DirectDecoder : public IntDecoder<isSigned> {

void seekToRowGroup(dwio::common::PositionProvider&) override;

void skip(uint64_t numValues) override;
using IntDecoder<isSigned>::skip;

void skipPending() final {
auto toSkip = this->pendingSkip;
this->pendingSkip = 0;
this->skipLongs(toSkip);
}

template <typename T>
void nextValues(
Expand All @@ -51,25 +56,12 @@ class DirectDecoder : public IntDecoder<isSigned> {
nextValues<int64_t>(data, numValues, nulls);
}

template <bool hasNulls>
inline void skip(
int32_t numValues,
int32_t current,
const uint64_t* FOLLY_NULLABLE nulls) {
if (!numValues) {
return;
}
if (hasNulls) {
numValues = bits::countNonNulls(nulls, current, current + numValues);
}
IntDecoder<isSigned>::skipLongsFast(numValues);
}

template <bool hasNulls, typename Visitor>
void readWithVisitor(
const uint64_t* FOLLY_NULLABLE nulls,
Visitor visitor,
bool useFastPath = true) {
skipPending();
if constexpr (!std::is_same_v<typename Visitor::DataType, int128_t>) {
if (useFastPath &&
dwio::common::useFastPath<Visitor, hasNulls>(visitor)) {
Expand All @@ -78,7 +70,7 @@ class DirectDecoder : public IntDecoder<isSigned> {
}
}
int32_t current = visitor.start();
skip<hasNulls>(current, 0, nulls);
this->template skip<hasNulls>(current, 0, nulls);
const bool allowNulls = hasNulls && visitor.allowNulls();
for (;;) {
bool atEnd = false;
Expand All @@ -87,7 +79,7 @@ class DirectDecoder : public IntDecoder<isSigned> {
if (!allowNulls) {
toSkip = visitor.checkAndSkipNulls(nulls, current, atEnd);
if (!Visitor::dense) {
skip<false>(toSkip, current, nullptr);
this->template skip<false>(toSkip, current, nullptr);
}
if (atEnd) {
return;
Expand All @@ -113,7 +105,7 @@ class DirectDecoder : public IntDecoder<isSigned> {
skip:
++current;
if (toSkip) {
skip<hasNulls>(toSkip, current, nulls);
this->template skip<hasNulls>(toSkip, current, nulls);
current += toSkip;
}
if (atEnd) {
Expand Down Expand Up @@ -141,6 +133,7 @@ class DirectDecoder : public IntDecoder<isSigned> {
// buffer. If the element would straddle buffers, it is copied to
// *temp and temp is returned.
const void* FOLLY_NONNULL readFixed(int32_t size, void* FOLLY_NONNULL temp) {
skipPending();
auto ptr = super::bufferStart;
if (ptr && ptr + size <= super::bufferEnd) {
super::bufferStart += size;
Expand Down Expand Up @@ -200,7 +193,7 @@ class DirectDecoder : public IntDecoder<isSigned> {
visitor.setHasNulls();
}
if (innerVector->empty()) {
skip<false>(tailSkip, 0, nullptr);
this->template skip<false>(tailSkip, 0, nullptr);
visitor.setAllNull(hasFilter ? 0 : numRows);
return;
}
Expand All @@ -211,7 +204,7 @@ class DirectDecoder : public IntDecoder<isSigned> {
} else {
super::bulkReadRows(*innerVector, data);
}
skip<false>(tailSkip, 0, nullptr);
this->template skip<false>(tailSkip, 0, nullptr);
auto dataRows = innerVector
? folly::Range<const int*>(innerVector->data(), innerVector->size())
: folly::Range<const int32_t*>(rows, outerVector->size());
Expand Down Expand Up @@ -239,7 +232,7 @@ class DirectDecoder : public IntDecoder<isSigned> {
super::bufferEnd,
visitor.filter(),
visitor.hook());
skip<false>(tailSkip, 0, nullptr);
this->template skip<false>(tailSkip, 0, nullptr);
}
} else {
if (super::useVInts) {
Expand Down
12 changes: 9 additions & 3 deletions velox/dwio/common/IntDecoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ FOLLY_ALWAYS_INLINE void IntDecoder<isSigned>::skipVarints(uint64_t items) {
template <bool isSigned>
FOLLY_ALWAYS_INLINE uint64_t
IntDecoder<isSigned>::skipVarintsInBuffer(uint64_t items) {
VELOX_DCHECK_EQ(pendingSkip, 0);
static constexpr uint64_t kVarintMask = 0x8080808080808080L;
if (bufferStart == bufferEnd) {
const void* bufferPointer;
Expand Down Expand Up @@ -70,20 +71,22 @@ IntDecoder<isSigned>::skipVarintsInBuffer(uint64_t items) {
}

template <bool isSigned>
void IntDecoder<isSigned>::skipLongsFast(uint64_t numValues) {
void IntDecoder<isSigned>::skipLongs(uint64_t numValues) {
VELOX_DCHECK_EQ(pendingSkip, 0);
if (useVInts) {
skipVarints(numValues);
} else {
skipBytes(numValues * numBytes, inputStream.get(), bufferStart, bufferEnd);
}
}

template void IntDecoder<true>::skipLongsFast(uint64_t numValues);
template void IntDecoder<false>::skipLongsFast(uint64_t numValues);
template void IntDecoder<true>::skipLongs(uint64_t numValues);
template void IntDecoder<false>::skipLongs(uint64_t numValues);

template <bool isSigned>
template <typename T>
void IntDecoder<isSigned>::bulkReadFixed(uint64_t size, T* result) {
VELOX_DCHECK_EQ(pendingSkip, 0);
if (isSigned) {
switch (numBytes) {
case 2:
Expand Down Expand Up @@ -127,6 +130,7 @@ void IntDecoder<isSigned>::bulkReadRowsFixed(
RowSet rows,
int32_t initialRow,
T* result) {
VELOX_DCHECK_EQ(pendingSkip, 0);
if (isSigned) {
switch (numBytes) {
case 2:
Expand Down Expand Up @@ -915,6 +919,7 @@ FOLLY_ALWAYS_INLINE void varintSwitch(
template <bool isSigned>
template <typename T>
void IntDecoder<isSigned>::bulkRead(uint64_t size, T* result) {
skipPending();
if (!useVInts) {
bulkReadFixed(size, result);
return;
Expand Down Expand Up @@ -966,6 +971,7 @@ void IntDecoder<isSigned>::bulkReadRows(
RowSet rows,
T* result,
int32_t initialRow) {
skipPending();
if (!useVInts) {
bulkReadRowsFixed(rows, initialRow, result);
return;
Expand Down
57 changes: 35 additions & 22 deletions velox/dwio/common/IntDecoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <folly/Likely.h>
#include <folly/Range.h>
#include <folly/Varint.h>
#include "velox/common/base/Nulls.h"
#include "velox/common/encode/Coding.h"
#include "velox/dwio/common/IntCodecCommon.h"
#include "velox/dwio/common/SeekableInputStream.h"
Expand Down Expand Up @@ -52,18 +53,24 @@ class IntDecoder {
virtual ~IntDecoder() = default;

/**
* Seek to a specific row group.
* Seek to a specific row group. Should not read the underlying input stream
* to avoid decoding same data multiple times.
*/
virtual void seekToRowGroup(
dwio::common::PositionProvider& positionProvider) = 0;

/**
* Seek over a given number of values.
* Seek over a given number of values. Does not decode the underlying input
* stream.
*/
virtual void skip(uint64_t numValues) = 0;
void skip(uint64_t numValues) {
pendingSkip += numValues;
}

/**
* Read a number of values into the batch.
* Read a number of values into the batch. Should call skipPending() in the
* beginning.
*
* @param data the array to read into
* @param numValues the number of values to read
* @param nulls If the pointer is null, all values are read. If the
Expand Down Expand Up @@ -117,18 +124,10 @@ class IntDecoder {
* Load RowIndex values for the stream being read.
* @return updated start index after this stream's index values.
*/
size_t loadIndices(size_t startIndex) {
size_t loadIndices(size_t startIndex) const {
return inputStream->positionSize() + startIndex + 1;
}

void skipLongs(uint64_t numValues) {
skipLongsFast(numValues);
}

// Optimized variant of skipLongs using popcnt. Used on selective
// path only pending validation.
void skipLongsFast(uint64_t numValues);

// Reads 'size' consecutive T' and stores then in 'result'.
template <typename T>
void bulkRead(uint64_t size, T* FOLLY_NONNULL result);
Expand All @@ -142,6 +141,22 @@ class IntDecoder {
bulkReadRows(RowSet rows, T* FOLLY_NONNULL result, int32_t initialRow = 0);

protected:
// Actually skip the pending entries.
virtual void skipPending() = 0;

template <bool kHasNulls>
inline void skip(int32_t numValues, int32_t current, const uint64_t* nulls) {
if constexpr (kHasNulls) {
numValues = bits::countNonNulls(nulls, current, current + numValues);
}
pendingSkip += numValues;
if (pendingSkip > 0) {
skipPending();
}
}

void skipLongs(uint64_t numValues);

template <typename T>
void bulkReadFixed(uint64_t size, T* FOLLY_NONNULL result);

Expand All @@ -163,21 +178,12 @@ class IntDecoder {
template <typename cppType>
cppType readLittleEndianFromBigEndian();

// Applies 'visitor to 'numRows' consecutive values.
template <typename Visitor>
void readDense(int32_t numRows, Visitor& visitor) {
auto data = visitor.mutableValues(numRows);
bulkRead(numRows, data);
visitor.processN(data, numRows);
}

private:
uint64_t skipVarintsInBuffer(uint64_t items);
void skipVarints(uint64_t items);
int128_t readVsHugeInt();
uint128_t readVuHugeInt();

protected:
// note: there is opportunity for performance gains here by avoiding
// this by directly supporting deserialization into the correct
// target data type
Expand Down Expand Up @@ -205,16 +211,19 @@ class IntDecoder {
}
}

protected:
const std::unique_ptr<dwio::common::SeekableInputStream> inputStream;
const char* FOLLY_NULLABLE bufferStart;
const char* FOLLY_NULLABLE bufferEnd;
const bool useVInts;
const uint32_t numBytes;
bool bigEndian;
int64_t pendingSkip = 0;
};

template <bool isSigned>
FOLLY_ALWAYS_INLINE signed char IntDecoder<isSigned>::readByte() {
VELOX_DCHECK_EQ(pendingSkip, 0);
if (UNLIKELY(bufferStart == bufferEnd)) {
int32_t bufferLength;
const void* bufferPointer;
Expand All @@ -230,6 +239,7 @@ FOLLY_ALWAYS_INLINE signed char IntDecoder<isSigned>::readByte() {

template <bool isSigned>
FOLLY_ALWAYS_INLINE uint64_t IntDecoder<isSigned>::readVuLong() {
VELOX_DCHECK_EQ(pendingSkip, 0);
if (LIKELY(bufferEnd - bufferStart >= folly::kMaxVarintLength64)) {
const char* p = bufferStart;
uint64_t val;
Expand Down Expand Up @@ -317,6 +327,7 @@ FOLLY_ALWAYS_INLINE int64_t IntDecoder<isSigned>::readVsLong() {

template <bool isSigned>
inline int64_t IntDecoder<isSigned>::readLongLE() {
VELOX_DCHECK_EQ(pendingSkip, 0);
int64_t result = 0;
if (bufferStart && bufferStart + sizeof(int64_t) <= bufferEnd) {
bufferStart += numBytes;
Expand Down Expand Up @@ -357,6 +368,7 @@ inline int64_t IntDecoder<isSigned>::readLongLE() {
template <bool isSigned>
template <typename cppType>
inline cppType IntDecoder<isSigned>::readLittleEndianFromBigEndian() {
VELOX_DCHECK_EQ(pendingSkip, 0);
cppType bigEndianValue = 0;
// Input is in Big Endian layout of size numBytes.
if (bufferStart && bufferStart + sizeof(int64_t) <= bufferEnd) {
Expand Down Expand Up @@ -413,6 +425,7 @@ inline int128_t IntDecoder<isSigned>::readVsHugeInt() {

template <bool isSigned>
inline uint128_t IntDecoder<isSigned>::readVuHugeInt() {
VELOX_DCHECK_EQ(pendingSkip, 0);
uint128_t value = 0;
uint128_t work;
uint32_t offset = 0;
Expand Down
1 change: 1 addition & 0 deletions velox/dwio/common/SeekableInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ bool SeekableArrayInputStream::Next(const void** buffer, int32_t* size) {
*buffer = data + position;
*size = static_cast<int32_t>(currentSize);
position += currentSize;
totalRead_ += currentSize;
return true;
}

Expand Down
7 changes: 7 additions & 0 deletions velox/dwio/common/SeekableInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class SeekableArrayInputStream : public SeekableInputStream {
uint64_t length;
uint64_t position;
uint64_t blockSize;
int64_t totalRead_ = 0;
void loadIfAvailable();

public:
Expand Down Expand Up @@ -107,6 +108,12 @@ class SeekableArrayInputStream : public SeekableInputStream {
virtual void seekToPosition(PositionProvider& position) override;
virtual std::string getName() const override;
virtual size_t positionSize() override;

/// Return the total number of bytes returned from Next() calls. Intended to
/// be used for test validation.
int64_t totalRead() const {
return totalRead_;
}
};

/**
Expand Down
Loading

0 comments on commit 4617100

Please sign in to comment.