diff --git a/src/common/hashtable/src/dictionary_string_hashtable.rs b/src/common/hashtable/src/dictionary_string_hashtable.rs index 5ae808a11b8f8..cfde4dc66c2b0 100644 --- a/src/common/hashtable/src/dictionary_string_hashtable.rs +++ b/src/common/hashtable/src/dictionary_string_hashtable.rs @@ -20,7 +20,7 @@ use std::ptr::NonNull; use std::sync::Arc; use bumpalo::Bump; -use databend_common_base::mem_allocator::MmapAllocator; +use databend_common_base::mem_allocator::DefaultAllocator; use crate::container::Container; use crate::container::HeapContainer; @@ -102,7 +102,7 @@ pub struct DictionaryStringHashTable { arena: Arc, dict_keys: usize, entries_len: usize, - pub(crate) entries: HeapContainer, MmapAllocator>, + pub(crate) entries: HeapContainer, DefaultAllocator>, pub dictionary_hashset: StringHashSet<[u8]>, } @@ -116,7 +116,7 @@ impl DictionaryStringHashTable { arena: bump.clone(), dict_keys, entries_len: 0, - entries: unsafe { HeapContainer::new_zeroed(256, MmapAllocator::default()) }, + entries: unsafe { HeapContainer::new_zeroed(256, DefaultAllocator {}) }, dictionary_hashset: StringHashSet::new(bump), } } @@ -452,7 +452,7 @@ impl HashtableLike for DictionaryStringHashTable { } } - self.entries = HeapContainer::new_zeroed(0, MmapAllocator::default()); + self.entries = HeapContainer::new_zeroed(0, DefaultAllocator {}); } self.dictionary_hashset.clear(); @@ -615,7 +615,7 @@ impl<'a, V> Iterator for DictionaryTableMutIter<'a, V> { } pub struct DictionarySlotIter<'a> { - empty: Option<&'a TableEmpty<(), MmapAllocator>>, + empty: Option<&'a TableEmpty<(), DefaultAllocator>>, entities_slice: &'a [Entry], i: usize, } diff --git a/src/common/hashtable/src/hashjoin_hashtable.rs b/src/common/hashtable/src/hashjoin_hashtable.rs index 12a4c1ac20c9c..0fec13dc8a4d2 100644 --- a/src/common/hashtable/src/hashjoin_hashtable.rs +++ b/src/common/hashtable/src/hashjoin_hashtable.rs @@ -17,7 +17,7 @@ use std::marker::PhantomData; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; -use databend_common_base::mem_allocator::MmapAllocator; +use databend_common_base::mem_allocator::DefaultAllocator; use databend_common_column::bitmap::Bitmap; use super::traits::HashJoinHashtableLike; @@ -101,7 +101,7 @@ pub fn hash_bits() -> u32 { } } -pub struct HashJoinHashTable { +pub struct HashJoinHashTable { pub(crate) pointers: Box<[u64], A>, pub(crate) atomic_pointers: *mut AtomicU64, pub(crate) hash_shift: usize, diff --git a/src/common/hashtable/src/hashjoin_string_hashtable.rs b/src/common/hashtable/src/hashjoin_string_hashtable.rs index 8fdc7aa13208f..2779bb08f9964 100644 --- a/src/common/hashtable/src/hashjoin_string_hashtable.rs +++ b/src/common/hashtable/src/hashjoin_string_hashtable.rs @@ -16,7 +16,7 @@ use std::alloc::Allocator; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; -use databend_common_base::mem_allocator::MmapAllocator; +use databend_common_base::mem_allocator::DefaultAllocator; use databend_common_column::bitmap::Bitmap; use super::traits::HashJoinHashtableLike; @@ -37,7 +37,7 @@ pub struct StringRawEntry { pub next: u64, } -pub struct HashJoinStringHashTable { +pub struct HashJoinStringHashTable { pub(crate) pointers: Box<[u64], A>, pub(crate) atomic_pointers: *mut AtomicU64, pub(crate) hash_shift: usize, diff --git a/src/common/hashtable/src/hashtable.rs b/src/common/hashtable/src/hashtable.rs index 3f5f3055bbf63..6591217a2447c 100644 --- a/src/common/hashtable/src/hashtable.rs +++ b/src/common/hashtable/src/hashtable.rs @@ -17,7 +17,7 @@ use std::intrinsics::unlikely; use std::iter::TrustedLen; use std::mem::MaybeUninit; -use databend_common_base::mem_allocator::MmapAllocator; +use databend_common_base::mem_allocator::DefaultAllocator; use super::container::HeapContainer; use super::table0::Entry; @@ -29,7 +29,7 @@ use super::traits::Keyable; use super::utils::ZeroEntry; use crate::FastHash; -pub struct Hashtable +pub struct Hashtable where K: Keyable, A: Allocator + Clone, diff --git a/src/common/hashtable/src/lookup_hashtable.rs b/src/common/hashtable/src/lookup_hashtable.rs index bcaf6f44ded4a..19be8634c7983 100644 --- a/src/common/hashtable/src/lookup_hashtable.rs +++ b/src/common/hashtable/src/lookup_hashtable.rs @@ -17,13 +17,17 @@ use std::iter::TrustedLen; use std::mem; use std::mem::MaybeUninit; -use databend_common_base::mem_allocator::MmapAllocator; +use databend_common_base::mem_allocator::DefaultAllocator; use crate::table0::Entry; use crate::HashtableLike; -pub struct LookupHashtable -{ +pub struct LookupHashtable< + K: Sized, + const CAPACITY: usize, + V, + A: Allocator + Clone = DefaultAllocator, +> { flags: Box<[bool; CAPACITY], A>, data: Box<[Entry; CAPACITY], A>, len: usize, diff --git a/src/common/hashtable/src/short_string_hashtable.rs b/src/common/hashtable/src/short_string_hashtable.rs index 9dacd1d424d7a..ab2b95b756c8c 100644 --- a/src/common/hashtable/src/short_string_hashtable.rs +++ b/src/common/hashtable/src/short_string_hashtable.rs @@ -21,7 +21,7 @@ use std::ptr::NonNull; use std::sync::Arc; use bumpalo::Bump; -use databend_common_base::mem_allocator::MmapAllocator; +use databend_common_base::mem_allocator::DefaultAllocator; use super::container::HeapContainer; use super::table0::Entry; @@ -39,7 +39,7 @@ use crate::table_empty::TableEmpty; use crate::table_empty::TableEmptyIter; use crate::table_empty::TableEmptyIterMut; -pub struct ShortStringHashtable +pub struct ShortStringHashtable where K: UnsizedKeyable + ?Sized, A: Allocator + Clone, diff --git a/src/common/hashtable/src/stack_hashtable.rs b/src/common/hashtable/src/stack_hashtable.rs index 36dc1d070ae25..5be8ebbb649b0 100644 --- a/src/common/hashtable/src/stack_hashtable.rs +++ b/src/common/hashtable/src/stack_hashtable.rs @@ -16,7 +16,7 @@ use std::alloc::Allocator; use std::intrinsics::unlikely; use std::mem::MaybeUninit; -use databend_common_base::mem_allocator::MmapAllocator; +use databend_common_base::mem_allocator::DefaultAllocator; use super::container::StackContainer; use super::table0::Entry; @@ -25,7 +25,7 @@ use super::table0::Table0Iter; use super::traits::Keyable; use super::utils::ZeroEntry; -pub struct StackHashtable +pub struct StackHashtable where K: Keyable, A: Allocator + Clone, diff --git a/src/common/hashtable/src/string_hashtable.rs b/src/common/hashtable/src/string_hashtable.rs index 076c700bfbb32..83a86106f500c 100644 --- a/src/common/hashtable/src/string_hashtable.rs +++ b/src/common/hashtable/src/string_hashtable.rs @@ -19,7 +19,7 @@ use std::mem::MaybeUninit; use std::sync::Arc; use bumpalo::Bump; -use databend_common_base::mem_allocator::MmapAllocator; +use databend_common_base::mem_allocator::DefaultAllocator; use super::container::HeapContainer; use super::table0::Entry; @@ -38,7 +38,7 @@ use crate::table_empty::TableEmptyIterMut; /// Simple unsized hashtable is used for storing unsized keys in arena. It can be worked with HashMethodSerializer. /// Different from `ShortStringHashTable`, it doesn't use adaptive sub hashtable to store key values via key size. /// It can be considered as a minimal hashtable implementation of ShortStringHashTable -pub struct StringHashtable +pub struct StringHashtable where K: UnsizedKeyable + ?Sized, A: Allocator + Clone, diff --git a/src/query/expression/src/aggregate/payload.rs b/src/query/expression/src/aggregate/payload.rs index f08e61280f5cc..ad76f6dfe5927 100644 --- a/src/query/expression/src/aggregate/payload.rs +++ b/src/query/expression/src/aggregate/payload.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use bumpalo::Bump; use databend_common_base::runtime::drop_guard; use itertools::Itertools; +use log::info; use strength_reduce::StrengthReducedU64; use super::payload_row::rowformat_size; @@ -78,10 +79,18 @@ unsafe impl Sync for Payload {} pub struct Page { pub(crate) data: Vec>, pub(crate) rows: usize, - pub(crate) state_rows: usize, + // state_offset = state_rows * agg_len + // which mark that the offset to clean the agg states + pub(crate) state_offsets: usize, pub(crate) capacity: usize, } +impl Page { + pub fn is_partial_state(&self, agg_len: usize) -> bool { + self.rows * agg_len != self.state_offsets + } +} + pub type Pages = Vec; // TODO FIXME @@ -179,7 +188,7 @@ impl Payload { self.pages.push(Page { data, rows: 0, - state_rows: 0, + state_offsets: 0, capacity: self.row_per_page, }); } @@ -300,10 +309,18 @@ impl Payload { } let place = StateAddr::from(place); + let page = &mut self.pages[page_index[idx]]; for (aggr, offset) in self.aggrs.iter().zip(self.state_addr_offsets.iter()) { aggr.init_state(place.next(*offset)); + page.state_offsets += 1; + } + } + + #[cfg(debug_assertions)] + { + for page in self.pages.iter() { + assert_eq!(page.rows * self.aggrs.len(), page.state_offsets); } - self.pages[page_index[idx]].state_rows += 1; } } @@ -338,6 +355,7 @@ impl Payload { address: &[*const u8], ) { let tuple_size = self.tuple_size; + let agg_len = self.aggrs.len(); let (mut page, _) = self.writable_page(); for i in 0..row_count { let index = select_vector[i]; @@ -350,7 +368,7 @@ impl Payload { ) } page.rows += 1; - page.state_rows += 1; + page.state_offsets += agg_len; if page.rows == page.capacity { (page, _) = self.writable_page(); @@ -421,10 +439,26 @@ impl Drop for Payload { drop_guard(move || { // drop states if !self.state_move_out { - for (aggr, addr_offset) in self.aggrs.iter().zip(self.state_addr_offsets.iter()) { + 'FOR: for (idx, (aggr, addr_offset)) in self + .aggrs + .iter() + .zip(self.state_addr_offsets.iter()) + .enumerate() + { if aggr.need_manual_drop_state() { for page in self.pages.iter() { - for row in 0..page.state_rows { + let is_partial_state = page.is_partial_state(self.aggrs.len()); + + if is_partial_state && idx == 0 { + info!("Cleaning partial page, state_offsets: {}, row: {}, agg length: {}", page.state_offsets, page.rows, self.aggrs.len()); + } + for row in 0..page.state_offsets.div_ceil(self.aggrs.len()) { + // When OOM, some states are not initialized, we don't need to destroy them + if is_partial_state + && row * self.aggrs.len() + idx >= page.state_offsets + { + continue 'FOR; + } let ptr = self.data_ptr(page, row); unsafe { let state_addr = diff --git a/src/query/functions/src/aggregates/aggregate_distinct_state.rs b/src/query/functions/src/aggregates/aggregate_distinct_state.rs index 2c825d2bcc258..f15b4200749e8 100644 --- a/src/query/functions/src/aggregates/aggregate_distinct_state.rs +++ b/src/query/functions/src/aggregates/aggregate_distinct_state.rs @@ -41,6 +41,7 @@ use databend_common_hashtable::HashSet as CommonHashSet; use databend_common_hashtable::HashtableKeyable; use databend_common_hashtable::HashtableLike; use databend_common_hashtable::ShortStringHashSet; +use databend_common_hashtable::StackHashSet; use databend_common_io::prelude::*; use siphasher::sip128::Hasher128; use siphasher::sip128::SipHasher24; @@ -318,13 +319,13 @@ where T: Number + BorshSerialize + BorshDeserialize + HashtableKeyable // For count(distinct string) and uniq(string) pub struct AggregateUniqStringState { - set: CommonHashSet, + set: StackHashSet, } impl DistinctStateFunc for AggregateUniqStringState { fn new() -> Self { AggregateUniqStringState { - set: CommonHashSet::new(), + set: StackHashSet::new(), } } @@ -338,7 +339,7 @@ impl DistinctStateFunc for AggregateUniqStringState { fn deserialize(reader: &mut &[u8]) -> Result { let size = reader.read_uvarint()?; - let mut set = CommonHashSet::with_capacity(size as usize); + let mut set = StackHashSet::with_capacity(size as usize); for _ in 0..size { let e = borsh_deserialize_state(reader)?; let _ = set.set_insert(e).is_ok();