From 62bdb6ec60a2970b6f238fba1534d06103f6deb8 Mon Sep 17 00:00:00 2001 From: baishen Date: Tue, 10 Jan 2023 10:01:05 +0800 Subject: [PATCH 1/6] refactor(storage): refactor bloom index to use vectorized siphash function --- Cargo.lock | 2 + src/query/storages/common/index/Cargo.toml | 2 + src/query/storages/common/index/src/bloom.rs | 155 ++++++++++++++---- .../common/index/src/filters/filter.rs | 6 + .../storages/common/index/src/filters/xor8.rs | 13 ++ .../index/tests/it/filters/bloom_filter.rs | 6 +- .../common/index/tests/it/filters/xor8.rs | 30 ++++ 7 files changed, 180 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 05772f29056fd..981a9ab584917 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7674,10 +7674,12 @@ version = "0.1.0" dependencies = [ "anyerror", "cbordata", + "common-arrow", "common-catalog", "common-exception", "common-expression", "common-functions", + "common-sql", "criterion", "match-template", "rand 0.8.5", diff --git a/src/query/storages/common/index/Cargo.toml b/src/query/storages/common/index/Cargo.toml index f2945f7c0d0e1..911bacd568d9e 100644 --- a/src/query/storages/common/index/Cargo.toml +++ b/src/query/storages/common/index/Cargo.toml @@ -15,10 +15,12 @@ test = false ignored = ["xorfilter-rs"] [dependencies] +common-arrow = { path = "../../../../common/arrow" } common-catalog = { path = "../../../catalog" } common-exception = { path = "../../../../common/exception" } common-expression = { path = "../../../expression" } common-functions = { path = "../../../functions" } +common-sql = { path = "../../../sql" } storages-common-table-meta = { path = "../table-meta" } diff --git a/src/query/storages/common/index/src/bloom.rs b/src/query/storages/common/index/src/bloom.rs index 94f8dccee25c3..773ac7ba3ec14 100644 --- a/src/query/storages/common/index/src/bloom.rs +++ b/src/query/storages/common/index/src/bloom.rs @@ -13,18 +13,29 @@ // limitations under the License. use std::collections::HashMap; +use std::ops::Deref; use std::sync::Arc; use common_exception::ErrorCode; use common_exception::Result; +use common_expression::arrow::constant_bitmap; use common_expression::converts::scalar_to_datavalue; use common_expression::types::DataType; +use common_expression::types::NullableType; +use common_expression::types::Number; +use common_expression::types::NumberDataType; +use common_expression::types::UInt64Type; +use common_expression::types::ValueType; use common_expression::BlockEntry; +use common_expression::Column; use common_expression::ConstantFolder; use common_expression::DataBlock; +use common_expression::DataField; use common_expression::Domain; +use common_expression::Evaluator; use common_expression::Expr; use common_expression::FunctionContext; +use common_expression::Literal; use common_expression::Scalar; use common_expression::Span; use common_expression::TableDataType; @@ -33,6 +44,7 @@ use common_expression::TableSchema; use common_expression::TableSchemaRef; use common_expression::Value; use common_functions::scalars::BUILTIN_FUNCTIONS; +use common_sql::executor::PhysicalScalar; use storages_common_table_meta::meta::V2BloomBlock; use storages_common_table_meta::meta::Versioned; @@ -125,42 +137,100 @@ impl BlockFilter { return Err(ErrorCode::BadArguments("block is empty")); } + let mut exprs = Vec::new(); + let mut fields = Vec::new(); + let mut columns = Vec::new(); + for i in 0..blocks[0].num_columns() { + let data_type = &blocks[0].get_by_offset(i).data_type; + if Xor8Filter::is_supported_type(data_type) { + let source_field = source_schema.field(i); + let return_type = if data_type.is_nullable() { + DataType::Nullable(Box::new(DataType::Number(NumberDataType::UInt64))) + } else { + DataType::Number(NumberDataType::UInt64) + }; + let field = DataField::new(source_field.name().as_str(), return_type.clone()); + fields.push(field); + + let arg = PhysicalScalar::IndexedVariable { + index: columns.len(), + data_type: data_type.clone(), + display_name: source_field.name().clone(), + }; + let scalar = PhysicalScalar::Function { + name: "siphash".to_string(), + params: vec![], + args: vec![arg], + return_type, + }; + let expr = scalar.as_expr()?; + exprs.push(expr); + + let source_columns = blocks + .iter() + .map(|block| { + let value = &block.get_by_offset(i).value; + value.convert_to_full_column(data_type, block.num_rows()) + }) + .collect::>(); + let column = Column::concat(&source_columns); + columns.push(column); + } + } + let mut filter_fields = vec![]; let mut filter_columns = vec![]; let mut column_distinct_count = HashMap::::new(); - for i in 0..blocks[0].num_columns() { - if Xor8Filter::is_supported_type(&blocks[0].get_by_offset(i).data_type) { - // create filter per column - let mut filter_builder = Xor8Builder::create(); - - // ingest the same column data from all blocks - for block in blocks { - let entry = block.get_by_offset(i); - match &entry.value { - Value::Scalar(scalar) => filter_builder.add_key(&scalar), - Value::Column(col) => { - col.iter() - .for_each(|scalar| filter_builder.add_key(&scalar)); + let data_block = DataBlock::new_from_columns(columns); + let evaluator = Evaluator::new(&data_block, func_ctx, &BUILTIN_FUNCTIONS); + for (field, expr) in fields.iter().zip(exprs.iter()) { + let value = evaluator + .run(expr) + .map_err(|(_, e)| ErrorCode::Internal(format!("eval siphash failed: {}.", e)))?; + let col = value.convert_to_full_column(field.data_type(), data_block.num_rows()); + let (column, validity) = if field.data_type().is_nullable() { + let nullable_column = + NullableType::::try_downcast_column(&col).unwrap(); + (nullable_column.column, nullable_column.validity) + } else { + let column = UInt64Type::try_downcast_column(&col).unwrap(); + let validity = constant_bitmap(true, column.len()).into(); + (column, validity) + }; + + // create filter per column + let mut filter_builder = Xor8Builder::create(); + let filter = if validity.unset_bits() > 0 { + let mut digests = Vec::with_capacity(column.len()); + UInt64Type::iter_column(&column) + .zip(validity.iter()) + .for_each(|(v, b)| { + if !b { + digests.push(0); + } else { + digests.push(v); } - } - } - - let filter = filter_builder.build()?; - if let Some(len) = filter.len() { - column_distinct_count.insert(i, len); - } - - let filter_name = Self::build_filter_column_name(source_schema.field(i).name()); - filter_fields.push(TableField::new(&filter_name, TableDataType::String)); - // create filter column - let serialized_bytes = filter.to_bytes()?; - let filter_value = Value::Scalar(Scalar::String(serialized_bytes)); - filter_columns.push(BlockEntry { - data_type: DataType::String, - value: filter_value, - }); + }); + filter_builder.build_from_digests(&digests)? + } else { + filter_builder.build_from_digests(column.deref())? + }; + + if let Some(len) = filter.len() { + let idx = source_schema.index_of(field.name().as_str()).unwrap(); + column_distinct_count.insert(idx, len); } + + let filter_name = Self::build_filter_column_name(field.name()); + filter_fields.push(TableField::new(&filter_name, TableDataType::String)); + // create filter column + let serialized_bytes = filter.to_bytes()?; + let filter_value = Value::Scalar(Scalar::String(serialized_bytes)); + filter_columns.push(BlockEntry { + data_type: DataType::String, + value: filter_value, + }); } let filter_schema = Arc::new(TableSchema::new(filter_fields)); @@ -261,7 +331,30 @@ impl BlockFilter { let datavalue = scalar_to_datavalue(target); filter.contains(&datavalue) } else { - filter.contains(target) + let digest = if let Scalar::Null = target { + 0u64 + } else { + let arg = PhysicalScalar::Constant { + value: Literal::try_from(target.clone()).unwrap(), + data_type: ty.clone(), + }; + let scalar = PhysicalScalar::Function { + name: "siphash".to_string(), + params: vec![], + args: vec![arg], + return_type: DataType::Number(NumberDataType::UInt64), + }; + let expr = scalar.as_expr()?; + + let data_block = DataBlock::empty(); + let evaluator = Evaluator::new(&data_block, self.func_ctx, &BUILTIN_FUNCTIONS); + let value = evaluator.run(&expr).map_err(|(_, e)| { + ErrorCode::Internal(format!("eval siphash failed: {}.", e)) + })?; + let number_scalar = value.into_scalar().unwrap().into_number().unwrap(); + u64::try_downcast_scalar(&number_scalar).unwrap() + }; + filter.contains_digest(digest) }; if contains { diff --git a/src/query/storages/common/index/src/filters/filter.rs b/src/query/storages/common/index/src/filters/filter.rs index f84ac13dab734..eb9a5c2bba368 100644 --- a/src/query/storages/common/index/src/filters/filter.rs +++ b/src/query/storages/common/index/src/filters/filter.rs @@ -31,6 +31,9 @@ pub trait Filter: Sized { /// False positive: returning `true` only a key **probably** presents. fn contains(&self, key: &K) -> bool; + /// Check if the pre-computed digest is in the filter. + fn contains_digest(&self, digest: u64) -> bool; + /// Serialize the filter. fn to_bytes(&self) -> Result, Self::CodecError>; @@ -50,6 +53,9 @@ pub trait FilterBuilder { /// This methods can be called more than once. fn add_keys(&mut self, keys: &[K]); + /// Build the filter from the pre-computed digest. + fn build_from_digests(&mut self, digests: &[u64]) -> Result; + /// Build the filter with added keys. fn build(self) -> Result; } diff --git a/src/query/storages/common/index/src/filters/xor8.rs b/src/query/storages/common/index/src/filters/xor8.rs index a8ab6b8fb0a65..51d241e2bacc9 100644 --- a/src/query/storages/common/index/src/filters/xor8.rs +++ b/src/query/storages/common/index/src/filters/xor8.rs @@ -65,6 +65,15 @@ impl FilterBuilder for Xor8Builder { self.builder.populate(keys) } + fn build_from_digests(&mut self, digests: &[u64]) -> Result { + let f = self + .builder + .build_from_digests(digests) + .map_err(|e| Xor8BuildingError::new(&e))?; + + Ok(Xor8Filter { filter: f }) + } + fn build(mut self) -> Result { let f = self .builder @@ -94,6 +103,10 @@ impl Filter for Xor8Filter { self.filter.contains(key) } + fn contains_digest(&self, digest: u64) -> bool { + self.filter.contains_digest(digest) + } + fn to_bytes(&self) -> Result, Xor8CodecError> { let mut buf: Vec = vec![]; let cbor_val = self diff --git a/src/query/storages/common/index/tests/it/filters/bloom_filter.rs b/src/query/storages/common/index/tests/it/filters/bloom_filter.rs index 5ab8e0f7f8dcf..12e0c5ac7c598 100644 --- a/src/query/storages/common/index/tests/it/filters/bloom_filter.rs +++ b/src/query/storages/common/index/tests/it/filters/bloom_filter.rs @@ -44,7 +44,7 @@ fn test_bloom_filter() -> Result<()> { TableField::new("0", TableDataType::Number(NumberDataType::UInt8)), TableField::new("1", TableDataType::String), ])); - let chunks = vec![ + let blocks = vec![ DataBlock::new( vec![ BlockEntry { @@ -63,13 +63,13 @@ fn test_bloom_filter() -> Result<()> { StringType::from_data(vec!["b", "c"]), ]), ]; - let chunks_ref = chunks.iter().collect::>(); + let blocks_ref = blocks.iter().collect::>(); let index = BlockFilter::try_create( FunctionContext::default(), schema, LatestBloom::VERSION, - &chunks_ref, + &blocks_ref, )?; assert_eq!( diff --git a/src/query/storages/common/index/tests/it/filters/xor8.rs b/src/query/storages/common/index/tests/it/filters/xor8.rs index db112d400720a..cdf0cfe3090c0 100644 --- a/src/query/storages/common/index/tests/it/filters/xor8.rs +++ b/src/query/storages/common/index/tests/it/filters/xor8.rs @@ -215,3 +215,33 @@ fn test_xor_bitmap_data_block() -> Result<()> { Ok(()) } + +#[test] +fn test_xor_bitmap_from_digests() -> Result<()> { + let numbers = 1_000_000; + + let size = 8 * numbers; + let keys: Vec = (0..numbers).collect(); + let mut builder = Xor8Builder::create(); + let filter = builder.build_from_digests(&keys)?; + for key in keys.iter() { + assert!(filter.contains_digest(*key), "key {} not present", key); + } + + let val = filter.to_bytes()?; + let (_, n) = Xor8Filter::from_bytes(&val)?; + assert_eq!(n, val.len(), "{} {}", n, val.len()); + + // Lock the size. + assert_eq!(n, 1230069); + + // u64 bitmap enc:1230069, raw:8000000, ratio:0.15375863 + println!( + "u64 bitmap enc:{}, raw:{}, ratio:{}", + val.len(), + size, + val.len() as f32 / size as f32 + ); + + Ok(()) +} From a007b4cc08fc8d752b300eff529199f24747cc35 Mon Sep 17 00:00:00 2001 From: baishen Date: Wed, 11 Jan 2023 11:04:46 +0800 Subject: [PATCH 2/6] pre caculate constant scalar --- src/query/storages/common/index/src/bloom.rs | 93 +++++++++++-------- .../index/tests/it/filters/bloom_filter.rs | 56 ++++++----- src/query/storages/fuse/src/pruning/pruner.rs | 27 +++++- 3 files changed, 111 insertions(+), 65 deletions(-) diff --git a/src/query/storages/common/index/src/bloom.rs b/src/query/storages/common/index/src/bloom.rs index 773ac7ba3ec14..18f58f285e83e 100644 --- a/src/query/storages/common/index/src/bloom.rs +++ b/src/query/storages/common/index/src/bloom.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use common_exception::ErrorCode; use common_exception::Result; -use common_expression::arrow::constant_bitmap; use common_expression::converts::scalar_to_datavalue; use common_expression::types::DataType; use common_expression::types::NullableType; @@ -157,13 +156,13 @@ impl BlockFilter { data_type: data_type.clone(), display_name: source_field.name().clone(), }; - let scalar = PhysicalScalar::Function { + let siphash_func = PhysicalScalar::Function { name: "siphash".to_string(), params: vec![], args: vec![arg], return_type, }; - let expr = scalar.as_expr()?; + let expr = siphash_func.as_expr()?; exprs.push(expr); let source_columns = blocks @@ -192,16 +191,16 @@ impl BlockFilter { let (column, validity) = if field.data_type().is_nullable() { let nullable_column = NullableType::::try_downcast_column(&col).unwrap(); - (nullable_column.column, nullable_column.validity) + (nullable_column.column, Some(nullable_column.validity)) } else { let column = UInt64Type::try_downcast_column(&col).unwrap(); - let validity = constant_bitmap(true, column.len()).into(); - (column, validity) + (column, None) }; // create filter per column let mut filter_builder = Xor8Builder::create(); - let filter = if validity.unset_bits() > 0 { + let filter = if validity.as_ref().map(|v| v.unset_bits()).unwrap_or(0) > 0 { + let validity = validity.unwrap(); let mut digests = Vec::with_capacity(column.len()); UInt64Type::iter_column(&column) .zip(validity.iter()) @@ -252,12 +251,16 @@ impl BlockFilter { /// /// Otherwise return `Uncertain`. #[tracing::instrument(level = "debug", name = "block_filter_index_eval", skip_all)] - pub fn eval(&self, mut expr: Expr) -> Result { + pub fn eval( + &self, + mut expr: Expr, + scalar_map: &HashMap, + ) -> Result { visit_expr_column_eq_constant( &mut expr, &mut |span, col_name, scalar, ty, return_type| { // If the column doesn't contain the constant, we rewrite the expression to `false`. - if self.find(col_name, scalar, ty)? == FilterEvalResult::MustFalse { + if self.find(col_name, scalar, ty, scalar_map)? == FilterEvalResult::MustFalse { Ok(Some(Expr::Constant { span, scalar: Scalar::Boolean(false), @@ -290,11 +293,41 @@ impl BlockFilter { } } + /// calculate digest for constant scalar + pub fn calculate_target_digest( + func_ctx: FunctionContext, + scalar: &Scalar, + data_type: &DataType, + ) -> Result { + let arg = PhysicalScalar::Constant { + value: Literal::try_from(scalar.clone())?, + data_type: data_type.clone(), + }; + let siphash_func = PhysicalScalar::Function { + name: "siphash".to_string(), + params: vec![], + args: vec![arg], + return_type: DataType::Number(NumberDataType::UInt64), + }; + let expr = siphash_func.as_expr()?; + + let data_block = DataBlock::empty(); + let evaluator = Evaluator::new(&data_block, func_ctx, &BUILTIN_FUNCTIONS); + let value = evaluator + .run(&expr) + .map_err(|(_, e)| ErrorCode::Internal(format!("eval siphash failed: {}.", e)))?; + let number_scalar = value.into_scalar().unwrap().into_number().unwrap(); + let digest = u64::try_downcast_scalar(&number_scalar).unwrap(); + Ok(digest) + } + /// Find all columns that match the pattern of `col = ` in the expression. - pub fn find_eq_columns(expr: &Expr) -> Result> { + pub fn find_eq_columns(expr: &Expr) -> Result> { let mut cols = Vec::new(); - visit_expr_column_eq_constant(&mut expr.clone(), &mut |_, col_name, _, _, _| { - cols.push(col_name.to_string()); + visit_expr_column_eq_constant(&mut expr.clone(), &mut |_, col_name, scalar, ty, _| { + if Xor8Filter::is_supported_type(ty) && !scalar.is_null() { + cols.push((col_name.to_string(), scalar.clone(), ty.clone())); + } Ok(None) })?; Ok(cols) @@ -306,7 +339,13 @@ impl BlockFilter { format!("Bloom({})", column_name) } - fn find(&self, column_name: &str, target: &Scalar, ty: &DataType) -> Result { + fn find( + &self, + column_name: &str, + target: &Scalar, + ty: &DataType, + scalar_map: &HashMap, + ) -> Result { let filter_column = &Self::build_filter_column_name(column_name); if !self.filter_schema.has_field(filter_column) @@ -331,30 +370,10 @@ impl BlockFilter { let datavalue = scalar_to_datavalue(target); filter.contains(&datavalue) } else { - let digest = if let Scalar::Null = target { - 0u64 - } else { - let arg = PhysicalScalar::Constant { - value: Literal::try_from(target.clone()).unwrap(), - data_type: ty.clone(), - }; - let scalar = PhysicalScalar::Function { - name: "siphash".to_string(), - params: vec![], - args: vec![arg], - return_type: DataType::Number(NumberDataType::UInt64), - }; - let expr = scalar.as_expr()?; - - let data_block = DataBlock::empty(); - let evaluator = Evaluator::new(&data_block, self.func_ctx, &BUILTIN_FUNCTIONS); - let value = evaluator.run(&expr).map_err(|(_, e)| { - ErrorCode::Internal(format!("eval siphash failed: {}.", e)) - })?; - let number_scalar = value.into_scalar().unwrap().into_number().unwrap(); - u64::try_downcast_scalar(&number_scalar).unwrap() - }; - filter.contains_digest(digest) + match scalar_map.get(target) { + Some(digest) => filter.contains_digest(*digest), + None => true, + } }; if contains { diff --git a/src/query/storages/common/index/tests/it/filters/bloom_filter.rs b/src/query/storages/common/index/tests/it/filters/bloom_filter.rs index 12e0c5ac7c598..db98c141e8d2f 100644 --- a/src/query/storages/common/index/tests/it/filters/bloom_filter.rs +++ b/src/query/storages/common/index/tests/it/filters/bloom_filter.rs @@ -13,6 +13,7 @@ // limitations under the License. // +use std::collections::HashMap; use std::sync::Arc; use common_exception::Result; @@ -116,27 +117,36 @@ fn test_bloom_filter() -> Result<()> { } fn eval_index(index: &BlockFilter, col_name: &str, val: Scalar, ty: DataType) -> FilterEvalResult { - index - .eval( - check_function( - None, - "eq", - &[], - &[ - Expr::ColumnRef { - span: None, - id: col_name.to_string(), - data_type: ty.clone(), - }, - Expr::Constant { - span: None, - scalar: val, - data_type: ty, - }, - ], - &BUILTIN_FUNCTIONS, - ) - .unwrap(), - ) - .unwrap() + let expr = check_function( + None, + "eq", + &[], + &[ + Expr::ColumnRef { + span: None, + id: col_name.to_string(), + data_type: ty.clone(), + }, + Expr::Constant { + span: None, + scalar: val, + data_type: ty, + }, + ], + &BUILTIN_FUNCTIONS, + ) + .unwrap(); + + let point_query_cols = BlockFilter::find_eq_columns(&expr).unwrap(); + + let mut scalar_map = HashMap::::new(); + let func_ctx = FunctionContext::default(); + for (_, scalar, ty) in point_query_cols.iter() { + if !scalar_map.contains_key(scalar) { + let digest = BlockFilter::calculate_target_digest(func_ctx, scalar, ty).unwrap(); + scalar_map.insert(scalar.clone(), digest); + } + } + + index.eval(expr, &scalar_map).unwrap() } diff --git a/src/query/storages/fuse/src/pruning/pruner.rs b/src/query/storages/fuse/src/pruning/pruner.rs index e752786556b1f..28a08163a5d9f 100644 --- a/src/query/storages/fuse/src/pruning/pruner.rs +++ b/src/query/storages/fuse/src/pruning/pruner.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use common_catalog::table_context::TableContext; @@ -20,6 +21,7 @@ use common_expression::type_check::check_function; use common_expression::ConstantFolder; use common_expression::Domain; use common_expression::Expr; +use common_expression::Scalar; use common_expression::TableSchemaRef; use common_functions::scalars::BUILTIN_FUNCTIONS; use opendal::Operator; @@ -43,6 +45,9 @@ struct FilterPruner { /// the expression that would be evaluate filter_expression: Expr, + /// pre caculated digest for constant Scalar + scalar_map: HashMap, + /// the data accessor dal: Operator, @@ -55,6 +60,7 @@ impl FilterPruner { ctx: Arc, index_columns: Vec, filter_expression: Expr, + scalar_map: HashMap, dal: Operator, data_schema: TableSchemaRef, ) -> Self { @@ -62,6 +68,7 @@ impl FilterPruner { ctx, index_columns, filter_expression, + scalar_map, dal, data_schema, } @@ -79,6 +86,7 @@ impl Pruner for FilterPruner { self.dal.clone(), &self.data_schema, &self.filter_expression, + &self.scalar_map, &self.index_columns, loc, index_length, @@ -149,15 +157,22 @@ pub fn new_filter_pruner( if !point_query_cols.is_empty() { // convert to filter column names - let filter_block_cols = point_query_cols - .iter() - .map(|n| BlockFilter::build_filter_column_name(n)) - .collect(); + let mut filter_block_cols = vec![]; + let mut scalar_map = HashMap::::new(); + let func_ctx = ctx.try_get_function_context()?; + for (col_name, scalar, ty) in point_query_cols.iter() { + filter_block_cols.push(BlockFilter::build_filter_column_name(col_name)); + if !scalar_map.contains_key(scalar) { + let digest = BlockFilter::calculate_target_digest(func_ctx, scalar, ty)?; + scalar_map.insert(scalar.clone(), digest); + } + } return Ok(Some(Arc::new(FilterPruner::new( ctx.clone(), filter_block_cols, optimized_expr, + scalar_map, dal, schema.clone(), )))); @@ -173,12 +188,14 @@ mod util { use storages_common_index::FilterEvalResult; use super::*; + #[allow(clippy::too_many_arguments)] #[tracing::instrument(level = "debug", skip_all)] pub async fn should_keep_by_filter( ctx: Arc, dal: Operator, schema: &TableSchemaRef, filter_expr: &Expr, + scalar_map: &HashMap, filter_col_names: &[String], index_location: &Location, index_length: u64, @@ -196,7 +213,7 @@ mod util { filter.filter_block, index_location.1, )? - .eval(filter_expr.clone())? + .eval(filter_expr.clone(), scalar_map)? != FilterEvalResult::MustFalse), Err(e) if e.code() == ErrorCode::DEPRECATED_INDEX_FORMAT => { // In case that the index is no longer supported, just return ture to indicate From 7443abfd754177cdec4de315218df4fa0690d35d Mon Sep 17 00:00:00 2001 From: baishen Date: Wed, 11 Jan 2023 15:33:32 +0800 Subject: [PATCH 3/6] fix index build digests --- .../common/index/benches/build_from_block.rs | 94 ++++++++++++++++++- src/query/storages/common/index/src/bloom.rs | 28 +++--- .../common/index/src/filters/filter.rs | 4 +- .../storages/common/index/src/filters/xor8.rs | 9 +- .../common/index/tests/it/filters/xor8.rs | 13 ++- 5 files changed, 120 insertions(+), 28 deletions(-) diff --git a/src/query/storages/common/index/benches/build_from_block.rs b/src/query/storages/common/index/benches/build_from_block.rs index ec2834da88d9e..cba38fdb67a6a 100644 --- a/src/query/storages/common/index/benches/build_from_block.rs +++ b/src/query/storages/common/index/benches/build_from_block.rs @@ -15,9 +15,21 @@ #[macro_use] extern crate criterion; +use std::ops::Deref; + +use common_arrow::arrow::buffer::Buffer; use common_expression::types::number::NumberColumn; use common_expression::types::string::StringColumnBuilder; +use common_expression::types::DataType; +use common_expression::types::NumberDataType; +use common_expression::types::UInt64Type; +use common_expression::types::ValueType; use common_expression::Column; +use common_expression::DataBlock; +use common_expression::Evaluator; +use common_expression::FunctionContext; +use common_functions::scalars::BUILTIN_FUNCTIONS; +use common_sql::executor::PhysicalScalar; use criterion::Criterion; use rand::prelude::random; use rand::rngs::StdRng; @@ -87,6 +99,80 @@ fn bench_string(c: &mut Criterion) { ); } +fn bench_u64_using_digests(c: &mut Criterion) { + let column = rand_i64_column(1_000_000); + + let mut builder = Xor8Builder::create(); + let digests = calculate_digests(&column, &DataType::Number(NumberDataType::Int64)); + builder.add_digests(digests.deref()); + let filter = builder.build().unwrap(); + + for i in 0..digests.len() { + let digest = unsafe { digests.get_unchecked(i) }; + assert!(filter.contains_digest(*digest), "digest {} present", digest); + } + + c.bench_function( + "xor8_filter_u64_1m_rows_build_from_column_to_digests", + |b| { + b.iter(|| { + let mut builder = Xor8Builder::create(); + let digests = calculate_digests(&column, &DataType::Number(NumberDataType::Int64)); + builder.add_digests(digests.deref()); + let _filter = criterion::black_box(builder.build().unwrap()); + }) + }, + ); +} + +fn bench_string_using_digests(c: &mut Criterion) { + let column = rand_str_column(1_000_000, 32); + + let mut builder = Xor8Builder::create(); + let digests = calculate_digests(&column, &DataType::String); + builder.add_digests(digests.deref()); + let filter = builder.build().unwrap(); + + for i in 0..digests.len() { + let digest = unsafe { digests.get_unchecked(i) }; + assert!(filter.contains_digest(*digest), "digest {} present", digest); + } + + c.bench_function( + "xor8_filter_string16to32_1m_rows_build_from_column_to_digests", + |b| { + b.iter(|| { + let mut builder = Xor8Builder::create(); + let digests = calculate_digests(&column, &DataType::String); + builder.add_digests(digests.deref()); + let _filter = criterion::black_box(builder.build().unwrap()); + }) + }, + ); +} + +fn calculate_digests(column: &Column, data_type: &DataType) -> Buffer { + let arg = PhysicalScalar::IndexedVariable { + index: 0, + data_type: data_type.clone(), + display_name: "a".to_string(), + }; + let siphash_func = PhysicalScalar::Function { + name: "siphash".to_string(), + params: vec![], + args: vec![arg], + return_type: DataType::Boolean, + }; + let expr = siphash_func.as_expr().unwrap(); + + let func_ctx = FunctionContext::default(); + let data_block = DataBlock::new_from_columns(vec![column.clone()]); + let evaluator = Evaluator::new(&data_block, func_ctx, &BUILTIN_FUNCTIONS); + let value = evaluator.run(&expr).unwrap(); + let col = value.convert_to_full_column(data_type, data_block.num_rows()); + UInt64Type::try_downcast_column(&col).unwrap() +} + fn rand_i64_column(n: i32) -> Column { let seed: u64 = random(); @@ -116,5 +202,11 @@ fn rand_str_column(n: i32, len: i32) -> Column { Column::String(builder.build()) } -criterion_group!(benches, bench_u64, bench_string); +criterion_group!( + benches, + bench_u64, + bench_u64_using_digests, + bench_string, + bench_string_using_digests +); criterion_main!(benches); diff --git a/src/query/storages/common/index/src/bloom.rs b/src/query/storages/common/index/src/bloom.rs index 18f58f285e83e..9c9a58b68eec3 100644 --- a/src/query/storages/common/index/src/bloom.rs +++ b/src/query/storages/common/index/src/bloom.rs @@ -199,22 +199,22 @@ impl BlockFilter { // create filter per column let mut filter_builder = Xor8Builder::create(); - let filter = if validity.as_ref().map(|v| v.unset_bits()).unwrap_or(0) > 0 { + if validity.as_ref().map(|v| v.unset_bits()).unwrap_or(0) > 0 { let validity = validity.unwrap(); - let mut digests = Vec::with_capacity(column.len()); - UInt64Type::iter_column(&column) - .zip(validity.iter()) - .for_each(|(v, b)| { - if !b { - digests.push(0); - } else { - digests.push(v); - } - }); - filter_builder.build_from_digests(&digests)? + let mut column_iter = column.deref().iter(); + let mut validity_iter = validity.into_iter(); + filter_builder.add_digests(std::iter::from_fn(move || { + if let Some(validity) = validity_iter.next() { + let digest = column_iter.next().unwrap(); + if !validity { Some(&0) } else { Some(digest) } + } else { + None + } + })); } else { - filter_builder.build_from_digests(column.deref())? - }; + filter_builder.add_digests(column.deref()); + } + let filter = filter_builder.build()?; if let Some(len) = filter.len() { let idx = source_schema.index_of(field.name().as_str()).unwrap(); diff --git a/src/query/storages/common/index/src/filters/filter.rs b/src/query/storages/common/index/src/filters/filter.rs index eb9a5c2bba368..b9281a8d56dae 100644 --- a/src/query/storages/common/index/src/filters/filter.rs +++ b/src/query/storages/common/index/src/filters/filter.rs @@ -53,8 +53,8 @@ pub trait FilterBuilder { /// This methods can be called more than once. fn add_keys(&mut self, keys: &[K]); - /// Build the filter from the pre-computed digest. - fn build_from_digests(&mut self, digests: &[u64]) -> Result; + /// Populate with pre-compute collection of 64-bit digests. + fn add_digests<'i, I: IntoIterator>(&mut self, digests: I); /// Build the filter with added keys. fn build(self) -> Result; diff --git a/src/query/storages/common/index/src/filters/xor8.rs b/src/query/storages/common/index/src/filters/xor8.rs index 51d241e2bacc9..0f47c5964a1a8 100644 --- a/src/query/storages/common/index/src/filters/xor8.rs +++ b/src/query/storages/common/index/src/filters/xor8.rs @@ -65,13 +65,8 @@ impl FilterBuilder for Xor8Builder { self.builder.populate(keys) } - fn build_from_digests(&mut self, digests: &[u64]) -> Result { - let f = self - .builder - .build_from_digests(digests) - .map_err(|e| Xor8BuildingError::new(&e))?; - - Ok(Xor8Filter { filter: f }) + fn add_digests<'i, I: IntoIterator>(&mut self, digests: I) { + self.builder.populate_digests(digests) } fn build(mut self) -> Result { diff --git a/src/query/storages/common/index/tests/it/filters/xor8.rs b/src/query/storages/common/index/tests/it/filters/xor8.rs index cdf0cfe3090c0..f6a44666c64ed 100644 --- a/src/query/storages/common/index/tests/it/filters/xor8.rs +++ b/src/query/storages/common/index/tests/it/filters/xor8.rs @@ -221,11 +221,16 @@ fn test_xor_bitmap_from_digests() -> Result<()> { let numbers = 1_000_000; let size = 8 * numbers; - let keys: Vec = (0..numbers).collect(); + let digests: Vec = (0..numbers).collect(); let mut builder = Xor8Builder::create(); - let filter = builder.build_from_digests(&keys)?; - for key in keys.iter() { - assert!(filter.contains_digest(*key), "key {} not present", key); + builder.add_digests(&digests); + let filter = builder.build()?; + for digest in digests.iter() { + assert!( + filter.contains_digest(*digest), + "digests {} not present", + digest + ); } let val = filter.to_bytes()?; From 685995897ab73d0ef36460d963b48cf691801c79 Mon Sep 17 00:00:00 2001 From: baishen Date: Wed, 11 Jan 2023 18:26:53 +0800 Subject: [PATCH 4/6] stop write empty index parquet file --- .../tests/it/storages/fuse/block_writer.rs | 30 ++++--- src/query/storages/common/index/src/bloom.rs | 9 +- .../index/tests/it/filters/bloom_filter.rs | 3 +- .../storages/fuse/src/operations/fuse_sink.rs | 85 +++++++++++-------- .../mutation/compact/compact_transform.rs | 62 ++++++++------ .../mutation/deletion/deletion_source.rs | 41 ++++++--- 6 files changed, 138 insertions(+), 92 deletions(-) diff --git a/src/query/service/tests/it/storages/fuse/block_writer.rs b/src/query/service/tests/it/storages/fuse/block_writer.rs index 9ebf05e59cb56..c429cfe91fd81 100644 --- a/src/query/service/tests/it/storages/fuse/block_writer.rs +++ b/src/query/service/tests/it/storages/fuse/block_writer.rs @@ -85,7 +85,7 @@ impl<'a> BlockWriter<'a> { col_metas, cluster_stats, location, - Some(bloom_filter_index_location), + bloom_filter_index_location, bloom_filter_index_size, Compression::Lz4Raw, ); @@ -98,23 +98,27 @@ impl<'a> BlockWriter<'a> { schema: TableSchemaRef, block: &DataBlock, block_id: Uuid, - ) -> Result<(u64, Location)> { + ) -> Result<(u64, Option)> { let location = self .location_generator .block_bloom_index_location(&block_id); let bloom_index = BlockFilter::try_create(FunctionContext::default(), schema, location.1, &[block])?; - let index_block = bloom_index.filter_block; - let mut data = Vec::with_capacity(DEFAULT_BLOOM_INDEX_WRITE_BUFFER_SIZE); - let index_block_schema = &bloom_index.filter_schema; - let (size, _) = blocks_to_parquet( - index_block_schema, - vec![index_block], - &mut data, - TableCompression::None, - )?; - write_data(&data, data_accessor, &location.0).await?; - Ok((size, location)) + if let Some(bloom_index) = bloom_index { + let index_block = bloom_index.filter_block; + let mut data = Vec::with_capacity(DEFAULT_BLOOM_INDEX_WRITE_BUFFER_SIZE); + let index_block_schema = &bloom_index.filter_schema; + let (size, _) = blocks_to_parquet( + index_block_schema, + vec![index_block], + &mut data, + TableCompression::None, + )?; + write_data(&data, data_accessor, &location.0).await?; + Ok((size, Some(location))) + } else { + Ok((0u64, None)) + } } } diff --git a/src/query/storages/common/index/src/bloom.rs b/src/query/storages/common/index/src/bloom.rs index 9c9a58b68eec3..fe2c5631d92a6 100644 --- a/src/query/storages/common/index/src/bloom.rs +++ b/src/query/storages/common/index/src/bloom.rs @@ -131,7 +131,7 @@ impl BlockFilter { source_schema: TableSchemaRef, version: u64, blocks: &[&DataBlock], - ) -> Result { + ) -> Result> { if blocks.is_empty() { return Err(ErrorCode::BadArguments("block is empty")); } @@ -176,6 +176,9 @@ impl BlockFilter { columns.push(column); } } + if columns.is_empty() { + return Ok(None); + } let mut filter_fields = vec![]; let mut filter_columns = vec![]; @@ -235,14 +238,14 @@ impl BlockFilter { let filter_schema = Arc::new(TableSchema::new(filter_fields)); let filter_block = DataBlock::new(filter_columns, 1); - Ok(Self { + Ok(Some(Self { func_ctx, version, source_schema, filter_schema, filter_block, column_distinct_count, - }) + })) } /// Apply the predicate expression, return the result. diff --git a/src/query/storages/common/index/tests/it/filters/bloom_filter.rs b/src/query/storages/common/index/tests/it/filters/bloom_filter.rs index db98c141e8d2f..1ceccd18d46b7 100644 --- a/src/query/storages/common/index/tests/it/filters/bloom_filter.rs +++ b/src/query/storages/common/index/tests/it/filters/bloom_filter.rs @@ -71,7 +71,8 @@ fn test_bloom_filter() -> Result<()> { schema, LatestBloom::VERSION, &blocks_ref, - )?; + )? + .unwrap(); assert_eq!( FilterEvalResult::MustFalse, diff --git a/src/query/storages/fuse/src/operations/fuse_sink.rs b/src/query/storages/fuse/src/operations/fuse_sink.rs index b87a8243989b6..2eaa650baaedb 100644 --- a/src/query/storages/fuse/src/operations/fuse_sink.rs +++ b/src/query/storages/fuse/src/operations/fuse_sink.rs @@ -59,6 +59,7 @@ pub struct BloomIndexState { pub(crate) data: Vec, pub(crate) size: u64, pub(crate) location: Location, + pub(crate) column_distinct_count: HashMap, } impl BloomIndexState { @@ -67,7 +68,7 @@ impl BloomIndexState { source_schema: TableSchemaRef, block: &DataBlock, location: Location, - ) -> Result<(Self, HashMap)> { + ) -> Result> { // write index let bloom_index = BlockFilter::try_create( ctx.try_get_function_context()?, @@ -75,23 +76,25 @@ impl BloomIndexState { location.1, &[block], )?; - let index_block = bloom_index.filter_block; - let mut data = Vec::with_capacity(100 * 1024); - let index_block_schema = &bloom_index.filter_schema; - let (size, _) = blocks_to_parquet( - index_block_schema, - vec![index_block], - &mut data, - TableCompression::None, - )?; - Ok(( - Self { + if let Some(bloom_index) = bloom_index { + let index_block = bloom_index.filter_block; + let mut data = Vec::with_capacity(100 * 1024); + let index_block_schema = &bloom_index.filter_schema; + let (size, _) = blocks_to_parquet( + index_block_schema, + vec![index_block], + &mut data, + TableCompression::None, + )?; + Ok(Some(Self { data, size, location, - }, - bloom_index.column_distinct_count, - )) + column_distinct_count: bloom_index.column_distinct_count, + })) + } else { + Ok(None) + } } } @@ -103,7 +106,7 @@ enum State { size: u64, meta_data: HashMap, block_statistics: BlockStatistics, - bloom_index_state: BloomIndexState, + bloom_index_state: Option, }, GenerateSegment, SerializedSegment { @@ -216,18 +219,19 @@ impl Processor for FuseTableSink { let (block_location, block_id) = self.meta_locations.gen_block_location(); let location = self.meta_locations.block_bloom_index_location(&block_id); - let (bloom_index_state, column_distinct_count) = BloomIndexState::try_create( + let bloom_index_state = BloomIndexState::try_create( self.ctx.clone(), self.source_schema.clone(), &block, location, )?; - + let column_distinct_count = + bloom_index_state.as_ref().map(|i| i.column_distinct_count.clone()); let block_statistics = BlockStatistics::from( &block, block_location.0, cluster_stats, - Some(column_distinct_count), + column_distinct_count, )?; // we need a configuration of block size threshold here @@ -311,27 +315,40 @@ impl Processor for FuseTableSink { let start = Instant::now(); // write bloom filter index - io::write_data( - &bloom_index_state.data, - &self.data_accessor, - &bloom_index_state.location.0, - ) - .await?; - - // Perf. - { - metrics_inc_block_index_write_nums(1); - metrics_inc_block_index_write_bytes(bloom_index_state.data.len() as u64); - metrics_inc_block_index_write_milliseconds(start.elapsed().as_millis() as u64); + if let Some(ref bloom_index_state) = bloom_index_state { + io::write_data( + &bloom_index_state.data, + &self.data_accessor, + &bloom_index_state.location.0, + ) + .await?; + + // Perf. + { + metrics_inc_block_index_write_nums(1); + metrics_inc_block_index_write_bytes(bloom_index_state.data.len() as u64); + metrics_inc_block_index_write_milliseconds( + start.elapsed().as_millis() as u64 + ); + } } - let bloom_filter_index_size = bloom_index_state.size; + let (bloom_index_location, bloom_index_size) = + if let Some(bloom_index_state) = bloom_index_state { + ( + Some(bloom_index_state.location.clone()), + bloom_index_state.size, + ) + } else { + (None, 0u64) + }; + self.accumulator.add_block( size, meta_data, block_statistics, - Some(bloom_index_state.location), - bloom_filter_index_size, + bloom_index_location, + bloom_index_size, self.write_settings.table_compression.into(), )?; diff --git a/src/query/storages/fuse/src/operations/mutation/compact/compact_transform.rs b/src/query/storages/fuse/src/operations/mutation/compact/compact_transform.rs index 77e6860099a25..619ea3ff83bae 100644 --- a/src/query/storages/fuse/src/operations/mutation/compact/compact_transform.rs +++ b/src/query/storages/fuse/src/operations/mutation/compact/compact_transform.rs @@ -57,8 +57,8 @@ use crate::statistics::reducers::reduce_block_metas; struct SerializeState { block_data: Vec, block_location: String, - index_data: Vec, - index_location: String, + index_data: Option>, + index_location: Option, } enum State { @@ -237,26 +237,30 @@ impl Processor for CompactTransform { let (block_location, block_id) = self.location_gen.gen_block_location(); // build block index. - let (index_data, index_size, index_location) = { - // write index - let func_ctx = self.ctx.try_get_function_context()?; - let bloom_index = BlockFilter::try_create( - func_ctx, - self.schema.clone(), - block_location.1, - &[&new_block], - )?; - let index_block = bloom_index.filter_block; - let location = self.location_gen.block_bloom_index_location(&block_id); - let mut data = Vec::with_capacity(100 * 1024); - let index_block_schema = &bloom_index.filter_schema; - let (size, _) = blocks_to_parquet( - index_block_schema, - vec![index_block], - &mut data, - TableCompression::None, - )?; - (data, size, location) + let func_ctx = self.ctx.try_get_function_context()?; + let bloom_index = BlockFilter::try_create( + func_ctx, + self.schema.clone(), + block_location.1, + &[&new_block], + )?; + + let (index_data, index_size, index_location) = match bloom_index { + Some(bloom_index) => { + // write index + let index_block = bloom_index.filter_block; + let location = self.location_gen.block_bloom_index_location(&block_id); + let mut data = Vec::with_capacity(100 * 1024); + let index_block_schema = &bloom_index.filter_schema; + let (size, _) = blocks_to_parquet( + index_block_schema, + vec![index_block], + &mut data, + TableCompression::None, + )?; + (Some(data), size, Some(location)) + } + None => (None, 0u64, None), }; // serialize data block. @@ -277,7 +281,7 @@ impl Processor for CompactTransform { col_metas, None, block_location.clone(), - Some(index_location.clone()), + index_location.clone(), index_size, self.write_settings.table_compression.into(), ); @@ -288,7 +292,7 @@ impl Processor for CompactTransform { block_data, block_location: block_location.0, index_data, - index_location: index_location.0, + index_location: index_location.map(|l| l.0), }); } self.state = State::SerializedBlocks(serialize_states); @@ -411,10 +415,14 @@ impl Processor for CompactTransform { metrics_inc_compact_block_write_bytes(state.block_data.len() as u64); } - // write block data. - write_data(&state.block_data, dal, &state.block_location).await?; // write index data. - write_data(&state.index_data, dal, &state.index_location).await + if let (Some(index_data), Some(index_location)) = + (state.index_data, state.index_location) + { + write_data(&index_data, dal, &index_location).await?; + } + // write block data. + write_data(&state.block_data, dal, &state.block_location).await }); } diff --git a/src/query/storages/fuse/src/operations/mutation/deletion/deletion_source.rs b/src/query/storages/fuse/src/operations/mutation/deletion/deletion_source.rs index 71326a2a17ddc..e612cc9ccb695 100644 --- a/src/query/storages/fuse/src/operations/mutation/deletion/deletion_source.rs +++ b/src/query/storages/fuse/src/operations/mutation/deletion/deletion_source.rs @@ -61,8 +61,8 @@ type DataChunks = Vec<(usize, Vec)>; struct SerializeState { block_data: Vec, block_location: String, - index_data: Vec, - index_location: String, + index_data: Option>, + index_location: Option, } enum State { @@ -281,13 +281,15 @@ impl Processor for DeletionSource { // build block index. let location = self.location_gen.block_bloom_index_location(&block_id); - let (bloom_index_state, column_distinct_count) = BloomIndexState::try_create( + let bloom_index_state = BloomIndexState::try_create( self.ctx.clone(), self.source_schema.clone(), &block, location, )?; - let col_stats = gen_columns_statistics(&block, Some(column_distinct_count))?; + let column_distinct_count = + bloom_index_state.as_ref().map(|i| i.column_distinct_count.clone()); + let col_stats = gen_columns_statistics(&block, column_distinct_count)?; // serialize data block. let mut block_data = Vec::with_capacity(100 * 1024 * 1024); @@ -300,6 +302,17 @@ impl Processor for DeletionSource { )?; let col_metas = util::column_metas(&meta_data)?; + let (index_data, index_location, index_size) = + if let Some(bloom_index_state) = bloom_index_state { + ( + Some(bloom_index_state.data.clone()), + Some(bloom_index_state.location.clone()), + bloom_index_state.size, + ) + } else { + (None, None, 0u64) + }; + // new block meta. let new_meta = Arc::new(BlockMeta::new( row_count, @@ -309,8 +322,8 @@ impl Processor for DeletionSource { col_metas, cluster_stats, block_location.clone(), - Some(bloom_index_state.location.clone()), - bloom_index_state.size, + index_location.clone(), + index_size, self.table_compression.into(), )); @@ -318,8 +331,8 @@ impl Processor for DeletionSource { SerializeState { block_data, block_location: block_location.0, - index_data: bloom_index_state.data, - index_location: bloom_index_state.location.0, + index_data, + index_location: index_location.map(|l| l.0), }, new_meta, ); @@ -401,12 +414,12 @@ impl Processor for DeletionSource { ) .await?; // write index data. - write_data( - &serialize_state.index_data, - &self.dal, - &serialize_state.index_location, - ) - .await?; + if let (Some(index_data), Some(index_location)) = + (serialize_state.index_data, serialize_state.index_location) + { + write_data(&index_data, &self.dal, &index_location).await?; + } + self.state = State::Generated(Deletion::Replaced(block_meta)); } _ => return Err(ErrorCode::Internal("It's a bug.")), From 3bc603c798401107ebb050c5e31c2bc9649aa9e4 Mon Sep 17 00:00:00 2001 From: baishen Date: Wed, 11 Jan 2023 20:44:56 +0800 Subject: [PATCH 5/6] fix --- Cargo.lock | 1 - src/query/storages/common/index/Cargo.toml | 1 - .../common/index/benches/build_from_block.rs | 68 +++++++------ src/query/storages/common/index/src/bloom.rs | 96 ++++++++----------- .../index/tests/it/filters/bloom_filter.rs | 2 +- .../storages/fuse/src/operations/fuse_sink.rs | 5 +- .../mutation/deletion/deletion_source.rs | 5 +- src/query/storages/fuse/src/pruning/pruner.rs | 2 +- 8 files changed, 86 insertions(+), 94 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a727e1d2f15c0..f453e89531414 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7576,7 +7576,6 @@ dependencies = [ "common-exception", "common-expression", "common-functions", - "common-sql", "criterion", "match-template", "rand 0.8.5", diff --git a/src/query/storages/common/index/Cargo.toml b/src/query/storages/common/index/Cargo.toml index 911bacd568d9e..fb45152a6b98f 100644 --- a/src/query/storages/common/index/Cargo.toml +++ b/src/query/storages/common/index/Cargo.toml @@ -20,7 +20,6 @@ common-catalog = { path = "../../../catalog" } common-exception = { path = "../../../../common/exception" } common-expression = { path = "../../../expression" } common-functions = { path = "../../../functions" } -common-sql = { path = "../../../sql" } storages-common-table-meta = { path = "../table-meta" } diff --git a/src/query/storages/common/index/benches/build_from_block.rs b/src/query/storages/common/index/benches/build_from_block.rs index cba38fdb67a6a..0fded499c9fb8 100644 --- a/src/query/storages/common/index/benches/build_from_block.rs +++ b/src/query/storages/common/index/benches/build_from_block.rs @@ -17,7 +17,6 @@ extern crate criterion; use std::ops::Deref; -use common_arrow::arrow::buffer::Buffer; use common_expression::types::number::NumberColumn; use common_expression::types::string::StringColumnBuilder; use common_expression::types::DataType; @@ -25,11 +24,7 @@ use common_expression::types::NumberDataType; use common_expression::types::UInt64Type; use common_expression::types::ValueType; use common_expression::Column; -use common_expression::DataBlock; -use common_expression::Evaluator; use common_expression::FunctionContext; -use common_functions::scalars::BUILTIN_FUNCTIONS; -use common_sql::executor::PhysicalScalar; use criterion::Criterion; use rand::prelude::random; use rand::rngs::StdRng; @@ -38,6 +33,7 @@ use rand::SeedableRng; use storages_common_index::filters::Filter; use storages_common_index::filters::FilterBuilder; use storages_common_index::filters::Xor8Builder; +use storages_common_index::BlockFilter; /// Benchmark building BlockFilter from DataBlock. /// @@ -103,7 +99,15 @@ fn bench_u64_using_digests(c: &mut Criterion) { let column = rand_i64_column(1_000_000); let mut builder = Xor8Builder::create(); - let digests = calculate_digests(&column, &DataType::Number(NumberDataType::Int64)); + let func_ctx = FunctionContext::default(); + let col = BlockFilter::calculate_column_digest( + func_ctx, + &column, + &DataType::Number(NumberDataType::Int64), + &DataType::Boolean, + ) + .unwrap(); + let digests = UInt64Type::try_downcast_column(&col).unwrap(); builder.add_digests(digests.deref()); let filter = builder.build().unwrap(); @@ -117,7 +121,15 @@ fn bench_u64_using_digests(c: &mut Criterion) { |b| { b.iter(|| { let mut builder = Xor8Builder::create(); - let digests = calculate_digests(&column, &DataType::Number(NumberDataType::Int64)); + let func_ctx = FunctionContext::default(); + let col = BlockFilter::calculate_column_digest( + func_ctx, + &column, + &DataType::Number(NumberDataType::Int64), + &DataType::Boolean, + ) + .unwrap(); + let digests = UInt64Type::try_downcast_column(&col).unwrap(); builder.add_digests(digests.deref()); let _filter = criterion::black_box(builder.build().unwrap()); }) @@ -129,7 +141,15 @@ fn bench_string_using_digests(c: &mut Criterion) { let column = rand_str_column(1_000_000, 32); let mut builder = Xor8Builder::create(); - let digests = calculate_digests(&column, &DataType::String); + let func_ctx = FunctionContext::default(); + let col = BlockFilter::calculate_column_digest( + func_ctx, + &column, + &DataType::String, + &DataType::Boolean, + ) + .unwrap(); + let digests = UInt64Type::try_downcast_column(&col).unwrap(); builder.add_digests(digests.deref()); let filter = builder.build().unwrap(); @@ -143,7 +163,15 @@ fn bench_string_using_digests(c: &mut Criterion) { |b| { b.iter(|| { let mut builder = Xor8Builder::create(); - let digests = calculate_digests(&column, &DataType::String); + let func_ctx = FunctionContext::default(); + let col = BlockFilter::calculate_column_digest( + func_ctx, + &column, + &DataType::String, + &DataType::Boolean, + ) + .unwrap(); + let digests = UInt64Type::try_downcast_column(&col).unwrap(); builder.add_digests(digests.deref()); let _filter = criterion::black_box(builder.build().unwrap()); }) @@ -151,28 +179,6 @@ fn bench_string_using_digests(c: &mut Criterion) { ); } -fn calculate_digests(column: &Column, data_type: &DataType) -> Buffer { - let arg = PhysicalScalar::IndexedVariable { - index: 0, - data_type: data_type.clone(), - display_name: "a".to_string(), - }; - let siphash_func = PhysicalScalar::Function { - name: "siphash".to_string(), - params: vec![], - args: vec![arg], - return_type: DataType::Boolean, - }; - let expr = siphash_func.as_expr().unwrap(); - - let func_ctx = FunctionContext::default(); - let data_block = DataBlock::new_from_columns(vec![column.clone()]); - let evaluator = Evaluator::new(&data_block, func_ctx, &BUILTIN_FUNCTIONS); - let value = evaluator.run(&expr).unwrap(); - let col = value.convert_to_full_column(data_type, data_block.num_rows()); - UInt64Type::try_downcast_column(&col).unwrap() -} - fn rand_i64_column(n: i32) -> Column { let seed: u64 = random(); diff --git a/src/query/storages/common/index/src/bloom.rs b/src/query/storages/common/index/src/bloom.rs index fe2c5631d92a6..91beae4e56417 100644 --- a/src/query/storages/common/index/src/bloom.rs +++ b/src/query/storages/common/index/src/bloom.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use common_exception::ErrorCode; use common_exception::Result; use common_expression::converts::scalar_to_datavalue; +use common_expression::eval_function; use common_expression::types::DataType; use common_expression::types::NullableType; use common_expression::types::Number; @@ -31,10 +32,8 @@ use common_expression::ConstantFolder; use common_expression::DataBlock; use common_expression::DataField; use common_expression::Domain; -use common_expression::Evaluator; use common_expression::Expr; use common_expression::FunctionContext; -use common_expression::Literal; use common_expression::Scalar; use common_expression::Span; use common_expression::TableDataType; @@ -43,7 +42,6 @@ use common_expression::TableSchema; use common_expression::TableSchemaRef; use common_expression::Value; use common_functions::scalars::BUILTIN_FUNCTIONS; -use common_sql::executor::PhysicalScalar; use storages_common_table_meta::meta::V2BloomBlock; use storages_common_table_meta::meta::Versioned; @@ -136,7 +134,6 @@ impl BlockFilter { return Err(ErrorCode::BadArguments("block is empty")); } - let mut exprs = Vec::new(); let mut fields = Vec::new(); let mut columns = Vec::new(); for i in 0..blocks[0].num_columns() { @@ -151,20 +148,6 @@ impl BlockFilter { let field = DataField::new(source_field.name().as_str(), return_type.clone()); fields.push(field); - let arg = PhysicalScalar::IndexedVariable { - index: columns.len(), - data_type: data_type.clone(), - display_name: source_field.name().clone(), - }; - let siphash_func = PhysicalScalar::Function { - name: "siphash".to_string(), - params: vec![], - args: vec![arg], - return_type, - }; - let expr = siphash_func.as_expr()?; - exprs.push(expr); - let source_columns = blocks .iter() .map(|block| { @@ -173,7 +156,7 @@ impl BlockFilter { }) .collect::>(); let column = Column::concat(&source_columns); - columns.push(column); + columns.push((column, data_type.clone())); } } if columns.is_empty() { @@ -183,14 +166,9 @@ impl BlockFilter { let mut filter_fields = vec![]; let mut filter_columns = vec![]; let mut column_distinct_count = HashMap::::new(); - - let data_block = DataBlock::new_from_columns(columns); - let evaluator = Evaluator::new(&data_block, func_ctx, &BUILTIN_FUNCTIONS); - for (field, expr) in fields.iter().zip(exprs.iter()) { - let value = evaluator - .run(expr) - .map_err(|(_, e)| ErrorCode::Internal(format!("eval siphash failed: {}.", e)))?; - let col = value.convert_to_full_column(field.data_type(), data_block.num_rows()); + for (field, (column, data_type)) in fields.iter().zip(columns.iter()) { + let col = + Self::calculate_column_digest(func_ctx, column, data_type, field.data_type())?; let (column, validity) = if field.data_type().is_nullable() { let nullable_column = NullableType::::try_downcast_column(&col).unwrap(); @@ -204,16 +182,12 @@ impl BlockFilter { let mut filter_builder = Xor8Builder::create(); if validity.as_ref().map(|v| v.unset_bits()).unwrap_or(0) > 0 { let validity = validity.unwrap(); - let mut column_iter = column.deref().iter(); - let mut validity_iter = validity.into_iter(); - filter_builder.add_digests(std::iter::from_fn(move || { - if let Some(validity) = validity_iter.next() { - let digest = column_iter.next().unwrap(); - if !validity { Some(&0) } else { Some(digest) } - } else { - None - } - })); + let it = column.deref().iter().zip(validity.iter()).map( + |(v, b)| { + if !b { &0 } else { v } + }, + ); + filter_builder.add_digests(it); } else { filter_builder.add_digests(column.deref()); } @@ -296,29 +270,41 @@ impl BlockFilter { } } + /// calculate digest for column + pub fn calculate_column_digest( + func_ctx: FunctionContext, + column: &Column, + data_type: &DataType, + target_type: &DataType, + ) -> Result { + let (value, _) = eval_function( + None, + "siphash", + [(Value::Column(column.clone()), data_type.clone())], + func_ctx, + column.len(), + &BUILTIN_FUNCTIONS, + ) + .map_err(|(_, e)| ErrorCode::Internal(format!("eval siphash failed: {}.", e)))?; + let column = value.convert_to_full_column(target_type, column.len()); + Ok(column) + } + /// calculate digest for constant scalar - pub fn calculate_target_digest( + pub fn calculate_scalar_digest( func_ctx: FunctionContext, scalar: &Scalar, data_type: &DataType, ) -> Result { - let arg = PhysicalScalar::Constant { - value: Literal::try_from(scalar.clone())?, - data_type: data_type.clone(), - }; - let siphash_func = PhysicalScalar::Function { - name: "siphash".to_string(), - params: vec![], - args: vec![arg], - return_type: DataType::Number(NumberDataType::UInt64), - }; - let expr = siphash_func.as_expr()?; - - let data_block = DataBlock::empty(); - let evaluator = Evaluator::new(&data_block, func_ctx, &BUILTIN_FUNCTIONS); - let value = evaluator - .run(&expr) - .map_err(|(_, e)| ErrorCode::Internal(format!("eval siphash failed: {}.", e)))?; + let (value, _) = eval_function( + None, + "siphash", + [(Value::Scalar(scalar.clone()), data_type.clone())], + func_ctx, + 1, + &BUILTIN_FUNCTIONS, + ) + .map_err(|(_, e)| ErrorCode::Internal(format!("eval siphash failed: {}.", e)))?; let number_scalar = value.into_scalar().unwrap().into_number().unwrap(); let digest = u64::try_downcast_scalar(&number_scalar).unwrap(); Ok(digest) diff --git a/src/query/storages/common/index/tests/it/filters/bloom_filter.rs b/src/query/storages/common/index/tests/it/filters/bloom_filter.rs index 1ceccd18d46b7..0b1bfb2ddc2f5 100644 --- a/src/query/storages/common/index/tests/it/filters/bloom_filter.rs +++ b/src/query/storages/common/index/tests/it/filters/bloom_filter.rs @@ -144,7 +144,7 @@ fn eval_index(index: &BlockFilter, col_name: &str, val: Scalar, ty: DataType) -> let func_ctx = FunctionContext::default(); for (_, scalar, ty) in point_query_cols.iter() { if !scalar_map.contains_key(scalar) { - let digest = BlockFilter::calculate_target_digest(func_ctx, scalar, ty).unwrap(); + let digest = BlockFilter::calculate_scalar_digest(func_ctx, scalar, ty).unwrap(); scalar_map.insert(scalar.clone(), digest); } } diff --git a/src/query/storages/fuse/src/operations/fuse_sink.rs b/src/query/storages/fuse/src/operations/fuse_sink.rs index 2eaa650baaedb..d3e813657f041 100644 --- a/src/query/storages/fuse/src/operations/fuse_sink.rs +++ b/src/query/storages/fuse/src/operations/fuse_sink.rs @@ -225,8 +225,9 @@ impl Processor for FuseTableSink { &block, location, )?; - let column_distinct_count = - bloom_index_state.as_ref().map(|i| i.column_distinct_count.clone()); + let column_distinct_count = bloom_index_state + .as_ref() + .map(|i| i.column_distinct_count.clone()); let block_statistics = BlockStatistics::from( &block, block_location.0, diff --git a/src/query/storages/fuse/src/operations/mutation/deletion/deletion_source.rs b/src/query/storages/fuse/src/operations/mutation/deletion/deletion_source.rs index e612cc9ccb695..2ed98ec6f4a69 100644 --- a/src/query/storages/fuse/src/operations/mutation/deletion/deletion_source.rs +++ b/src/query/storages/fuse/src/operations/mutation/deletion/deletion_source.rs @@ -287,8 +287,9 @@ impl Processor for DeletionSource { &block, location, )?; - let column_distinct_count = - bloom_index_state.as_ref().map(|i| i.column_distinct_count.clone()); + let column_distinct_count = bloom_index_state + .as_ref() + .map(|i| i.column_distinct_count.clone()); let col_stats = gen_columns_statistics(&block, column_distinct_count)?; // serialize data block. diff --git a/src/query/storages/fuse/src/pruning/pruner.rs b/src/query/storages/fuse/src/pruning/pruner.rs index 28a08163a5d9f..6e0c7ab9bc79a 100644 --- a/src/query/storages/fuse/src/pruning/pruner.rs +++ b/src/query/storages/fuse/src/pruning/pruner.rs @@ -163,7 +163,7 @@ pub fn new_filter_pruner( for (col_name, scalar, ty) in point_query_cols.iter() { filter_block_cols.push(BlockFilter::build_filter_column_name(col_name)); if !scalar_map.contains_key(scalar) { - let digest = BlockFilter::calculate_target_digest(func_ctx, scalar, ty)?; + let digest = BlockFilter::calculate_scalar_digest(func_ctx, scalar, ty)?; scalar_map.insert(scalar.clone(), digest); } } From f571d23ab3de2b8f8e2c2fbd2114d67e933b3a86 Mon Sep 17 00:00:00 2001 From: baishen Date: Wed, 11 Jan 2023 21:12:54 +0800 Subject: [PATCH 6/6] fix --- Cargo.lock | 1 - src/query/storages/common/index/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8112725191eb5..383cef6bb9fa8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7581,7 +7581,6 @@ version = "0.1.0" dependencies = [ "anyerror", "cbordata", - "common-arrow", "common-catalog", "common-exception", "common-expression", diff --git a/src/query/storages/common/index/Cargo.toml b/src/query/storages/common/index/Cargo.toml index 738c81d078023..fda9759127775 100644 --- a/src/query/storages/common/index/Cargo.toml +++ b/src/query/storages/common/index/Cargo.toml @@ -15,7 +15,6 @@ test = false ignored = ["xorfilter-rs", "match-template"] [dependencies] -common-arrow = { path = "../../../../common/arrow" } common-catalog = { path = "../../../catalog" } common-exception = { path = "../../../../common/exception" } common-expression = { path = "../../../expression" }