Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-43244: [C++][Parquet] Supports write BinaryView/StringView to Parquet file #45413

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions cpp/src/parquet/arrow/arrow_schema_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ const auto TIMESTAMP_US = ::arrow::timestamp(TimeUnit::MICRO);
const auto TIMESTAMP_NS = ::arrow::timestamp(TimeUnit::NANO);
const auto BINARY = ::arrow::binary();
const auto DECIMAL_8_4 = std::make_shared<::arrow::Decimal128Type>(8, 4);
const auto BINARY_VIEW = ::arrow::binary_view();
const auto UTF8_VIEW = ::arrow::utf8_view();

class TestConvertParquetSchema : public ::testing::Test {
public:
Expand Down Expand Up @@ -1014,6 +1016,14 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) {
"binary", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::NONE));
arrow_fields.push_back(::arrow::field("binary", BINARY));

parquet_fields.push_back(PrimitiveNode::Make(
"string_view", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::UTF8));
arrow_fields.push_back(::arrow::field("string_view", UTF8_VIEW));

parquet_fields.push_back(PrimitiveNode::Make(
"binary_view", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::NONE));
arrow_fields.push_back(::arrow::field("binary_view", BINARY_VIEW));

ASSERT_OK(ConvertSchema(arrow_fields));

ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields));
Expand All @@ -1031,6 +1041,8 @@ TEST_F(TestConvertArrowSchema, ArrowFields) {
std::vector<FieldConstructionArguments> cases = {
{"boolean", ::arrow::boolean(), LogicalType::None(), ParquetType::BOOLEAN, -1},
{"binary", ::arrow::binary(), LogicalType::None(), ParquetType::BYTE_ARRAY, -1},
{"binary_view", ::arrow::binary_view(), LogicalType::None(),
ParquetType::BYTE_ARRAY, -1},
{"large_binary", ::arrow::large_binary(), LogicalType::None(),
ParquetType::BYTE_ARRAY, -1},
{"fixed_size_binary", ::arrow::fixed_size_binary(64), LogicalType::None(),
Expand All @@ -1046,6 +1058,8 @@ TEST_F(TestConvertArrowSchema, ArrowFields) {
{"float32", ::arrow::float32(), LogicalType::None(), ParquetType::FLOAT, -1},
{"float64", ::arrow::float64(), LogicalType::None(), ParquetType::DOUBLE, -1},
{"utf8", ::arrow::utf8(), LogicalType::String(), ParquetType::BYTE_ARRAY, -1},
{"utf8_view", ::arrow::utf8_view(), LogicalType::String(), ParquetType::BYTE_ARRAY,
-1},
{"large_utf8", ::arrow::large_utf8(), LogicalType::String(),
ParquetType::BYTE_ARRAY, -1},
{"decimal(1, 0)", ::arrow::decimal128(1, 0), LogicalType::Decimal(1, 0),
Expand Down Expand Up @@ -1175,6 +1189,16 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitivesAsDictionaries) {
arrow_fields.push_back(
::arrow::field("binary", ::arrow::dictionary(::arrow::int8(), ::arrow::binary())));

parquet_fields.push_back(PrimitiveNode::Make(
"string_view", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::UTF8));
arrow_fields.push_back(::arrow::field(
"string_view", ::arrow::dictionary(::arrow::int8(), ::arrow::utf8_view())));

parquet_fields.push_back(PrimitiveNode::Make(
"binary_view", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::NONE));
arrow_fields.push_back(::arrow::field(
"binary_view", ::arrow::dictionary(::arrow::int8(), ::arrow::binary_view())));

ASSERT_OK(ConvertSchema(arrow_fields));

ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields));
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/parquet/arrow/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -342,11 +342,13 @@ Status FieldToNode(const std::string& name, const std::shared_ptr<Field>& field,
break;
case ArrowTypeId::LARGE_STRING:
case ArrowTypeId::STRING:
case ArrowTypeId::STRING_VIEW:
type = ParquetType::BYTE_ARRAY;
logical_type = LogicalType::String();
break;
case ArrowTypeId::LARGE_BINARY:
case ArrowTypeId::BINARY:
case ArrowTypeId::BINARY_VIEW:
type = ParquetType::BYTE_ARRAY;
break;
case ArrowTypeId::FIXED_SIZE_BINARY: {
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2274,7 +2274,8 @@ template <>
Status TypedColumnWriterImpl<ByteArrayType>::WriteArrowDense(
const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels,
const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) {
if (!::arrow::is_base_binary_like(array.type()->id())) {
if (!::arrow::is_base_binary_like(array.type()->id()) &&
!::arrow::is_binary_view_like(array.type()->id())) {
ARROW_UNSUPPORTED();
}

Expand Down
42 changes: 31 additions & 11 deletions cpp/src/parquet/encoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,19 @@ class PlainEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
}

protected:
template <typename ArrayType>
template <bool isView = false, typename ArrayType>
void PutBinaryArray(const ArrayType& array) {
const int64_t total_bytes =
array.value_offset(array.length()) - array.value_offset(0);
const int64_t total_bytes = [](const ArrayType& a) {
if constexpr (isView) {
int64_t bts = 0;
for (int i = 0; i < a.length(); i++) {
bts += static_cast<int64_t>(a.GetView(i).size());
}
return bts;
} else {
return a.value_offset(a.length()) - a.value_offset(0);
}
}(array);
PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes + array.length() * sizeof(uint32_t)));

PARQUET_THROW_NOT_OK(::arrow::VisitArraySpanInline<typename ArrayType::TypeClass>(
Expand Down Expand Up @@ -249,18 +258,21 @@ void PlainEncoder<DType>::Put(const ::arrow::Array& values) {
ParquetException::NYI("direct put of " + values.type()->ToString());
}

void AssertBaseBinary(const ::arrow::Array& values) {
if (!::arrow::is_base_binary_like(values.type_id())) {
throw ParquetException("Only BaseBinaryArray and subclasses supported");
void AssertBaseBinaryOrBinaryView(const ::arrow::Array& values) {
if (!::arrow::is_base_binary_like(values.type_id()) &&
!::arrow::is_binary_view_like(values.type_id())) {
throw ParquetException(
"Only BaseBinaryArray and subclasses or binary_view_like supported");
}
}

template <>
inline void PlainEncoder<ByteArrayType>::Put(const ::arrow::Array& values) {
AssertBaseBinary(values);

AssertBaseBinaryOrBinaryView(values);
if (::arrow::is_binary_like(values.type_id())) {
PutBinaryArray(checked_cast<const ::arrow::BinaryArray&>(values));
} else if (::arrow::is_binary_view_like(values.type_id())) {
PutBinaryArray<true>(checked_cast<const ::arrow::BinaryViewArray&>(values));
} else {
DCHECK(::arrow::is_large_binary_like(values.type_id()));
PutBinaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values));
Expand Down Expand Up @@ -752,9 +764,11 @@ void DictEncoderImpl<FLBAType>::Put(const ::arrow::Array& values) {

template <>
void DictEncoderImpl<ByteArrayType>::Put(const ::arrow::Array& values) {
AssertBaseBinary(values);
AssertBaseBinaryOrBinaryView(values);
if (::arrow::is_binary_like(values.type_id())) {
PutBinaryArray(checked_cast<const ::arrow::BinaryArray&>(values));
} else if (::arrow::is_binary_view_like(values.type_id())) {
PutBinaryArray(checked_cast<const ::arrow::BinaryViewArray&>(values));
} else {
DCHECK(::arrow::is_large_binary_like(values.type_id()));
PutBinaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values));
Expand Down Expand Up @@ -803,11 +817,13 @@ void DictEncoderImpl<FLBAType>::PutDictionary(const ::arrow::Array& values) {

template <>
void DictEncoderImpl<ByteArrayType>::PutDictionary(const ::arrow::Array& values) {
AssertBaseBinary(values);
AssertBaseBinaryOrBinaryView(values);
AssertCanPutDictionary(this, values);

if (::arrow::is_binary_like(values.type_id())) {
PutBinaryDictionaryArray(checked_cast<const ::arrow::BinaryArray&>(values));
} else if (::arrow::is_binary_view_like(values.type_id())) {
PutBinaryDictionaryArray(checked_cast<const ::arrow::BinaryViewArray&>(values));
} else {
DCHECK(::arrow::is_large_binary_like(values.type_id()));
PutBinaryDictionaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values));
Expand Down Expand Up @@ -1304,9 +1320,11 @@ class DeltaLengthByteArrayEncoder : public EncoderImpl,
};

void DeltaLengthByteArrayEncoder::Put(const ::arrow::Array& values) {
AssertBaseBinary(values);
AssertBaseBinaryOrBinaryView(values);
if (::arrow::is_binary_like(values.type_id())) {
PutBinaryArray(checked_cast<const ::arrow::BinaryArray&>(values));
} else if (::arrow::is_binary_view_like(values.type_id())) {
PutBinaryArray(checked_cast<const ::arrow::BinaryViewArray&>(values));
} else {
PutBinaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values));
}
Expand Down Expand Up @@ -1577,6 +1595,8 @@ void DeltaByteArrayEncoder<DType>::Put(const ::arrow::Array& values) {
PutBinaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values));
} else if (::arrow::is_fixed_size_binary(values.type_id())) {
PutBinaryArray(checked_cast<const ::arrow::FixedSizeBinaryArray&>(values));
} else if (::arrow::is_binary_view_like(values.type_id())) {
PutBinaryArray(checked_cast<const ::arrow::BinaryViewArray&>(values));
} else {
throw ParquetException("Only BaseBinaryArray and subclasses supported");
}
Expand Down
Loading
Loading