From e45ff608eff501a4a832611ae6047ca1e6429424 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Fri, 13 Jan 2023 12:14:29 +0100 Subject: [PATCH] cleanup and make reverse work --- polars/polars-core/Cargo.toml | 2 +- polars/polars-core/src/schema.rs | 4 +- polars/polars-io/src/parquet/read_impl.rs | 5 +- polars/polars-io/src/parquet/write.rs | 2 +- polars/polars-lazy/polars-pipe/Cargo.toml | 2 +- .../src/executors/sinks/sort/io.rs | 35 +++-- .../src/executors/sinks/sort/ooc.rs | 148 ++++++++---------- .../src/executors/sinks/sort/sink.rs | 92 +++++------ .../polars-pipe/src/pipeline/convert.rs | 5 +- .../polars-pipe/src/pipeline/dispatcher.rs | 13 +- .../src/dsl/function_expr/search_sorted.rs | 2 +- .../src/physical_plan/streaming/convert.rs | 2 +- polars/polars-lazy/src/tests/streaming.rs | 23 --- .../src/series/ops/search_sorted.rs | 30 ++-- 14 files changed, 169 insertions(+), 196 deletions(-) diff --git a/polars/polars-core/Cargo.toml b/polars/polars-core/Cargo.toml index 16b3c4e57fec..d1398ca72f49 100644 --- a/polars/polars-core/Cargo.toml +++ b/polars/polars-core/Cargo.toml @@ -157,7 +157,6 @@ docs-selection = [ ahash.workspace = true anyhow.workspace = true arrow.workspace = true -once_cell.workspace = true base64 = { version = "0.13", optional = true } bitflags.workspace = true chrono = { version = "0.4", optional = true } @@ -169,6 +168,7 @@ indexmap = { version = "1", features = ["std"] } ndarray = { version = "0.15", optional = true, default_features = false } num.workspace = true object_store = { version = "0.5.3", default-features = false, optional = true } +once_cell.workspace = true polars-arrow = { version = "0.26.1", path = "../polars-arrow", features = ["compute"] } polars-utils = { version = "0.26.1", path = "../polars-utils" } rand = { version = "0.8", optional = true, features = ["small_rng", "std"] } diff --git a/polars/polars-core/src/schema.rs b/polars/polars-core/src/schema.rs index bd2c0015ad34..aae30a200273 100644 --- a/polars/polars-core/src/schema.rs +++ b/polars/polars-core/src/schema.rs @@ -126,11 +126,11 @@ impl Schema { } pub fn try_get_full(&self, name: &str) -> PolarsResult<(usize, &String, &DataType)> { - self.inner.get_full(name) + self.inner + .get_full(name) .ok_or_else(|| PolarsError::NotFound(name.to_string().into())) } - pub fn get_full(&self, name: &str) -> Option<(usize, &String, &DataType)> { self.inner.get_full(name) } diff --git a/polars/polars-io/src/parquet/read_impl.rs b/polars/polars-io/src/parquet/read_impl.rs index f039d065aa0d..849b3a4f4095 100644 --- a/polars/polars-io/src/parquet/read_impl.rs +++ b/polars/polars-io/src/parquet/read_impl.rs @@ -3,7 +3,6 @@ use std::collections::VecDeque; use std::convert::TryFrom; use std::ops::{Deref, Range}; use std::sync::Arc; -use std::thread::current; use arrow::array::new_empty_array; use arrow::io::parquet::read; @@ -490,11 +489,11 @@ impl Iterator for BatchedParquetIter { match self.current_batch.next() { Some(df) => Some(Ok(df)), None => match self.inner.next_batches(self.batch_size) { - Err(e) => return Some(Err(e)), + Err(e) => Some(Err(e)), Ok(opt_batch) => { let batch = opt_batch?; self.current_batch = batch.into_iter(); - return self.current_batch.next().map(Ok); + self.current_batch.next().map(Ok) } }, } diff --git a/polars/polars-io/src/parquet/write.rs b/polars/polars-io/src/parquet/write.rs index ba0990f63d6b..3e5a9676ae52 100644 --- a/polars/polars-io/src/parquet/write.rs +++ b/polars/polars-io/src/parquet/write.rs @@ -64,7 +64,7 @@ pub enum ParquetCompression { impl Default for ParquetCompression { fn default() -> Self { - ParquetCompression::Lz4Raw + ParquetCompression::Lz4Raw } } diff --git a/polars/polars-lazy/polars-pipe/Cargo.toml b/polars/polars-lazy/polars-pipe/Cargo.toml index 4a945d91651e..a8e6c6b68c35 100644 --- a/polars/polars-lazy/polars-pipe/Cargo.toml +++ b/polars/polars-lazy/polars-pipe/Cargo.toml @@ -15,9 +15,9 @@ num.workspace = true polars-arrow = { version = "0.26.1", path = "../../polars-arrow", default-features = false } polars-core = { version = "0.26.1", path = "../../polars-core", features = ["lazy", "private", "zip_with", "random"], default-features = false } polars-io = { version = "0.26.1", path = "../../polars-io", default-features = false } +polars-ops = { version = "0.26.1", path = "../../polars-ops", features = ["search_sorted"] } polars-plan = { version = "0.26.1", path = "../polars-plan", default-features = false, features = ["compile"] } polars-utils = { version = "0.26.1", path = "../../polars-utils", features = ["sysinfo"] } -polars-ops = { version = "0.26.1", path = "../../polars-ops", features = ["search_sorted"] } rayon.workspace = true smartstring = { version = "1" } diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/io.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/io.rs index 81b9fc3db33b..c133c750690f 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/io.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/io.rs @@ -1,26 +1,24 @@ use std::path::{Path, PathBuf}; use std::sync::atomic::Ordering; -use std::sync::mpsc::{Sender, SyncSender}; +use std::sync::mpsc::SyncSender; use std::sync::{Arc, Condvar, Mutex}; -use std::thread::JoinHandle; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::{SystemTime, UNIX_EPOCH}; use polars_core::prelude::*; use polars_core::POOL; use polars_io::prelude::*; use polars_utils::atomic::SyncCounter; -pub(super) type DfIter = Box+ Sync + Send>; +pub(super) type DfIter = Box + Sync + Send>; // The Option are the partitions it should be written to, if any type Payload = (Option, DfIter); - pub(super) struct IOThread { sender: SyncSender, pub(super) dir: PathBuf, pub(super) sent: SyncCounter, pub(super) total: SyncCounter, - pub(super) all_processed: Arc<(Condvar, Mutex<()>)> + pub(super) all_processed: Arc<(Condvar, Mutex<()>)>, } impl IOThread { @@ -48,7 +46,6 @@ impl IOThread { std::thread::spawn(move || { let mut count = 0usize; while let Ok((partitions, iter)) = receiver.recv() { - if let Some(partitions) = partitions { for (part, df) in partitions.into_no_null_iter().zip(iter) { let mut path = dir2.clone(); @@ -63,7 +60,6 @@ impl IOThread { writer.finish().unwrap(); count += 1; } - } else { let mut path = dir2.clone(); path.push(format!("{count}.parquet")); @@ -79,13 +75,10 @@ impl IOThread { count += 1; } let total = total2.load(Ordering::Relaxed); - dbg!(total, count); if total != 0 && total == count { all_processed2.0.notify_one(); - dbg!("notified"); } } - eprintln!("kill thread"); }); Ok(Self { @@ -93,7 +86,7 @@ impl IOThread { dir, sent, total, - all_processed + all_processed, }) } @@ -103,9 +96,21 @@ impl IOThread { } pub(super) fn dump_iter(&self, partition: Option, iter: DfIter) { let add = iter.size_hint().1.unwrap(); - self.sender - .send( (partition, iter)) - .unwrap(); + self.sender.send((partition, iter)).unwrap(); self.sent.fetch_add(add, Ordering::Relaxed); } } + +pub(super) fn block_thread_until_io_thread_done(io_thread: &IOThread) { + // get number sent + let sent = io_thread.sent.load(Ordering::Relaxed); + // set total sent + io_thread.total.store(sent, Ordering::Relaxed); + + // then the io thread will check if it has written all files, and if it has + // it will set the condvar so we can continue on this thread + + // we don't really need the mutex for our case, but the condvar needs one + let cond_lock = io_thread.all_processed.1.lock().unwrap(); + let _lock = io_thread.all_processed.0.wait(cond_lock).unwrap(); +} diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/ooc.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/ooc.rs index 9a5915df0b2b..d9d174eb48e4 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/ooc.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/ooc.rs @@ -1,87 +1,76 @@ -use std::collections::VecDeque; use std::fs::DirEntry; -use std::path::{Path, PathBuf}; -use std::sync::atomic::Ordering; -use polars_core::POOL; +use std::path::PathBuf; use polars_core::prelude::*; -use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df}; +use polars_core::utils::split_df; +use polars_core::POOL; use polars_io::parquet::ParquetReader; -use polars_io::prelude::BatchedParquetReader; use polars_io::SerReader; use polars_ops::prelude::*; -use polars_plan::prelude::Context::Default; +use rayon::prelude::*; -use crate::executors::sinks::sort::io::{DfIter, IOThread}; -use crate::CHUNK_SIZE; -use crate::operators::{DataChunk, FinalizedSink, Operator, OperatorResult, PExecutionContext, Source, SourceResult}; +use crate::executors::sinks::sort::io::{block_thread_until_io_thread_done, DfIter, IOThread}; +use crate::executors::sinks::sort::sink::accumulate_and_sort; +use crate::operators::{DataChunk, FinalizedSink, PExecutionContext, Source, SourceResult}; fn read_df(entry: DirEntry) -> PolarsResult { let path = entry.path(); - let file = std::fs::File::open(&path)?; + let file = std::fs::File::open(path)?; ParquetReader::new(file).set_rechunk(false).finish() } -pub(super) type ChunkFallibleIter = Box> + Send + Sync>; - -pub(super) fn sort_ooc(io_thread: &IOThread, partitions: Series, idx: usize, reverse: bool) -> PolarsResult { - +pub(super) fn sort_ooc( + io_thread: &IOThread, + partitions: Series, + idx: usize, + reverse: bool, +) -> PolarsResult { let partitions = partitions.to_physical_repr().into_owned(); - // let dir = &write_thread.dir; // we collect as I am not sure that if we write to the same directory the // iterator will read those also. // We don't want to merge files we just written to disk let dir = &io_thread.dir; let files = std::fs::read_dir(dir)?.collect::>>()?; - const BATCH_SIZE: usize = 16; for entry in files { - let df= read_df(entry)?; + let df = read_df(entry)?; let sort_col = &df.get_columns()[idx]; - let mut assigned_parts = det_partitions(sort_col, &partitions); + let assigned_parts = det_partitions(sort_col, &partitions, reverse); // partition the dataframe into proper buckets let (iter, partition) = partition_df(df, &assigned_parts)?; io_thread.dump_iter(Some(partition), iter); } - let all_processed = io_thread.all_processed.clone(); - // get number sent - let sent = io_thread.sent.load(Ordering::Acquire); - // set total sent - io_thread.total.store(sent, Ordering::Release); - - // then the io thread will check if it has written all files, and if it has - // it will set the condvar so we can continue on this thread - - // we don't really need the mutex for our case, but the condvar needs one - let cond_lock = io_thread.all_processed.1.lock().unwrap(); - all_processed.0.wait(cond_lock).unwrap(); - - let mut files = std::fs::read_dir(dir)?.flat_map(|entry| { - entry.map(|entry| { - let path = entry.path(); - if path.is_dir() { - let dirname = path.file_name().unwrap(); - let partition = dirname.to_string_lossy().parse::().unwrap(); - Some((partition, path)) - } else { - None - } - }).transpose() - }).collect::>>()?; + block_thread_until_io_thread_done(io_thread); + + let files = std::fs::read_dir(dir)? + .flat_map(|entry| { + entry + .map(|entry| { + let path = entry.path(); + if path.is_dir() { + let dirname = path.file_name().unwrap(); + let partition = dirname.to_string_lossy().parse::().unwrap(); + Some((partition, path)) + } else { + None + } + }) + .transpose() + }) + .collect::>>()?; - let source = SortSource::new(files, idx, reverse) ; + let source = SortSource::new(files, idx, reverse); Ok(FinalizedSink::Source(Box::new(source))) } -fn det_partitions(s: &Series, partitions: &Series) -> IdxCa { +fn det_partitions(s: &Series, partitions: &Series, reverse: bool) -> IdxCa { let s = s.to_physical_repr(); - search_sorted(partitions, &s, SearchSortedSide::Any).unwrap() - + search_sorted(partitions, &s, SearchSortedSide::Any, reverse).unwrap() } fn partition_df(df: DataFrame, partitions: &IdxCa) -> PolarsResult<(DfIter, IdxCa)> { @@ -91,12 +80,10 @@ fn partition_df(df: DataFrame, partitions: &IdxCa) -> PolarsResult<(DfIter, IdxC let out = match groups { GroupsProxy::Idx(idx) => { - let iter = idx - .into_iter() - .map(move |(_, group)| { - // groups are in bounds - unsafe { df._take_unchecked_slice(&group, false) } - }); + let iter = idx.into_iter().map(move |(_, group)| { + // groups are in bounds + unsafe { df._take_unchecked_slice(&group, false) } + }); Box::new(iter) as DfIter } GroupsProxy::Slice { groups, .. } => { @@ -104,7 +91,7 @@ fn partition_df(df: DataFrame, partitions: &IdxCa) -> PolarsResult<(DfIter, IdxC .into_iter() .map(move |[first, len]| df.slice(first as i64, len as usize)); Box::new(iter) as DfIter - }, + } }; Ok((out, partitions)) } @@ -114,14 +101,12 @@ pub struct SortSource { n_threads: usize, sort_idx: usize, reverse: bool, - chunk_offset: IdxSize + chunk_offset: IdxSize, } impl SortSource { fn new(mut files: Vec<(u32, PathBuf)>, sort_idx: usize, reverse: bool) -> Self { - files.sort_unstable_by_key(|entry| { - entry.0 - }); + files.sort_unstable_by_key(|entry| entry.0); let n_threads = POOL.current_num_threads(); let files = files.into_iter(); @@ -131,40 +116,41 @@ impl SortSource { n_threads, sort_idx, reverse, - chunk_offset: 0 + chunk_offset: 0, } } } impl Source for SortSource { - fn get_batches(&mut self, context: &PExecutionContext) -> PolarsResult { + fn get_batches(&mut self, _context: &PExecutionContext) -> PolarsResult { match self.files.next() { None => Ok(SourceResult::Finished), Some((_, path)) => { let files = std::fs::read_dir(path)?.collect::>>()?; - let dfs = files.into_iter().map(|entry| { - dbg!(entry.path()); - let df = read_df(entry)?; - - dbg!(df.get_columns()[0].max::()); - Ok(df) - - }).collect::>>()?; - let df = accumulate_dataframes_vertical_unchecked(dfs); - let sort_column = df.get_columns()[self.sort_idx].clone(); - - let mut df = df.sort_impl(vec![sort_column], vec![self.reverse], false, None, true)?; - - let mut chunk_offset = self.chunk_offset; + // read the files in a single partition in parallel + let dfs = POOL.install(|| { + files + .into_par_iter() + .map(read_df) + .collect::>>() + })?; + // sort a single partition + let mut df = accumulate_and_sort(dfs, self.sort_idx, self.reverse)?; + + // convert to chunks + // TODO: make utility functions to save these allocations + let chunk_offset = self.chunk_offset; let dfs = split_df(&mut df, self.n_threads)?; self.chunk_offset += dfs.len() as IdxSize; - let batch = dfs.into_iter().enumerate().map(|(i, df)| { - DataChunk { + let batch = dfs + .into_iter() + .enumerate() + .map(|(i, df)| DataChunk { chunk_index: chunk_offset + i as IdxSize, - data: df - } - }).collect(); + data: df, + }) + .collect(); Ok(SourceResult::GotMoreData(batch)) } @@ -174,4 +160,4 @@ impl Source for SortSource { fn fmt(&self) -> &str { "sort_source" } -} \ No newline at end of file +} diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/sink.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/sink.rs index 0ccc97768919..d331acecfb74 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/sink.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/sort/sink.rs @@ -2,16 +2,15 @@ use std::any::Any; use std::collections::VecDeque; use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex}; -use std::time::Duration; use polars_core::error::PolarsResult; use polars_core::frame::DataFrame; -use polars_core::prelude::{AnyValue, SchemaRef, Series, SortOptions}; +use polars_core::prelude::{AnyValue, SchemaRef, Series}; use polars_core::utils::accumulate_dataframes_vertical_unchecked; use polars_utils::atomic::SyncCounter; use polars_utils::sys::MEMINFO; -use crate::executors::sinks::sort::io::IOThread; +use crate::executors::sinks::sort::io::{block_thread_until_io_thread_done, IOThread}; use crate::executors::sinks::sort::ooc::sort_ooc; use crate::operators::{DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult}; @@ -29,22 +28,30 @@ pub struct SortSink { sort_idx: usize, reverse: bool, // sampled values so we can find the distribution. - dist_sample: Vec> + dist_sample: Vec>, } impl SortSink { pub(crate) fn new(sort_idx: usize, reverse: bool, schema: SchemaRef) -> Self { - Self { + // for testing purposes + let ooc = std::env::var("POLARS_FORCE_OOC_SORT").is_ok(); + + let mut out = Self { schema, chunks: Default::default(), free_mem: SyncCounter::new(0), mem_total: SyncCounter::new(0), - ooc: false, + ooc, io_thread: Default::default(), sort_idx, reverse, - dist_sample: vec![] + dist_sample: vec![], + }; + if ooc { + eprintln!("Out of core sort forced"); + out.init_ooc().unwrap(); } + out } fn refresh_memory(&self) { @@ -54,6 +61,17 @@ impl SortSink { } } + fn init_ooc(&mut self) -> PolarsResult<()> { + self.ooc = true; + + // start IO thread + let mut iot = self.io_thread.lock().unwrap(); + if iot.is_none() { + *iot = Some(IOThread::try_new(self.schema.clone())?) + } + Ok(()) + } + fn store_chunk(&mut self, chunk: DataChunk) -> PolarsResult<()> { let chunk_bytes = chunk.data.estimated_size(); @@ -64,25 +82,7 @@ impl SortSink { // we need some free memory to be able to sort // so we keep 3x the sort data size before we go out of core if used * 3 > free { - self.ooc = true; - - // start IO thread - let mut iot = self.io_thread.lock().unwrap(); - if iot.is_none() { - *iot = Some(IOThread::try_new(self.schema.clone())?) - } - } - - // TODO! remove, only for testing - if used > 200_000 { - dbg!("Start ooc"); - self.ooc = true; - - // start IO thread - let mut iot = self.io_thread.lock().unwrap(); - if iot.is_none() { - *iot = Some(IOThread::try_new(self.schema.clone())?) - } + self.init_ooc()?; } } self.chunks.push_back(chunk.data); @@ -91,7 +91,7 @@ impl SortSink { fn dump(&mut self) -> PolarsResult<()> { // take from the front so that sorted data remains sorted in writing order - while let Some(mut df) = self.chunks.pop_front() { + while let Some(df) = self.chunks.pop_front() { if df.height() > 0 { // safety: we just asserted height > 0 let sample = unsafe { df.get_columns()[self.sort_idx].get_unchecked(0) }; @@ -120,10 +120,11 @@ impl Sink for SortSink { } fn combine(&mut self, mut other: Box) { - let mut other = other.as_any().downcast_mut::().unwrap(); + let other = other.as_any().downcast_mut::().unwrap(); self.chunks.extend(std::mem::take(&mut other.chunks)); self.ooc |= other.ooc; - self.dist_sample.extend(std::mem::take(&mut other.dist_sample)); + self.dist_sample + .extend(std::mem::take(&mut other.dist_sample)); if self.ooc { self.dump().unwrap() @@ -136,7 +137,7 @@ impl Sink for SortSink { chunks: Default::default(), free_mem: self.free_mem.clone(), mem_total: self.mem_total.clone(), - ooc: self.ooc.clone(), + ooc: self.ooc, io_thread: self.io_thread.clone(), sort_idx: self.sort_idx, reverse: self.reverse, @@ -148,28 +149,19 @@ impl Sink for SortSink { if self.ooc { let lock = self.io_thread.lock().unwrap(); let io_thread = lock.as_ref().unwrap(); - let all_processed = io_thread.all_processed.clone(); let dist = Series::from_any_values("", &self.dist_sample).unwrap(); let dist = dist.sort(self.reverse); - - // get number sent - let sent = io_thread.sent.load(Ordering::Relaxed); - // set total sent - io_thread.total.fetch_add(sent, Ordering::Relaxed); - - // then the io thread will check if it has written all files, and if it has - // it will set the condvar so we can continue on this thread - - // we don't really need the mutex for our case, but the condvar needs one - let cond_lock = io_thread.all_processed.1.lock().unwrap(); - all_processed.0.wait(cond_lock).unwrap(); + block_thread_until_io_thread_done(io_thread); sort_ooc(io_thread, dist, self.sort_idx, self.reverse) - } else { - let df = accumulate_dataframes_vertical_unchecked(std::mem::take(&mut self.chunks)); + let df = accumulate_and_sort( + std::mem::take(&mut self.chunks), + self.sort_idx, + self.reverse, + )?; Ok(FinalizedSink::Finished(df)) } } @@ -182,3 +174,13 @@ impl Sink for SortSink { "sort" } } + +pub(super) fn accumulate_and_sort>( + dfs: I, + sort_idx: usize, + reverse: bool, +) -> PolarsResult { + let df = accumulate_dataframes_vertical_unchecked(dfs); + let sort_column = df.get_columns()[sort_idx].clone(); + df.sort_impl(vec![sort_column], vec![reverse], false, None, true) +} diff --git a/polars/polars-lazy/polars-pipe/src/pipeline/convert.rs b/polars/polars-lazy/polars-pipe/src/pipeline/convert.rs index 16cbc6abe164..002780145eef 100644 --- a/polars/polars-lazy/polars-pipe/src/pipeline/convert.rs +++ b/polars/polars-lazy/polars-pipe/src/pipeline/convert.rs @@ -318,6 +318,7 @@ where Ok(op) } +#[allow(clippy::too_many_arguments)] pub fn create_pipeline( sources: &[Node], operators: Vec>, @@ -326,7 +327,7 @@ pub fn create_pipeline( lp_arena: &mut Arena, expr_arena: &mut Arena, to_physical: F, - verbose: bool + verbose: bool, ) -> PolarsResult where F: Fn(Node, &Arena, Option<&SchemaRef>) -> PolarsResult>, @@ -420,7 +421,7 @@ where operator_nodes, sink_nodes, operator_offset, - verbose + verbose, )) } diff --git a/polars/polars-lazy/polars-pipe/src/pipeline/dispatcher.rs b/polars/polars-lazy/polars-pipe/src/pipeline/dispatcher.rs index 9e33f7c01773..eb2be8d3e660 100644 --- a/polars/polars-lazy/polars-pipe/src/pipeline/dispatcher.rs +++ b/polars/polars-lazy/polars-pipe/src/pipeline/dispatcher.rs @@ -2,10 +2,10 @@ use std::collections::VecDeque; use polars_core::error::PolarsResult; use polars_core::frame::DataFrame; +use polars_core::utils::accumulate_dataframes_vertical_unchecked; use polars_core::POOL; use polars_utils::arena::Node; use rayon::prelude::*; -use polars_core::utils::accumulate_dataframes_vertical_unchecked; use crate::executors::operators::Dummy; use crate::executors::sources::DataFrameSource; @@ -25,7 +25,7 @@ pub struct PipeLine { sink_nodes: Vec, rh_sides: Vec, operator_offset: usize, - verbose: bool + verbose: bool, } impl PipeLine { @@ -35,7 +35,7 @@ impl PipeLine { operator_nodes: Vec, sink_and_nodes: Vec<(usize, Node, Box)>, operator_offset: usize, - verbose: bool + verbose: bool, ) -> PipeLine { debug_assert_eq!(operators.len(), operator_nodes.len() + operator_offset); // we don't use the power of two partition size here @@ -62,7 +62,7 @@ impl PipeLine { sink_nodes, rh_sides: vec![], operator_offset, - verbose + verbose, } } @@ -175,8 +175,7 @@ impl PipeLine { fn set_sources(&mut self, src: Box) { self.sources.clear(); - self.sources - .push(src); + self.sources.push(src); } pub fn run_pipeline(&mut self, ec: &PExecutionContext) -> PolarsResult { @@ -334,4 +333,4 @@ fn consume_source(src: &mut dyn Source, context: &PExecutionContext) -> PolarsRe frames.extend(batch.into_iter().map(|chunk| chunk.data)) } Ok(accumulate_dataframes_vertical_unchecked(frames)) -} \ No newline at end of file +} diff --git a/polars/polars-lazy/polars-plan/src/dsl/function_expr/search_sorted.rs b/polars/polars-lazy/polars-plan/src/dsl/function_expr/search_sorted.rs index 8d8b7e7e4da4..9d4566dfce65 100644 --- a/polars/polars-lazy/polars-plan/src/dsl/function_expr/search_sorted.rs +++ b/polars/polars-lazy/polars-plan/src/dsl/function_expr/search_sorted.rs @@ -6,5 +6,5 @@ pub(super) fn search_sorted_impl(s: &mut [Series], side: SearchSortedSide) -> Po let sorted_array = &s[0]; let search_value = &s[1]; - search_sorted(sorted_array, search_value, side).map(|ca| ca.into_series()) + search_sorted(sorted_array, search_value, side, false).map(|ca| ca.into_series()) } diff --git a/polars/polars-lazy/src/physical_plan/streaming/convert.rs b/polars/polars-lazy/src/physical_plan/streaming/convert.rs index 82f7c96532df..e455b44d1d41 100644 --- a/polars/polars-lazy/src/physical_plan/streaming/convert.rs +++ b/polars/polars-lazy/src/physical_plan/streaming/convert.rs @@ -399,7 +399,7 @@ pub(crate) fn insert_streaming_nodes( lp_arena, expr_arena, to_physical_piped_expr, - verbose + verbose, )?; pipelines.push_back(pipeline); } diff --git a/polars/polars-lazy/src/tests/streaming.rs b/polars/polars-lazy/src/tests/streaming.rs index cde8e2963350..68952212ea75 100644 --- a/polars/polars-lazy/src/tests/streaming.rs +++ b/polars/polars-lazy/src/tests/streaming.rs @@ -332,26 +332,3 @@ fn test_streaming_double_left_join() -> PolarsResult<()> { Ok(()) } - -#[test] -#[cfg(feature = "random")] -fn test_streaming_sort() -> PolarsResult<()> { - let ca: NoNull = (0..100_000).collect(); - let s = ca.into_inner().into_series(); - let s = s.shuffle(None); - - let q = df![ - "a" => s - ]? - .lazy(); - - let out = q - .with_streaming(true) - .sort("a", Default::default()) - // .sink_parquet("/tmp/test.parquet".into(), Default::default()); - .collect()?; - - dbg!(&out); - - Ok(()) -} diff --git a/polars/polars-ops/src/series/ops/search_sorted.rs b/polars/polars-ops/src/series/ops/search_sorted.rs index 704792cf1dd9..45f6c42a57fb 100644 --- a/polars/polars-ops/src/series/ops/search_sorted.rs +++ b/polars/polars-ops/src/series/ops/search_sorted.rs @@ -104,6 +104,7 @@ fn binary_search_array( arr: G, len: usize, search_value: I, + reverse: bool, ) where G: GetArray, I: PartialEq + Debug + Copy + PartialOrd + IsFloat, @@ -120,7 +121,13 @@ fn binary_search_array( // - `mid < size`: `mid` is limited by `[left; right)` bound. let cmp = match unsafe { arr._get_value_unchecked(mid as usize) } { None => Ordering::Less, - Some(value) => compare_fn_nan_max(&value, &search_value), + Some(value) => { + if reverse { + compare_fn_nan_max(&search_value, &value) + } else { + compare_fn_nan_max(&value, &search_value) + } + } }; // The reason why we use if/else control flow rather than match @@ -146,6 +153,7 @@ fn search_sorted_ca_array( ca: &ChunkedArray, search_values: &ChunkedArray, side: SearchSortedSide, + reverse: bool, ) -> Vec where T: PolarsNumericType, @@ -158,7 +166,9 @@ where for opt_v in search_values { match opt_v { None => out.push(0), - Some(search_value) => binary_search_array(side, &mut out, arr, ca.len(), search_value), + Some(search_value) => { + binary_search_array(side, &mut out, arr, ca.len(), search_value, reverse) + } } } out @@ -168,6 +178,7 @@ fn search_sorted_utf8_array( ca: &Utf8Chunked, search_values: &Utf8Chunked, side: SearchSortedSide, + reverse: bool, ) -> Vec { let ca = ca.rechunk(); let arr = ca.downcast_iter().next().unwrap(); @@ -178,7 +189,7 @@ fn search_sorted_utf8_array( match opt_v { None => out.push(0), Some(search_value) => { - binary_search_array(side, &mut out, arr, ca.len(), search_value); + binary_search_array(side, &mut out, arr, ca.len(), search_value, reverse); } } } @@ -189,6 +200,7 @@ pub fn search_sorted( s: &Series, search_values: &Series, side: SearchSortedSide, + reverse: bool, ) -> PolarsResult { let original_dtype = s.dtype(); let s = s.to_physical_repr(); @@ -198,7 +210,7 @@ pub fn search_sorted( DataType::Utf8 => { let ca = s.utf8().unwrap(); let search_values = search_values.utf8()?; - let idx = search_sorted_utf8_array(ca, search_values, side); + let idx = search_sorted_utf8_array(ca, search_values, side, reverse); Ok(IdxCa::new_vec(s.name(), idx)) } @@ -209,7 +221,7 @@ pub fn search_sorted( let ca: &ChunkedArray<$T> = s.as_ref().as_ref().as_ref(); let search_values: &ChunkedArray<$T> = search_values.as_ref().as_ref().as_ref(); - search_sorted_ca_array(ca, search_values, side) + search_sorted_ca_array(ca, search_values, side, reverse) }); Ok(IdxCa::new_vec(s.name(), idx)) } @@ -218,11 +230,3 @@ pub fn search_sorted( )), } } - -#[test] -fn test_search_sorted() { - let s = Series::new("", [1, 1, 4, 4]); - let b = Series::new("", [0, 1, 2, 4, 5]); - let out = search_sorted_array(&s, &b, SearchSortedSide::Right).unwrap(); - dbg!(out); -}