Skip to content

Commit

Permalink
[Feature] Insert into parquet files support binary type (backport #36797
Browse files Browse the repository at this point in the history
) (#37448)

Co-authored-by: trueeyu <[email protected]>
  • Loading branch information
mergify[bot] and trueeyu authored Dec 21, 2023
1 parent 05ad522 commit 899a52c
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 10 deletions.
4 changes: 4 additions & 0 deletions be/src/formats/parquet/file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ arrow::Result<::parquet::schema::NodePtr> ParquetBuildHelper::_make_schema_node(
return ::parquet::schema::PrimitiveNode::Make(name, rep_type, ::parquet::LogicalType::None(),
::parquet::Type::DOUBLE, -1, file_column_id.field_id);
}
case TYPE_BINARY:
case TYPE_VARBINARY:
return ::parquet::schema::PrimitiveNode::Make(name, rep_type, ::parquet::LogicalType::None(),
::parquet::Type::BYTE_ARRAY, -1, file_column_id.field_id);
case TYPE_CHAR:
case TYPE_VARCHAR: {
return ::parquet::schema::PrimitiveNode::Make(name, rep_type, ::parquet::LogicalType::String(),
Expand Down
16 changes: 11 additions & 5 deletions be/src/formats/parquet/level_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,12 @@ void LevelBuilder::_write_column_chunk(const LevelBuilderContext& ctx, const Typ
}
case TYPE_CHAR:
case TYPE_VARCHAR: {
_write_varchar_column_chunk(ctx, type_desc, node, col, write_leaf_callback);
_write_byte_array_column_chunk<TYPE_VARCHAR>(ctx, type_desc, node, col, write_leaf_callback);
break;
}
case TYPE_BINARY:
case TYPE_VARBINARY: {
_write_byte_array_column_chunk<TYPE_VARBINARY>(ctx, type_desc, node, col, write_leaf_callback);
break;
}
case TYPE_ARRAY: {
Expand Down Expand Up @@ -293,10 +298,11 @@ void LevelBuilder::_write_datetime_column_chunk(const LevelBuilderContext& ctx,
});
}

void LevelBuilder::_write_varchar_column_chunk(const LevelBuilderContext& ctx, const TypeDescriptor& type_desc,
const ::parquet::schema::NodePtr& node, const ColumnPtr& col,
const CallbackFunction& write_leaf_callback) {
const auto* data_col = down_cast<const RunTimeColumnType<TYPE_VARCHAR>*>(ColumnHelper::get_data_column(col.get()));
template <LogicalType lt>
void LevelBuilder::_write_byte_array_column_chunk(const LevelBuilderContext& ctx, const TypeDescriptor& type_desc,
const ::parquet::schema::NodePtr& node, const ColumnPtr& col,
const CallbackFunction& write_leaf_callback) {
const auto* data_col = down_cast<const RunTimeColumnType<lt>*>(ColumnHelper::get_data_column(col.get()));
const auto* null_col = get_raw_null_column(col);
auto& vo = data_col->get_offset();
auto& vb = data_col->get_bytes();
Expand Down
7 changes: 4 additions & 3 deletions be/src/formats/parquet/level_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,10 @@ class LevelBuilder {
const ::parquet::schema::NodePtr& node, const ColumnPtr& col,
const CallbackFunction& write_leaf_callback);

void _write_varchar_column_chunk(const LevelBuilderContext& ctx, const TypeDescriptor& type_desc,
const ::parquet::schema::NodePtr& node, const ColumnPtr& col,
const CallbackFunction& write_leaf_callback);
template <LogicalType lt>
void _write_byte_array_column_chunk(const LevelBuilderContext& ctx, const TypeDescriptor& type_desc,
const ::parquet::schema::NodePtr& node, const ColumnPtr& col,
const CallbackFunction& write_leaf_callback);

void _write_date_column_chunk(const LevelBuilderContext& ctx, const TypeDescriptor& type_desc,
const ::parquet::schema::NodePtr& node, const ColumnPtr& col,
Expand Down
28 changes: 26 additions & 2 deletions be/test/formats/parquet/file_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -631,12 +631,36 @@ TEST_F(FileWriterTest, TestWriteNestedArray) {
Utils::assert_equal_chunk(chunk.get(), read_chunk.get());
}

TEST_F(FileWriterTest, TestVarbinaryNotSupport) {
TEST_F(FileWriterTest, TestWriteVarbinary) {
auto type_varbinary = TypeDescriptor::from_logical_type(TYPE_VARBINARY);
std::vector<TypeDescriptor> type_descs{type_varbinary};

auto chunk = std::make_shared<Chunk>();
{
// not-null column
auto data_column = BinaryColumn::create();
data_column->append("hello");
data_column->append("world");
data_column->append("starrocks");
data_column->append("lakehouse");

auto null_column = UInt8Column::create();
std::vector<uint8_t> nulls = {1, 0, 1, 0};
null_column->append_numbers(nulls.data(), nulls.size());
auto nullable_column = NullableColumn::create(data_column, null_column);
chunk->append_column(nullable_column, chunk->num_columns());
}

// write chunk
auto schema = _make_schema(type_descs);
ASSERT_TRUE(schema == nullptr);
ASSERT_TRUE(schema != nullptr);
auto st = _write_chunk(chunk, type_descs, schema);
ASSERT_OK(st);

// read chunk and assert equality
auto read_chunk = _read_chunk(type_descs);
ASSERT_TRUE(read_chunk != nullptr);
Utils::assert_equal_chunk(chunk.get(), read_chunk.get());
}

TEST_F(FileWriterTest, TestFieldIdWithStruct) {
Expand Down

0 comments on commit 899a52c

Please sign in to comment.