Skip to content

Commit

Permalink
make encoded & decoded values consistent
Browse files Browse the repository at this point in the history
  • Loading branch information
wgtmac committed Dec 6, 2022
1 parent 4eaaacb commit 19b5768
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 37 deletions.
103 changes: 72 additions & 31 deletions cpp/src/parquet/page_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,47 @@ namespace {

template <typename DType>
void Decode(std::unique_ptr<typename EncodingTraits<DType>::Decoder>& decoder,
const std::string& src, typename DType::c_type* dst) {
decoder->SetData(/*num_values=*/1, reinterpret_cast<const uint8_t*>(src.c_str()),
static_cast<int>(src.size()));
decoder->Decode(dst, /*max_values=*/1);
const std::string& input, std::vector<typename DType::c_type>& output,
size_t index) {
if (ARROW_PREDICT_FALSE(index >= output.size())) {
throw ParquetException("Index out of bound");
}

decoder->SetData(/*num_values=*/1, reinterpret_cast<const uint8_t*>(input.c_str()),
static_cast<int>(input.size()));
decoder->Decode(&output[index], /*max_values=*/1);
}

template <>
void Decode<ByteArrayType>(std::unique_ptr<ByteArrayDecoder>&, const std::string& src,
ByteArray* dst) {
DCHECK_LE(src.size(), static_cast<size_t>(std::numeric_limits<uint32_t>::max()));
dst->len = static_cast<uint32_t>(src.size());
dst->ptr = reinterpret_cast<const uint8_t*>(src.data());
void Decode<BooleanType>(std::unique_ptr<BooleanDecoder>& decoder,
const std::string& input, std::vector<bool>& output,
size_t index) {
if (ARROW_PREDICT_FALSE(index >= output.size())) {
throw ParquetException("Index out of bound");
}

bool value;
decoder->SetData(/*num_values=*/1, reinterpret_cast<const uint8_t*>(input.c_str()),
static_cast<int>(input.size()));
decoder->Decode(&value, /*max_values=*/1);
output[index] = value;
}

template <>
void Decode<ByteArrayType>(std::unique_ptr<ByteArrayDecoder>&, const std::string& input,
std::vector<ByteArray>& output, size_t index) {
if (ARROW_PREDICT_FALSE(index >= output.size())) {
throw ParquetException("Index out of bound");
}

if (ARROW_PREDICT_FALSE(input.size() >
static_cast<size_t>(std::numeric_limits<uint32_t>::max()))) {
throw ParquetException("Invalid encoded byte array length");
}

auto& decoded = output.at(index);
decoded.len = static_cast<uint32_t>(input.size());
decoded.ptr = reinterpret_cast<const uint8_t*>(input.data());
}

template <typename DType>
Expand All @@ -56,27 +85,39 @@ class TypedColumnIndexImpl : public TypedColumnIndex<DType> {
TypedColumnIndexImpl(const ColumnDescriptor& descr,
const format::ColumnIndex& column_index)
: column_index_(column_index) {
int32_t num_non_null_pages = std::accumulate(
// Make sure the number of pages is valid and it does not overflow to int32_t.
if (column_index_.null_pages.size() != column_index_.min_values.size() ||
column_index_.min_values.size() != column_index_.max_values.size() ||
ARROW_PREDICT_FALSE(column_index_.null_pages.size() >=
static_cast<size_t>(std::numeric_limits<int32_t>::max()))) {
throw ParquetException("Invalid column index");
}

size_t num_pages = column_index_.null_pages.size();
size_t num_non_null_pages = std::accumulate(
column_index_.null_pages.cbegin(), column_index_.null_pages.cend(), 0,
[](int32_t num_non_null_pages, bool null_page) {
[](size_t num_non_null_pages, bool null_page) {
return num_non_null_pages + (null_page ? 0 : 1);
});
if (num_non_null_pages > 0) {
min_values_.reserve(num_non_null_pages);
max_values_.reserve(num_non_null_pages);
// Decode min and max values into a compact form (i.e. w/o null page)
auto plain_decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, &descr);
T value;
for (size_t i = 0; i < column_index_.null_pages.size(); ++i) {
if (!column_index_.null_pages[i]) {
non_null_page_indices_.emplace_back(static_cast<int32_t>(i));
Decode<DType>(plain_decoder, column_index_.min_values[i], &value);
min_values_.emplace_back(value);
Decode<DType>(plain_decoder, column_index_.max_values[i], &value);
max_values_.emplace_back(value);
}
DCHECK_LE(num_non_null_pages, num_pages);

// Allocate slots for decoded values.
min_values_.resize(num_pages);
max_values_.resize(num_pages);
non_null_page_indices_.reserve(num_non_null_pages);

// Decode min and max values according to the physical type.
// Note that null page are skipped.
auto plain_decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, &descr);
for (size_t i = 0; i < num_pages; ++i) {
if (!column_index_.null_pages[i]) {
// The check on `num_pages` has guaranteed the cast below is safe.
non_null_page_indices_.emplace_back(static_cast<int32_t>(i));
Decode<DType>(plain_decoder, column_index_.min_values[i], min_values_, i);
Decode<DType>(plain_decoder, column_index_.max_values[i], max_values_, i);
}
}
DCHECK_EQ(num_non_null_pages, non_null_page_indices_.size());
}

const std::vector<bool>& null_pages() const override {
Expand All @@ -101,21 +142,21 @@ class TypedColumnIndexImpl : public TypedColumnIndex<DType> {
return column_index_.null_counts;
}

const std::vector<T>& min_values() const override { return min_values_; }

const std::vector<T>& max_values() const override { return max_values_; }

const std::vector<int32_t>& non_null_page_indices() const override {
return non_null_page_indices_;
}

const std::vector<T>& min_values() const override { return min_values_; }

const std::vector<T>& max_values() const override { return max_values_; }

private:
/// Wrapped thrift column index.
const format::ColumnIndex column_index_;
/// Decoded typed min/max values. Null pages are set to std::nullopt.
/// Decoded typed min/max values. Undefined for null pages.
std::vector<T> min_values_;
std::vector<T> max_values_;
/// A list of page indices for not-null pages.
/// A list of page indices for non-null pages.
std::vector<int32_t> non_null_page_indices_;
};

Expand Down
9 changes: 3 additions & 6 deletions cpp/src/parquet/page_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ class PARQUET_EXPORT ColumnIndex {
/// `has_null_counts` should be called first to determine if this information is
/// available.
virtual const std::vector<int64_t>& null_counts() const = 0;

/// \brief A vector of page indices for non-null pages.
virtual const std::vector<int32_t>& non_null_page_indices() const = 0;
};

/// \brief Typed implementation of ColumnIndex.
Expand All @@ -89,12 +92,6 @@ class PARQUET_EXPORT TypedColumnIndex : public ColumnIndex {
///
/// Just like `min_values`, but for upper bounds instead of lower bounds.
virtual const std::vector<T>& max_values() const = 0;

/// \brief A vector of page indices for not-null pages.
///
/// It is helpful to understand the original page id in the values returned from
/// min_values() and max_values() above.
virtual const std::vector<int32_t>& non_null_page_indices() const = 0;
};

using BoolColumnIndex = TypedColumnIndex<BooleanType>;
Expand Down

0 comments on commit 19b5768

Please sign in to comment.