From 5de1d5416eaea1ca8ba405191cf0f5dd1df472a6 Mon Sep 17 00:00:00 2001 From: coldWater Date: Wed, 9 Oct 2024 11:03:52 +0800 Subject: [PATCH 01/12] set_size Signed-off-by: coldWater --- Cargo.lock | 15 ++++++----- src/query/service/Cargo.toml | 1 + src/query/service/src/spillers/spiller.rs | 1 + .../storages/common/cache/src/temp_dir.rs | 26 ++++++++++++++++++- 4 files changed, 35 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c02ddbbcf4635..0e016c432af83 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5279,6 +5279,7 @@ dependencies = [ "time", "tokio", "tokio-stream", + "tokio-util", "toml 0.8.19", "tonic 0.11.0", "tower", @@ -8543,7 +8544,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.7", "tokio", "tower-service", "tracing", @@ -9575,7 +9576,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -12115,7 +12116,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "18bec9b0adc4eba778b33684b7ba3e7137789434769ee3ce3930463ef904cfca" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.13.0", "proc-macro2", "quote", "syn 2.0.58", @@ -12259,7 +12260,7 @@ dependencies = [ "indoc", "libc", "memoffset", - "parking_lot 0.11.2", + "parking_lot 0.12.3", "portable-atomic", "pyo3-build-config", "pyo3-ffi", @@ -15305,9 +15306,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.11" +version = "0.7.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" +checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a" dependencies = [ "bytes", "futures-core", @@ -15628,7 +15629,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ "cfg-if", - "rand 0.7.3", + "rand 0.8.5", "static_assertions", ] diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index 5f0bd0831a480..982e825f6d5e6 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -170,6 +170,7 @@ tempfile = "3.4.0" time = "0.3.14" tokio = { workspace = true } tokio-stream = { workspace = true, features = ["net"] } +tokio-util = "0.7.12" toml = { version = "0.8", default-features = false } tonic = { workspace = true } typetag = { workspace = true } diff --git a/src/query/service/src/spillers/spiller.rs b/src/query/service/src/spillers/spiller.rs index d13229b65df66..ae2cc9b912c03 100644 --- a/src/query/service/src/spillers/spiller.rs +++ b/src/query/service/src/spillers/spiller.rs @@ -36,6 +36,7 @@ use databend_common_expression::DataBlock; use databend_storages_common_cache::TempDir; use databend_storages_common_cache::TempPath; use opendal::Operator; +use tokio_util::io::SyncIoBridge; use crate::sessions::QueryContext; diff --git a/src/query/storages/common/cache/src/temp_dir.rs b/src/query/storages/common/cache/src/temp_dir.rs index 5493e9faf4002..e00318e8ebb2b 100644 --- a/src/query/storages/common/cache/src/temp_dir.rs +++ b/src/query/storages/common/cache/src/temp_dir.rs @@ -200,6 +200,8 @@ pub struct TempDir { } impl TempDir { + // It should be ensured that the actual size is less than or equal to + // the reserved size as much as possible, otherwise the limit may be exceeded. pub fn new_file_with_size(&self, size: usize) -> Result> { let path = self.path.join(GlobalUniqName::unique()).into_boxed_path(); @@ -306,6 +308,27 @@ impl TempPath { pub fn size(&self) -> usize { self.0.size } + + pub fn set_size(&mut self, size: usize) -> std::result::Result<(), &'static str> { + use std::cmp::Ordering; + + let Some(path) = Arc::get_mut(&mut self.0) else { + return Err("can't set size after share"); + }; + match size.cmp(&path.size) { + Ordering::Equal => {} + Ordering::Greater => { + let mut dir = path.dir_info.size.lock().unwrap(); + *dir += size - path.size; + } + Ordering::Less => { + let mut dir = path.dir_info.size.lock().unwrap(); + *dir -= path.size - size; + } + } + path.size = size; + Ok(()) + } } struct InnerPath { @@ -348,11 +371,12 @@ mod tests { let mgr = TempDirManager::instance(); let dir = mgr.get_disk_spill_dir(1 << 30, "some_query").unwrap(); - let path = dir.new_file_with_size(100)?.unwrap(); + let mut path = dir.new_file_with_size(110)?.unwrap(); println!("{:?}", &path); fs::write(&path, vec![b'a'; 100])?; + path.set_size(100).unwrap(); assert_eq!(1, dir.dir_info.count.load(Ordering::Relaxed)); assert_eq!(100, *dir.dir_info.size.lock().unwrap()); From 5227c7647ee454c716b29f7ddd3a264ac2fd7e2e Mon Sep 17 00:00:00 2001 From: coldWater Date: Wed, 9 Oct 2024 16:16:30 +0800 Subject: [PATCH 02/12] TempDirManager alignment Signed-off-by: coldWater --- src/query/storages/common/cache/src/lib.rs | 1 + .../storages/common/cache/src/temp_dir.rs | 24 +++++++++++++++---- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/src/query/storages/common/cache/src/lib.rs b/src/query/storages/common/cache/src/lib.rs index 8c70b627aeacd..d4d39a65cea36 100644 --- a/src/query/storages/common/cache/src/lib.rs +++ b/src/query/storages/common/cache/src/lib.rs @@ -15,6 +15,7 @@ #![feature(write_all_vectored)] #![feature(associated_type_defaults)] #![feature(assert_matches)] +#![feature(ptr_alignment_type)] mod cache; mod caches; diff --git a/src/query/storages/common/cache/src/temp_dir.rs b/src/query/storages/common/cache/src/temp_dir.rs index e00318e8ebb2b..7b69693b97e1e 100644 --- a/src/query/storages/common/cache/src/temp_dir.rs +++ b/src/query/storages/common/cache/src/temp_dir.rs @@ -24,6 +24,7 @@ use std::ops::Deref; use std::ops::Drop; use std::path::Path; use std::path::PathBuf; +use std::ptr::Alignment; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -44,14 +45,15 @@ pub struct TempDirManager { global_limit: usize, // Reserved disk space in blocks reserved: u64, + alignment: Alignment, group: Mutex, } impl TempDirManager { pub fn init(config: &SpillConfig, tenant_id: &str) -> Result<()> { - let (root, reserved) = if config.path.is_empty() { - (None, 0) + let (root, reserved, alignment) = if config.path.is_empty() { + (None, 0, Alignment::MIN) } else { let path = PathBuf::from(&config.path) .join(tenant_id) @@ -66,13 +68,16 @@ impl TempDirManager { } if create_dir_all(&path).is_err() { - (None, 0) + (None, 0, Alignment::MIN) } else { let stat = statvfs(path.as_ref()).map_err(|e| ErrorCode::StorageOther(e.to_string()))?; - let reserved = (stat.f_blocks as f64 * *config.reserved_disk_ratio) as u64; - (Some(path), reserved) + ( + Some(path), + (stat.f_blocks as f64 * *config.reserved_disk_ratio) as u64, + Alignment::new(stat.f_bsize.max(512) as usize).unwrap(), + ) } }; @@ -80,6 +85,7 @@ impl TempDirManager { root, global_limit: config.global_bytes_limit as usize, reserved, + alignment, group: Mutex::new(Group { dirs: HashMap::new(), }), @@ -175,6 +181,10 @@ impl TempDirManager { } } + pub fn block_alignment(&self) -> Alignment { + self.alignment + } + fn insufficient_disk(&self, size: u64) -> Result { let stat = statvfs(self.root.as_ref().unwrap().as_ref()) .map_err(|e| ErrorCode::Internal(e.to_string()))?; @@ -243,6 +253,10 @@ impl TempDir { }); Ok(rt?) } + + pub fn block_alignment(&self) -> Alignment { + self.manager.alignment + } } struct DirInfo { From b5a3e50400662c789ac54649f77eee77b6c96d0a Mon Sep 17 00:00:00 2001 From: coldWater Date: Wed, 9 Oct 2024 19:12:52 +0800 Subject: [PATCH 03/12] BlocksEncoder Signed-off-by: coldWater --- Cargo.lock | 1 - src/common/base/src/base/dma.rs | 1 + src/common/base/src/base/mod.rs | 1 + src/query/expression/src/utils/arrow.rs | 26 ++-- src/query/service/Cargo.toml | 1 - src/query/service/src/lib.rs | 2 + src/query/service/src/spillers/spiller.rs | 156 +++++++++++++++++----- 7 files changed, 139 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0e016c432af83..604e95d5447d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5279,7 +5279,6 @@ dependencies = [ "time", "tokio", "tokio-stream", - "tokio-util", "toml 0.8.19", "tonic 0.11.0", "tower", diff --git a/src/common/base/src/base/dma.rs b/src/common/base/src/base/dma.rs index 192a6ecc5e46b..1e5d264a0b436 100644 --- a/src/common/base/src/base/dma.rs +++ b/src/common/base/src/base/dma.rs @@ -33,6 +33,7 @@ use crate::runtime::spawn_blocking; unsafe impl Send for DmaAllocator {} +#[derive(Clone, Copy)] pub struct DmaAllocator(Alignment); impl DmaAllocator { diff --git a/src/common/base/src/base/mod.rs b/src/common/base/src/base/mod.rs index 5ac11ea7a46cf..ef46456a3e27e 100644 --- a/src/common/base/src/base/mod.rs +++ b/src/common/base/src/base/mod.rs @@ -32,6 +32,7 @@ pub use dma::dma_buffer_as_vec; pub use dma::dma_read_file; pub use dma::dma_read_file_range; pub use dma::dma_write_file_vectored; +pub use dma::DmaAllocator; pub use net::get_free_tcp_port; pub use net::get_free_udp_port; pub use ordered_float::OrderedFloat; diff --git a/src/query/expression/src/utils/arrow.rs b/src/query/expression/src/utils/arrow.rs index a57a871c09a6a..6dbadbf1730f2 100644 --- a/src/query/expression/src/utils/arrow.rs +++ b/src/query/expression/src/utils/arrow.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::io::Cursor; +use std::io::Write; use databend_common_arrow::arrow::array::Array; use databend_common_arrow::arrow::bitmap::Bitmap; @@ -66,19 +67,22 @@ pub fn buffer_into_mut(mut buffer: Buffer) -> Vec { pub fn serialize_column(col: &Column) -> Vec { let mut buffer = Vec::new(); + serialize_column_in(col, &mut buffer).unwrap(); + buffer +} +pub fn serialize_column_in( + col: &Column, + w: &mut impl Write, +) -> databend_common_arrow::arrow::error::Result<()> { let schema = Schema::from(vec![col.arrow_field()]); - let mut writer = FileWriter::new(&mut buffer, schema, None, IpcWriteOptions::default()); - writer.start().unwrap(); - writer - .write( - &databend_common_arrow::arrow::chunk::Chunk::new(vec![col.as_arrow()]), - None, - ) - .unwrap(); - writer.finish().unwrap(); - - buffer + let mut writer = FileWriter::new(w, schema, None, IpcWriteOptions::default()); + writer.start()?; + writer.write( + &databend_common_arrow::arrow::chunk::Chunk::new(vec![col.as_arrow()]), + None, + )?; + writer.finish() } pub fn deserialize_column(bytes: &[u8]) -> Result { diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index 982e825f6d5e6..5f0bd0831a480 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -170,7 +170,6 @@ tempfile = "3.4.0" time = "0.3.14" tokio = { workspace = true } tokio-stream = { workspace = true, features = ["net"] } -tokio-util = "0.7.12" toml = { version = "0.8", default-features = false } tonic = { workspace = true } typetag = { workspace = true } diff --git a/src/query/service/src/lib.rs b/src/query/service/src/lib.rs index b9d6d07fd6778..63ea6a688e06d 100644 --- a/src/query/service/src/lib.rs +++ b/src/query/service/src/lib.rs @@ -16,6 +16,8 @@ #![allow(internal_features)] #![allow(clippy::useless_asref)] #![allow(clippy::uninlined_format_args)] +#![feature(ptr_alignment_type)] +#![feature(allocator_api)] #![feature(hash_raw_entry)] #![feature(core_intrinsics)] #![feature(arbitrary_self_types)] diff --git a/src/query/service/src/spillers/spiller.rs b/src/query/service/src/spillers/spiller.rs index ae2cc9b912c03..f9d55911d8c1d 100644 --- a/src/query/service/src/spillers/spiller.rs +++ b/src/query/service/src/spillers/spiller.rs @@ -17,13 +17,16 @@ use std::collections::HashSet; use std::fmt::Display; use std::fmt::Formatter; use std::io; +use std::io::Write; use std::ops::Range; +use std::ptr::Alignment; use std::sync::Arc; use std::time::Instant; use databend_common_base::base::dma_buffer_as_vec; use databend_common_base::base::dma_read_file_range; use databend_common_base::base::dma_write_file_vectored; +use databend_common_base::base::DmaAllocator; use databend_common_base::base::GlobalUniqName; use databend_common_base::base::ProgressValues; use databend_common_base::runtime::profile::Profile; @@ -31,12 +34,11 @@ use databend_common_base::runtime::profile::ProfileStatisticsName; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_expression::arrow::deserialize_column; -use databend_common_expression::arrow::serialize_column; +use databend_common_expression::arrow::serialize_column_in; use databend_common_expression::DataBlock; use databend_storages_common_cache::TempDir; use databend_storages_common_cache::TempPath; use opendal::Operator; -use tokio_util::io::SyncIoBridge; use crate::sessions::QueryContext; @@ -127,10 +129,16 @@ impl Spiller { let instant = Instant::now(); // Spill data to storage. - let encoded = EncodedBlock::from_block(data_block); - let columns_layout = encoded.columns_layout(); - let data_size = encoded.size(); - let location = self.write_encodes(data_size, vec![encoded]).await?; + let mut encoder = self.block_encoder(); + encoder.add_block(data_block); + let data_size = encoder.size(); + let BlocksEncoder { + data, + mut columns_layout, + .. + } = encoder; + + let location = self.write_encodes(data_size, data).await?; // Record statistics. match location { @@ -139,7 +147,8 @@ impl Spiller { } // Record columns layout for spilled data. - self.columns_layout.insert(location.clone(), columns_layout); + self.columns_layout + .insert(location.clone(), columns_layout.pop().unwrap()); Ok(location) } @@ -180,24 +189,35 @@ impl Spiller { partitioned_data: Vec<(usize, DataBlock)>, ) -> Result { // Serialize data block. - let mut write_bytes = 0; - let mut write_data = Vec::with_capacity(partitioned_data.len()); - let mut spilled_partitions = Vec::with_capacity(partitioned_data.len()); + let mut encoder = self.block_encoder(); + let mut partition_ids = Vec::new(); for (partition_id, data_block) in partitioned_data.into_iter() { - let begin = write_bytes; - - let encoded = EncodedBlock::from_block(data_block); - let columns_layout = encoded.columns_layout(); - let data_size = encoded.size(); - - write_bytes += data_size; - write_data.push(encoded); - spilled_partitions.push((partition_id, begin..write_bytes, columns_layout)); + partition_ids.push(partition_id); + encoder.add_block(data_block); } + let write_bytes = encoder.size(); + let BlocksEncoder { + data, + offsets, + columns_layout, + .. + } = encoder; + + let partitions = partition_ids + .into_iter() + .zip( + offsets + .windows(2) + .map(|x| x[0]..x[1]) + .zip(columns_layout.into_iter()), + ) + .map(|(id, (range, layout))| (id, range, layout)) + .collect(); + // Spill data to storage. let instant = Instant::now(); - let location = self.write_encodes(write_bytes, write_data).await?; + let location = self.write_encodes(write_bytes, data).await?; // Record statistics. match location { @@ -207,7 +227,7 @@ impl Spiller { Ok(SpilledData::MergedPartition { location, - partitions: spilled_partitions, + partitions, }) } @@ -338,7 +358,11 @@ impl Spiller { } } - async fn write_encodes(&mut self, size: usize, blocks: Vec) -> Result { + async fn write_encodes( + &mut self, + size: usize, + data: Vec>, + ) -> Result { let location = match &self.disk_spill { None => None, Some(disk) => disk.new_file_with_size(size)?.map(Location::Local), @@ -358,18 +382,17 @@ impl Spiller { .await?; let mut written = 0; - for data in blocks.into_iter().flat_map(|x| x.0) { + for data in data.into_iter() { written += data.len(); - writer.write(data).await?; + writer.write(dma_buffer_as_vec(data)).await?; } writer.close().await?; written } Location::Local(path) => { - let bufs = blocks + let bufs = data .iter() - .flat_map(|x| &x.0) .map(|data| io::IoSlice::new(data)) .collect::>(); @@ -380,6 +403,15 @@ impl Spiller { Ok(location) } + fn block_encoder(&self) -> BlocksEncoder { + let align = self + .disk_spill + .as_ref() + .map(|dir| dir.block_alignment()) + .unwrap_or(Alignment::MIN); + BlocksEncoder::new(align, 8 * 1024 * 1024) + } + pub(crate) fn spilled_files(&self) -> Vec { self.columns_layout.keys().cloned().collect() } @@ -399,29 +431,81 @@ pub enum Location { Local(TempPath), } -pub struct EncodedBlock(pub Vec>); +struct BlocksEncoder { + allocator: DmaAllocator, + data: Vec>, + offsets: Vec, + columns_layout: Vec>, + chunk: usize, +} + +impl BlocksEncoder { + fn new(align: Alignment, chunk: usize) -> Self { + Self { + allocator: DmaAllocator::new(align), + data: Vec::new(), + offsets: vec![0], + columns_layout: Vec::new(), + chunk: align_up(align, chunk), + } + } -impl EncodedBlock { - pub fn from_block(block: DataBlock) -> Self { - let data = block + fn add_block(&mut self, block: DataBlock) { + let start = self.size(); + let columns_layout = block .columns() .iter() .map(|entry| { let column = entry .value .convert_to_full_column(&entry.data_type, block.num_rows()); - serialize_column(&column) + serialize_column_in(&column, self).unwrap(); + self.size() - start }) .collect(); - EncodedBlock(data) + self.columns_layout.push(columns_layout); + self.offsets.push(self.size()) } - pub fn columns_layout(&self) -> Vec { - self.0.iter().map(|data| data.len()).collect() + fn size(&self) -> usize { + self.data.iter().map(|x| x.len()).sum() + } +} + +fn align_up(alignment: Alignment, value: usize) -> usize { + (value + alignment.as_usize() - 1) & alignment.mask() +} + +impl Write for BlocksEncoder { + fn write(&mut self, mut buf: &[u8]) -> io::Result { + let n = buf.len(); + while !buf.is_empty() { + let (dst, remain) = match self.data.last_mut() { + Some(dst) if dst.len() < dst.capacity() => { + let remian = dst.capacity() - dst.len(); + (dst, remian) + } + _ => { + self.data + .push(Vec::with_capacity_in(self.chunk, self.allocator)); + (self.data.last_mut().unwrap(), self.chunk) + } + }; + + if buf.len() <= remain { + dst.extend_from_slice(buf); + buf = &buf[buf.len()..] + } else { + let (left, right) = buf.split_at(remain); + dst.extend_from_slice(left); + buf = right + } + } + Ok(n) } - pub fn size(&self) -> usize { - self.0.iter().map(|data| data.len()).sum() + fn flush(&mut self) -> io::Result<()> { + Ok(()) } } From 7cec4dc3dba8f4f727ebfb588dd150f2dc7b94ef Mon Sep 17 00:00:00 2001 From: coldWater Date: Wed, 9 Oct 2024 22:39:31 +0800 Subject: [PATCH 04/12] DmaWriteBuf Signed-off-by: coldWater --- src/common/base/src/base/dma.rs | 154 +++++++++++++++++++--- src/common/base/src/base/mod.rs | 1 + src/query/service/src/spillers/spiller.rs | 79 ++--------- 3 files changed, 146 insertions(+), 88 deletions(-) diff --git a/src/common/base/src/base/dma.rs b/src/common/base/src/base/dma.rs index 1e5d264a0b436..12db38c42e274 100644 --- a/src/common/base/src/base/dma.rs +++ b/src/common/base/src/base/dma.rs @@ -19,6 +19,7 @@ use std::alloc::Layout; use std::io; use std::io::IoSlice; use std::io::SeekFrom; +use std::io::Write; use std::ops::Range; use std::os::fd::BorrowedFd; use std::os::unix::io::AsRawFd; @@ -116,28 +117,22 @@ pub mod linux { use super::*; impl DmaFile { - /// Attempts to open a file in read-only mode. - pub(super) async fn open(path: impl AsRef) -> io::Result { - let file = File::options() + pub(super) async fn open_raw(path: impl AsRef) -> io::Result { + File::options() .read(true) .custom_flags(OFlags::DIRECT.bits() as i32) .open(path) - .await?; - - open_dma(file).await + .await } - /// Opens a file in write-only mode. - pub(super) async fn create(path: impl AsRef) -> io::Result { - let file = File::options() + pub(super) async fn create_raw(path: impl AsRef) -> io::Result { + File::options() .write(true) .create(true) .truncate(true) .custom_flags((OFlags::DIRECT | OFlags::EXCL).bits() as i32) .open(path) - .await?; - - open_dma(file).await + .await } } } @@ -150,28 +145,36 @@ pub mod not_linux { impl DmaFile { /// Attempts to open a file in read-only mode. - pub(super) async fn open(path: impl AsRef) -> io::Result { - let file = File::options().read(true).open(path).await?; - - open_dma(file).await + pub(super) async fn open_raw(path: impl AsRef) -> io::Result { + File::options().read(true).open(path).await } /// Opens a file in write-only mode. - pub(super) async fn create(path: impl AsRef) -> io::Result { - let file = File::options() + pub(super) async fn create_raw(path: impl AsRef) -> io::Result { + File::options() .write(true) .create(true) .truncate(true) .custom_flags(OFlags::EXCL.bits() as i32) .open(path) - .await?; - - open_dma(file).await + .await } } } impl DmaFile { + /// Attempts to open a file in read-only mode. + async fn open(path: impl AsRef) -> io::Result { + let file = DmaFile::open_raw(path).await?; + open_dma(file).await + } + + /// Opens a file in write-only mode. + async fn create(path: impl AsRef) -> io::Result { + let file = DmaFile::create_raw(path).await?; + open_dma(file).await + } + fn set_buffer(&mut self, buf: DmaBuffer) { self.buf = Some(buf) } @@ -283,6 +286,103 @@ where } } +pub struct DmaWriteBuf { + allocator: DmaAllocator, + data: Vec>, + chunk: usize, +} + +impl DmaWriteBuf { + pub fn new(align: Alignment, chunk: usize) -> DmaWriteBuf { + DmaWriteBuf { + allocator: DmaAllocator::new(align), + data: Vec::new(), + chunk: align_up(align, chunk), + } + } + + pub fn size(&self) -> usize { + if self.data.is_empty() { + return 0; + } + + (self.data.len() - 1) * self.chunk + self.data.last().unwrap().len() + } + + pub async fn into_file(mut self, path: impl AsRef) -> io::Result { + let mut file = DmaFile { + fd: DmaFile::create_raw(path).await?, + alignment: self.allocator.0, + buf: None, + }; + + let file_length = self.size(); + + let Some(mut last) = self.data.pop() else { + return Ok(0); + }; + + for buf in self.data { + debug_assert_eq!(buf.len(), buf.capacity()); + file.set_buffer(buf); + file = asyncify(move || file.write_direct().map(|_| file)).await?; + } + + let len = last.len(); + let align_up = file.align_up(len); + if align_up == len { + file.set_buffer(last); + asyncify(move || file.write_direct()).await?; + } else { + unsafe { last.set_len(align_up) } + file.set_buffer(last); + asyncify(move || { + file.write_direct()?; + file.truncate(file_length) + }) + .await?; + } + Ok(file_length) + } + + pub fn into_data(self) -> Vec { + self.data + } +} + +impl Write for DmaWriteBuf { + fn write(&mut self, mut buf: &[u8]) -> io::Result { + let n = buf.len(); + while !buf.is_empty() { + let (dst, remain) = match self.data.last_mut() { + Some(dst) if dst.len() < dst.capacity() => { + let remian = dst.capacity() - dst.len(); + (dst, remian) + } + _ => { + self.data + .push(Vec::with_capacity_in(self.chunk, self.allocator)); + (self.data.last_mut().unwrap(), self.chunk) + } + }; + + if buf.len() <= remain { + dst.extend_from_slice(buf); + buf = &buf[buf.len()..] + } else { + let (left, right) = buf.split_at(remain); + dst.extend_from_slice(left); + buf = right + } + } + Ok(n) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + pub async fn dma_write_file_vectored<'a>( path: impl AsRef, bufs: &'a [IoSlice<'a>], @@ -458,6 +558,18 @@ mod tests { assert_eq!(length, want.len()); assert_eq!(got, want); + let file = DmaFile::open(filename).await?; + let align = file.alignment; + drop(file); + + std::fs::remove_file(filename)?; + + let mut buf = DmaWriteBuf::new(align, align.as_usize()); + buf.write_all(&want)?; + let length = buf.into_file(filename).await?; + + assert_eq!(length, want.len()); + let (buf, range) = dma_read_file_range(filename, 0..length as u64).await?; assert_eq!(&buf[range], &want); diff --git a/src/common/base/src/base/mod.rs b/src/common/base/src/base/mod.rs index ef46456a3e27e..62cfb9f7712b8 100644 --- a/src/common/base/src/base/mod.rs +++ b/src/common/base/src/base/mod.rs @@ -33,6 +33,7 @@ pub use dma::dma_read_file; pub use dma::dma_read_file_range; pub use dma::dma_write_file_vectored; pub use dma::DmaAllocator; +pub use dma::DmaWriteBuf; pub use net::get_free_tcp_port; pub use net::get_free_udp_port; pub use ordered_float::OrderedFloat; diff --git a/src/query/service/src/spillers/spiller.rs b/src/query/service/src/spillers/spiller.rs index f9d55911d8c1d..ceea6358f2328 100644 --- a/src/query/service/src/spillers/spiller.rs +++ b/src/query/service/src/spillers/spiller.rs @@ -16,8 +16,6 @@ use std::collections::HashMap; use std::collections::HashSet; use std::fmt::Display; use std::fmt::Formatter; -use std::io; -use std::io::Write; use std::ops::Range; use std::ptr::Alignment; use std::sync::Arc; @@ -25,8 +23,7 @@ use std::time::Instant; use databend_common_base::base::dma_buffer_as_vec; use databend_common_base::base::dma_read_file_range; -use databend_common_base::base::dma_write_file_vectored; -use databend_common_base::base::DmaAllocator; +use databend_common_base::base::DmaWriteBuf; use databend_common_base::base::GlobalUniqName; use databend_common_base::base::ProgressValues; use databend_common_base::runtime::profile::Profile; @@ -133,12 +130,12 @@ impl Spiller { encoder.add_block(data_block); let data_size = encoder.size(); let BlocksEncoder { - data, + buf, mut columns_layout, .. } = encoder; - let location = self.write_encodes(data_size, data).await?; + let location = self.write_encodes(data_size, buf).await?; // Record statistics. match location { @@ -198,7 +195,7 @@ impl Spiller { let write_bytes = encoder.size(); let BlocksEncoder { - data, + buf, offsets, columns_layout, .. @@ -217,7 +214,7 @@ impl Spiller { // Spill data to storage. let instant = Instant::now(); - let location = self.write_encodes(write_bytes, data).await?; + let location = self.write_encodes(write_bytes, buf).await?; // Record statistics. match location { @@ -358,11 +355,7 @@ impl Spiller { } } - async fn write_encodes( - &mut self, - size: usize, - data: Vec>, - ) -> Result { + async fn write_encodes(&mut self, size: usize, buf: DmaWriteBuf) -> Result { let location = match &self.disk_spill { None => None, Some(disk) => disk.new_file_with_size(size)?.map(Location::Local), @@ -382,7 +375,7 @@ impl Spiller { .await?; let mut written = 0; - for data in data.into_iter() { + for data in buf.into_data() { written += data.len(); writer.write(dma_buffer_as_vec(data)).await?; } @@ -390,14 +383,7 @@ impl Spiller { writer.close().await?; written } - Location::Local(path) => { - let bufs = data - .iter() - .map(|data| io::IoSlice::new(data)) - .collect::>(); - - dma_write_file_vectored(path.as_ref(), &bufs).await? - } + Location::Local(path) => buf.into_file(path).await?, }; debug_assert_eq!(size, written); Ok(location) @@ -432,21 +418,17 @@ pub enum Location { } struct BlocksEncoder { - allocator: DmaAllocator, - data: Vec>, + buf: DmaWriteBuf, offsets: Vec, columns_layout: Vec>, - chunk: usize, } impl BlocksEncoder { fn new(align: Alignment, chunk: usize) -> Self { Self { - allocator: DmaAllocator::new(align), - data: Vec::new(), + buf: DmaWriteBuf::new(align, chunk), offsets: vec![0], columns_layout: Vec::new(), - chunk: align_up(align, chunk), } } @@ -459,7 +441,7 @@ impl BlocksEncoder { let column = entry .value .convert_to_full_column(&entry.data_type, block.num_rows()); - serialize_column_in(&column, self).unwrap(); + serialize_column_in(&column, &mut self.buf).unwrap(); self.size() - start }) .collect(); @@ -468,44 +450,7 @@ impl BlocksEncoder { } fn size(&self) -> usize { - self.data.iter().map(|x| x.len()).sum() - } -} - -fn align_up(alignment: Alignment, value: usize) -> usize { - (value + alignment.as_usize() - 1) & alignment.mask() -} - -impl Write for BlocksEncoder { - fn write(&mut self, mut buf: &[u8]) -> io::Result { - let n = buf.len(); - while !buf.is_empty() { - let (dst, remain) = match self.data.last_mut() { - Some(dst) if dst.len() < dst.capacity() => { - let remian = dst.capacity() - dst.len(); - (dst, remian) - } - _ => { - self.data - .push(Vec::with_capacity_in(self.chunk, self.allocator)); - (self.data.last_mut().unwrap(), self.chunk) - } - }; - - if buf.len() <= remain { - dst.extend_from_slice(buf); - buf = &buf[buf.len()..] - } else { - let (left, right) = buf.split_at(remain); - dst.extend_from_slice(left); - buf = right - } - } - Ok(n) - } - - fn flush(&mut self) -> io::Result<()> { - Ok(()) + self.buf.size() } } From 64488ec2b3aff047369d6e75307aa71719ee478b Mon Sep 17 00:00:00 2001 From: coldWater Date: Wed, 9 Oct 2024 23:08:05 +0800 Subject: [PATCH 05/12] fix Signed-off-by: coldWater --- src/common/base/src/base/dma.rs | 4 ++-- src/query/service/src/lib.rs | 2 +- src/query/service/src/spillers/spiller.rs | 13 ++++++------- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/src/common/base/src/base/dma.rs b/src/common/base/src/base/dma.rs index 12db38c42e274..c5c183e99d8dc 100644 --- a/src/common/base/src/base/dma.rs +++ b/src/common/base/src/base/dma.rs @@ -356,8 +356,8 @@ impl Write for DmaWriteBuf { while !buf.is_empty() { let (dst, remain) = match self.data.last_mut() { Some(dst) if dst.len() < dst.capacity() => { - let remian = dst.capacity() - dst.len(); - (dst, remian) + let remain = dst.capacity() - dst.len(); + (dst, remain) } _ => { self.data diff --git a/src/query/service/src/lib.rs b/src/query/service/src/lib.rs index 63ea6a688e06d..5d7443bdd0d8b 100644 --- a/src/query/service/src/lib.rs +++ b/src/query/service/src/lib.rs @@ -17,7 +17,7 @@ #![allow(clippy::useless_asref)] #![allow(clippy::uninlined_format_args)] #![feature(ptr_alignment_type)] -#![feature(allocator_api)] +#![feature(iter_map_windows)] #![feature(hash_raw_entry)] #![feature(core_intrinsics)] #![feature(arbitrary_self_types)] diff --git a/src/query/service/src/spillers/spiller.rs b/src/query/service/src/spillers/spiller.rs index ceea6358f2328..a33b0a9a704eb 100644 --- a/src/query/service/src/spillers/spiller.rs +++ b/src/query/service/src/spillers/spiller.rs @@ -433,18 +433,17 @@ impl BlocksEncoder { } fn add_block(&mut self, block: DataBlock) { - let start = self.size(); - let columns_layout = block - .columns() - .iter() - .map(|entry| { + let columns_layout = std::iter::once(self.size()) + .chain(block.columns().iter().map(|entry| { let column = entry .value .convert_to_full_column(&entry.data_type, block.num_rows()); serialize_column_in(&column, &mut self.buf).unwrap(); - self.size() - start - }) + self.size() + })) + .map_windows(|x: &[_; 2]| x[1] - x[0]) .collect(); + self.columns_layout.push(columns_layout); self.offsets.push(self.size()) } From bb7aca7432b81e3c49075b091104e2f423733257 Mon Sep 17 00:00:00 2001 From: coldWater Date: Fri, 11 Oct 2024 13:38:53 +0800 Subject: [PATCH 06/12] enable_dio Signed-off-by: coldWater --- src/common/base/src/base/dma.rs | 127 ++++++++---------- src/query/service/src/spillers/spiller.rs | 8 +- src/query/settings/src/settings_default.rs | 6 + .../settings/src/settings_getter_setter.rs | 4 + .../storages/common/cache/src/temp_dir.rs | 5 +- 5 files changed, 75 insertions(+), 75 deletions(-) diff --git a/src/common/base/src/base/dma.rs b/src/common/base/src/base/dma.rs index c5c183e99d8dc..87a9c92e4f86e 100644 --- a/src/common/base/src/base/dma.rs +++ b/src/common/base/src/base/dma.rs @@ -27,6 +27,7 @@ use std::path::Path; use std::ptr::Alignment; use std::ptr::NonNull; +use rustix::fs::OFlags; use tokio::fs::File; use tokio::io::AsyncSeekExt; @@ -43,7 +44,11 @@ impl DmaAllocator { } fn real_layout(&self, layout: Layout) -> Layout { - Layout::from_size_align(layout.size(), self.0.as_usize()).unwrap() + if layout.align() >= self.0.as_usize() { + layout + } else { + Layout::from_size_align(layout.size(), self.0.as_usize()).unwrap() + } } fn real_cap(&self, cap: usize) -> usize { @@ -110,68 +115,46 @@ struct DmaFile { buf: Option, } -#[cfg(target_os = "linux")] -pub mod linux { - use rustix::fs::OFlags; - - use super::*; - - impl DmaFile { - pub(super) async fn open_raw(path: impl AsRef) -> io::Result { - File::options() - .read(true) - .custom_flags(OFlags::DIRECT.bits() as i32) - .open(path) - .await +impl DmaFile { + async fn open_raw(path: impl AsRef, dio: bool) -> io::Result { + let mut flags = 0; + #[cfg(target_os = "linux")] + if dio { + flags = OFlags::DIRECT.bits() as i32 } - pub(super) async fn create_raw(path: impl AsRef) -> io::Result { - File::options() - .write(true) - .create(true) - .truncate(true) - .custom_flags((OFlags::DIRECT | OFlags::EXCL).bits() as i32) - .open(path) - .await - } + File::options() + .read(true) + .custom_flags(flags) + .open(path) + .await } -} -#[cfg(not(target_os = "linux"))] -pub mod not_linux { - use rustix::fs::OFlags; - - use super::*; - - impl DmaFile { - /// Attempts to open a file in read-only mode. - pub(super) async fn open_raw(path: impl AsRef) -> io::Result { - File::options().read(true).open(path).await + async fn create_raw(path: impl AsRef, dio: bool) -> io::Result { + let mut flags = OFlags::EXCL; + #[cfg(target_os = "linux")] + if dio { + flags |= OFlags::DIRECT; } - /// Opens a file in write-only mode. - pub(super) async fn create_raw(path: impl AsRef) -> io::Result { - File::options() - .write(true) - .create(true) - .truncate(true) - .custom_flags(OFlags::EXCL.bits() as i32) - .open(path) - .await - } + File::options() + .write(true) + .create(true) + .truncate(true) + .custom_flags(flags.bits() as i32) + .open(path) + .await } -} -impl DmaFile { /// Attempts to open a file in read-only mode. - async fn open(path: impl AsRef) -> io::Result { - let file = DmaFile::open_raw(path).await?; + async fn open(path: impl AsRef, dio: bool) -> io::Result { + let file = DmaFile::open_raw(path, dio).await?; open_dma(file).await } /// Opens a file in write-only mode. - async fn create(path: impl AsRef) -> io::Result { - let file = DmaFile::create_raw(path).await?; + async fn create(path: impl AsRef, dio: bool) -> io::Result { + let file = DmaFile::create_raw(path, dio).await?; open_dma(file).await } @@ -309,9 +292,9 @@ impl DmaWriteBuf { (self.data.len() - 1) * self.chunk + self.data.last().unwrap().len() } - pub async fn into_file(mut self, path: impl AsRef) -> io::Result { + pub async fn into_file(mut self, path: impl AsRef, dio: bool) -> io::Result { let mut file = DmaFile { - fd: DmaFile::create_raw(path).await?, + fd: DmaFile::create_raw(path, dio).await?, alignment: self.allocator.0, buf: None, }; @@ -387,7 +370,7 @@ pub async fn dma_write_file_vectored<'a>( path: impl AsRef, bufs: &'a [IoSlice<'a>], ) -> io::Result { - let mut file = DmaFile::create(path.as_ref()).await?; + let mut file = DmaFile::create(path.as_ref(), true).await?; let file_length = bufs.iter().map(|buf| buf.len()).sum(); if file_length == 0 { @@ -445,7 +428,7 @@ pub async fn dma_read_file( mut writer: impl io::Write, ) -> io::Result { const BUFFER_SIZE: usize = 1024 * 1024; - let mut file = DmaFile::open(path.as_ref()).await?; + let mut file = DmaFile::open(path.as_ref(), true).await?; let buf = Vec::with_capacity_in( file.align_up(BUFFER_SIZE), DmaAllocator::new(file.alignment), @@ -480,7 +463,7 @@ pub async fn dma_read_file_range( path: impl AsRef, range: Range, ) -> io::Result<(DmaBuffer, Range)> { - let mut file = DmaFile::open(path.as_ref()).await?; + let mut file = DmaFile::open(path.as_ref(), true).await?; let align_start = file.align_down(range.start as usize); let align_end = file.align_up(range.end as usize); @@ -526,24 +509,26 @@ mod tests { async fn test_read_write() { let _ = std::fs::remove_file("test_file"); - run_test(0).await.unwrap(); - run_test(100).await.unwrap(); - run_test(200).await.unwrap(); + for dio in [true, false] { + run_test(0, dio).await.unwrap(); + run_test(100, dio).await.unwrap(); + run_test(200, dio).await.unwrap(); - run_test(4096 - 1).await.unwrap(); - run_test(4096).await.unwrap(); - run_test(4096 + 1).await.unwrap(); + run_test(4096 - 1, dio).await.unwrap(); + run_test(4096, dio).await.unwrap(); + run_test(4096 + 1, dio).await.unwrap(); - run_test(4096 * 2 - 1).await.unwrap(); - run_test(4096 * 2).await.unwrap(); - run_test(4096 * 2 + 1).await.unwrap(); + run_test(4096 * 2 - 1, dio).await.unwrap(); + run_test(4096 * 2, dio).await.unwrap(); + run_test(4096 * 2 + 1, dio).await.unwrap(); - run_test(1024 * 1024 * 3 - 1).await.unwrap(); - run_test(1024 * 1024 * 3).await.unwrap(); - run_test(1024 * 1024 * 3 + 1).await.unwrap(); + run_test(1024 * 1024 * 3 - 1, dio).await.unwrap(); + run_test(1024 * 1024 * 3, dio).await.unwrap(); + run_test(1024 * 1024 * 3 + 1, dio).await.unwrap(); + } } - async fn run_test(n: usize) -> io::Result<()> { + async fn run_test(n: usize, dio: bool) -> io::Result<()> { let filename = "test_file"; let want = (0..n).map(|i| (i % 256) as u8).collect::>(); @@ -558,7 +543,7 @@ mod tests { assert_eq!(length, want.len()); assert_eq!(got, want); - let file = DmaFile::open(filename).await?; + let file = DmaFile::open(filename, dio).await?; let align = file.alignment; drop(file); @@ -566,7 +551,7 @@ mod tests { let mut buf = DmaWriteBuf::new(align, align.as_usize()); buf.write_all(&want)?; - let length = buf.into_file(filename).await?; + let length = buf.into_file(filename, dio).await?; assert_eq!(length, want.len()); @@ -628,7 +613,7 @@ mod tests { let bufs = vec![IoSlice::new(&want)]; dma_write_file_vectored(filename, &bufs).await.unwrap(); - let mut file = DmaFile::open(filename).await.unwrap(); + let mut file = DmaFile::open(filename, true).await.unwrap(); let buf = Vec::with_capacity_in(file_size, DmaAllocator::new(file.alignment)); file.set_buffer(buf); diff --git a/src/query/service/src/spillers/spiller.rs b/src/query/service/src/spillers/spiller.rs index a33b0a9a704eb..0542882ac8ea2 100644 --- a/src/query/service/src/spillers/spiller.rs +++ b/src/query/service/src/spillers/spiller.rs @@ -81,6 +81,7 @@ pub struct Spiller { operator: Operator, location_prefix: String, disk_spill: Option>, + use_dio: bool, _spiller_type: SpillerType, pub join_spilling_partition_bits: usize, /// 1 partition -> N partition files @@ -98,7 +99,7 @@ impl Spiller { operator: Operator, config: SpillerConfig, ) -> Result { - let join_spilling_partition_bits = ctx.get_settings().get_join_spilling_partition_bits()?; + let settings = ctx.get_settings(); let SpillerConfig { location_prefix, disk_spill, @@ -109,8 +110,9 @@ impl Spiller { operator, location_prefix, disk_spill, + use_dio: settings.get_enable_dio()?, _spiller_type: spiller_type, - join_spilling_partition_bits, + join_spilling_partition_bits: settings.get_join_spilling_partition_bits()?, partition_location: Default::default(), columns_layout: Default::default(), partition_spilled_bytes: Default::default(), @@ -383,7 +385,7 @@ impl Spiller { writer.close().await?; written } - Location::Local(path) => buf.into_file(path).await?, + Location::Local(path) => buf.into_file(path, self.use_dio).await?, }; debug_assert_eq!(size, written); Ok(location) diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 50f9aeb27e5bf..3600ee150d5a4 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -273,6 +273,12 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=1)), }), + ("enable_dio", DefaultSettingValue{ + value: UserSettingValue::UInt64(1), + desc: "Enables Direct IO.", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=1)), + }), ("disable_join_reorder", DefaultSettingValue { value: UserSettingValue::UInt64(0), desc: "Disable join reorder optimization.", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 55c2e46cd7e57..cb45b3d00f347 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -265,6 +265,10 @@ impl Settings { Ok(self.try_get_u64("enable_cbo")? != 0) } + pub fn get_enable_dio(&self) -> Result { + Ok(self.try_get_u64("enable_dio")? != 0) + } + /// # Safety pub unsafe fn get_disable_join_reorder(&self) -> Result { Ok(self.unchecked_try_get_u64("disable_join_reorder")? != 0) diff --git a/src/query/storages/common/cache/src/temp_dir.rs b/src/query/storages/common/cache/src/temp_dir.rs index 7b69693b97e1e..f97ebc0af84c8 100644 --- a/src/query/storages/common/cache/src/temp_dir.rs +++ b/src/query/storages/common/cache/src/temp_dir.rs @@ -188,7 +188,10 @@ impl TempDirManager { fn insufficient_disk(&self, size: u64) -> Result { let stat = statvfs(self.root.as_ref().unwrap().as_ref()) .map_err(|e| ErrorCode::Internal(e.to_string()))?; - Ok(stat.f_bavail < self.reserved + (size + stat.f_frsize - 1) / stat.f_frsize) + + debug_assert_eq!(stat.f_frsize, self.alignment.as_usize()); + let n = (size + self.alignment.as_usize() as u64 - 1) >> self.alignment.log2(); + Ok(stat.f_bavail < self.reserved + n) } } From 703b556ee940de4e978c943130c552d40bfce0b5 Mon Sep 17 00:00:00 2001 From: coldWater Date: Fri, 11 Oct 2024 14:26:54 +0800 Subject: [PATCH 07/12] Alignment Signed-off-by: coldWater --- src/common/base/src/base/dma.rs | 94 ++++++++++++++++--- src/common/base/src/base/mod.rs | 1 + src/query/service/src/lib.rs | 1 - src/query/service/src/spillers/spiller.rs | 2 +- src/query/storages/common/cache/src/lib.rs | 1 - .../storages/common/cache/src/temp_dir.rs | 6 +- 6 files changed, 86 insertions(+), 19 deletions(-) diff --git a/src/common/base/src/base/dma.rs b/src/common/base/src/base/dma.rs index 87a9c92e4f86e..6f92a2043a627 100644 --- a/src/common/base/src/base/dma.rs +++ b/src/common/base/src/base/dma.rs @@ -16,6 +16,7 @@ use std::alloc::AllocError; use std::alloc::Allocator; use std::alloc::Global; use std::alloc::Layout; +use std::fmt; use std::io; use std::io::IoSlice; use std::io::SeekFrom; @@ -24,7 +25,7 @@ use std::ops::Range; use std::os::fd::BorrowedFd; use std::os::unix::io::AsRawFd; use std::path::Path; -use std::ptr::Alignment; +use std::ptr; use std::ptr::NonNull; use rustix::fs::OFlags; @@ -33,6 +34,71 @@ use tokio::io::AsyncSeekExt; use crate::runtime::spawn_blocking; +#[derive(Copy, Clone, PartialEq, Eq)] + +pub struct Alignment(ptr::Alignment); + +impl Alignment { + pub const MIN: Self = Self(ptr::Alignment::MIN); + + #[inline] + pub const fn new(align: usize) -> Option { + match ptr::Alignment::new(align) { + Some(a) => Some(Alignment(a)), + None => None, + } + } + + #[inline] + pub const fn as_usize(self) -> usize { + self.0.as_usize() + } + + #[inline] + pub const fn align_up(self, value: usize) -> usize { + (value + self.as_usize() - 1) & self.mask() + } + + #[inline] + pub const fn align_down(self, value: usize) -> usize { + value & self.mask() + } + + #[inline] + pub const fn align_up_count(self, value: usize) -> usize { + (value + self.as_usize() - 1) >> self.log2() + } + + #[inline] + pub const fn align_down_count(self, value: usize) -> usize { + value >> self.log2() + } + + #[inline] + pub const fn mask(self) -> usize { + self.0.mask() + } + + #[inline] + pub const fn log2(self) -> u32 { + self.0.log2() + } +} + +impl fmt::Debug for Alignment { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self.0) + } +} + +impl TryFrom for Alignment { + type Error = std::num::TryFromIntError; + + fn try_from(value: usize) -> Result { + Ok(Alignment(value.try_into()?)) + } +} + unsafe impl Send for DmaAllocator {} #[derive(Clone, Copy)] @@ -52,7 +118,7 @@ impl DmaAllocator { } fn real_cap(&self, cap: usize) -> usize { - align_up(self.0, cap) + self.0.align_up(cap) } } @@ -164,12 +230,12 @@ impl DmaFile { /// Aligns `value` up to the memory alignment requirement for this file. pub fn align_up(&self, value: usize) -> usize { - align_up(self.alignment, value) + self.alignment.align_up(value) } /// Aligns `value` down to the memory alignment requirement for this file. pub fn align_down(&self, value: usize) -> usize { - align_down(self.alignment, value) + self.alignment.align_down(value) } /// Return the alignment requirement for this file. The returned alignment value can be used @@ -227,14 +293,6 @@ impl DmaFile { } } -pub fn align_up(alignment: Alignment, value: usize) -> usize { - (value + alignment.as_usize() - 1) & alignment.mask() -} - -pub fn align_down(alignment: Alignment, value: usize) -> usize { - value & alignment.mask() -} - async fn open_dma(file: File) -> io::Result { let stat = fstatvfs(&file).await?; let alignment = Alignment::new(stat.f_bsize.max(512) as usize).unwrap(); @@ -280,7 +338,7 @@ impl DmaWriteBuf { DmaWriteBuf { allocator: DmaAllocator::new(align), data: Vec::new(), - chunk: align_up(align, chunk), + chunk: align.align_up(chunk), } } @@ -505,6 +563,16 @@ pub async fn dma_read_file_range( mod tests { use super::*; + #[test] + fn test_alignment() { + let a = Alignment::new(4).unwrap(); + + assert_eq!(8, a.align_up(5)); + assert_eq!(4, a.align_down(5)); + assert_eq!(2, a.align_up_count(5)); + assert_eq!(1, a.align_down_count(5)); + } + #[tokio::test] async fn test_read_write() { let _ = std::fs::remove_file("test_file"); diff --git a/src/common/base/src/base/mod.rs b/src/common/base/src/base/mod.rs index 62cfb9f7712b8..72e96459220cd 100644 --- a/src/common/base/src/base/mod.rs +++ b/src/common/base/src/base/mod.rs @@ -32,6 +32,7 @@ pub use dma::dma_buffer_as_vec; pub use dma::dma_read_file; pub use dma::dma_read_file_range; pub use dma::dma_write_file_vectored; +pub use dma::Alignment; pub use dma::DmaAllocator; pub use dma::DmaWriteBuf; pub use net::get_free_tcp_port; diff --git a/src/query/service/src/lib.rs b/src/query/service/src/lib.rs index 5d7443bdd0d8b..d0ea62933783b 100644 --- a/src/query/service/src/lib.rs +++ b/src/query/service/src/lib.rs @@ -16,7 +16,6 @@ #![allow(internal_features)] #![allow(clippy::useless_asref)] #![allow(clippy::uninlined_format_args)] -#![feature(ptr_alignment_type)] #![feature(iter_map_windows)] #![feature(hash_raw_entry)] #![feature(core_intrinsics)] diff --git a/src/query/service/src/spillers/spiller.rs b/src/query/service/src/spillers/spiller.rs index 0542882ac8ea2..46cd88da282fd 100644 --- a/src/query/service/src/spillers/spiller.rs +++ b/src/query/service/src/spillers/spiller.rs @@ -17,12 +17,12 @@ use std::collections::HashSet; use std::fmt::Display; use std::fmt::Formatter; use std::ops::Range; -use std::ptr::Alignment; use std::sync::Arc; use std::time::Instant; use databend_common_base::base::dma_buffer_as_vec; use databend_common_base::base::dma_read_file_range; +use databend_common_base::base::Alignment; use databend_common_base::base::DmaWriteBuf; use databend_common_base::base::GlobalUniqName; use databend_common_base::base::ProgressValues; diff --git a/src/query/storages/common/cache/src/lib.rs b/src/query/storages/common/cache/src/lib.rs index d4d39a65cea36..8c70b627aeacd 100644 --- a/src/query/storages/common/cache/src/lib.rs +++ b/src/query/storages/common/cache/src/lib.rs @@ -15,7 +15,6 @@ #![feature(write_all_vectored)] #![feature(associated_type_defaults)] #![feature(assert_matches)] -#![feature(ptr_alignment_type)] mod cache; mod caches; diff --git a/src/query/storages/common/cache/src/temp_dir.rs b/src/query/storages/common/cache/src/temp_dir.rs index f97ebc0af84c8..d1f906a5225ef 100644 --- a/src/query/storages/common/cache/src/temp_dir.rs +++ b/src/query/storages/common/cache/src/temp_dir.rs @@ -24,13 +24,13 @@ use std::ops::Deref; use std::ops::Drop; use std::path::Path; use std::path::PathBuf; -use std::ptr::Alignment; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; use std::sync::Mutex; use std::sync::Once; +use databend_common_base::base::Alignment; use databend_common_base::base::GlobalInstance; use databend_common_base::base::GlobalUniqName; use databend_common_config::SpillConfig; @@ -189,8 +189,8 @@ impl TempDirManager { let stat = statvfs(self.root.as_ref().unwrap().as_ref()) .map_err(|e| ErrorCode::Internal(e.to_string()))?; - debug_assert_eq!(stat.f_frsize, self.alignment.as_usize()); - let n = (size + self.alignment.as_usize() as u64 - 1) >> self.alignment.log2(); + debug_assert_eq!(stat.f_frsize, self.alignment.as_usize() as u64); + let n = self.alignment.align_up_count(size as usize) as u64; Ok(stat.f_bavail < self.reserved + n) } } From a22fa300d0140a81cb4cc65cf5895c8f27fc5b99 Mon Sep 17 00:00:00 2001 From: coldWater Date: Fri, 11 Oct 2024 17:33:19 +0800 Subject: [PATCH 08/12] no dio read Signed-off-by: coldWater --- src/query/service/src/spillers/spiller.rs | 42 +++++++++++++++++------ 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/src/query/service/src/spillers/spiller.rs b/src/query/service/src/spillers/spiller.rs index 46cd88da282fd..4f08df851dc3d 100644 --- a/src/query/service/src/spillers/spiller.rs +++ b/src/query/service/src/spillers/spiller.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::collections::HashSet; use std::fmt::Display; use std::fmt::Formatter; +use std::io; use std::ops::Range; use std::sync::Arc; use std::time::Instant; @@ -36,6 +37,8 @@ use databend_common_expression::DataBlock; use databend_storages_common_cache::TempDir; use databend_storages_common_cache::TempPath; use opendal::Operator; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncSeekExt; use crate::sessions::QueryContext; @@ -244,12 +247,18 @@ impl Spiller { deserialize_block(columns_layout, &data) } Location::Local(path) => { - let file_size = path.size(); - debug_assert_eq!(file_size, columns_layout.iter().sum::()); - let (buf, range) = dma_read_file_range(path, 0..file_size as u64).await?; - let data = &buf[range]; - record_local_read_profile(&instant, data.len()); - deserialize_block(columns_layout, data) + debug_assert_eq!(path.size(), columns_layout.iter().sum::()); + if !self.use_dio { + let data = tokio::fs::read(path).await?; + record_local_read_profile(&instant, data.len()); + deserialize_block(columns_layout, &data) + } else { + let file_size = path.size(); + let (buf, range) = dma_read_file_range(path, 0..file_size as u64).await?; + let data = &buf[range]; + record_local_read_profile(&instant, data.len()); + deserialize_block(columns_layout, data) + } } }; @@ -349,10 +358,23 @@ impl Spiller { Ok(deserialize_block(columns_layout, &data)) } Location::Local(path) => { - let (buf, range) = dma_read_file_range(path, data_range).await?; - let data = &buf[range]; - record_local_read_profile(&instant, data.len()); - Ok(deserialize_block(columns_layout, data)) + if !self.use_dio { + let mut file = tokio::fs::File::open(path).await?; + file.seek(io::SeekFrom::Start(data_range.start)).await?; + + let mut data = Vec::new(); + file.take(data_range.count() as u64) + .read_to_end(&mut data) + .await?; + + record_local_read_profile(&instant, data.len()); + Ok(deserialize_block(columns_layout, &data)) + } else { + let (buf, range) = dma_read_file_range(path, data_range).await?; + let data = &buf[range]; + record_local_read_profile(&instant, data.len()); + Ok(deserialize_block(columns_layout, data)) + } } } } From ded1977f21aef6830f1403b53ce805f76904a089 Mon Sep 17 00:00:00 2001 From: coldWater Date: Fri, 11 Oct 2024 17:51:32 +0800 Subject: [PATCH 09/12] test Signed-off-by: coldWater --- .../window_partition_spill.test | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tests/sqllogictests/suites/query/window_function/window_partition_spill.test b/tests/sqllogictests/suites/query/window_function/window_partition_spill.test index fccbd097486af..031a0e8778b06 100644 --- a/tests/sqllogictests/suites/query/window_function/window_partition_spill.test +++ b/tests/sqllogictests/suites/query/window_function/window_partition_spill.test @@ -10,6 +10,9 @@ set window_partition_spilling_bytes_threshold_per_proc = 1024 * 1024 * 1; statement ok set window_partition_spilling_to_disk_bytes_limit = 1024 * 1024 * 1024; +statement ok +set enable_dio = 1; + query T SELECT SUM(number + a + b) FROM ( @@ -35,6 +38,37 @@ FROM ( ---- 1499998499576 +statement ok +set enable_dio = 0; + +query T +SELECT SUM(number + a + b) +FROM ( + SELECT + number, + LEAD(number, 1, 0) OVER (PARTITION BY number % 16 ORDER BY number + 1) AS a, + LEAD(number, 2, 0) OVER (PARTITION BY number % 16 ORDER BY number + 1) AS b + FROM numbers(5000000) +); +---- +37499992499384 + +query I +SELECT SUM(a + b + c) +FROM ( + SELECT + number, + LEAD(number, 1, 0) OVER (PARTITION BY number % 8 ORDER BY number + 2) a, + LEAD(number, 2, 0) OVER (PARTITION BY number % 8 ORDER BY number + 2) b, + LEAD(number, 3, 0) OVER (PARTITION BY number % 8 ORDER BY number + 2) c + FROM numbers(1000000) +); +---- +1499998499576 + +statement ok +unset enable_dio; + statement ok DROP TABLE IF EXISTS customers; From f315c1ec96574cb1fcb9cd3dd39177943a7720c3 Mon Sep 17 00:00:00 2001 From: coldWater Date: Sat, 12 Oct 2024 09:55:10 +0800 Subject: [PATCH 10/12] one read Signed-off-by: coldWater --- src/query/service/src/spillers/spiller.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/query/service/src/spillers/spiller.rs b/src/query/service/src/spillers/spiller.rs index 4f08df851dc3d..6c2ecd76aacdc 100644 --- a/src/query/service/src/spillers/spiller.rs +++ b/src/query/service/src/spillers/spiller.rs @@ -362,10 +362,10 @@ impl Spiller { let mut file = tokio::fs::File::open(path).await?; file.seek(io::SeekFrom::Start(data_range.start)).await?; - let mut data = Vec::new(); - file.take(data_range.count() as u64) - .read_to_end(&mut data) - .await?; + let n = data_range.count(); + let mut data = Vec::with_capacity(n); + unsafe { data.set_len(n) }; + file.read_exact(&mut data).await?; record_local_read_profile(&instant, data.len()); Ok(deserialize_block(columns_layout, &data)) From 49dde3e01013c4c573a111e0b7ecedf6c355b65d Mon Sep 17 00:00:00 2001 From: coldWater Date: Sat, 12 Oct 2024 19:14:56 +0800 Subject: [PATCH 11/12] refactor Signed-off-by: coldWater --- Cargo.lock | 11 ++ src/query/expression/src/utils/arrow.rs | 29 +-- src/query/service/Cargo.toml | 2 + .../src/pipelines/builders/builder_window.rs | 21 +- .../transform_window_partition_collect.rs | 4 +- src/query/service/src/spillers/mod.rs | 6 +- src/query/service/src/spillers/spiller.rs | 186 +++++++++--------- .../storages/common/cache/src/temp_dir.rs | 13 +- 8 files changed, 158 insertions(+), 114 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 604e95d5447d9..c0c2037b3cd02 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1807,6 +1807,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "buf-list" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f56bd1685d994a3e2a3ed802eb1ecee8cb500b0ad4df48cb4d5d1a2f04749c3a" +dependencies = [ + "bytes", +] + [[package]] name = "bumpalo" version = "3.16.0" @@ -5141,9 +5150,11 @@ dependencies = [ "backoff", "backon 0.4.4", "base64 0.21.7", + "buf-list", "bumpalo", "byte-unit", "byteorder", + "bytes", "chrono", "chrono-tz 0.8.6", "config", diff --git a/src/query/expression/src/utils/arrow.rs b/src/query/expression/src/utils/arrow.rs index 6dbadbf1730f2..7d3f20dc01277 100644 --- a/src/query/expression/src/utils/arrow.rs +++ b/src/query/expression/src/utils/arrow.rs @@ -13,8 +13,11 @@ // limitations under the License. use std::io::Cursor; +use std::io::Read; +use std::io::Seek; use std::io::Write; +use databend_common_arrow::arrow; use databend_common_arrow::arrow::array::Array; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_arrow::arrow::bitmap::MutableBitmap; @@ -22,8 +25,9 @@ use databend_common_arrow::arrow::buffer::Buffer; use databend_common_arrow::arrow::datatypes::Schema; use databend_common_arrow::arrow::io::ipc::read::read_file_metadata; use databend_common_arrow::arrow::io::ipc::read::FileReader; +use databend_common_arrow::arrow::io::ipc::write::Compression; use databend_common_arrow::arrow::io::ipc::write::FileWriter; -use databend_common_arrow::arrow::io::ipc::write::WriteOptions as IpcWriteOptions; +use databend_common_arrow::arrow::io::ipc::write::WriteOptions; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -67,32 +71,31 @@ pub fn buffer_into_mut(mut buffer: Buffer) -> Vec { pub fn serialize_column(col: &Column) -> Vec { let mut buffer = Vec::new(); - serialize_column_in(col, &mut buffer).unwrap(); + write_column(col, &mut buffer).unwrap(); buffer } -pub fn serialize_column_in( - col: &Column, - w: &mut impl Write, -) -> databend_common_arrow::arrow::error::Result<()> { +pub fn write_column(col: &Column, w: &mut impl Write) -> arrow::error::Result<()> { let schema = Schema::from(vec![col.arrow_field()]); - let mut writer = FileWriter::new(w, schema, None, IpcWriteOptions::default()); + let mut writer = FileWriter::new(w, schema, None, WriteOptions { + compression: Some(Compression::LZ4), + }); writer.start()?; - writer.write( - &databend_common_arrow::arrow::chunk::Chunk::new(vec![col.as_arrow()]), - None, - )?; + writer.write(&arrow::chunk::Chunk::new(vec![col.as_arrow()]), None)?; writer.finish() } pub fn deserialize_column(bytes: &[u8]) -> Result { let mut cursor = Cursor::new(bytes); + read_column(&mut cursor) +} - let metadata = read_file_metadata(&mut cursor)?; +pub fn read_column(r: &mut R) -> Result { + let metadata = read_file_metadata(r)?; let f = metadata.schema.fields[0].clone(); let data_field = DataField::try_from(&f)?; - let mut reader = FileReader::new(cursor, metadata, None, None); + let mut reader = FileReader::new(r, metadata, None, None); let col = reader .next() .ok_or_else(|| ErrorCode::Internal("expected one arrow array"))?? diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index 5f0bd0831a480..a13b5364b33f2 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -44,9 +44,11 @@ async-trait = { workspace = true } backoff = { version = "0.4.0", features = ["futures", "tokio"] } backon = "0.4" base64 = "0.21.0" +buf-list = "1.0.3" bumpalo = { workspace = true } byte-unit = "4.0.19" byteorder = { workspace = true } +bytes = { workspace = true } chrono = { workspace = true } chrono-tz = { workspace = true } config = { version = "0.13.4", features = [] } diff --git a/src/query/service/src/pipelines/builders/builder_window.rs b/src/query/service/src/pipelines/builders/builder_window.rs index 0bddf1fb2aede..c62a09092cfbb 100644 --- a/src/query/service/src/pipelines/builders/builder_window.rs +++ b/src/query/service/src/pipelines/builders/builder_window.rs @@ -26,6 +26,8 @@ use databend_common_pipeline_core::processors::ProcessorPtr; use databend_common_sql::executor::physical_plans::Window; use databend_common_sql::executor::physical_plans::WindowPartition; use databend_storages_common_cache::TempDirManager; +use opendal::services::Fs; +use opendal::Operator; use crate::pipelines::processors::transforms::FrameBound; use crate::pipelines::processors::transforms::TransformWindowPartitionCollect; @@ -34,6 +36,7 @@ use crate::pipelines::processors::transforms::WindowPartitionExchange; use crate::pipelines::processors::transforms::WindowSpillSettings; use crate::pipelines::processors::TransformWindow; use crate::pipelines::PipelineBuilder; +use crate::spillers::SpillerDiskConfig; impl PipelineBuilder { pub(crate) fn build_window(&mut self, window: &Window) -> Result<()> { @@ -173,7 +176,23 @@ impl PipelineBuilder { let disk_bytes_limit = settings.get_window_partition_spilling_to_disk_bytes_limit()?; let temp_dir_manager = TempDirManager::instance(); - let disk_spill = temp_dir_manager.get_disk_spill_dir(disk_bytes_limit, &self.ctx.get_id()); + + let enable_dio = settings.get_enable_dio()?; + let disk_spill = + match temp_dir_manager.get_disk_spill_dir(disk_bytes_limit, &self.ctx.get_id()) { + Some(temp_dir) if !enable_dio => { + let builder = Fs::default().root(temp_dir.path().to_str().unwrap()); + Some(SpillerDiskConfig { + temp_dir, + local_operator: Some(Operator::new(builder)?.finish()), + }) + } + Some(temp_dir) => Some(SpillerDiskConfig { + temp_dir, + local_operator: None, + }), + None => None, + }; let window_spill_settings = WindowSpillSettings::new(&settings, num_processors)?; let have_order_col = window_partition.after_exchange.unwrap_or(false); diff --git a/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs b/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs index f983b6208e650..e73dd63f6056a 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs @@ -34,7 +34,6 @@ use databend_common_pipeline_transforms::processors::sort_merge; use databend_common_settings::Settings; use databend_common_storage::DataOperator; use databend_common_storages_fuse::TableContext; -use databend_storages_common_cache::TempDir; use super::WindowPartitionBuffer; use super::WindowPartitionMeta; @@ -42,6 +41,7 @@ use super::WindowSpillSettings; use crate::sessions::QueryContext; use crate::spillers::Spiller; use crate::spillers::SpillerConfig; +use crate::spillers::SpillerDiskConfig; use crate::spillers::SpillerType; #[derive(Debug, Clone, Copy)] @@ -99,7 +99,7 @@ impl TransformWindowPartitionCollect { num_processors: usize, num_partitions: usize, spill_settings: WindowSpillSettings, - disk_spill: Option>, + disk_spill: Option, sort_desc: Vec, schema: DataSchemaRef, have_order_col: bool, diff --git a/src/query/service/src/spillers/mod.rs b/src/query/service/src/spillers/mod.rs index 31d4208490412..f867e56749f93 100644 --- a/src/query/service/src/spillers/mod.rs +++ b/src/query/service/src/spillers/mod.rs @@ -17,8 +17,4 @@ mod spiller; pub use partition_buffer::PartitionBuffer; pub use partition_buffer::PartitionBufferFetchOption; -pub use spiller::Location; -pub use spiller::SpilledData; -pub use spiller::Spiller; -pub use spiller::SpillerConfig; -pub use spiller::SpillerType; +pub use spiller::*; diff --git a/src/query/service/src/spillers/spiller.rs b/src/query/service/src/spillers/spiller.rs index 6c2ecd76aacdc..7939a9b5e01ac 100644 --- a/src/query/service/src/spillers/spiller.rs +++ b/src/query/service/src/spillers/spiller.rs @@ -16,11 +16,14 @@ use std::collections::HashMap; use std::collections::HashSet; use std::fmt::Display; use std::fmt::Formatter; -use std::io; use std::ops::Range; use std::sync::Arc; use std::time::Instant; +use buf_list::BufList; +use buf_list::Cursor; +use bytes::Buf; +use bytes::Bytes; use databend_common_base::base::dma_buffer_as_vec; use databend_common_base::base::dma_read_file_range; use databend_common_base::base::Alignment; @@ -31,14 +34,13 @@ use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; -use databend_common_expression::arrow::deserialize_column; -use databend_common_expression::arrow::serialize_column_in; +use databend_common_expression::arrow::read_column; +use databend_common_expression::arrow::write_column; use databend_common_expression::DataBlock; use databend_storages_common_cache::TempDir; use databend_storages_common_cache::TempPath; +use opendal::Buffer; use opendal::Operator; -use tokio::io::AsyncReadExt; -use tokio::io::AsyncSeekExt; use crate::sessions::QueryContext; @@ -68,10 +70,16 @@ impl Display for SpillerType { #[derive(Clone)] pub struct SpillerConfig { pub location_prefix: String, - pub disk_spill: Option>, + pub disk_spill: Option, pub spiller_type: SpillerType, } +#[derive(Clone)] +pub struct SpillerDiskConfig { + pub temp_dir: Arc, + pub local_operator: Option, +} + /// Spiller is a unified framework for operators which need to spill data from memory. /// It provides the following features: /// 1. Collection data that needs to be spilled. @@ -83,8 +91,9 @@ pub struct Spiller { ctx: Arc, operator: Operator, location_prefix: String, - disk_spill: Option>, - use_dio: bool, + temp_dir: Option>, + // for dio disabled + local_operator: Option, _spiller_type: SpillerType, pub join_spilling_partition_bits: usize, /// 1 partition -> N partition files @@ -108,12 +117,21 @@ impl Spiller { disk_spill, spiller_type, } = config; + + let (temp_dir, local_operator) = match disk_spill { + Some(SpillerDiskConfig { + temp_dir, + local_operator, + }) => (Some(temp_dir), local_operator), + None => (None, None), + }; + Ok(Self { ctx, operator, location_prefix, - disk_spill, - use_dio: settings.get_enable_dio()?, + temp_dir, + local_operator, _spiller_type: spiller_type, join_spilling_partition_bits: settings.get_join_spilling_partition_bits()?, partition_location: Default::default(), @@ -240,28 +258,23 @@ impl Spiller { // Read spilled data from storage. let instant = Instant::now(); - let block = match location { - Location::Remote(loc) => { - let data = self.operator.read(loc).await?.to_bytes(); - record_remote_read_profile(&instant, data.len()); - deserialize_block(columns_layout, &data) + let data = match (location, &self.local_operator) { + (Location::Local(path), None) => { + debug_assert_eq!(path.size(), columns_layout.iter().sum::()); + let file_size = path.size(); + let (buf, range) = dma_read_file_range(path, 0..file_size as u64).await?; + Buffer::from(dma_buffer_as_vec(buf)).slice(range) } - Location::Local(path) => { + (Location::Local(path), Some(ref local)) => { debug_assert_eq!(path.size(), columns_layout.iter().sum::()); - if !self.use_dio { - let data = tokio::fs::read(path).await?; - record_local_read_profile(&instant, data.len()); - deserialize_block(columns_layout, &data) - } else { - let file_size = path.size(); - let (buf, range) = dma_read_file_range(path, 0..file_size as u64).await?; - let data = &buf[range]; - record_local_read_profile(&instant, data.len()); - deserialize_block(columns_layout, data) - } + local + .read(path.file_name().unwrap().to_str().unwrap()) + .await? } + (Location::Remote(loc), _) => self.operator.read(loc).await?, }; - + record_remote_read_profile(&instant, data.len()); + let block = deserialize_block(columns_layout, data); Ok(block) } @@ -295,9 +308,8 @@ impl Spiller { // Read spilled data from storage. let instant = Instant::now(); - let data = match location { - Location::Remote(loc) => self.operator.read(loc).await?.to_bytes(), - Location::Local(path) => { + let data = match (location, &self.local_operator) { + (Location::Local(path), None) => { let file_size = path.size(); debug_assert_eq!( file_size, @@ -308,12 +320,15 @@ impl Spiller { } ); - let (mut buf, range) = dma_read_file_range(path, 0..file_size as u64).await?; - assert_eq!(range.start, 0); - buf.truncate(range.end); - - dma_buffer_as_vec(buf).into() + let (buf, range) = dma_read_file_range(path, 0..file_size as u64).await?; + Buffer::from(dma_buffer_as_vec(buf)).slice(range) } + (Location::Local(path), Some(ref local)) => { + local + .read(path.file_name().unwrap().to_str().unwrap()) + .await? + } + (Location::Remote(loc), _) => self.operator.read(loc).await?, }; // Record statistics. @@ -326,7 +341,7 @@ impl Spiller { let partitioned_data = partitions .iter() .map(|(partition_id, range, columns_layout)| { - let block = deserialize_block(columns_layout, &data[range.clone()]); + let block = deserialize_block(columns_layout, data.slice(range.clone())); (*partition_id, block) }) .collect(); @@ -346,41 +361,25 @@ impl Spiller { let instant = Instant::now(); let data_range = data_range.start as u64..data_range.end as u64; - match location { - Location::Remote(loc) => { - let data = self - .operator - .read_with(loc) + let data = match (location, &self.local_operator) { + (Location::Local(path), None) => { + let (buf, range) = dma_read_file_range(path, data_range).await?; + Buffer::from(dma_buffer_as_vec(buf)).slice(range) + } + (Location::Local(path), Some(ref local)) => { + local + .read_with(path.file_name().unwrap().to_str().unwrap()) .range(data_range) .await? - .to_bytes(); - record_remote_read_profile(&instant, data.len()); - Ok(deserialize_block(columns_layout, &data)) - } - Location::Local(path) => { - if !self.use_dio { - let mut file = tokio::fs::File::open(path).await?; - file.seek(io::SeekFrom::Start(data_range.start)).await?; - - let n = data_range.count(); - let mut data = Vec::with_capacity(n); - unsafe { data.set_len(n) }; - file.read_exact(&mut data).await?; - - record_local_read_profile(&instant, data.len()); - Ok(deserialize_block(columns_layout, &data)) - } else { - let (buf, range) = dma_read_file_range(path, data_range).await?; - let data = &buf[range]; - record_local_read_profile(&instant, data.len()); - Ok(deserialize_block(columns_layout, data)) - } } - } + (Location::Remote(loc), _) => self.operator.read_with(loc).range(data_range).await?, + }; + record_remote_read_profile(&instant, data.len()); + Ok(deserialize_block(columns_layout, data)) } async fn write_encodes(&mut self, size: usize, buf: DmaWriteBuf) -> Result { - let location = match &self.disk_spill { + let location = match &self.temp_dir { None => None, Some(disk) => disk.new_file_with_size(size)?.map(Location::Local), } @@ -390,32 +389,36 @@ impl Spiller { GlobalUniqName::unique(), ))); - let written = match &location { - Location::Remote(loc) => { - let mut writer = self - .operator - .writer_with(loc) - .chunk(8 * 1024 * 1024) - .await?; - - let mut written = 0; - for data in buf.into_data() { - written += data.len(); - writer.write(dma_buffer_as_vec(data)).await?; - } - - writer.close().await?; - written + let mut writer = match (&location, &self.local_operator) { + (Location::Local(path), None) => { + let written = buf.into_file(path, true).await?; + debug_assert_eq!(size, written); + return Ok(location); } - Location::Local(path) => buf.into_file(path, self.use_dio).await?, - }; + (Location::Local(path), Some(local)) => { + local.writer_with(path.file_name().unwrap().to_str().unwrap()) + } + (Location::Remote(loc), _) => self.operator.writer_with(loc), + } + .chunk(8 * 1024 * 1024) + .await?; + + let buf = buf + .into_data() + .into_iter() + .map(|x| Bytes::from(dma_buffer_as_vec(x))) + .collect::(); + let written = buf.len(); + writer.write(buf).await?; + writer.close().await?; + debug_assert_eq!(size, written); Ok(location) } fn block_encoder(&self) -> BlocksEncoder { let align = self - .disk_spill + .temp_dir .as_ref() .map(|dir| dir.block_alignment()) .unwrap_or(Alignment::MIN); @@ -462,7 +465,7 @@ impl BlocksEncoder { let column = entry .value .convert_to_full_column(&entry.data_type, block.num_rows()); - serialize_column_in(&column, &mut self.buf).unwrap(); + write_column(&column, &mut self.buf).unwrap(); self.size() })) .map_windows(|x: &[_; 2]| x[1] - x[0]) @@ -477,13 +480,14 @@ impl BlocksEncoder { } } -pub fn deserialize_block(columns_layout: &[usize], mut data: &[u8]) -> DataBlock { +pub fn deserialize_block(columns_layout: &[usize], mut data: Buffer) -> DataBlock { let columns = columns_layout .iter() - .map(|layout| { - let (cur, remain) = data.split_at(*layout); - data = remain; - deserialize_column(cur).unwrap() + .map(|&layout| { + let ls = BufList::from_iter(data.slice(0..layout)); + data.advance(layout); + let mut cursor = Cursor::new(ls); + read_column(&mut cursor).unwrap() }) .collect::>(); diff --git a/src/query/storages/common/cache/src/temp_dir.rs b/src/query/storages/common/cache/src/temp_dir.rs index d1f906a5225ef..54f0465e0c94d 100644 --- a/src/query/storages/common/cache/src/temp_dir.rs +++ b/src/query/storages/common/cache/src/temp_dir.rs @@ -70,6 +70,8 @@ impl TempDirManager { if create_dir_all(&path).is_err() { (None, 0, Alignment::MIN) } else { + let path = path.canonicalize()?.into_boxed_path(); + let stat = statvfs(path.as_ref()).map_err(|e| ErrorCode::StorageOther(e.to_string()))?; @@ -260,6 +262,10 @@ impl TempDir { pub fn block_alignment(&self) -> Alignment { self.manager.alignment } + + pub fn path(&self) -> &Path { + &self.path + } } struct DirInfo { @@ -436,10 +442,13 @@ mod tests { deleted.sort(); + let pwd = std::env::current_dir()?.canonicalize()?; assert_eq!( vec![ - PathBuf::from("test_data2/test_tenant/unknown_query1").into_boxed_path(), - PathBuf::from("test_data2/test_tenant/unknown_query2").into_boxed_path(), + pwd.join("test_data2/test_tenant/unknown_query1") + .into_boxed_path(), + pwd.join("test_data2/test_tenant/unknown_query2") + .into_boxed_path(), ], deleted ); From 5150460f8344bcf3db8254edfaca138fdafd0264 Mon Sep 17 00:00:00 2001 From: coldWater Date: Sun, 13 Oct 2024 01:59:21 +0800 Subject: [PATCH 12/12] fix Signed-off-by: coldWater --- Makefile | 4 ++++ src/query/service/src/spillers/spiller.rs | 13 +++++++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index c2841aae4b542..eeab809c33455 100644 --- a/Makefile +++ b/Makefile @@ -48,6 +48,10 @@ run-debug: build run-debug-management: build bash ./scripts/ci/deploy/databend-query-management-mode.sh +kill: + killall databend-query + killall databend-meta + build: bash ./scripts/build/build-debug.sh diff --git a/src/query/service/src/spillers/spiller.rs b/src/query/service/src/spillers/spiller.rs index 7939a9b5e01ac..964b7ff4b04bf 100644 --- a/src/query/service/src/spillers/spiller.rs +++ b/src/query/service/src/spillers/spiller.rs @@ -273,7 +273,12 @@ impl Spiller { } (Location::Remote(loc), _) => self.operator.read(loc).await?, }; - record_remote_read_profile(&instant, data.len()); + + match location { + Location::Remote(_) => record_remote_read_profile(&instant, data.len()), + Location::Local(_) => record_local_read_profile(&instant, data.len()), + } + let block = deserialize_block(columns_layout, data); Ok(block) } @@ -374,7 +379,11 @@ impl Spiller { } (Location::Remote(loc), _) => self.operator.read_with(loc).range(data_range).await?, }; - record_remote_read_profile(&instant, data.len()); + + match location { + Location::Remote(_) => record_remote_read_profile(&instant, data.len()), + Location::Local(_) => record_local_read_profile(&instant, data.len()), + } Ok(deserialize_block(columns_layout, data)) }