Skip to content

Commit

Permalink
feat(rust, python): The 1 billion row sort (#6156)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Jan 13, 2023
1 parent 5b310e2 commit 1a51f9e
Show file tree
Hide file tree
Showing 32 changed files with 820 additions and 51 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ xxhash-rust = { version = "0.8.6", features = ["xxh3"] }
anyhow = "1"
hashbrown = { version = "0.13.1", features = ["rayon", "ahash"] }
bitflags = "1.3"
once_cell = "1"

[workspace.dependencies.arrow]
package = "arrow2"
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ indexmap = { version = "1", features = ["std"] }
ndarray = { version = "0.15", optional = true, default_features = false }
num.workspace = true
object_store = { version = "0.5.3", default-features = false, optional = true }
once_cell = "1"
once_cell.workspace = true
polars-arrow = { version = "0.26.1", path = "../polars-arrow", features = ["compute"] }
polars-utils = { version = "0.26.1", path = "../polars-utils" }
rand = { version = "0.8", optional = true, features = ["small_rng", "std"] }
Expand Down
11 changes: 5 additions & 6 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1780,8 +1780,6 @@ impl DataFrame {
by_column: impl IntoVec<String>,
reverse: impl IntoVec<bool>,
) -> PolarsResult<&mut Self> {
// a lot of indirection in both sorting and take
self.as_single_chunk_par();
let by_column = self.select_series(by_column)?;
let reverse = reverse.into_vec();
self.columns = self
Expand All @@ -1803,6 +1801,9 @@ impl DataFrame {
if self.height() == 0 {
return Ok(self.clone());
}
// a lot of indirection in both sorting and take
let mut df = self.clone();
let df = df.as_single_chunk_par();
// note that the by_column argument also contains evaluated expression from polars-lazy
// that may not even be present in this dataframe.

Expand All @@ -1821,7 +1822,7 @@ impl DataFrame {
// fast path for a frame with a single series
// no need to compute the sort indices and then take by these indices
// simply sort and return as frame
if self.width() == 1 && self.check_name_to_idx(s.name()).is_ok() {
if df.width() == 1 && df.check_name_to_idx(s.name()).is_ok() {
let mut out = s.sort_with(options);
if let Some((offset, len)) = slice {
out = out.slice(offset, len);
Expand Down Expand Up @@ -1850,7 +1851,7 @@ impl DataFrame {

// Safety:
// the created indices are in bounds
let mut df = unsafe { self.take_unchecked_impl(&take, parallel) };
let mut df = unsafe { df.take_unchecked_impl(&take, parallel) };
// Mark the first sort column as sorted
// if the column did not exists it is ok, because we sorted by an expression
// not present in the dataframe
Expand Down Expand Up @@ -1893,8 +1894,6 @@ impl DataFrame {
/// Sort the `DataFrame` by a single column with extra options.
pub fn sort_with_options(&self, by_column: &str, options: SortOptions) -> PolarsResult<Self> {
let mut df = self.clone();
// a lot of indirection in both sorting and take
df.as_single_chunk_par();
let by_column = vec![df.column(by_column)?.clone()];
let reverse = vec![options.descending];
df.columns = df
Expand Down
4 changes: 1 addition & 3 deletions polars/polars-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ mod tests;
pub(crate) mod vector_hasher;

use std::sync::Mutex;
#[cfg(feature = "object")]
use std::time::{SystemTime, UNIX_EPOCH};

use once_cell::sync::Lazy;
Expand All @@ -36,8 +35,7 @@ use rayon::{ThreadPool, ThreadPoolBuilder};
#[cfg(feature = "dtype-categorical")]
pub use crate::chunked_array::logical::categorical::stringcache::*;

#[cfg(feature = "object")]
pub(crate) static PROCESS_ID: Lazy<u128> = Lazy::new(|| {
pub static PROCESS_ID: Lazy<u128> = Lazy::new(|| {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
Expand Down
6 changes: 6 additions & 0 deletions polars/polars-core/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ impl Schema {
.ok_or_else(|| PolarsError::NotFound(name.to_string().into()))
}

pub fn try_get_full(&self, name: &str) -> PolarsResult<(usize, &String, &DataType)> {
self.inner
.get_full(name)
.ok_or_else(|| PolarsError::NotFound(name.to_string().into()))
}

pub fn get_full(&self, name: &str) -> Option<(usize, &String, &DataType)> {
self.inner.get_full(name)
}
Expand Down
40 changes: 39 additions & 1 deletion polars/polars-io/src/ipc/ipc_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,26 @@ pub use write::Compression as IpcCompression;
use crate::mmap::MmapBytesReader;
use crate::RowCount;

impl<W> IpcWriter<W> {
impl<W: Write> IpcWriter<W> {
/// Set the compression used. Defaults to None.
pub fn with_compression(mut self, compression: Option<write::Compression>) -> Self {
self.compression = compression;
self
}

pub fn batched(&mut self, schema: &Schema) -> PolarsResult<BatchedWriter<&mut W>> {
let mut writer = write::FileWriter::new(
&mut self.writer,
schema.to_arrow(),
None,
WriteOptions {
compression: self.compression,
},
);
writer.start()?;

Ok(BatchedWriter { writer })
}
}

impl<W> SerWriter<W> for IpcWriter<W>
Expand Down Expand Up @@ -316,6 +330,30 @@ where
}
}

pub struct BatchedWriter<W: Write> {
writer: write::FileWriter<W>,
}

impl<W: Write> BatchedWriter<W> {
/// Write a batch to the parquet writer.
///
/// # Panics
/// The caller must ensure the chunks in the given [`DataFrame`] are aligned.
pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
let iter = df.iter_chunks();
for batch in iter {
self.writer.write(&batch, None)?
}
Ok(())
}

/// Writes the footer of the IPC file.
pub fn finish(&mut self) -> PolarsResult<()> {
self.writer.finish()?;
Ok(())
}
}

pub struct IpcWriterOption {
compression: Option<write::Compression>,
extension: PathBuf,
Expand Down
33 changes: 33 additions & 0 deletions polars/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,4 +465,37 @@ impl BatchedParquetReader {
Ok(Some(chunks))
}
}

/// Turn the batched reader into an iterator.
pub fn iter(self, batch_size: usize) -> BatchedParquetIter {
BatchedParquetIter {
batch_size,
inner: self,
current_batch: vec![].into_iter(),
}
}
}

pub struct BatchedParquetIter {
batch_size: usize,
inner: BatchedParquetReader,
current_batch: std::vec::IntoIter<DataFrame>,
}

impl Iterator for BatchedParquetIter {
type Item = PolarsResult<DataFrame>;

fn next(&mut self) -> Option<Self::Item> {
match self.current_batch.next() {
Some(df) => Some(Ok(df)),
None => match self.inner.next_batches(self.batch_size) {
Err(e) => Some(Err(e)),
Ok(opt_batch) => {
let batch = opt_batch?;
self.current_batch = batch.into_iter();
self.current_batch.next().map(Ok)
}
},
}
}
}
6 changes: 6 additions & 0 deletions polars/polars-io/src/parquet/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ pub enum ParquetCompression {
Lz4Raw,
}

impl Default for ParquetCompression {
fn default() -> Self {
ParquetCompression::Lz4Raw
}
}

impl From<ParquetCompression> for CompressionOptions {
fn from(value: ParquetCompression) -> Self {
use ParquetCompression::*;
Expand Down
7 changes: 1 addition & 6 deletions polars/polars-io/src/predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,7 @@ pub(crate) fn arrow_schema_to_empty_df(schema: &ArrowSchema) -> DataFrame {
DataFrame::new_no_checks(columns)
}

#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "json",
feature = "ipc_streaming"
))]
#[cfg(any(feature = "parquet", feature = "json",))]
pub(crate) fn apply_predicate(
df: &mut DataFrame,
predicate: Option<&dyn PhysicalIoExpr>,
Expand Down
5 changes: 3 additions & 2 deletions polars/polars-lazy/polars-pipe/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ hashbrown.workspace = true
num.workspace = true
polars-arrow = { version = "0.26.1", path = "../../polars-arrow", default-features = false }
polars-core = { version = "0.26.1", path = "../../polars-core", features = ["lazy", "private", "zip_with", "random"], default-features = false }
polars-io = { version = "0.26.1", path = "../../polars-io", default-features = false }
polars-io = { version = "0.26.1", path = "../../polars-io", default-features = false, features = ["ipc"] }
polars-ops = { version = "0.26.1", path = "../../polars-ops", features = ["search_sorted"] }
polars-plan = { version = "0.26.1", path = "../polars-plan", default-features = false, features = ["compile"] }
polars-utils = { version = "0.26.1", path = "../../polars-utils" }
polars-utils = { version = "0.26.1", path = "../../polars-utils", features = ["sysinfo"] }
rayon.workspace = true
smartstring = { version = "1" }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@ use polars_core::error::PolarsResult;
use crate::operators::{DataChunk, Operator, OperatorResult, PExecutionContext};

#[derive(Default)]
pub struct Dummy {}
pub struct PlaceHolder {}

impl Operator for Dummy {
impl Operator for PlaceHolder {
fn execute(
&mut self,
_context: &PExecutionContext,
_chunk: &DataChunk,
) -> PolarsResult<OperatorResult> {
panic!("dummy should be replaced")
panic!("placeholder should be replaced")
}

fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
Box::new(Self {})
}

fn fmt(&self) -> &str {
"dummy"
"placeholder"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ mod dummy;
mod filter;
mod projection;

pub(crate) use dummy::Dummy;
pub(crate) use dummy::PlaceHolder;
pub(crate) use filter::*;
pub(crate) use projection::*;
2 changes: 2 additions & 0 deletions polars/polars-lazy/polars-pipe/src/executors/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ mod ordered;
#[cfg(feature = "parquet")]
mod parquet_sink;
mod slice;
mod sort;
mod utils;

pub(crate) use joins::*;
pub(crate) use ordered::*;
#[cfg(feature = "parquet")]
pub(crate) use parquet_sink::ParquetSink;
pub(crate) use slice::*;
pub(crate) use sort::*;

// We must strike a balance between cache coherence and resizing costs.
// Overallocation seems a lot more expensive than resizing so we start reasonable small.
Expand Down
Loading

0 comments on commit 1a51f9e

Please sign in to comment.