From 26028ef777b5a7335a83f8475bea0ddb7114ab13 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Thu, 2 Sep 2021 23:10:20 +0800 Subject: [PATCH 1/4] Fixed send and sync for mutex and rwlock --- common/infallible/src/mutex.rs | 7 ++----- common/infallible/src/rwlock.rs | 4 ++-- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/common/infallible/src/mutex.rs b/common/infallible/src/mutex.rs index 8e7a7355b378d..a3f28431c0065 100644 --- a/common/infallible/src/mutex.rs +++ b/common/infallible/src/mutex.rs @@ -19,11 +19,8 @@ use parking_lot::MutexGuard; #[derive(Debug)] pub struct Mutex(ParkingMutex); -/// Mutex is Send -unsafe impl Send for Mutex {} - -/// Mutex is Sync -unsafe impl Sync for Mutex {} +unsafe impl Send for Mutex where ParkingMutex: Send {} +unsafe impl Sync for Mutex where ParkingMutex: Sync {} impl Mutex { /// creates mutex diff --git a/common/infallible/src/rwlock.rs b/common/infallible/src/rwlock.rs index 00c538229bcb2..c71609ad90944 100644 --- a/common/infallible/src/rwlock.rs +++ b/common/infallible/src/rwlock.rs @@ -21,8 +21,8 @@ use parking_lot::RwLockWriteGuard; #[derive(Debug, Default)] pub struct RwLock(ParkingRwLock); -unsafe impl Send for RwLock {} -unsafe impl Sync for RwLock {} +unsafe impl Send for RwLock where ParkingRwLock: Send {} +unsafe impl Sync for RwLock where ParkingRwLock: Sync {} impl RwLock { /// creates a read-write lock From f600ce92da4706ba1dcfea5d8ef5d55f177b0fbf Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Fri, 3 Sep 2021 09:47:44 +0800 Subject: [PATCH 2/4] Implement Send and Sync for AggregatorState --- common/streams/src/sources/source_csv.rs | 4 +- common/streams/src/sources/source_values.rs | 4 +- .../transforms/group_by/aggregator.rs | 12 ++-- .../transforms/group_by/aggregator_state.rs | 70 +++++++++++++------ .../transforms/transform_group_by_partial.rs | 5 +- 5 files changed, 59 insertions(+), 36 deletions(-) diff --git a/common/streams/src/sources/source_csv.rs b/common/streams/src/sources/source_csv.rs index a9d2e8f1b996f..53a94d4660863 100644 --- a/common/streams/src/sources/source_csv.rs +++ b/common/streams/src/sources/source_csv.rs @@ -35,7 +35,7 @@ pub struct CsvSource { } impl CsvSource -where R: io::Read +where R: io::Read + Sync + Send { pub fn new(reader: R, schema: DataSchemaRef, block_size: usize) -> Self { let reader = ReaderBuilder::new().has_headers(false).from_reader(reader); @@ -50,7 +50,7 @@ where R: io::Read } impl Source for CsvSource -where R: io::Read +where R: io::Read + Sync + Send { fn read(&mut self) -> Result> { let mut reader = self.reader.write(); diff --git a/common/streams/src/sources/source_values.rs b/common/streams/src/sources/source_values.rs index 05668f5f74b73..e44f5c3b87863 100644 --- a/common/streams/src/sources/source_values.rs +++ b/common/streams/src/sources/source_values.rs @@ -32,7 +32,7 @@ pub struct ValueSource { } impl ValueSource -where R: io::Read +where R: io::Read + Send + Sync { pub fn new(reader: R, schema: DataSchemaRef, block_size: usize) -> Self { let reader = BufReader::new(reader); @@ -46,7 +46,7 @@ where R: io::Read } impl Source for ValueSource -where R: io::Read +where R: io::Read + Send + Sync { fn read(&mut self) -> Result> { let mut reader = self.reader.write(); diff --git a/query/src/pipelines/transforms/group_by/aggregator.rs b/query/src/pipelines/transforms/group_by/aggregator.rs index 6130e296e4559..f084a412f360f 100644 --- a/query/src/pipelines/transforms/group_by/aggregator.rs +++ b/query/src/pipelines/transforms/group_by/aggregator.rs @@ -23,7 +23,6 @@ use common_datavalues::DataSchemaRefExt; use common_exception::Result; use common_functions::aggregates::StateAddr; use common_functions::aggregates::StateAddrs; -use common_infallible::Mutex; use common_io::prelude::BytesMut; use common_streams::DataBlockStream; use common_streams::SendableDataBlockStream; @@ -53,17 +52,16 @@ impl> Aggregator { &self, group_cols: Vec, mut stream: SendableDataBlockStream, - ) -> Result> { + ) -> Result { // This may be confusing // It will help us improve performance ~10% when we declare local references for them. let hash_method = &self.method; let aggregator_params = self.params.as_ref(); - let aggregate_state = Mutex::new(hash_method.aggregate_state()); + let mut state = hash_method.aggregate_state(); while let Some(block) = stream.next().await { let block = block?; - let mut groups = aggregate_state.lock(); // 1.1 and 1.2. let group_columns = Self::group_columns(&group_cols, &block)?; @@ -73,13 +71,13 @@ impl> Aggregator { // In fact, the rust compiler will help us do this(optimize the while match to match while), // but we need to ensure that the match is simple enough(otherwise there will be performance degradation). let places: StateAddrs = match aggregator_params.aggregate_functions.is_empty() { - true => self.lookup_key(group_keys, &mut groups), - false => self.lookup_state(group_keys, &mut groups), + true => self.lookup_key(group_keys, &mut state), + false => self.lookup_state(group_keys, &mut state), }; Self::execute(aggregator_params, &block, &places)?; } - Ok(aggregate_state) + Ok(state) } #[inline(always)] diff --git a/query/src/pipelines/transforms/group_by/aggregator_state.rs b/query/src/pipelines/transforms/group_by/aggregator_state.rs index 7012fe3c70504..e25f6a6ef513c 100644 --- a/query/src/pipelines/transforms/group_by/aggregator_state.rs +++ b/query/src/pipelines/transforms/group_by/aggregator_state.rs @@ -40,7 +40,7 @@ use crate::pipelines::transforms::group_by::AggregatorParams; /// - Aggregate data(HashMap or MergeSort set in future) /// - Aggregate function state data memory pool /// - Group by key data memory pool (if necessary) -pub trait AggregatorState { +pub trait AggregatorState: Sync + Send { type Key; type Entity: StateEntity; type Iterator: Iterator; @@ -62,6 +62,16 @@ pub struct ShortFixedKeysAggregatorState { data: *mut ShortFixedKeysStateEntity, } +// TODO:(Winter) Hack: +// The *mut ShortFixedKeysStateEntity needs to be used externally, but we can ensure that *mut +// ShortFixedKeysStateEntity will not be used across async, so ShortFixedKeysAggregatorState is Send +unsafe impl Send for ShortFixedKeysAggregatorState {} + +// TODO:(Winter) Hack: +// The *mut ShortFixedKeysStateEntity needs to be used externally, but we can ensure that &*mut +// ShortFixedKeysStateEntity will not be used across async, so ShortFixedKeysAggregatorState is Sync +unsafe impl Sync for ShortFixedKeysAggregatorState {} + impl ShortFixedKeysAggregatorState { pub fn create(max_size: usize) -> Self { unsafe { @@ -150,6 +160,16 @@ pub struct LongerFixedKeysAggregatorState { pub data: HashMap, } +// TODO:(Winter) Hack: +// The *mut KeyValueEntity needs to be used externally, but we can ensure that *mut KeyValueEntity +// will not be used across async, so KeyValueEntity is Send +unsafe impl Send for LongerFixedKeysAggregatorState {} + +// TODO:(Winter) Hack: +// The *mut KeyValueEntity needs to be used externally, but we can ensure that &*mut KeyValueEntity +// will not be used across async, so KeyValueEntity is Sync +unsafe impl Sync for LongerFixedKeysAggregatorState {} + impl AggregatorState> for LongerFixedKeysAggregatorState where T: DFPrimitiveType, @@ -170,11 +190,6 @@ where self.data.iter() } - #[inline(always)] - fn entity(&mut self, key: &Self::Key, inserted: &mut bool) -> *mut Self::Entity { - self.data.insert_key(key, inserted) - } - #[inline(always)] fn alloc_layout(&self, params: &AggregatorParams) -> StateAddr { let place: StateAddr = self.area.alloc_layout(params.layout).into(); @@ -187,6 +202,11 @@ where place } + + #[inline(always)] + fn entity(&mut self, key: &Self::Key, inserted: &mut bool) -> *mut Self::Entity { + self.data.insert_key(key, inserted) + } } pub struct SerializedKeysAggregatorState { @@ -195,6 +215,16 @@ pub struct SerializedKeysAggregatorState { pub data_state_map: HashMap, } +// TODO:(Winter) Hack: +// The *mut KeyValueEntity needs to be used externally, but we can ensure that *mut KeyValueEntity +// will not be used across async, so KeyValueEntity is Send +unsafe impl Send for SerializedKeysAggregatorState {} + +// TODO:(Winter) Hack: +// The *mut KeyValueEntity needs to be used externally, but we can ensure that &*mut KeyValueEntity +// will not be used across async, so KeyValueEntity is Sync +unsafe impl Sync for SerializedKeysAggregatorState {} + impl AggregatorState for SerializedKeysAggregatorState { type Key = KeysRef; type Entity = KeyValueEntity; @@ -208,9 +238,18 @@ impl AggregatorState for SerializedKeysAggregatorState { self.data_state_map.iter() } - // fn alloc_layout(&self, memory_layout: &AggregatorLayout) -> NonNull { - // self.state_area.alloc_layout(memory_layout.layout) - // } + #[inline(always)] + fn alloc_layout(&self, params: &AggregatorParams) -> StateAddr { + let place: StateAddr = self.state_area.alloc_layout(params.layout).into(); + + for idx in 0..params.offsets_aggregate_states.len() { + let aggr_state = params.offsets_aggregate_states[idx]; + let aggr_state_place = place.next(aggr_state); + params.aggregate_functions[idx].init_state(aggr_state_place); + } + + place + } fn entity(&mut self, keys: &Vec, inserted: &mut bool) -> *mut Self::Entity { let mut keys_ref = KeysRef::create(keys.as_ptr() as usize, keys.len()); @@ -229,17 +268,4 @@ impl AggregatorState for SerializedKeysAggregatorState { state_entity } - - #[inline(always)] - fn alloc_layout(&self, params: &AggregatorParams) -> StateAddr { - let place: StateAddr = self.state_area.alloc_layout(params.layout).into(); - - for idx in 0..params.offsets_aggregate_states.len() { - let aggr_state = params.offsets_aggregate_states[idx]; - let aggr_state_place = place.next(aggr_state); - params.aggregate_functions[idx].init_state(aggr_state_place); - } - - place - } } diff --git a/query/src/pipelines/transforms/transform_group_by_partial.rs b/query/src/pipelines/transforms/transform_group_by_partial.rs index 244dbc074dd52..2c3e433628cb0 100644 --- a/query/src/pipelines/transforms/transform_group_by_partial.rs +++ b/query/src/pipelines/transforms/transform_group_by_partial.rs @@ -77,14 +77,13 @@ impl GroupByPartialTransform { let aggregator_params = AggregatorParams::try_create(schema, aggr_exprs)?; let aggregator = Aggregator::create(method, aggregator_params); - let groups_locker = aggregator.aggregate(group_cols, stream).await?; + let state = aggregator.aggregate(group_cols, stream).await?; let delta = start.elapsed(); tracing::debug!("Group by partial cost: {:?}", delta); - let groups = groups_locker.lock(); let finalized_schema = self.schema.clone(); - aggregator.aggregate_finalized(&groups, finalized_schema) + aggregator.aggregate_finalized(&state, finalized_schema) } } From cb14e6f60d1a416b3124fca0759f97a1daf62347 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Fri, 3 Sep 2021 10:14:26 +0800 Subject: [PATCH 3/4] Fix performance degradation --- .../transforms/group_by/aggregator.rs | 44 +++++++++++-------- .../transforms/group_by/aggregator_state.rs | 12 ++--- 2 files changed, 31 insertions(+), 25 deletions(-) diff --git a/query/src/pipelines/transforms/group_by/aggregator.rs b/query/src/pipelines/transforms/group_by/aggregator.rs index f084a412f360f..218afe1b123d4 100644 --- a/query/src/pipelines/transforms/group_by/aggregator.rs +++ b/query/src/pipelines/transforms/group_by/aggregator.rs @@ -60,23 +60,31 @@ impl> Aggregator { let mut state = hash_method.aggregate_state(); - while let Some(block) = stream.next().await { - let block = block?; - - // 1.1 and 1.2. - let group_columns = Self::group_columns(&group_cols, &block)?; - let group_keys = hash_method.build_keys(&group_columns, block.num_rows())?; - - // TODO: This can be moved outside the while - // In fact, the rust compiler will help us do this(optimize the while match to match while), - // but we need to ensure that the match is simple enough(otherwise there will be performance degradation). - let places: StateAddrs = match aggregator_params.aggregate_functions.is_empty() { - true => self.lookup_key(group_keys, &mut state), - false => self.lookup_state(group_keys, &mut state), - }; - - Self::execute(aggregator_params, &block, &places)?; + match aggregator_params.aggregate_functions.is_empty() { + true => { + while let Some(block) = stream.next().await { + let block = block?; + + // 1.1 and 1.2. + let group_columns = Self::group_columns(&group_cols, &block)?; + let group_keys = hash_method.build_keys(&group_columns, block.num_rows())?; + self.lookup_key(group_keys, &mut state); + } + } + false => { + while let Some(block) = stream.next().await { + let block = block?; + + // 1.1 and 1.2. + let group_columns = Self::group_columns(&group_cols, &block)?; + let group_keys = hash_method.build_keys(&group_columns, block.num_rows())?; + + let places = self.lookup_state(group_keys, &mut state); + Self::execute(aggregator_params, &block, &places)?; + } + } } + Ok(state) } @@ -103,13 +111,11 @@ impl> Aggregator { } #[inline(always)] - fn lookup_key(&self, keys: Vec, state: &mut Method::State) -> StateAddrs { + fn lookup_key(&self, keys: Vec, state: &mut Method::State) { let mut inserted = true; for key in keys.iter() { state.entity(key, &mut inserted); } - - vec![0_usize.into(); keys.len()] } /// Allocate aggregation function state for each key(the same key can always get the same state) diff --git a/query/src/pipelines/transforms/group_by/aggregator_state.rs b/query/src/pipelines/transforms/group_by/aggregator_state.rs index e25f6a6ef513c..1f979fb52087b 100644 --- a/query/src/pipelines/transforms/group_by/aggregator_state.rs +++ b/query/src/pipelines/transforms/group_by/aggregator_state.rs @@ -64,12 +64,12 @@ pub struct ShortFixedKeysAggregatorState { // TODO:(Winter) Hack: // The *mut ShortFixedKeysStateEntity needs to be used externally, but we can ensure that *mut -// ShortFixedKeysStateEntity will not be used across async, so ShortFixedKeysAggregatorState is Send +// ShortFixedKeysStateEntity will not be used multiple async, so ShortFixedKeysAggregatorState is Send unsafe impl Send for ShortFixedKeysAggregatorState {} // TODO:(Winter) Hack: // The *mut ShortFixedKeysStateEntity needs to be used externally, but we can ensure that &*mut -// ShortFixedKeysStateEntity will not be used across async, so ShortFixedKeysAggregatorState is Sync +// ShortFixedKeysStateEntity will not be used multiple async, so ShortFixedKeysAggregatorState is Sync unsafe impl Sync for ShortFixedKeysAggregatorState {} impl ShortFixedKeysAggregatorState { @@ -162,12 +162,12 @@ pub struct LongerFixedKeysAggregatorState { // TODO:(Winter) Hack: // The *mut KeyValueEntity needs to be used externally, but we can ensure that *mut KeyValueEntity -// will not be used across async, so KeyValueEntity is Send +// will not be used multiple async, so KeyValueEntity is Send unsafe impl Send for LongerFixedKeysAggregatorState {} // TODO:(Winter) Hack: // The *mut KeyValueEntity needs to be used externally, but we can ensure that &*mut KeyValueEntity -// will not be used across async, so KeyValueEntity is Sync +// will not be used multiple async, so KeyValueEntity is Sync unsafe impl Sync for LongerFixedKeysAggregatorState {} impl AggregatorState> for LongerFixedKeysAggregatorState @@ -217,12 +217,12 @@ pub struct SerializedKeysAggregatorState { // TODO:(Winter) Hack: // The *mut KeyValueEntity needs to be used externally, but we can ensure that *mut KeyValueEntity -// will not be used across async, so KeyValueEntity is Send +// will not be used multiple async, so KeyValueEntity is Send unsafe impl Send for SerializedKeysAggregatorState {} // TODO:(Winter) Hack: // The *mut KeyValueEntity needs to be used externally, but we can ensure that &*mut KeyValueEntity -// will not be used across async, so KeyValueEntity is Sync +// will not be used multiple async, so KeyValueEntity is Sync unsafe impl Sync for SerializedKeysAggregatorState {} impl AggregatorState for SerializedKeysAggregatorState { From e66bacfecab4d70750e1982d4cce156518b1604d Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Fri, 3 Sep 2021 11:18:36 +0800 Subject: [PATCH 4/4] Better code --- common/streams/src/sources/source_csv.rs | 10 ++++------ common/streams/src/sources/source_values.rs | 8 +++----- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/common/streams/src/sources/source_csv.rs b/common/streams/src/sources/source_csv.rs index 53a94d4660863..475e8945d8344 100644 --- a/common/streams/src/sources/source_csv.rs +++ b/common/streams/src/sources/source_csv.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::io; -use std::sync::Arc; use common_arrow::arrow::io::csv::read::ByteRecord; use common_arrow::arrow::io::csv::read::Reader; @@ -23,12 +22,11 @@ use common_datavalues::DataSchemaRef; use common_exception::ErrorCode; use common_exception::Result; use common_exception::ToErrorCode; -use common_infallible::RwLock; use crate::Source; pub struct CsvSource { - reader: Arc>>, + reader: Reader, schema: DataSchemaRef, block_size: usize, rows: usize, @@ -41,7 +39,7 @@ where R: io::Read + Sync + Send let reader = ReaderBuilder::new().has_headers(false).from_reader(reader); Self { - reader: Arc::new(RwLock::new(reader)), + reader, block_size, schema, rows: 0, @@ -53,7 +51,6 @@ impl Source for CsvSource where R: io::Read + Sync + Send { fn read(&mut self) -> Result> { - let mut reader = self.reader.write(); let mut record = ByteRecord::new(); let mut desers = self .schema @@ -63,7 +60,8 @@ where R: io::Read + Sync + Send .collect::>>()?; for row in 0..self.block_size { - let v = reader + let v = self + .reader .read_byte_record(&mut record) .map_err_to_code(ErrorCode::BadBytes, || { format!("Parse csv error at line {}", self.rows) diff --git a/common/streams/src/sources/source_values.rs b/common/streams/src/sources/source_values.rs index e44f5c3b87863..5aec378a31703 100644 --- a/common/streams/src/sources/source_values.rs +++ b/common/streams/src/sources/source_values.rs @@ -14,18 +14,16 @@ use std::io; use std::io::BufReader; -use std::sync::Arc; use common_datablocks::DataBlock; use common_datavalues::DataSchemaRef; use common_exception::Result; -use common_infallible::RwLock; use common_io::prelude::*; use crate::Source; pub struct ValueSource { - reader: Arc>>, + reader: BufReader, schema: DataSchemaRef, block_size: usize, rows: usize, @@ -37,7 +35,7 @@ where R: io::Read + Send + Sync pub fn new(reader: R, schema: DataSchemaRef, block_size: usize) -> Self { let reader = BufReader::new(reader); Self { - reader: Arc::new(RwLock::new(reader)), + reader, block_size, schema, rows: 0, @@ -49,7 +47,7 @@ impl Source for ValueSource where R: io::Read + Send + Sync { fn read(&mut self) -> Result> { - let mut reader = self.reader.write(); + let reader = &mut self.reader; let mut buf = Vec::new(); let mut temp = Vec::new();