Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust, python): The 1 billion row sort #6156

Merged
merged 9 commits into from
Jan 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ xxhash-rust = { version = "0.8.6", features = ["xxh3"] }
anyhow = "1"
hashbrown = { version = "0.13.1", features = ["rayon", "ahash"] }
bitflags = "1.3"
once_cell = "1"

[workspace.dependencies.arrow]
package = "arrow2"
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ indexmap = { version = "1", features = ["std"] }
ndarray = { version = "0.15", optional = true, default_features = false }
num.workspace = true
object_store = { version = "0.5.3", default-features = false, optional = true }
once_cell = "1"
once_cell.workspace = true
polars-arrow = { version = "0.26.1", path = "../polars-arrow", features = ["compute"] }
polars-utils = { version = "0.26.1", path = "../polars-utils" }
rand = { version = "0.8", optional = true, features = ["small_rng", "std"] }
Expand Down
11 changes: 5 additions & 6 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1780,8 +1780,6 @@ impl DataFrame {
by_column: impl IntoVec<String>,
reverse: impl IntoVec<bool>,
) -> PolarsResult<&mut Self> {
// a lot of indirection in both sorting and take
self.as_single_chunk_par();
let by_column = self.select_series(by_column)?;
let reverse = reverse.into_vec();
self.columns = self
Expand All @@ -1803,6 +1801,9 @@ impl DataFrame {
if self.height() == 0 {
return Ok(self.clone());
}
// a lot of indirection in both sorting and take
let mut df = self.clone();
let df = df.as_single_chunk_par();
// note that the by_column argument also contains evaluated expression from polars-lazy
// that may not even be present in this dataframe.

Expand All @@ -1821,7 +1822,7 @@ impl DataFrame {
// fast path for a frame with a single series
// no need to compute the sort indices and then take by these indices
// simply sort and return as frame
if self.width() == 1 && self.check_name_to_idx(s.name()).is_ok() {
if df.width() == 1 && df.check_name_to_idx(s.name()).is_ok() {
let mut out = s.sort_with(options);
if let Some((offset, len)) = slice {
out = out.slice(offset, len);
Expand Down Expand Up @@ -1850,7 +1851,7 @@ impl DataFrame {

// Safety:
// the created indices are in bounds
let mut df = unsafe { self.take_unchecked_impl(&take, parallel) };
let mut df = unsafe { df.take_unchecked_impl(&take, parallel) };
// Mark the first sort column as sorted
// if the column did not exists it is ok, because we sorted by an expression
// not present in the dataframe
Expand Down Expand Up @@ -1893,8 +1894,6 @@ impl DataFrame {
/// Sort the `DataFrame` by a single column with extra options.
pub fn sort_with_options(&self, by_column: &str, options: SortOptions) -> PolarsResult<Self> {
let mut df = self.clone();
// a lot of indirection in both sorting and take
df.as_single_chunk_par();
let by_column = vec![df.column(by_column)?.clone()];
let reverse = vec![options.descending];
df.columns = df
Expand Down
4 changes: 1 addition & 3 deletions polars/polars-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ mod tests;
pub(crate) mod vector_hasher;

use std::sync::Mutex;
#[cfg(feature = "object")]
use std::time::{SystemTime, UNIX_EPOCH};

use once_cell::sync::Lazy;
Expand All @@ -36,8 +35,7 @@ use rayon::{ThreadPool, ThreadPoolBuilder};
#[cfg(feature = "dtype-categorical")]
pub use crate::chunked_array::logical::categorical::stringcache::*;

#[cfg(feature = "object")]
pub(crate) static PROCESS_ID: Lazy<u128> = Lazy::new(|| {
pub static PROCESS_ID: Lazy<u128> = Lazy::new(|| {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
Expand Down
6 changes: 6 additions & 0 deletions polars/polars-core/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ impl Schema {
.ok_or_else(|| PolarsError::NotFound(name.to_string().into()))
}

pub fn try_get_full(&self, name: &str) -> PolarsResult<(usize, &String, &DataType)> {
self.inner
.get_full(name)
.ok_or_else(|| PolarsError::NotFound(name.to_string().into()))
}

pub fn get_full(&self, name: &str) -> Option<(usize, &String, &DataType)> {
self.inner.get_full(name)
}
Expand Down
40 changes: 39 additions & 1 deletion polars/polars-io/src/ipc/ipc_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,26 @@ pub use write::Compression as IpcCompression;
use crate::mmap::MmapBytesReader;
use crate::RowCount;

impl<W> IpcWriter<W> {
impl<W: Write> IpcWriter<W> {
/// Set the compression used. Defaults to None.
pub fn with_compression(mut self, compression: Option<write::Compression>) -> Self {
self.compression = compression;
self
}

pub fn batched(&mut self, schema: &Schema) -> PolarsResult<BatchedWriter<&mut W>> {
let mut writer = write::FileWriter::new(
&mut self.writer,
schema.to_arrow(),
None,
WriteOptions {
compression: self.compression,
},
);
writer.start()?;

Ok(BatchedWriter { writer })
}
}

impl<W> SerWriter<W> for IpcWriter<W>
Expand Down Expand Up @@ -316,6 +330,30 @@ where
}
}

pub struct BatchedWriter<W: Write> {
writer: write::FileWriter<W>,
}

impl<W: Write> BatchedWriter<W> {
/// Write a batch to the parquet writer.
///
/// # Panics
/// The caller must ensure the chunks in the given [`DataFrame`] are aligned.
pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
let iter = df.iter_chunks();
for batch in iter {
self.writer.write(&batch, None)?
}
Ok(())
}

/// Writes the footer of the IPC file.
pub fn finish(&mut self) -> PolarsResult<()> {
self.writer.finish()?;
Ok(())
}
}

pub struct IpcWriterOption {
compression: Option<write::Compression>,
extension: PathBuf,
Expand Down
33 changes: 33 additions & 0 deletions polars/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,4 +465,37 @@ impl BatchedParquetReader {
Ok(Some(chunks))
}
}

/// Turn the batched reader into an iterator.
pub fn iter(self, batch_size: usize) -> BatchedParquetIter {
BatchedParquetIter {
batch_size,
inner: self,
current_batch: vec![].into_iter(),
}
}
}

pub struct BatchedParquetIter {
batch_size: usize,
inner: BatchedParquetReader,
current_batch: std::vec::IntoIter<DataFrame>,
}

impl Iterator for BatchedParquetIter {
type Item = PolarsResult<DataFrame>;

fn next(&mut self) -> Option<Self::Item> {
match self.current_batch.next() {
Some(df) => Some(Ok(df)),
None => match self.inner.next_batches(self.batch_size) {
Err(e) => Some(Err(e)),
Ok(opt_batch) => {
let batch = opt_batch?;
self.current_batch = batch.into_iter();
self.current_batch.next().map(Ok)
}
},
}
}
}
6 changes: 6 additions & 0 deletions polars/polars-io/src/parquet/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ pub enum ParquetCompression {
Lz4Raw,
}

impl Default for ParquetCompression {
fn default() -> Self {
ParquetCompression::Lz4Raw
}
}

impl From<ParquetCompression> for CompressionOptions {
fn from(value: ParquetCompression) -> Self {
use ParquetCompression::*;
Expand Down
7 changes: 1 addition & 6 deletions polars/polars-io/src/predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,7 @@ pub(crate) fn arrow_schema_to_empty_df(schema: &ArrowSchema) -> DataFrame {
DataFrame::new_no_checks(columns)
}

#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "json",
feature = "ipc_streaming"
))]
#[cfg(any(feature = "parquet", feature = "json",))]
pub(crate) fn apply_predicate(
df: &mut DataFrame,
predicate: Option<&dyn PhysicalIoExpr>,
Expand Down
5 changes: 3 additions & 2 deletions polars/polars-lazy/polars-pipe/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ hashbrown.workspace = true
num.workspace = true
polars-arrow = { version = "0.26.1", path = "../../polars-arrow", default-features = false }
polars-core = { version = "0.26.1", path = "../../polars-core", features = ["lazy", "private", "zip_with", "random"], default-features = false }
polars-io = { version = "0.26.1", path = "../../polars-io", default-features = false }
polars-io = { version = "0.26.1", path = "../../polars-io", default-features = false, features = ["ipc"] }
polars-ops = { version = "0.26.1", path = "../../polars-ops", features = ["search_sorted"] }
polars-plan = { version = "0.26.1", path = "../polars-plan", default-features = false, features = ["compile"] }
polars-utils = { version = "0.26.1", path = "../../polars-utils" }
polars-utils = { version = "0.26.1", path = "../../polars-utils", features = ["sysinfo"] }
rayon.workspace = true
smartstring = { version = "1" }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@ use polars_core::error::PolarsResult;
use crate::operators::{DataChunk, Operator, OperatorResult, PExecutionContext};

#[derive(Default)]
pub struct Dummy {}
pub struct PlaceHolder {}

impl Operator for Dummy {
impl Operator for PlaceHolder {
fn execute(
&mut self,
_context: &PExecutionContext,
_chunk: &DataChunk,
) -> PolarsResult<OperatorResult> {
panic!("dummy should be replaced")
panic!("placeholder should be replaced")
}

fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
Box::new(Self {})
}

fn fmt(&self) -> &str {
"dummy"
"placeholder"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ mod dummy;
mod filter;
mod projection;

pub(crate) use dummy::Dummy;
pub(crate) use dummy::PlaceHolder;
pub(crate) use filter::*;
pub(crate) use projection::*;
2 changes: 2 additions & 0 deletions polars/polars-lazy/polars-pipe/src/executors/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ mod ordered;
#[cfg(feature = "parquet")]
mod parquet_sink;
mod slice;
mod sort;
mod utils;

pub(crate) use joins::*;
pub(crate) use ordered::*;
#[cfg(feature = "parquet")]
pub(crate) use parquet_sink::ParquetSink;
pub(crate) use slice::*;
pub(crate) use sort::*;

// We must strike a balance between cache coherence and resizing costs.
// Overallocation seems a lot more expensive than resizing so we start reasonable small.
Expand Down
Loading