diff --git a/src/io/parquet/read/deserialize/nested_utils.rs b/src/io/parquet/read/deserialize/nested_utils.rs index 5fafdceee2f..f4188d2816a 100644 --- a/src/io/parquet/read/deserialize/nested_utils.rs +++ b/src/io/parquet/read/deserialize/nested_utils.rs @@ -455,7 +455,7 @@ fn extend_offsets2<'a, D: NestedDecoder<'a>>( let next_rep = page.iter.peek().map(|x| x.0).unwrap_or(0); - if next_rep == 0 && rows == additional.saturating_add(1) { + if next_rep == 0 && rows == additional { break; } } diff --git a/src/io/parquet/read/file.rs b/src/io/parquet/read/file.rs index 9f866a8dce5..474f29f7de6 100644 --- a/src/io/parquet/read/file.rs +++ b/src/io/parquet/read/file.rs @@ -240,6 +240,7 @@ impl RowGroupReader { row_group, self.schema.fields.clone(), self.chunk_size, + Some(self.remaining_rows), )?; let result = RowGroupDeserializer::new( diff --git a/src/io/parquet/read/row_group.rs b/src/io/parquet/read/row_group.rs index deefaea3644..8e9a6a97c64 100644 --- a/src/io/parquet/read/row_group.rs +++ b/src/io/parquet/read/row_group.rs @@ -65,14 +65,7 @@ impl Iterator for RowGroupDeserializer { let chunk = self .column_chunks .iter_mut() - .map(|iter| { - let array = iter.next().unwrap()?; - Ok(if array.len() > self.remaining_rows { - array.slice(0, array.len() - self.remaining_rows) - } else { - array - }) - }) + .map(|iter| iter.next().unwrap()) .collect::>>() .and_then(Chunk::try_new); self.remaining_rows = self.remaining_rows.saturating_sub( @@ -218,7 +211,11 @@ pub fn read_columns_many<'a, R: Read + Seek>( row_group: &RowGroupMetaData, fields: Vec, chunk_size: Option, + limit: Option, ) -> Result>> { + let num_rows = row_group.num_rows(); + let num_rows = limit.map(|limit| limit.min(num_rows)).unwrap_or(num_rows); + // reads all the necessary columns for all fields from the row group // This operation is IO-bounded `O(C)` where C is the number of columns in the row group let field_columns = fields @@ -229,9 +226,7 @@ pub fn read_columns_many<'a, R: Read + Seek>( field_columns .into_iter() .zip(fields.into_iter()) - .map(|(columns, field)| { - to_deserializer(columns, field, row_group.num_rows() as usize, chunk_size) - }) + .map(|(columns, field)| to_deserializer(columns, field, num_rows, chunk_size)) .collect() } diff --git a/src/io/print.rs b/src/io/print.rs index 08cc3e3efb8..9cb0438f645 100644 --- a/src/io/print.rs +++ b/src/io/print.rs @@ -8,27 +8,27 @@ use crate::{ use comfy_table::{Cell, Table}; /// Returns a visual representation of [`Chunk`] -pub fn write, N: AsRef>(batches: &[Chunk], names: &[N]) -> String { +pub fn write, N: AsRef>(chunks: &[Chunk], names: &[N]) -> String { let mut table = Table::new(); table.load_preset("||--+-++| ++++++"); - if batches.is_empty() { + if chunks.is_empty() { return table.to_string(); } let header = names.iter().map(|name| Cell::new(name.as_ref())); table.set_header(header); - for batch in batches { - let displayes = batch + for chunk in chunks { + let displayes = chunk .arrays() .iter() .map(|array| get_display(array.as_ref(), "")) .collect::>(); - for row in 0..batch.len() { + for row in 0..chunk.len() { let mut cells = Vec::new(); - (0..batch.arrays().len()).for_each(|col| { + (0..chunk.arrays().len()).for_each(|col| { let mut string = String::new(); displayes[col](&mut string, row).unwrap(); cells.push(Cell::new(string));