Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
Signed-off-by: coldWater <[email protected]>
  • Loading branch information
forsaken628 committed Oct 12, 2024
1 parent f315c1e commit 8ec1cdf
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 112 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 16 additions & 13 deletions src/query/expression/src/utils/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,21 @@
// limitations under the License.

use std::io::Cursor;
use std::io::Read;
use std::io::Seek;
use std::io::Write;

use databend_common_arrow::arrow;
use databend_common_arrow::arrow::array::Array;
use databend_common_arrow::arrow::bitmap::Bitmap;
use databend_common_arrow::arrow::bitmap::MutableBitmap;
use databend_common_arrow::arrow::buffer::Buffer;
use databend_common_arrow::arrow::datatypes::Schema;
use databend_common_arrow::arrow::io::ipc::read::read_file_metadata;
use databend_common_arrow::arrow::io::ipc::read::FileReader;
use databend_common_arrow::arrow::io::ipc::write::Compression;
use databend_common_arrow::arrow::io::ipc::write::FileWriter;
use databend_common_arrow::arrow::io::ipc::write::WriteOptions as IpcWriteOptions;
use databend_common_arrow::arrow::io::ipc::write::WriteOptions;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;

Expand Down Expand Up @@ -67,32 +71,31 @@ pub fn buffer_into_mut<T: Clone>(mut buffer: Buffer<T>) -> Vec<T> {

pub fn serialize_column(col: &Column) -> Vec<u8> {
let mut buffer = Vec::new();
serialize_column_in(col, &mut buffer).unwrap();
write_column(col, &mut buffer).unwrap();
buffer
}

pub fn serialize_column_in(
col: &Column,
w: &mut impl Write,
) -> databend_common_arrow::arrow::error::Result<()> {
pub fn write_column(col: &Column, w: &mut impl Write) -> arrow::error::Result<()> {
let schema = Schema::from(vec![col.arrow_field()]);
let mut writer = FileWriter::new(w, schema, None, IpcWriteOptions::default());
let mut writer = FileWriter::new(w, schema, None, WriteOptions {
compression: Some(Compression::LZ4),
});
writer.start()?;
writer.write(
&databend_common_arrow::arrow::chunk::Chunk::new(vec![col.as_arrow()]),
None,
)?;
writer.write(&arrow::chunk::Chunk::new(vec![col.as_arrow()]), None)?;
writer.finish()
}

pub fn deserialize_column(bytes: &[u8]) -> Result<Column> {
let mut cursor = Cursor::new(bytes);
read_column(&mut cursor)
}

let metadata = read_file_metadata(&mut cursor)?;
pub fn read_column<R: Read + Seek>(r: &mut R) -> Result<Column> {
let metadata = read_file_metadata(r)?;
let f = metadata.schema.fields[0].clone();
let data_field = DataField::try_from(&f)?;

let mut reader = FileReader::new(cursor, metadata, None, None);
let mut reader = FileReader::new(r, metadata, None, None);
let col = reader
.next()
.ok_or_else(|| ErrorCode::Internal("expected one arrow array"))??
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ async-trait = { workspace = true }
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
backon = "0.4"
base64 = "0.21.0"
buf-list = "1.0.3"
bumpalo = { workspace = true }
byte-unit = "4.0.19"
byteorder = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
chrono-tz = { workspace = true }
config = { version = "0.13.4", features = [] }
Expand Down
21 changes: 20 additions & 1 deletion src/query/service/src/pipelines/builders/builder_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_sql::executor::physical_plans::Window;
use databend_common_sql::executor::physical_plans::WindowPartition;
use databend_storages_common_cache::TempDirManager;
use opendal::services::Fs;
use opendal::Operator;

use crate::pipelines::processors::transforms::FrameBound;
use crate::pipelines::processors::transforms::TransformWindowPartitionCollect;
Expand All @@ -34,6 +36,7 @@ use crate::pipelines::processors::transforms::WindowPartitionExchange;
use crate::pipelines::processors::transforms::WindowSpillSettings;
use crate::pipelines::processors::TransformWindow;
use crate::pipelines::PipelineBuilder;
use crate::spillers::SpillerDiskConfig;

impl PipelineBuilder {
pub(crate) fn build_window(&mut self, window: &Window) -> Result<()> {
Expand Down Expand Up @@ -173,7 +176,23 @@ impl PipelineBuilder {

let disk_bytes_limit = settings.get_window_partition_spilling_to_disk_bytes_limit()?;
let temp_dir_manager = TempDirManager::instance();
let disk_spill = temp_dir_manager.get_disk_spill_dir(disk_bytes_limit, &self.ctx.get_id());

let enable_dio = settings.get_enable_dio()?;
let disk_spill =
match temp_dir_manager.get_disk_spill_dir(disk_bytes_limit, &self.ctx.get_id()) {
Some(temp_dir) if !enable_dio => {
let builder = Fs::default().root(temp_dir.path().to_str().unwrap());
Some(SpillerDiskConfig {
temp_dir,
local_operator: Some(Operator::new(builder)?.finish()),
})
}
Some(temp_dir) => Some(SpillerDiskConfig {
temp_dir,
local_operator: None,
}),
None => None,
};

let window_spill_settings = WindowSpillSettings::new(&settings, num_processors)?;
let have_order_col = window_partition.after_exchange.unwrap_or(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ use databend_common_pipeline_transforms::processors::sort_merge;
use databend_common_settings::Settings;
use databend_common_storage::DataOperator;
use databend_common_storages_fuse::TableContext;
use databend_storages_common_cache::TempDir;

use super::WindowPartitionBuffer;
use super::WindowPartitionMeta;
use super::WindowSpillSettings;
use crate::sessions::QueryContext;
use crate::spillers::Spiller;
use crate::spillers::SpillerConfig;
use crate::spillers::SpillerDiskConfig;
use crate::spillers::SpillerType;

#[derive(Debug, Clone, Copy)]
Expand Down Expand Up @@ -99,7 +99,7 @@ impl TransformWindowPartitionCollect {
num_processors: usize,
num_partitions: usize,
spill_settings: WindowSpillSettings,
disk_spill: Option<Arc<TempDir>>,
disk_spill: Option<SpillerDiskConfig>,
sort_desc: Vec<SortColumnDescription>,
schema: DataSchemaRef,
have_order_col: bool,
Expand Down
6 changes: 1 addition & 5 deletions src/query/service/src/spillers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,4 @@ mod spiller;

pub use partition_buffer::PartitionBuffer;
pub use partition_buffer::PartitionBufferFetchOption;
pub use spiller::Location;
pub use spiller::SpilledData;
pub use spiller::Spiller;
pub use spiller::SpillerConfig;
pub use spiller::SpillerType;
pub use spiller::*;
Loading

0 comments on commit 8ec1cdf

Please sign in to comment.