Skip to content

Commit

Permalink
Merge branch 'main' into join-order
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Jan 11, 2023
2 parents 4f604da + 618424a commit 8591811
Show file tree
Hide file tree
Showing 11 changed files with 482 additions and 162 deletions.
30 changes: 17 additions & 13 deletions src/query/service/tests/it/storages/fuse/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl<'a> BlockWriter<'a> {
col_metas,
cluster_stats,
location,
Some(bloom_filter_index_location),
bloom_filter_index_location,
bloom_filter_index_size,
Compression::Lz4Raw,
);
Expand All @@ -98,23 +98,27 @@ impl<'a> BlockWriter<'a> {
schema: TableSchemaRef,
block: &DataBlock,
block_id: Uuid,
) -> Result<(u64, Location)> {
) -> Result<(u64, Option<Location>)> {
let location = self
.location_generator
.block_bloom_index_location(&block_id);

let bloom_index =
BlockFilter::try_create(FunctionContext::default(), schema, location.1, &[block])?;
let index_block = bloom_index.filter_block;
let mut data = Vec::with_capacity(DEFAULT_BLOOM_INDEX_WRITE_BUFFER_SIZE);
let index_block_schema = &bloom_index.filter_schema;
let (size, _) = blocks_to_parquet(
index_block_schema,
vec![index_block],
&mut data,
TableCompression::None,
)?;
write_data(&data, data_accessor, &location.0).await?;
Ok((size, location))
if let Some(bloom_index) = bloom_index {
let index_block = bloom_index.filter_block;
let mut data = Vec::with_capacity(DEFAULT_BLOOM_INDEX_WRITE_BUFFER_SIZE);
let index_block_schema = &bloom_index.filter_schema;
let (size, _) = blocks_to_parquet(
index_block_schema,
vec![index_block],
&mut data,
TableCompression::None,
)?;
write_data(&data, data_accessor, &location.0).await?;
Ok((size, Some(location)))
} else {
Ok((0u64, None))
}
}
}
100 changes: 99 additions & 1 deletion src/query/storages/common/index/benches/build_from_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@
#[macro_use]
extern crate criterion;

use std::ops::Deref;

use common_expression::types::number::NumberColumn;
use common_expression::types::string::StringColumnBuilder;
use common_expression::types::DataType;
use common_expression::types::NumberDataType;
use common_expression::types::UInt64Type;
use common_expression::types::ValueType;
use common_expression::Column;
use common_expression::FunctionContext;
use criterion::Criterion;
use rand::prelude::random;
use rand::rngs::StdRng;
Expand All @@ -28,6 +35,7 @@ use rand::SeedableRng;
use storages_common_index::filters::Filter;
use storages_common_index::filters::FilterBuilder;
use storages_common_index::filters::Xor8Builder;
use storages_common_index::BlockFilter;

/// Benchmark building BlockFilter from DataBlock.
///
Expand Down Expand Up @@ -89,6 +97,90 @@ fn bench_string(c: &mut Criterion) {
);
}

fn bench_u64_using_digests(c: &mut Criterion) {
let column = rand_i64_column(1_000_000);

let mut builder = Xor8Builder::create();
let func_ctx = FunctionContext::default();
let col = BlockFilter::calculate_column_digest(
func_ctx,
&column,
&DataType::Number(NumberDataType::Int64),
&DataType::Boolean,
)
.unwrap();
let digests = UInt64Type::try_downcast_column(&col).unwrap();
builder.add_digests(digests.deref());
let filter = builder.build().unwrap();

for i in 0..digests.len() {
let digest = unsafe { digests.get_unchecked(i) };
assert!(filter.contains_digest(*digest), "digest {} present", digest);
}

c.bench_function(
"xor8_filter_u64_1m_rows_build_from_column_to_digests",
|b| {
b.iter(|| {
let mut builder = Xor8Builder::create();
let func_ctx = FunctionContext::default();
let col = BlockFilter::calculate_column_digest(
func_ctx,
&column,
&DataType::Number(NumberDataType::Int64),
&DataType::Boolean,
)
.unwrap();
let digests = UInt64Type::try_downcast_column(&col).unwrap();
builder.add_digests(digests.deref());
let _filter = criterion::black_box(builder.build().unwrap());
})
},
);
}

fn bench_string_using_digests(c: &mut Criterion) {
let column = rand_str_column(1_000_000, 32);

let mut builder = Xor8Builder::create();
let func_ctx = FunctionContext::default();
let col = BlockFilter::calculate_column_digest(
func_ctx,
&column,
&DataType::String,
&DataType::Boolean,
)
.unwrap();
let digests = UInt64Type::try_downcast_column(&col).unwrap();
builder.add_digests(digests.deref());
let filter = builder.build().unwrap();

for i in 0..digests.len() {
let digest = unsafe { digests.get_unchecked(i) };
assert!(filter.contains_digest(*digest), "digest {} present", digest);
}

c.bench_function(
"xor8_filter_string16to32_1m_rows_build_from_column_to_digests",
|b| {
b.iter(|| {
let mut builder = Xor8Builder::create();
let func_ctx = FunctionContext::default();
let col = BlockFilter::calculate_column_digest(
func_ctx,
&column,
&DataType::String,
&DataType::Boolean,
)
.unwrap();
let digests = UInt64Type::try_downcast_column(&col).unwrap();
builder.add_digests(digests.deref());
let _filter = criterion::black_box(builder.build().unwrap());
})
},
);
}

fn rand_i64_column(n: i32) -> Column {
let seed: u64 = random();

Expand Down Expand Up @@ -118,5 +210,11 @@ fn rand_str_column(n: i32, len: i32) -> Column {
Column::String(builder.build())
}

criterion_group!(benches, bench_u64, bench_string);
criterion_group!(
benches,
bench_u64,
bench_u64_using_digests,
bench_string,
bench_string_using_digests
);
criterion_main!(benches);
Loading

0 comments on commit 8591811

Please sign in to comment.