Skip to content

Commit

Permalink
test(batch): add benchmark for sort merge join (risingwavelabs#3797)
Browse files Browse the repository at this point in the history
* add gen_sorted_data

* modify the visibility of modules to allow benchmarking

* remove data_type of gen_data and gen_sorted_data

* add benchmark for sort merge join

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
wzzzzd and mergify[bot] authored Jul 12, 2022
1 parent 7b544ad commit ead67f7
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 10 deletions.
4 changes: 4 additions & 0 deletions src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,7 @@ harness = false
name = "nested_loop_join"
harness = false

[[bench]]
name = "sort_merge_join"
harness = false

2 changes: 1 addition & 1 deletion src/batch/benches/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tokio::runtime::Runtime;
static GLOBAL: Jemalloc = Jemalloc;

fn create_filter_executor(chunk_size: usize, chunk_num: usize) -> BoxedExecutor {
let input_data = gen_data(DataType::Int64, chunk_size, chunk_num);
let input_data = gen_data(chunk_size, chunk_num);

let mut mock_executor = MockExecutor::new(field_n::<1>(DataType::Int64));
input_data.into_iter().for_each(|c| mock_executor.add(c));
Expand Down
4 changes: 2 additions & 2 deletions src/batch/benches/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ fn create_nested_loop_join_executor(
right_chunk_size: usize,
right_chunk_num: usize,
) -> BoxedExecutor {
let left_input = gen_data(DataType::Int64, left_chunk_size, left_chunk_num);
let right_input = gen_data(DataType::Int64, right_chunk_size, right_chunk_num);
let left_input = gen_data(left_chunk_size, left_chunk_num);
let right_input = gen_data(right_chunk_size, right_chunk_num);

let mut left_child = Box::new(MockExecutor::new(field_n::<1>(DataType::Int64)));
left_input.into_iter().for_each(|c| left_child.add(c));
Expand Down
99 changes: 99 additions & 0 deletions src/batch/benches/sort_merge_join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use criterion::{black_box, criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion};
use futures::StreamExt;
use risingwave_batch::executor::row_level_iter::RowLevelIter;
use risingwave_batch::executor::test_utils::{gen_sorted_data, MockExecutor};
use risingwave_batch::executor::{BoxedExecutor, Executor, JoinType, SortMergeJoinExecutor};
use risingwave_common::catalog::schema_test_utils::field_n;
use risingwave_common::catalog::Schema;
use risingwave_common::types::DataType;
use tikv_jemallocator::Jemalloc;
use tokio::runtime::Runtime;

#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;

fn create_sort_merge_join_executor(
left_chunk_size: usize,
left_chunk_num: usize,
right_chunk_size: usize,
right_chunk_num: usize,
) -> BoxedExecutor {
let left_input = gen_sorted_data(left_chunk_size, left_chunk_num, "0".into(), 10);
let right_input = gen_sorted_data(right_chunk_size, right_chunk_num, "1000".into(), 20);

let mut left_child = Box::new(MockExecutor::new(field_n::<1>(DataType::Int64)));
left_input.into_iter().for_each(|c| left_child.add(c));

let mut right_child = Box::new(MockExecutor::new(field_n::<1>(DataType::Int64)));
right_input.into_iter().for_each(|c| right_child.add(c));

Box::new(SortMergeJoinExecutor::new(
JoinType::Inner,
Schema::from_iter(
left_child
.schema()
.fields()
.iter()
.chain(right_child.schema().fields().iter())
.cloned(),
),
vec![0, 1],
RowLevelIter::new(left_child),
RowLevelIter::new(right_child),
vec![0],
vec![0],
"SortMergeJoinExecutor".into(),
))
}

async fn execute_sort_merge_join_executor(executor: BoxedExecutor) {
let mut stream = executor.execute();
while let Some(ret) = stream.next().await {
black_box(ret.unwrap());
}
}

fn bench_sort_merge_join(c: &mut Criterion) {
const LEFT_SIZE: usize = 2 * 1024;
const RIGHT_SIZE: usize = 2 * 1024;
let rt = Runtime::new().unwrap();
for chunk_size in &[32, 128, 512, 1024] {
c.bench_with_input(
BenchmarkId::new("SortMergeJoinExecutor", format!("{}", chunk_size)),
chunk_size,
|b, &chunk_size| {
let left_chunk_num = LEFT_SIZE / chunk_size;
let right_chunk_num = RIGHT_SIZE / chunk_size;
b.to_async(&rt).iter_batched(
|| {
create_sort_merge_join_executor(
chunk_size,
left_chunk_num,
chunk_size,
right_chunk_num,
)
},
|e| execute_sort_merge_join_executor(e),
BatchSize::SmallInput,
);
},
);
}
}

criterion_group!(benches, bench_sort_merge_join);
criterion_main!(benches);
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub mod hash_join;
mod hash_join_state;
pub mod lookup_join;
pub mod nested_loop_join;
mod row_level_iter;
pub mod row_level_iter;
mod sort_merge_join;

use std::sync::Arc;
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/row_level_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::executor::BoxedExecutor;

/// `inner_table` is a buffer for all data. For all probe key, directly fetch data in `inner_table`
/// without call executor. The executor is only called when building `inner_table`.
pub(crate) struct RowLevelIter {
pub struct RowLevelIter {
data_source: Option<BoxedExecutor>,
/// Buffering of inner table. TODO: Spill to disk or more fine-grained memory management to
/// avoid OOM.
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl SortMergeJoinExecutor {

impl SortMergeJoinExecutor {
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
pub fn new(
join_type: JoinType,
schema: Schema,
output_indices: Vec<usize>,
Expand Down
48 changes: 44 additions & 4 deletions src/batch/src/executor/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,15 @@ use crate::task::BatchTaskContext;

const SEED: u64 = 0xFF67FEABBAEF76FF;

/// Generate `batch_num` data chunks, each data chunk has cardinality of `batch_size`.
pub fn gen_data(data_type: DataType, batch_size: usize, batch_num: usize) -> Vec<DataChunk> {
/// Generate `batch_num` data chunks with type `Int64`, each data chunk has cardinality of
/// `batch_size`.
pub fn gen_data(batch_size: usize, batch_num: usize) -> Vec<DataChunk> {
let mut data_gen =
FieldGeneratorImpl::with_random(data_type.clone(), None, None, None, None, SEED).unwrap();
FieldGeneratorImpl::with_random(DataType::Int64, None, None, None, None, SEED).unwrap();
let mut ret = Vec::<DataChunk>::with_capacity(batch_num);

for i in 0..batch_num {
let mut array_builder = data_type.create_array_builder(batch_size);
let mut array_builder = DataType::Int64.create_array_builder(batch_size);

for j in 0..batch_size {
array_builder
Expand All @@ -67,6 +68,45 @@ pub fn gen_data(data_type: DataType, batch_size: usize, batch_num: usize) -> Vec
ret
}

/// Generate `batch_num` sorted data chunks with type `Int64`, each data chunk has cardinality of
/// `batch_size`.
pub fn gen_sorted_data(
batch_size: usize,
batch_num: usize,
start: String,
step: u64,
) -> Vec<DataChunk> {
let mut data_gen = FieldGeneratorImpl::with_sequence(
DataType::Int64,
Some(start),
Some(i64::MAX.to_string()),
0,
step,
)
.unwrap();
let mut ret = Vec::<DataChunk>::with_capacity(batch_num);

for _ in 0..batch_num {
let mut array_builder = DataType::Int64.create_array_builder(batch_size);

for _ in 0..batch_size {
array_builder
.append_datum(&Some(ScalarImpl::Int64(
data_gen.generate(0).as_i64().unwrap(),
)))
.unwrap();
}

let array = array_builder.finish().unwrap();
ret.push(DataChunk::new(
vec![Column::new(Arc::new(array))],
batch_size,
));
}

ret
}

/// Mock the input of executor.
/// You can bind one or more `MockExecutor` as the children of the executor to test,
/// (`HashAgg`, e.g), so that allow testing without instantiating real `SeqScan`s and real storage.
Expand Down

0 comments on commit ead67f7

Please sign in to comment.