From 62762a261b0c446d79af70d2adf3a57addd9aa40 Mon Sep 17 00:00:00 2001 From: "N.A" Date: Wed, 17 Jul 2024 21:54:12 +0800 Subject: [PATCH 1/3] add DeltaLengthByteArray support BinaryView/StringView add corresponding test. --- cpp/src/parquet/encoding_test.cc | 71 ++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 974e9ce62263c..f3fbcc1de9bd8 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -2234,6 +2234,77 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowBinaryDirectPut) { } } +TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowBinaryViewDirectPut) { + const int64_t size = 50; + const int32_t min_length = 0; + const int32_t max_length = 10; + const int32_t num_unique = 10; + const double null_probability = 0.25; + auto encoder = MakeTypedEncoder(Encoding::DELTA_LENGTH_BYTE_ARRAY); + auto decoder = MakeTypedDecoder(Encoding::DELTA_LENGTH_BYTE_ARRAY); + + auto castTo = [](std::shared_ptr<::arrow::Array> result, + const std::shared_ptr<::arrow::DataType>& to_type) { + ::arrow::compute::CastOptions options; + options.to_type = to_type; + EXPECT_OK_AND_ASSIGN( + auto tmp, CallFunction("cast", {::arrow::Datum{result}}, &options, nullptr)); + result = tmp.make_array(); + return result; + }; + + auto CheckSeed = [&](std::shared_ptr<::arrow::Array> values) { + ASSERT_NO_THROW(encoder->Put(*values)); + const auto* binary_view_array = + checked_cast(values.get()); + // For DeltaLength encoding, the estimated size should be at least the total byte size + // BinaryViewArray doesn't have a total value length method, so calculate by hand. + int64_t total_size = 0; + for (int64_t i = 0; i < binary_view_array->length(); i++) { + total_size += binary_view_array->GetView(i).size(); + } + EXPECT_GE(encoder->EstimatedDataEncodedSize(), total_size) + << "Estimated size should be at least the total byte size"; + const auto buf = encoder->FlushValues(); + + const int num_values = static_cast(values->length() - values->null_count()); + decoder->SetData(num_values, buf->data(), static_cast(buf->size())); + + typename EncodingTraits::Accumulator acc; + if (values->type()->id() == ::arrow::Type::type::STRING_VIEW) { + acc.builder = std::make_unique<::arrow::StringBuilder>(); + } else { + acc.builder = std::make_unique<::arrow::BinaryBuilder>(); + } + ASSERT_EQ(num_values, + decoder->DecodeArrow(static_cast(values->length()), + static_cast(values->null_count()), + values->null_bitmap_data(), values->offset(), &acc)); + + std::shared_ptr<::arrow::Array> result; + + ASSERT_OK(acc.builder->Finish(&result)); + ASSERT_EQ(values->length(), result->length()); + ASSERT_OK(result->ValidateFull()); + + ::arrow::AssertArraysEqual(*values, *castTo(result, values->type()), true); + }; + + ::arrow::random::RandomArrayGenerator rag(42); + auto values = rag.StringView(0, min_length, max_length, null_probability); + CheckSeed(values); + for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) { + rag = ::arrow::random::RandomArrayGenerator(seed); + values = rag.StringView(size, min_length, max_length, null_probability); + CheckSeed(values); + + values = castTo( + rag.BinaryWithRepeats(size, num_unique, min_length, max_length, null_probability), + ::arrow::binary_view()); + CheckSeed(values); + } +} + TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowDirectPut) { auto CheckEncode = [](std::shared_ptr<::arrow::Array> values, std::shared_ptr<::arrow::Array> lengths) { From 97170eea5dd7ff472f010f74870a19a0fc9c25c3 Mon Sep 17 00:00:00 2001 From: "N.A" Date: Sun, 2 Feb 2025 16:35:01 +0800 Subject: [PATCH 2/3] add support on encoding BinaryViewArray and corresponding tests. --- cpp/src/parquet/encoder.cc | 42 ++++++--- cpp/src/parquet/encoding_test.cc | 141 +++++++++++++++++++++++++------ 2 files changed, 144 insertions(+), 39 deletions(-) diff --git a/cpp/src/parquet/encoder.cc b/cpp/src/parquet/encoder.cc index f41eb9a19123c..8929929f30dd3 100644 --- a/cpp/src/parquet/encoder.cc +++ b/cpp/src/parquet/encoder.cc @@ -156,10 +156,19 @@ class PlainEncoder : public EncoderImpl, virtual public TypedEncoder { } protected: - template + template void PutBinaryArray(const ArrayType& array) { - const int64_t total_bytes = - array.value_offset(array.length()) - array.value_offset(0); + const int64_t total_bytes = [](const ArrayType& a) { + if constexpr (isView) { + int64_t bts = 0; + for (int i = 0; i < a.length(); i++) { + bts += static_cast(a.GetView(i).size()); + } + return bts; + } else { + return a.value_offset(a.length()) - a.value_offset(0); + } + }(array); PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes + array.length() * sizeof(uint32_t))); PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline( @@ -249,18 +258,21 @@ void PlainEncoder::Put(const ::arrow::Array& values) { ParquetException::NYI("direct put of " + values.type()->ToString()); } -void AssertBaseBinary(const ::arrow::Array& values) { - if (!::arrow::is_base_binary_like(values.type_id())) { - throw ParquetException("Only BaseBinaryArray and subclasses supported"); +void AssertBaseBinaryOrBinaryView(const ::arrow::Array& values) { + if (!::arrow::is_base_binary_like(values.type_id()) && + !::arrow::is_binary_view_like(values.type_id())) { + throw ParquetException( + "Only BaseBinaryArray and subclasses or binary_view_like supported"); } } template <> inline void PlainEncoder::Put(const ::arrow::Array& values) { - AssertBaseBinary(values); - + AssertBaseBinaryOrBinaryView(values); if (::arrow::is_binary_like(values.type_id())) { PutBinaryArray(checked_cast(values)); + } else if (::arrow::is_binary_view_like(values.type_id())) { + PutBinaryArray(checked_cast(values)); } else { DCHECK(::arrow::is_large_binary_like(values.type_id())); PutBinaryArray(checked_cast(values)); @@ -752,9 +764,11 @@ void DictEncoderImpl::Put(const ::arrow::Array& values) { template <> void DictEncoderImpl::Put(const ::arrow::Array& values) { - AssertBaseBinary(values); + AssertBaseBinaryOrBinaryView(values); if (::arrow::is_binary_like(values.type_id())) { PutBinaryArray(checked_cast(values)); + } else if (::arrow::is_binary_view_like(values.type_id())) { + PutBinaryArray(checked_cast(values)); } else { DCHECK(::arrow::is_large_binary_like(values.type_id())); PutBinaryArray(checked_cast(values)); @@ -803,11 +817,13 @@ void DictEncoderImpl::PutDictionary(const ::arrow::Array& values) { template <> void DictEncoderImpl::PutDictionary(const ::arrow::Array& values) { - AssertBaseBinary(values); + AssertBaseBinaryOrBinaryView(values); AssertCanPutDictionary(this, values); if (::arrow::is_binary_like(values.type_id())) { PutBinaryDictionaryArray(checked_cast(values)); + } else if (::arrow::is_binary_view_like(values.type_id())) { + PutBinaryDictionaryArray(checked_cast(values)); } else { DCHECK(::arrow::is_large_binary_like(values.type_id())); PutBinaryDictionaryArray(checked_cast(values)); @@ -1304,9 +1320,11 @@ class DeltaLengthByteArrayEncoder : public EncoderImpl, }; void DeltaLengthByteArrayEncoder::Put(const ::arrow::Array& values) { - AssertBaseBinary(values); + AssertBaseBinaryOrBinaryView(values); if (::arrow::is_binary_like(values.type_id())) { PutBinaryArray(checked_cast(values)); + } else if (::arrow::is_binary_view_like(values.type_id())) { + PutBinaryArray(checked_cast(values)); } else { PutBinaryArray(checked_cast(values)); } @@ -1577,6 +1595,8 @@ void DeltaByteArrayEncoder::Put(const ::arrow::Array& values) { PutBinaryArray(checked_cast(values)); } else if (::arrow::is_fixed_size_binary(values.type_id())) { PutBinaryArray(checked_cast(values)); + } else if (::arrow::is_binary_view_like(values.type_id())) { + PutBinaryArray(checked_cast(values)); } else { throw ParquetException("Only BaseBinaryArray and subclasses supported"); } diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index f3fbcc1de9bd8..7c7f3ef38548b 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -627,6 +627,80 @@ TEST(PlainEncodingAdHoc, ArrowBinaryDirectPut) { } } +// Util +int64_t BinaryViewTotalSize(const ::arrow::BinaryViewArray& array) { + int64_t total_size = 0; + for (int i = 0; i < array.length(); i++) { + total_size += array.GetView(i).size(); + } + return total_size; +} + +std::shared_ptr<::arrow::Array> CastBinaryTypesHelper( + std::shared_ptr<::arrow::Array> result, std::shared_ptr<::arrow::DataType> type) { + if (::arrow::is_large_binary_like(type->id())) { + ::arrow::compute::CastOptions options; + if (::arrow::is_string(type->id())) { + options.to_type = ::arrow::large_utf8(); + } else { + options.to_type = ::arrow::large_binary(); + } + EXPECT_OK_AND_ASSIGN( + auto tmp, CallFunction("cast", {::arrow::Datum{result}}, &options, nullptr)); + result = tmp.make_array(); + } else if (::arrow::is_binary_view_like(type->id())) { + ::arrow::compute::CastOptions options; + options.to_type = type; + EXPECT_OK_AND_ASSIGN( + auto tmp, CallFunction("cast", {::arrow::Datum{result}}, &options, nullptr)); + result = tmp.make_array(); + } + return result; +} + +TEST(PlainEncodingAdHoc, ArrowBinaryViewDirectPut) { + const int64_t size = 50; + const int32_t min_length = 0; + const int32_t max_length = 10; + const double null_probability = 0.25; + + auto CheckSeed = [&](int seed) { + ::arrow::random::RandomArrayGenerator rag(seed); + auto values = rag.StringView(size, min_length, max_length, null_probability); + + auto encoder = MakeTypedEncoder(Encoding::PLAIN); + auto decoder = MakeTypedDecoder(Encoding::PLAIN); + + ASSERT_NO_THROW(encoder->Put(*values)); + // For Plain encoding, the estimated size should be at least the total byte size + EXPECT_GE(encoder->EstimatedDataEncodedSize(), + BinaryViewTotalSize(dynamic_cast(*values))) + << "Estimated size should be at least the total byte size"; + + auto buf = encoder->FlushValues(); + + int num_values = static_cast(values->length() - values->null_count()); + decoder->SetData(num_values, buf->data(), static_cast(buf->size())); + + typename EncodingTraits::Accumulator acc; + acc.builder = std::make_unique<::arrow::StringBuilder>(); + ASSERT_EQ(num_values, + decoder->DecodeArrow(static_cast(values->length()), + static_cast(values->null_count()), + values->null_bitmap_data(), values->offset(), &acc)); + + std::shared_ptr<::arrow::Array> result; + ASSERT_OK(acc.builder->Finish(&result)); + ASSERT_EQ(50, result->length()); + ::arrow::AssertArraysEqual(*values, *CastBinaryTypesHelper(result, values->type()), + true); + }; + + for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) { + CheckSeed(seed); + } +} + // Check that one can put several Arrow arrays into a given encoder // and decode to the right values (see GH-36939) TEST(BooleanArrayEncoding, AdHocRoundTrip) { @@ -1182,6 +1256,40 @@ TEST(DictEncodingAdHoc, ArrowBinaryDirectPut) { ::arrow::AssertArraysEqual(*values, *result); } +TEST(DictEncodingAdHoc, ArrowBinaryViewDirectPut) { + // Implemented as part of ARROW-3246 + const int64_t size = 50; + const int64_t min_length = 0; + const int64_t max_length = 10; + const double null_probability = 0.1; + ::arrow::random::RandomArrayGenerator rag(0); + auto values = rag.StringView(size, min_length, max_length, null_probability); + + auto owned_encoder = MakeTypedEncoder(Encoding::PLAIN, + /*use_dictionary=*/true); + + auto encoder = dynamic_cast*>(owned_encoder.get()); + + ASSERT_NO_THROW(encoder->Put(*values)); + + std::unique_ptr decoder; + std::shared_ptr buf, dict_buf; + int num_values = static_cast(values->length() - values->null_count()); + GetDictDecoder(encoder, num_values, &buf, &dict_buf, nullptr, &decoder); + + typename EncodingTraits::Accumulator acc; + acc.builder.reset(new ::arrow::StringBuilder); + ASSERT_EQ(num_values, + decoder->DecodeArrow(static_cast(values->length()), + static_cast(values->null_count()), + values->null_bitmap_data(), values->offset(), &acc)); + + std::shared_ptr<::arrow::Array> result; + ASSERT_OK(acc.builder->Finish(&result)); + ::arrow::AssertArraysEqual(*values, *CastBinaryTypesHelper(result, values->type()), + true); +} + TYPED_TEST(EncodingAdHocTyped, DictArrowDirectPut) { this->Dict(0); } TEST(DictEncodingAdHoc, PutDictionaryPutIndices) { @@ -2164,22 +2272,6 @@ TEST(DeltaLengthByteArrayEncoding, RejectBadBuffer) { ASSERT_THROW(decoder->DecodeArrow(3, 0, nullptr, 0, &acc), ParquetException); } -std::shared_ptr<::arrow::Array> CastBinaryTypesHelper( - std::shared_ptr<::arrow::Array> result, std::shared_ptr<::arrow::DataType> type) { - if (::arrow::is_large_binary_like(type->id())) { - ::arrow::compute::CastOptions options; - if (::arrow::is_string(type->id())) { - options.to_type = ::arrow::large_utf8(); - } else { - options.to_type = ::arrow::large_binary(); - } - EXPECT_OK_AND_ASSIGN( - auto tmp, CallFunction("cast", {::arrow::Datum{result}}, &options, nullptr)); - result = tmp.make_array(); - } - return result; -} - TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowBinaryDirectPut) { const int64_t size = 50; const int32_t min_length = 0; @@ -2243,16 +2335,6 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowBinaryViewDirectPut) { auto encoder = MakeTypedEncoder(Encoding::DELTA_LENGTH_BYTE_ARRAY); auto decoder = MakeTypedDecoder(Encoding::DELTA_LENGTH_BYTE_ARRAY); - auto castTo = [](std::shared_ptr<::arrow::Array> result, - const std::shared_ptr<::arrow::DataType>& to_type) { - ::arrow::compute::CastOptions options; - options.to_type = to_type; - EXPECT_OK_AND_ASSIGN( - auto tmp, CallFunction("cast", {::arrow::Datum{result}}, &options, nullptr)); - result = tmp.make_array(); - return result; - }; - auto CheckSeed = [&](std::shared_ptr<::arrow::Array> values) { ASSERT_NO_THROW(encoder->Put(*values)); const auto* binary_view_array = @@ -2287,7 +2369,8 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowBinaryViewDirectPut) { ASSERT_EQ(values->length(), result->length()); ASSERT_OK(result->ValidateFull()); - ::arrow::AssertArraysEqual(*values, *castTo(result, values->type()), true); + ::arrow::AssertArraysEqual(*values, *CastBinaryTypesHelper(result, values->type()), + true); }; ::arrow::random::RandomArrayGenerator rag(42); @@ -2298,7 +2381,7 @@ TEST(DeltaLengthByteArrayEncodingAdHoc, ArrowBinaryViewDirectPut) { values = rag.StringView(size, min_length, max_length, null_probability); CheckSeed(values); - values = castTo( + values = CastBinaryTypesHelper( rag.BinaryWithRepeats(size, num_unique, min_length, max_length, null_probability), ::arrow::binary_view()); CheckSeed(values); @@ -2579,11 +2662,13 @@ TEST(DeltaByteArrayEncodingAdHoc, ArrowDirectPut) { CheckEncode(::arrow::ArrayFromJSON(::arrow::large_utf8(), values), encoded); CheckEncode(::arrow::ArrayFromJSON(::arrow::binary(), values), encoded); CheckEncode(::arrow::ArrayFromJSON(::arrow::large_binary(), values), encoded); + CheckEncode(::arrow::ArrayFromJSON(::arrow::binary_view(), values), encoded); CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::utf8(), values)); CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_utf8(), values)); CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::binary(), values)); CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_binary(), values)); + CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::binary_view(), values)); }; { From fdf33c46d3fb60000c29d4fd7e6779f34ee42cfd Mon Sep 17 00:00:00 2001 From: "N.A" Date: Tue, 4 Feb 2025 19:01:31 +0800 Subject: [PATCH 3/3] convert arrow view schema to bytearray --- cpp/src/parquet/arrow/arrow_schema_test.cc | 24 ++++++++++++++++++++++ cpp/src/parquet/arrow/schema.cc | 2 ++ cpp/src/parquet/column_writer.cc | 3 ++- 3 files changed, 28 insertions(+), 1 deletion(-) diff --git a/cpp/src/parquet/arrow/arrow_schema_test.cc b/cpp/src/parquet/arrow/arrow_schema_test.cc index a6e04e54259c9..ec4eaeeabbf3d 100644 --- a/cpp/src/parquet/arrow/arrow_schema_test.cc +++ b/cpp/src/parquet/arrow/arrow_schema_test.cc @@ -68,6 +68,8 @@ const auto TIMESTAMP_US = ::arrow::timestamp(TimeUnit::MICRO); const auto TIMESTAMP_NS = ::arrow::timestamp(TimeUnit::NANO); const auto BINARY = ::arrow::binary(); const auto DECIMAL_8_4 = std::make_shared<::arrow::Decimal128Type>(8, 4); +const auto BINARY_VIEW = ::arrow::binary_view(); +const auto UTF8_VIEW = ::arrow::utf8_view(); class TestConvertParquetSchema : public ::testing::Test { public: @@ -1014,6 +1016,14 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) { "binary", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::NONE)); arrow_fields.push_back(::arrow::field("binary", BINARY)); + parquet_fields.push_back(PrimitiveNode::Make( + "string_view", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::UTF8)); + arrow_fields.push_back(::arrow::field("string_view", UTF8_VIEW)); + + parquet_fields.push_back(PrimitiveNode::Make( + "binary_view", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::NONE)); + arrow_fields.push_back(::arrow::field("binary_view", BINARY_VIEW)); + ASSERT_OK(ConvertSchema(arrow_fields)); ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields)); @@ -1031,6 +1041,8 @@ TEST_F(TestConvertArrowSchema, ArrowFields) { std::vector cases = { {"boolean", ::arrow::boolean(), LogicalType::None(), ParquetType::BOOLEAN, -1}, {"binary", ::arrow::binary(), LogicalType::None(), ParquetType::BYTE_ARRAY, -1}, + {"binary_view", ::arrow::binary_view(), LogicalType::None(), + ParquetType::BYTE_ARRAY, -1}, {"large_binary", ::arrow::large_binary(), LogicalType::None(), ParquetType::BYTE_ARRAY, -1}, {"fixed_size_binary", ::arrow::fixed_size_binary(64), LogicalType::None(), @@ -1046,6 +1058,8 @@ TEST_F(TestConvertArrowSchema, ArrowFields) { {"float32", ::arrow::float32(), LogicalType::None(), ParquetType::FLOAT, -1}, {"float64", ::arrow::float64(), LogicalType::None(), ParquetType::DOUBLE, -1}, {"utf8", ::arrow::utf8(), LogicalType::String(), ParquetType::BYTE_ARRAY, -1}, + {"utf8_view", ::arrow::utf8_view(), LogicalType::String(), ParquetType::BYTE_ARRAY, + -1}, {"large_utf8", ::arrow::large_utf8(), LogicalType::String(), ParquetType::BYTE_ARRAY, -1}, {"decimal(1, 0)", ::arrow::decimal128(1, 0), LogicalType::Decimal(1, 0), @@ -1175,6 +1189,16 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitivesAsDictionaries) { arrow_fields.push_back( ::arrow::field("binary", ::arrow::dictionary(::arrow::int8(), ::arrow::binary()))); + parquet_fields.push_back(PrimitiveNode::Make( + "string_view", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::UTF8)); + arrow_fields.push_back(::arrow::field( + "string_view", ::arrow::dictionary(::arrow::int8(), ::arrow::utf8_view()))); + + parquet_fields.push_back(PrimitiveNode::Make( + "binary_view", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::NONE)); + arrow_fields.push_back(::arrow::field( + "binary_view", ::arrow::dictionary(::arrow::int8(), ::arrow::binary_view()))); + ASSERT_OK(ConvertSchema(arrow_fields)); ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields)); diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index c19e2b9e48bb3..d37d6d9cba638 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -342,11 +342,13 @@ Status FieldToNode(const std::string& name, const std::shared_ptr& field, break; case ArrowTypeId::LARGE_STRING: case ArrowTypeId::STRING: + case ArrowTypeId::STRING_VIEW: type = ParquetType::BYTE_ARRAY; logical_type = LogicalType::String(); break; case ArrowTypeId::LARGE_BINARY: case ArrowTypeId::BINARY: + case ArrowTypeId::BINARY_VIEW: type = ParquetType::BYTE_ARRAY; break; case ArrowTypeId::FIXED_SIZE_BINARY: { diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 12cbcf20affa4..64f8b675d1e59 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -2274,7 +2274,8 @@ template <> Status TypedColumnWriterImpl::WriteArrowDense( const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) { - if (!::arrow::is_base_binary_like(array.type()->id())) { + if (!::arrow::is_base_binary_like(array.type()->id()) && + !::arrow::is_binary_view_like(array.type()->id())) { ARROW_UNSUPPORTED(); }