Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parquet: allow writing smaller row groups #3852

Merged
merged 1 commit into from
Jun 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2161,6 +2161,17 @@ impl DataFrame {
DataFrame::new_no_checks(col)
}

#[must_use]
pub fn slice_par(&self, offset: i64, length: usize) -> Self {
let col = POOL.install(|| {
self.columns
.par_iter()
.map(|s| s.slice(offset, length))
.collect::<Vec<_>>()
});
DataFrame::new_no_checks(col)
}

/// Get the head of the `DataFrame`.
///
/// # Example
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-core/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ fn _get_supertype(l: &DataType, r: &DataType) -> Option<DataType> {

/// This takes ownership of the DataFrame so that drop is called earlier.
/// Does not check if schema is correct
pub fn accumulate_dataframes_vertical_unchecked<I>(dfs: I) -> Result<DataFrame>
pub fn accumulate_dataframes_vertical_unchecked<I>(dfs: I) -> DataFrame
where
I: IntoIterator<Item = DataFrame>,
{
Expand All @@ -794,7 +794,7 @@ where
for df in iter {
acc_df.vstack_mut_unchecked(&df);
}
Ok(acc_df)
acc_df
}

/// This takes ownership of the DataFrame so that drop is called earlier.
Expand Down
14 changes: 2 additions & 12 deletions polars/polars-io/src/parquet/mmap.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,10 @@
use super::*;
use arrow::datatypes::Field;
use arrow::io::parquet::read::{
column_iter_to_arrays, ArrayIter, BasicDecompressor, ColumnChunkMetaData, PageReader,
column_iter_to_arrays, get_field_columns, ArrayIter, BasicDecompressor, ColumnChunkMetaData,
PageReader,
};

// TODO! make public in arrow2?
pub(super) fn get_field_columns<'a>(
columns: &'a [ColumnChunkMetaData],
field_name: &str,
) -> Vec<&'a ColumnChunkMetaData> {
columns
.iter()
.filter(|x| x.descriptor().path_in_schema[0] == field_name)
.collect()
}

/// memory maps all columns that are part of the parquet field `field_name`
pub(super) fn mmap_columns<'a>(
file: &'a [u8],
Expand Down
17 changes: 17 additions & 0 deletions polars/polars-io/src/parquet/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use rayon::prelude::*;
use std::collections::VecDeque;
use std::io::Write;

use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df};
pub use write::{BrotliLevel, CompressionOptions as ParquetCompression, GzipLevel, ZstdLevel};

struct Bla {
Expand Down Expand Up @@ -52,6 +53,7 @@ pub struct ParquetWriter<W> {
writer: W,
compression: write::CompressionOptions,
statistics: bool,
row_group_size: Option<usize>,
}

impl<W> ParquetWriter<W>
Expand All @@ -67,6 +69,7 @@ where
writer,
compression: write::CompressionOptions::Lz4Raw,
statistics: false,
row_group_size: None,
}
}

Expand All @@ -76,14 +79,28 @@ where
self
}

/// Compute and write statistic
pub fn with_statistics(mut self, statistics: bool) -> Self {
self.statistics = statistics;
self
}

/// Set the row group size during writing. This can reduce memory pressure and improve
/// writing performance.
pub fn with_row_group_size(mut self, size: Option<usize>) -> Self {
self.row_group_size = size;
self
}

/// Write the given DataFrame in the the writer `W`.
pub fn finish(mut self, df: &mut DataFrame) -> Result<()> {
// ensures all chunks are aligned.
df.rechunk();

if let Some(n) = self.row_group_size {
*df = accumulate_dataframes_vertical_unchecked(split_df(df, df.height() / n)?);
};

let fields = df.schema().to_arrow().fields;
let rb_iter = df.iter_chunks();

Expand Down
9 changes: 8 additions & 1 deletion py-polars/polars/internals/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1372,6 +1372,7 @@ def write_parquet(
] = "lz4",
compression_level: Optional[int] = None,
statistics: bool = False,
row_group_size: Optional[int] = None,
use_pyarrow: bool = False,
**kwargs: Any,
) -> None:
Expand Down Expand Up @@ -1404,6 +1405,10 @@ def write_parquet(
* max-level: 22
statistics
Write statistics to the parquet headers. This requires extra compute.
row_group_size
Size of the row groups. If none the chunks of the `DataFrame` are used.
Writing in smaller chunks may reduce memory pressure and improve writing speeds.
This argument has no effect if 'pyarrow' is used.
use_pyarrow
Use C++ parquet implementation vs rust parquet implementation.
At the moment C++ supports more features.
Expand Down Expand Up @@ -1443,7 +1448,9 @@ def write_parquet(
**kwargs,
)
else:
self._df.to_parquet(file, compression, compression_level, statistics)
self._df.to_parquet(
file, compression, compression_level, statistics, row_group_size
)

def to_parquet(
self,
Expand Down
6 changes: 3 additions & 3 deletions py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ impl PyDataFrame {

if let Ok(s) = py_f.extract::<&str>(py) {
let f = std::fs::File::create(s).unwrap();
let f = BufWriter::new(f);
AvroWriter::new(f)
.with_compression(compression)
.finish(&mut self.df)
Expand Down Expand Up @@ -460,7 +459,6 @@ impl PyDataFrame {

if let Ok(s) = py_f.extract::<&str>(py) {
let f = std::fs::File::create(s).unwrap();
let f = BufWriter::new(f);
IpcWriter::new(f)
.with_compression(compression)
.finish(&mut self.df)
Expand Down Expand Up @@ -570,6 +568,7 @@ impl PyDataFrame {
compression: &str,
compression_level: Option<i32>,
statistics: bool,
row_group_size: Option<usize>,
) -> PyResult<()> {
let compression = match compression {
"uncompressed" => ParquetCompression::Uncompressed,
Expand Down Expand Up @@ -605,17 +604,18 @@ impl PyDataFrame {

if let Ok(s) = py_f.extract::<&str>(py) {
let f = std::fs::File::create(s).unwrap();
let f = BufWriter::new(f);
ParquetWriter::new(f)
.with_compression(compression)
.with_statistics(statistics)
.with_row_group_size(row_group_size)
.finish(&mut self.df)
.map_err(PyPolarsErr::from)?;
} else {
let buf = get_file_like(py_f, true)?;
ParquetWriter::new(buf)
.with_compression(compression)
.with_statistics(statistics)
.with_row_group_size(row_group_size)
.finish(&mut self.df)
.map_err(PyPolarsErr::from)?;
}
Expand Down