Skip to content

Commit

Permalink
Merge pull request #6067 from dantengsky/feat-abandon-internal-parque…
Browse files Browse the repository at this point in the history
…t2-patches

refactor: try abandon internal parquet2 patches
  • Loading branch information
BohuTANG authored Jul 7, 2022
2 parents fef12ed + ee005fa commit e267f7a
Show file tree
Hide file tree
Showing 51 changed files with 182 additions and 124 deletions.
15 changes: 8 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ object = { opt-level = 3 }
rustc-demangle = { opt-level = 3 }

[patch.crates-io]
parquet2 = { version = "0.13", optional = true, git = "https://github.com/datafuse-extras/parquet2", branch = "parquet2-0.13-patch2" }
parquet2 = { version = "0.14.1", optional = true, git = "https://github.com/datafuse-extras/parquet2", rev = "3a468fc3c4" }
chrono = { git = "https://github.com/datafuse-extras/chrono", rev = "279f590" }
# https://github.com/calder/rust-goldenfile/pull/7
goldenfile = { git = "https://github.com/datafuse-extras/rust-goldenfile", rev = "16c5783" }
15 changes: 11 additions & 4 deletions common/arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,14 @@ arrow-default = [
"arrow/compute_filter",
]
default = ["arrow-default", "parquet-default"]
parquet-default = ["parquet2/lz4"]
parquet-default = [
"parquet2/non_standard_legacy_lz4",
"parquet2/lz4",
"parquet2/zstd",
"parquet2/snappy",
"parquet2/gzip",
"parquet2/brotli",
]
simd = ["arrow/simd"]

[dependencies] # In alphabetical order
Expand All @@ -31,11 +38,11 @@ simd = ["arrow/simd"]
arrow = { package = "arrow2", git = "https://github.com/datafuse-extras/arrow2", default-features = false, features = [
"io_parquet",
"io_parquet_compression",
], rev = "6608071" }
], rev = "f5f6b7e3" }

# Crates.io dependencies
arrow-format = { version = "0.6.0", features = ["flight-data", "flight-service", "ipc"] }
arrow-format = { version = "0.7.0", features = ["flight-data", "flight-service", "ipc"] }
futures = "0.3.21"
parquet2 = { version = "0.13", default_features = false }
parquet2 = { version = "0.14", default_features = false }

[dev-dependencies]
2 changes: 2 additions & 0 deletions common/arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ pub use arrow_format;
pub use parquet2 as parquet;
pub use parquet_read::read_columns_many_async;
pub use parquet_write::write_parquet_file;

pub type ArrayRef = Box<dyn ::arrow::array::Array>;
9 changes: 5 additions & 4 deletions common/arrow/src/parquet_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ use arrow::datatypes::Schema;
use arrow::error::Result;
use arrow::io::parquet::write::to_parquet_schema;
use arrow::io::parquet::write::RowGroupIterator;
use parquet2::metadata::ThriftFileMetaData;
use parquet2::write::FileWriter;
use parquet2::write::WriteOptions;
use parquet2::FileMetaData;

// a simple wrapper for code reuse
pub fn write_parquet_file<W: Write, A, I>(
writer: &mut W,
row_groups: RowGroupIterator<A, I>,
schema: Schema,
options: WriteOptions,
) -> Result<(u64, FileMetaData)>
) -> Result<(u64, ThriftFileMetaData)>
where
W: Write,
A: AsRef<dyn Array> + 'static + Send + Sync,
Expand All @@ -45,6 +45,7 @@ where
for group in row_groups {
file_writer.write(group?)?;
}
let (size, file_meta_data) = file_writer.end_ext(None)?;
Ok((size, file_meta_data))
let file_size = file_writer.end(None)?;
let (_meta_size, thrift_file_meta_data) = file_writer.into_inner_and_metadata();
Ok((file_size, thrift_file_meta_data))
}
2 changes: 1 addition & 1 deletion common/datablocks/src/data_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use std::fmt;
use std::sync::Arc;

use common_arrow::arrow::array::Array;
use common_arrow::arrow::array::ArrayRef;
use common_arrow::arrow::chunk::Chunk;
use common_arrow::ArrayRef;
use common_datavalues::prelude::*;
use common_exception::ErrorCode;
use common_exception::Result;
Expand Down
4 changes: 2 additions & 2 deletions common/datablocks/src/kernels/data_block_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl DataBlock {

let boolean_col: &BooleanColumn = Series::check_get(&predict_boolean_nonull)?;
let rows = boolean_col.len();
let count_zeros = boolean_col.values().null_count();
let count_zeros = boolean_col.values().unset_bits();
Ok(count_zeros != rows)
}

Expand Down Expand Up @@ -67,7 +67,7 @@ impl DataBlock {
filter: &BooleanColumn,
) -> Result<DataBlock> {
let rows = filter.len();
let count_zeros = filter.values().null_count();
let count_zeros = filter.values().unset_bits();
match count_zeros {
0 => Ok(block),
_ => {
Expand Down
7 changes: 2 additions & 5 deletions common/datablocks/src/kernels/data_block_gather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_arrow::arrow::array::growable::make_growable;
use common_arrow::arrow::array::Array;
use common_arrow::arrow::array::ArrayRef;
use common_arrow::ArrayRef;
use common_datavalues::ColumnRef;
use common_datavalues::IntoColumn;
use common_exception::Result;
Expand Down Expand Up @@ -66,8 +64,7 @@ impl DataBlock {
growable.extend(index.0, index.1, 1);
}

let result = growable.as_box();
let result: ArrayRef = Arc::from(result);
let result: ArrayRef = growable.as_box();

match nullable {
false => Ok(result.into_column()),
Expand Down
2 changes: 1 addition & 1 deletion common/datablocks/src/kernels/data_block_take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

use common_arrow::arrow::array::growable::make_growable;
use common_arrow::arrow::array::Array;
use common_arrow::arrow::array::ArrayRef;
use common_arrow::arrow::compute::merge_sort::MergeSlice;
use common_arrow::arrow::types::Index;
use common_arrow::ArrayRef;
use common_datavalues::prelude::*;
use common_exception::Result;

Expand Down
4 changes: 2 additions & 2 deletions common/datablocks/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use common_arrow::arrow::io::parquet::write::RowGroupIterator;
use common_arrow::arrow::io::parquet::write::WriteOptions;
use common_arrow::parquet::compression::CompressionOptions;
use common_arrow::parquet::encoding::Encoding;
use common_arrow::parquet::metadata::ThriftFileMetaData;
use common_arrow::parquet::write::Version;
use common_arrow::parquet::FileMetaData;
use common_arrow::write_parquet_file;
use common_datavalues::DataSchemaRef;
use common_exception::ErrorCode;
Expand All @@ -31,7 +31,7 @@ pub fn serialize_data_blocks(
blocks: Vec<DataBlock>,
schema: &DataSchemaRef,
buf: &mut Vec<u8>,
) -> Result<(u64, FileMetaData)> {
) -> Result<(u64, ThriftFileMetaData)> {
let arrow_schema = schema.to_arrow();

let row_group_write_options = WriteOptions {
Expand Down
2 changes: 1 addition & 1 deletion common/datablocks/tests/it/data_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_arrow::arrow::array::ArrayRef;
use common_arrow::arrow::chunk::Chunk;
use common_arrow::ArrayRef;
use common_datablocks::DataBlock;
use common_datavalues::prelude::*;
use common_exception::Result;
Expand Down
10 changes: 6 additions & 4 deletions common/datavalues/benches/eq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use criterion::Criterion;

fn add_benchmark(c: &mut Criterion) {
let size = 1048576;
let lhs: ArrayRef = Arc::new(create_primitive_array::<i32>(size, 0.2));
let rhs: ArrayRef = Arc::new(create_primitive_array::<i32>(size, 0.3));
let lhs: ArrayRef = Box::new(create_primitive_array::<i32>(size, 0.2));
let rhs: ArrayRef = Box::new(create_primitive_array::<i32>(size, 0.3));

c.bench_function("arrow2_eq", |b| {
b.iter(|| criterion::black_box(arrow2_eq(&lhs, &rhs)))
Expand All @@ -51,7 +51,7 @@ fn add_benchmark(c: &mut Criterion) {
b.iter(|| criterion::black_box(databend_eq_simd(&lhs, &rhs)))
});

let rhs: ArrayRef = Arc::new(create_primitive_array::<u32>(size, 0.3));
let rhs: ArrayRef = Box::new(create_primitive_array::<u32>(size, 0.3));
let rhs: ColumnRef = rhs.into_nullable_column();

c.bench_function("databend_diff_type_eq", |b| {
Expand Down Expand Up @@ -143,18 +143,20 @@ fn cast(column: &ColumnRef, data_type: &DataTypeImpl) -> Result<ColumnRef> {
partial: false,
};
let result = cast::cast(arrow_array.as_ref(), &data_type.arrow_type(), arrow_options)?;
let result: ArrayRef = Arc::from(result);
let result: ArrayRef = result;
Ok(result.into_column())
}

criterion_group!(benches, add_benchmark);
criterion_main!(benches);

use common_arrow::ArrayRef;
use rand::distributions::Distribution;
use rand::distributions::Standard;
use rand::rngs::StdRng;
use rand::Rng;
use rand::SeedableRng;

/// Returns fixed seedable RNG
pub fn seedable_rng() -> StdRng {
StdRng::seed_from_u64(42)
Expand Down
8 changes: 5 additions & 3 deletions common/datavalues/benches/if_else_then.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ use criterion::Criterion;

fn add_benchmark(c: &mut Criterion) {
let size = 1048576;
let lhs: ArrayRef = Arc::new(create_primitive_array::<i32>(size, 0.2));
let rhs: ArrayRef = Arc::new(create_primitive_array::<i32>(size, 0.3));
let ifs: ArrayRef = Arc::new(create_boolean_array(size, 0.0, 0.3));
let lhs: ArrayRef = Box::new(create_primitive_array::<i32>(size, 0.2));
let rhs: ArrayRef = Box::new(create_primitive_array::<i32>(size, 0.3));
let ifs: ArrayRef = Box::new(create_boolean_array(size, 0.0, 0.3));

c.bench_function("arrow2_if_else_then", |b| {
b.iter(|| criterion::black_box(arrow2_if_else_then(&lhs, &rhs, &ifs)))
Expand Down Expand Up @@ -92,11 +92,13 @@ fn databend_if_else_then(
criterion_group!(benches, add_benchmark);
criterion_main!(benches);

use common_arrow::ArrayRef;
use rand::distributions::Distribution;
use rand::distributions::Standard;
use rand::rngs::StdRng;
use rand::Rng;
use rand::SeedableRng;

/// Returns fixed seedable RNG
pub fn seedable_rng() -> StdRng {
StdRng::seed_from_u64(42)
Expand Down
3 changes: 2 additions & 1 deletion common/datavalues/src/columns/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use common_arrow::arrow::array::*;
use common_arrow::arrow::buffer::Buffer;
use common_arrow::arrow::datatypes::DataType as ArrowType;
use common_arrow::arrow::types::Index;
use common_arrow::ArrayRef;

use crate::prelude::*;

Expand Down Expand Up @@ -124,7 +125,7 @@ impl Column for ArrayColumn {
fn as_arrow_array(&self) -> ArrayRef {
let arrow_type = self.data_type().arrow_type();
let array = self.values.as_arrow_array();
Arc::new(LargeListArray::from_data(
Box::new(LargeListArray::from_data(
arrow_type,
self.offsets.clone(),
array,
Expand Down
3 changes: 2 additions & 1 deletion common/datavalues/src/columns/boolean/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use common_arrow::arrow::array::*;
use common_arrow::arrow::bitmap::Bitmap;
use common_arrow::arrow::bitmap::MutableBitmap;
use common_arrow::arrow::datatypes::DataType as ArrowType;
use common_arrow::ArrayRef;

use crate::prelude::*;

Expand Down Expand Up @@ -87,7 +88,7 @@ impl Column for BooleanColumn {

fn as_arrow_array(&self) -> ArrayRef {
let array = BooleanArray::from_data(ArrowType::Boolean, self.values.clone(), None);
Arc::new(array)
Box::new(array)
}

fn arc(&self) -> ColumnRef {
Expand Down
2 changes: 1 addition & 1 deletion common/datavalues/src/columns/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use std::any::Any;
use std::sync::Arc;

use common_arrow::arrow::array::Array;
use common_arrow::arrow::array::ArrayRef;
use common_arrow::arrow::bitmap::Bitmap;
use common_arrow::ArrayRef;
use common_exception::ErrorCode;
use common_exception::Result;

Expand Down
4 changes: 2 additions & 2 deletions common/datavalues/src/columns/const_/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

use std::sync::Arc;

use common_arrow::arrow::array::*;
use common_arrow::arrow::bitmap::Bitmap;
use common_arrow::ArrayRef;

use crate::prelude::*;

Expand Down Expand Up @@ -103,7 +103,7 @@ impl Column for ConstColumn {
}

fn filter(&self, filter: &BooleanColumn) -> ColumnRef {
let length = filter.values().len() - filter.values().null_count();
let length = filter.values().len() - filter.values().unset_bits();
if length == self.len() {
return Arc::new(self.clone());
}
Expand Down
5 changes: 3 additions & 2 deletions common/datavalues/src/columns/null/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;
use common_arrow::arrow::array::*;
use common_arrow::arrow::bitmap::Bitmap;
use common_arrow::arrow::datatypes::DataType as ArrowType;
use common_arrow::ArrayRef;

use crate::prelude::*;

Expand Down Expand Up @@ -84,7 +85,7 @@ impl Column for NullColumn {
}

fn as_arrow_array(&self) -> ArrayRef {
Arc::new(NullArray::new_null(ArrowType::Null, self.length))
Box::new(NullArray::new_null(ArrowType::Null, self.length))
}

fn arc(&self) -> ColumnRef {
Expand All @@ -96,7 +97,7 @@ impl Column for NullColumn {
}

fn filter(&self, filter: &BooleanColumn) -> ColumnRef {
let length = filter.values().len() - filter.values().null_count();
let length = filter.values().len() - filter.values().unset_bits();
Arc::new(Self { length })
}

Expand Down
Loading

1 comment on commit e267f7a

@vercel
Copy link

@vercel vercel bot commented on e267f7a Jul 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend.rs
databend.vercel.app
databend-databend.vercel.app
databend-git-main-databend.vercel.app

Please sign in to comment.