diff --git a/examples/extension.rs b/examples/extension.rs index 158b9bbd438..c0aea843166 100644 --- a/examples/extension.rs +++ b/examples/extension.rs @@ -51,6 +51,6 @@ fn write_ipc(writer: W, array: impl Array + 'static) -> Result< fn read_ipc(buf: &[u8]) -> Result>> { let mut cursor = Cursor::new(buf); let metadata = read::read_file_metadata(&mut cursor)?; - let mut reader = read::FileReader::new(cursor, metadata, None); + let mut reader = read::FileReader::new(cursor, metadata, None, None); reader.next().unwrap() } diff --git a/examples/ipc_file_read.rs b/examples/ipc_file_read.rs index 18ceed0ba84..c171a5d18db 100644 --- a/examples/ipc_file_read.rs +++ b/examples/ipc_file_read.rs @@ -9,7 +9,7 @@ use arrow2::io::print; /// Simplest way: read all record batches from the file. This can be used e.g. for random access. #[allow(clippy::type_complexity)] -fn read_batches(path: &str) -> Result<(Schema, Vec>>)> { +fn read_chunks(path: &str) -> Result<(Schema, Vec>>)> { let mut file = File::open(path)?; // read the files' metadata. At this point, we can distribute the read whatever we like. @@ -18,10 +18,10 @@ fn read_batches(path: &str) -> Result<(Schema, Vec>>)> { let schema = metadata.schema.clone(); // Simplest way: use the reader, an iterator over batches. - let reader = read::FileReader::new(file, metadata, None); + let reader = read::FileReader::new(file, metadata, None, None); - let columns = reader.collect::>>()?; - Ok((schema, columns)) + let chunks = reader.collect::>>()?; + Ok((schema, chunks)) } /// Random access way: read a single record batch from the file. This can be used e.g. for random access. @@ -36,6 +36,7 @@ fn read_batch(path: &str) -> Result<(Schema, Chunk>)> { // advanced way: read the dictionary let dictionaries = read::read_file_dictionaries(&mut file, &metadata, &mut Default::default())?; + // and the chunk let chunk_index = 0; let chunk = read::read_batch( @@ -43,6 +44,7 @@ fn read_batch(path: &str) -> Result<(Schema, Chunk>)> { &dictionaries, &metadata, None, + None, chunk_index, &mut Default::default(), &mut Default::default(), @@ -57,12 +59,12 @@ fn main() -> Result<()> { let file_path = &args[1]; - let (schema, batches) = read_batches(file_path)?; + let (schema, chunks) = read_chunks(file_path)?; let names = schema.fields.iter().map(|f| &f.name).collect::>(); - println!("{}", print::write(&batches, &names)); + println!("{}", print::write(&chunks, &names)); - let (schema, batch) = read_batch(file_path)?; + let (schema, chunk) = read_batch(file_path)?; let names = schema.fields.iter().map(|f| &f.name).collect::>(); - println!("{}", print::write(&[batch], &names)); + println!("{}", print::write(&[chunk], &names)); Ok(()) } diff --git a/examples/ipc_file_write.rs b/examples/ipc_file_write.rs index 74c7ffeedff..51f570b228c 100644 --- a/examples/ipc_file_write.rs +++ b/examples/ipc_file_write.rs @@ -6,15 +6,15 @@ use arrow2::datatypes::{DataType, Field, Schema}; use arrow2::error::Result; use arrow2::io::ipc::write; -fn write_batches(path: &str, schema: Schema, columns: &[Chunk>]) -> Result<()> { +fn write_batches(path: &str, schema: Schema, chunks: &[Chunk>]) -> Result<()> { let file = File::create(path)?; let options = write::WriteOptions { compression: None }; let mut writer = write::FileWriter::new(file, schema, None, options); writer.start()?; - for columns in columns { - writer.write(columns, None)? + for chunk in chunks { + writer.write(chunk, None)? } writer.finish() } @@ -34,9 +34,9 @@ fn main() -> Result<()> { let a = Int32Array::from_slice(&[1, 2, 3, 4, 5]); let b = Utf8Array::::from_slice(&["a", "b", "c", "d", "e"]); - let batch = Chunk::try_new(vec![a.boxed(), b.boxed()])?; + let chunk = Chunk::try_new(vec![a.boxed(), b.boxed()])?; // write it - write_batches(file_path, schema, &[batch])?; + write_batches(file_path, schema, &[chunk])?; Ok(()) } diff --git a/examples/parquet_write_parallel/src/main.rs b/examples/parquet_write_parallel/src/main.rs index 3d6a0fe0d4b..28cf299310c 100644 --- a/examples/parquet_write_parallel/src/main.rs +++ b/examples/parquet_write_parallel/src/main.rs @@ -58,7 +58,7 @@ fn parallel_write(path: &str, schema: Schema, chunks: &[Chunk]) -> Result<()> { // declare encodings let encodings = (&schema.fields) - .par_iter() + .iter() .map(|f| transverse(&f.data_type, encoding_map)) .collect::>(); diff --git a/integration-testing/src/bin/arrow-file-to-stream.rs b/integration-testing/src/bin/arrow-file-to-stream.rs index ae8ed094fdc..c63606e28c3 100644 --- a/integration-testing/src/bin/arrow-file-to-stream.rs +++ b/integration-testing/src/bin/arrow-file-to-stream.rs @@ -32,7 +32,7 @@ fn main() -> Result<()> { let filename = args.file_name; let mut f = File::open(filename)?; let metadata = read::read_file_metadata(&mut f)?; - let mut reader = read::FileReader::new(f, metadata.clone(), None); + let mut reader = read::FileReader::new(f, metadata.clone(), None, None); let options = write::WriteOptions { compression: None }; let mut writer = write::StreamWriter::new(std::io::stdout(), options); diff --git a/integration-testing/src/bin/arrow-json-integration-test.rs b/integration-testing/src/bin/arrow-json-integration-test.rs index 7152fb40a70..31f58a4ff2f 100644 --- a/integration-testing/src/bin/arrow-json-integration-test.rs +++ b/integration-testing/src/bin/arrow-json-integration-test.rs @@ -80,7 +80,7 @@ fn arrow_to_json(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> let mut arrow_file = File::open(arrow_name)?; let metadata = read::read_file_metadata(&mut arrow_file)?; - let reader = read::FileReader::new(arrow_file, metadata.clone(), None); + let reader = read::FileReader::new(arrow_file, metadata.clone(), None, None); let names = metadata .schema @@ -118,7 +118,7 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> { // open Arrow file let mut arrow_file = File::open(arrow_name)?; let metadata = read::read_file_metadata(&mut arrow_file)?; - let reader = read::FileReader::new(arrow_file, metadata, None); + let reader = read::FileReader::new(arrow_file, metadata, None, None); let arrow_schema = reader.schema(); // compare schemas diff --git a/integration-testing/src/flight_server_scenarios/integration_test.rs b/integration-testing/src/flight_server_scenarios/integration_test.rs index c224967bfa2..9d21cf0c975 100644 --- a/integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/integration-testing/src/flight_server_scenarios/integration_test.rs @@ -285,6 +285,7 @@ fn chunk_from_message( fields, ipc_schema, None, + None, dictionaries, arrow_format::ipc::MetadataVersion::V5, &mut reader, diff --git a/src/io/flight/mod.rs b/src/io/flight/mod.rs index 9ed0835bb84..59ba0a83d88 100644 --- a/src/io/flight/mod.rs +++ b/src/io/flight/mod.rs @@ -137,6 +137,7 @@ pub fn deserialize_batch( fields, ipc_schema, None, + None, dictionaries, message.version()?, &mut reader, diff --git a/src/io/ipc/mod.rs b/src/io/ipc/mod.rs index 68124c8bfa0..3d8a8ccb994 100644 --- a/src/io/ipc/mod.rs +++ b/src/io/ipc/mod.rs @@ -58,10 +58,10 @@ //! // Fetch some of the data and get the reader back //! let mut reader = File::open(&path)?; //! let metadata = read_file_metadata(&mut reader)?; -//! let mut filereader = FileReader::new(reader, metadata, None); -//! let row1 = filereader.next().unwrap(); // [[-1, 1], [1, -1]] -//! let row2 = filereader.next().unwrap(); // [[-1, 1], [1, -1]] -//! let mut reader = filereader.into_inner(); +//! let mut reader = FileReader::new(reader, metadata, None, None); +//! let row1 = reader.next().unwrap(); // [[-1, 1], [1, -1]] +//! let row2 = reader.next().unwrap(); // [[-1, 1], [1, -1]] +//! let mut reader = reader.into_inner(); //! // Do more stuff with the reader, like seeking ahead. //! # Ok::<(), Error>(()) //! ``` diff --git a/src/io/ipc/read/array/binary.rs b/src/io/ipc/read/array/binary.rs index f32ca3e8713..3ee6c4ba288 100644 --- a/src/io/ipc/read/array/binary.rs +++ b/src/io/ipc/read/array/binary.rs @@ -18,6 +18,7 @@ pub fn read_binary( block_offset: u64, is_little_endian: bool, compression: Option, + limit: Option, scratch: &mut Vec, ) -> Result> { let field_node = field_nodes.pop_front().ok_or_else(|| { @@ -34,6 +35,7 @@ pub fn read_binary( block_offset, is_little_endian, compression, + limit, scratch, )?; @@ -41,6 +43,7 @@ pub fn read_binary( .length() .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + let length = limit.map(|limit| limit.min(length)).unwrap_or(length); let offsets: Buffer = read_buffer( buffers, @@ -54,7 +57,7 @@ pub fn read_binary( // Older versions of the IPC format sometimes do not report an offset .or_else(|_| Result::Ok(Buffer::::from(vec![O::default()])))?; - let last_offset = offsets.as_slice()[offsets.len() - 1].to_usize(); + let last_offset = offsets.last().unwrap().to_usize(); let values = read_buffer( buffers, last_offset, diff --git a/src/io/ipc/read/array/boolean.rs b/src/io/ipc/read/array/boolean.rs index 4440062a299..f53c0afd99b 100644 --- a/src/io/ipc/read/array/boolean.rs +++ b/src/io/ipc/read/array/boolean.rs @@ -17,6 +17,7 @@ pub fn read_boolean( block_offset: u64, is_little_endian: bool, compression: Option, + limit: Option, scratch: &mut Vec, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { @@ -26,11 +27,6 @@ pub fn read_boolean( )) })?; - let length: usize = field_node - .length() - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - let validity = read_validity( buffers, field_node, @@ -38,9 +34,16 @@ pub fn read_boolean( block_offset, is_little_endian, compression, + limit, scratch, )?; + let length: usize = field_node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + let length = limit.map(|limit| limit.min(length)).unwrap_or(length); + let values = read_bitmap( buffers, length, diff --git a/src/io/ipc/read/array/dictionary.rs b/src/io/ipc/read/array/dictionary.rs index 63a0cb0b80f..69c617852f6 100644 --- a/src/io/ipc/read/array/dictionary.rs +++ b/src/io/ipc/read/array/dictionary.rs @@ -18,6 +18,7 @@ pub fn read_dictionary( dictionaries: &Dictionaries, block_offset: u64, compression: Option, + limit: Option, is_little_endian: bool, scratch: &mut Vec, ) -> Result> @@ -48,6 +49,7 @@ where block_offset, is_little_endian, compression, + limit, scratch, )?; diff --git a/src/io/ipc/read/array/fixed_size_binary.rs b/src/io/ipc/read/array/fixed_size_binary.rs index ffceb00bc94..72e0bd8f5b9 100644 --- a/src/io/ipc/read/array/fixed_size_binary.rs +++ b/src/io/ipc/read/array/fixed_size_binary.rs @@ -17,6 +17,7 @@ pub fn read_fixed_size_binary( block_offset: u64, is_little_endian: bool, compression: Option, + limit: Option, scratch: &mut Vec, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { @@ -33,6 +34,7 @@ pub fn read_fixed_size_binary( block_offset, is_little_endian, compression, + limit, scratch, )?; @@ -40,6 +42,7 @@ pub fn read_fixed_size_binary( .length() .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + let length = limit.map(|limit| limit.min(length)).unwrap_or(length); let length = length.saturating_mul(FixedSizeBinaryArray::maybe_get_size(&data_type)?); let values = read_buffer( diff --git a/src/io/ipc/read/array/fixed_size_list.rs b/src/io/ipc/read/array/fixed_size_list.rs index 3ea4ef42b30..94c1c7662c4 100644 --- a/src/io/ipc/read/array/fixed_size_list.rs +++ b/src/io/ipc/read/array/fixed_size_list.rs @@ -21,6 +21,7 @@ pub fn read_fixed_size_list( block_offset: u64, is_little_endian: bool, compression: Option, + limit: Option, version: Version, scratch: &mut Vec, ) -> Result { @@ -38,10 +39,13 @@ pub fn read_fixed_size_list( block_offset, is_little_endian, compression, + limit, scratch, )?; - let (field, _) = FixedSizeListArray::get_child_and_size(&data_type); + let (field, size) = FixedSizeListArray::get_child_and_size(&data_type); + + let limit = limit.map(|x| x.saturating_mul(size)); let values = read( field_nodes, @@ -53,6 +57,7 @@ pub fn read_fixed_size_list( block_offset, is_little_endian, compression, + limit, version, scratch, )?; diff --git a/src/io/ipc/read/array/list.rs b/src/io/ipc/read/array/list.rs index e12dc5e02ec..7870b85a7b1 100644 --- a/src/io/ipc/read/array/list.rs +++ b/src/io/ipc/read/array/list.rs @@ -23,6 +23,7 @@ pub fn read_list( block_offset: u64, is_little_endian: bool, compression: Option, + limit: Option, version: Version, scratch: &mut Vec, ) -> Result> @@ -43,6 +44,7 @@ where block_offset, is_little_endian, compression, + limit, scratch, )?; @@ -50,6 +52,7 @@ where .length() .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + let length = limit.map(|limit| limit.min(length)).unwrap_or(length); let offsets = read_buffer::( buffers, @@ -63,6 +66,8 @@ where // Older versions of the IPC format sometimes do not report an offset .or_else(|_| Result::Ok(Buffer::::from(vec![O::default()])))?; + let last_offset = offsets.last().unwrap().to_usize(); + let field = ListArray::::get_child_field(&data_type); let values = read( @@ -75,6 +80,7 @@ where block_offset, is_little_endian, compression, + Some(last_offset), version, scratch, )?; diff --git a/src/io/ipc/read/array/map.rs b/src/io/ipc/read/array/map.rs index 8a2a5e4a124..b0803ca5f2f 100644 --- a/src/io/ipc/read/array/map.rs +++ b/src/io/ipc/read/array/map.rs @@ -22,6 +22,7 @@ pub fn read_map( block_offset: u64, is_little_endian: bool, compression: Option, + limit: Option, version: Version, scratch: &mut Vec, ) -> Result { @@ -39,6 +40,7 @@ pub fn read_map( block_offset, is_little_endian, compression, + limit, scratch, )?; @@ -46,6 +48,7 @@ pub fn read_map( .length() .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + let length = limit.map(|limit| limit.min(length)).unwrap_or(length); let offsets = read_buffer::( buffers, @@ -61,6 +64,8 @@ pub fn read_map( let field = MapArray::get_field(&data_type); + let last_offset: usize = offsets.last().copied().unwrap() as usize; + let field = read( field_nodes, field, @@ -71,6 +76,7 @@ pub fn read_map( block_offset, is_little_endian, compression, + Some(last_offset), version, scratch, )?; diff --git a/src/io/ipc/read/array/primitive.rs b/src/io/ipc/read/array/primitive.rs index 434aab0bd9c..bba155f78ea 100644 --- a/src/io/ipc/read/array/primitive.rs +++ b/src/io/ipc/read/array/primitive.rs @@ -17,6 +17,7 @@ pub fn read_primitive( block_offset: u64, is_little_endian: bool, compression: Option, + limit: Option, scratch: &mut Vec, ) -> Result> where @@ -36,6 +37,7 @@ where block_offset, is_little_endian, compression, + limit, scratch, )?; @@ -43,6 +45,7 @@ where .length() .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + let length = limit.map(|limit| limit.min(length)).unwrap_or(length); let values = read_buffer( buffers, diff --git a/src/io/ipc/read/array/struct_.rs b/src/io/ipc/read/array/struct_.rs index 3bbd777a019..77f5d74bedf 100644 --- a/src/io/ipc/read/array/struct_.rs +++ b/src/io/ipc/read/array/struct_.rs @@ -21,6 +21,7 @@ pub fn read_struct( block_offset: u64, is_little_endian: bool, compression: Option, + limit: Option, version: Version, scratch: &mut Vec, ) -> Result { @@ -38,6 +39,7 @@ pub fn read_struct( block_offset, is_little_endian, compression, + limit, scratch, )?; @@ -57,6 +59,7 @@ pub fn read_struct( block_offset, is_little_endian, compression, + limit, version, scratch, ) diff --git a/src/io/ipc/read/array/union.rs b/src/io/ipc/read/array/union.rs index 678d65ded9d..627d912b443 100644 --- a/src/io/ipc/read/array/union.rs +++ b/src/io/ipc/read/array/union.rs @@ -22,6 +22,7 @@ pub fn read_union( block_offset: u64, is_little_endian: bool, compression: Option, + limit: Option, version: Version, scratch: &mut Vec, ) -> Result { @@ -42,6 +43,7 @@ pub fn read_union( .length() .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + let length = limit.map(|limit| limit.min(length)).unwrap_or(length); let types = read_buffer( buffers, @@ -87,6 +89,7 @@ pub fn read_union( block_offset, is_little_endian, compression, + None, version, scratch, ) diff --git a/src/io/ipc/read/array/utf8.rs b/src/io/ipc/read/array/utf8.rs index 342da0c4de8..f57e51ecc5a 100644 --- a/src/io/ipc/read/array/utf8.rs +++ b/src/io/ipc/read/array/utf8.rs @@ -18,6 +18,7 @@ pub fn read_utf8( block_offset: u64, is_little_endian: bool, compression: Option, + limit: Option, scratch: &mut Vec, ) -> Result> { let field_node = field_nodes.pop_front().ok_or_else(|| { @@ -34,6 +35,7 @@ pub fn read_utf8( block_offset, is_little_endian, compression, + limit, scratch, )?; @@ -42,6 +44,8 @@ pub fn read_utf8( .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + let length = limit.map(|limit| limit.min(length)).unwrap_or(length); + let offsets: Buffer = read_buffer( buffers, 1 + length, @@ -54,7 +58,7 @@ pub fn read_utf8( // Older versions of the IPC format sometimes do not report an offset .or_else(|_| Result::Ok(Buffer::::from(vec![O::default()])))?; - let last_offset = offsets.as_slice()[offsets.len() - 1].to_usize(); + let last_offset = offsets.last().unwrap().to_usize(); let values = read_buffer( buffers, last_offset, diff --git a/src/io/ipc/read/common.rs b/src/io/ipc/read/common.rs index 26252dd1831..fe445f20791 100644 --- a/src/io/ipc/read/common.rs +++ b/src/io/ipc/read/common.rs @@ -71,7 +71,7 @@ impl<'a, A, I: Iterator> Iterator for ProjectionIter<'a, A, I> { } } -/// Creates a record batch from binary data using the `ipc::RecordBatch` indexes and the `Schema` +/// Returns a [`Chunk`] from a reader. /// # Panic /// Panics iff the projection is not in increasing order (e.g. `[1, 0]` nor `[0, 1, 1]` are valid) #[allow(clippy::too_many_arguments)] @@ -80,6 +80,7 @@ pub fn read_record_batch( fields: &[Field], ipc_schema: &IpcSchema, projection: Option<&[usize]>, + limit: Option, dictionaries: &Dictionaries, version: arrow_format::ipc::MetadataVersion, reader: &mut R, @@ -136,6 +137,7 @@ pub fn read_record_batch( batch.compression().map_err(|err| { Error::from(OutOfSpecKind::InvalidFlatbufferCompression(err)) })?, + limit, version, scratch, )?)), @@ -163,6 +165,7 @@ pub fn read_record_batch( batch.compression().map_err(|err| { Error::from(OutOfSpecKind::InvalidFlatbufferCompression(err)) })?, + limit, version, scratch, ) @@ -270,6 +273,7 @@ pub fn read_dictionary( &fields, &ipc_schema, None, + None, // we must read the whole dictionary dictionaries, arrow_format::ipc::MetadataVersion::V5, reader, diff --git a/src/io/ipc/read/deserialize.rs b/src/io/ipc/read/deserialize.rs index 780f33e0d59..fec5676def5 100644 --- a/src/io/ipc/read/deserialize.rs +++ b/src/io/ipc/read/deserialize.rs @@ -23,6 +23,7 @@ pub fn read( block_offset: u64, is_little_endian: bool, compression: Option, + limit: Option, version: MetadataVersion, scratch: &mut Vec, ) -> Result> { @@ -39,6 +40,7 @@ pub fn read( block_offset, is_little_endian, compression, + limit, scratch, ) .map(|x| x.boxed()), @@ -51,6 +53,7 @@ pub fn read( block_offset, is_little_endian, compression, + limit, scratch, ) .map(|x| x.boxed()) @@ -64,6 +67,7 @@ pub fn read( block_offset, is_little_endian, compression, + limit, scratch, )?; Ok(Box::new(array)) @@ -77,6 +81,7 @@ pub fn read( block_offset, is_little_endian, compression, + limit, scratch, )?; Ok(Box::new(array)) @@ -90,6 +95,7 @@ pub fn read( block_offset, is_little_endian, compression, + limit, scratch, )?; Ok(Box::new(array)) @@ -103,6 +109,7 @@ pub fn read( block_offset, is_little_endian, compression, + limit, scratch, )?; Ok(Box::new(array)) @@ -116,6 +123,7 @@ pub fn read( block_offset, is_little_endian, compression, + limit, scratch, )?; Ok(Box::new(array)) @@ -130,6 +138,7 @@ pub fn read( block_offset, is_little_endian, compression, + limit, version, scratch, ) @@ -144,6 +153,7 @@ pub fn read( block_offset, is_little_endian, compression, + limit, version, scratch, ) @@ -158,6 +168,7 @@ pub fn read( block_offset, is_little_endian, compression, + limit, version, scratch, ) @@ -172,6 +183,7 @@ pub fn read( block_offset, is_little_endian, compression, + limit, version, scratch, ) @@ -186,8 +198,9 @@ pub fn read( dictionaries, block_offset, compression, + limit, is_little_endian, - scratch + scratch, ) .map(|x| x.boxed()) }) @@ -202,6 +215,7 @@ pub fn read( block_offset, is_little_endian, compression, + limit, version, scratch, ) @@ -216,6 +230,7 @@ pub fn read( block_offset, is_little_endian, compression, + limit, version, scratch, ) diff --git a/src/io/ipc/read/file_async.rs b/src/io/ipc/read/file_async.rs index 100990a6509..0d80b80ac67 100644 --- a/src/io/ipc/read/file_async.rs +++ b/src/io/ipc/read/file_async.rs @@ -31,7 +31,12 @@ impl<'a> FileStream<'a> { /// /// # Examples /// See [`FileSink`](crate::io::ipc::write::file_async::FileSink). - pub fn new(reader: R, metadata: FileMetadata, projection: Option>) -> Self + pub fn new( + reader: R, + metadata: FileMetadata, + projection: Option>, + limit: Option, + ) -> Self where R: AsyncRead + AsyncSeek + Unpin + Send + 'a, { @@ -46,7 +51,7 @@ impl<'a> FileStream<'a> { (None, None) }; - let stream = Self::stream(reader, None, metadata.clone(), projection); + let stream = Self::stream(reader, None, metadata.clone(), projection, limit); Self { stream, metadata, @@ -69,6 +74,7 @@ impl<'a> FileStream<'a> { mut dictionaries: Option, metadata: FileMetadata, projection: Option<(Vec, HashMap)>, + limit: Option, ) -> BoxStream<'a, Result>>> where R: AsyncRead + AsyncSeek + Unpin + Send + 'a, @@ -80,17 +86,20 @@ impl<'a> FileStream<'a> { let mut meta_buffer = Default::default(); let mut block_buffer = Default::default(); let mut scratch = Default::default(); + let mut remaining = limit.unwrap_or(usize::MAX); for block in 0..metadata.blocks.len() { let chunk = read_batch( &mut reader, dictionaries.as_mut().unwrap(), &metadata, projection.as_ref().map(|x| x.0.as_ref()), + Some(remaining), block, &mut meta_buffer, &mut block_buffer, &mut scratch ).await?; + remaining -= chunk.len(); let chunk = if let Some((_, map)) = &projection { // re-order according to projection @@ -159,6 +168,7 @@ async fn read_batch( dictionaries: &mut Dictionaries, metadata: &FileMetadata, projection: Option<&[usize]>, + limit: Option, block: usize, meta_buffer: &mut Vec, block_buffer: &mut Vec, @@ -217,6 +227,7 @@ where &metadata.schema.fields, &metadata.ipc_schema, projection, + limit, dictionaries, message .version() diff --git a/src/io/ipc/read/read_basic.rs b/src/io/ipc/read/read_basic.rs index e58b01b58ed..0a93a63a217 100644 --- a/src/io/ipc/read/read_basic.rs +++ b/src/io/ipc/read/read_basic.rs @@ -254,6 +254,7 @@ pub fn read_bitmap( Bitmap::try_new(buffer, length) } +#[allow(clippy::too_many_arguments)] pub fn read_validity( buffers: &mut VecDeque, field_node: Node, @@ -261,12 +262,14 @@ pub fn read_validity( block_offset: u64, is_little_endian: bool, compression: Option, + limit: Option, scratch: &mut Vec, ) -> Result> { let length: usize = field_node .length() .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + let length = limit.map(|limit| limit.min(length)).unwrap_or(length); Ok(if field_node.null_count() > 0 { Some(read_bitmap( diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index 801f5041aed..b18ca4ceade 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -36,18 +36,6 @@ pub struct FileMetadata { pub(crate) size: u64, } -/// Arrow File reader -pub struct FileReader { - reader: R, - metadata: FileMetadata, - // the dictionaries are going to be read - dictionaries: Option, - current_block: usize, - projection: Option<(Vec, HashMap, Schema)>, - data_scratch: Vec, - message_scratch: Vec, -} - fn read_dictionary_message( reader: &mut R, offset: u64, @@ -262,11 +250,13 @@ pub(super) fn get_serialized_batch<'a>( /// certain parts of the file. /// # Panics /// This function panics iff `index >= metadata.blocks.len()` +#[allow(clippy::too_many_arguments)] pub fn read_batch( reader: &mut R, dictionaries: &Dictionaries, metadata: &FileMetadata, projection: Option<&[usize]>, + limit: Option, index: usize, message_scratch: &mut Vec, data_scratch: &mut Vec, @@ -317,6 +307,7 @@ pub fn read_batch( &metadata.schema.fields, &metadata.ipc_schema, projection, + limit, dictionaries, message .version() @@ -328,11 +319,29 @@ pub fn read_batch( ) } +/// An iterator of [`Chunk`]s from an Arrow IPC file. +pub struct FileReader { + reader: R, + metadata: FileMetadata, + // the dictionaries are going to be read + dictionaries: Option, + current_block: usize, + projection: Option<(Vec, HashMap, Schema)>, + remaining: usize, + data_scratch: Vec, + message_scratch: Vec, +} + impl FileReader { /// Creates a new [`FileReader`]. Use `projection` to only take certain columns. /// # Panic /// Panics iff the projection is not in increasing order (e.g. `[1, 0]` nor `[0, 1, 1]` are valid) - pub fn new(reader: R, metadata: FileMetadata, projection: Option>) -> Self { + pub fn new( + reader: R, + metadata: FileMetadata, + projection: Option>, + limit: Option, + ) -> Self { let projection = projection.map(|projection| { let (p, h, fields) = prepare_projection(&metadata.schema.fields, projection); let schema = Schema { @@ -346,6 +355,7 @@ impl FileReader { metadata, dictionaries: Default::default(), projection, + remaining: limit.unwrap_or(usize::MAX), current_block: 0, data_scratch: Default::default(), message_scratch: Default::default(), @@ -404,10 +414,12 @@ impl Iterator for FileReader { self.dictionaries.as_ref().unwrap(), &self.metadata, self.projection.as_ref().map(|x| x.0.as_ref()), + Some(self.remaining), block, &mut self.message_scratch, &mut self.data_scratch, ); + self.remaining -= chunk.as_ref().map(|x| x.len()).unwrap_or_default(); let chunk = if let Some((_, map, _)) = &self.projection { // re-order according to projection diff --git a/src/io/ipc/read/stream.rs b/src/io/ipc/read/stream.rs index e7d3d590aa4..b3f3a986d68 100644 --- a/src/io/ipc/read/stream.rs +++ b/src/io/ipc/read/stream.rs @@ -173,6 +173,7 @@ fn read_next( &metadata.schema.fields, &metadata.ipc_schema, projection.as_ref().map(|x| x.0.as_ref()), + None, dictionaries, metadata.version, &mut reader, diff --git a/src/io/ipc/read/stream_async.rs b/src/io/ipc/read/stream_async.rs index b637dc0a2d8..4c8556f0ea1 100644 --- a/src/io/ipc/read/stream_async.rs +++ b/src/io/ipc/read/stream_async.rs @@ -143,6 +143,7 @@ async fn maybe_next( &state.metadata.schema.fields, &state.metadata.ipc_schema, None, + None, &state.dictionaries, state.metadata.version, &mut std::io::Cursor::new(&state.data_buffer), diff --git a/src/io/ipc/write/file_async.rs b/src/io/ipc/write/file_async.rs index dfe0c0b7fc4..3ee7e7ee893 100644 --- a/src/io/ipc/write/file_async.rs +++ b/src/io/ipc/write/file_async.rs @@ -54,7 +54,7 @@ type WriteOutput = (usize, Option, Vec, Option); /// // Read chunks from file /// buffer.set_position(0); /// let metadata = read_file_metadata_async(&mut buffer).await?; -/// let mut stream = FileStream::new(buffer, metadata, None); +/// let mut stream = FileStream::new(buffer, metadata, None, None); /// let chunks = stream.try_collect::>().await?; /// # arrow2::error::Result::Ok(()) /// # }).unwrap(); diff --git a/tests/it/io/ipc/read/file.rs b/tests/it/io/ipc/read/file.rs index 022cf4de274..9d21d051f2a 100644 --- a/tests/it/io/ipc/read/file.rs +++ b/tests/it/io/ipc/read/file.rs @@ -17,7 +17,7 @@ fn test_file(version: &str, file_name: &str) -> Result<()> { let (schema, _, batches) = read_gzip_json(version, file_name)?; let metadata = read_file_metadata(&mut file)?; - let reader = FileReader::new(file, metadata, None); + let reader = FileReader::new(file, metadata, None, None); assert_eq!(&schema, reader.schema()); @@ -165,14 +165,12 @@ fn test_projection(version: &str, file_name: &str, columns: Vec) -> Resul testdata, version, file_name ))?; - let metadata = read_file_metadata(&mut file)?; - - let (_, _, chunks) = read_gzip_json(version, file_name)?; + let (schema, _, chunks) = read_gzip_json(version, file_name)?; let expected_fields = columns .iter() .copied() - .map(|x| metadata.schema.fields[x].clone()) + .map(|x| schema.fields[x].clone()) .collect::>(); let expected_chunks = chunks.into_iter().map(|chunk| { @@ -184,7 +182,8 @@ fn test_projection(version: &str, file_name: &str, columns: Vec) -> Resul Chunk::new(columns) }); - let reader = FileReader::new(&mut file, metadata, Some(columns.clone())); + let metadata = read_file_metadata(&mut file)?; + let reader = FileReader::new(&mut file, metadata, Some(columns.clone()), None); assert_eq!(reader.schema().fields, expected_fields); @@ -210,7 +209,7 @@ fn read_corrupted_ipc(data: Vec) -> Result<()> { let mut file = std::io::Cursor::new(data); let metadata = read_file_metadata(&mut file)?; - let mut reader = FileReader::new(file, metadata, None); + let mut reader = FileReader::new(file, metadata, None, None); reader.try_for_each(|rhs| { rhs?; @@ -241,3 +240,37 @@ fn test_does_not_panic() { let _ = read_corrupted_ipc(data); } } + +fn test_limit(version: &str, file_name: &str, limit: usize) -> Result<()> { + let testdata = crate::test_util::arrow_test_data(); + let mut file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, file_name + ))?; + + let (schema, _, _) = read_gzip_json(version, file_name)?; + + let metadata = read_file_metadata(&mut file)?; + let unlimited_chunk = FileReader::new(&mut file, metadata.clone(), None, None) + .next() + .unwrap()?; + let mut reader = FileReader::new(&mut file, metadata, None, Some(limit)); + + assert_eq!(reader.schema(), &schema); + + let chunk = reader.next().unwrap()?; + assert_eq!(chunk.len(), unlimited_chunk.len().min(limit)); + + Ok(()) +} + +#[test] +fn read_limited() -> Result<()> { + test_limit("1.0.0-littleendian", "generated_primitive", 2)?; + test_limit("1.0.0-littleendian", "generated_dictionary", 2)?; + test_limit("1.0.0-littleendian", "generated_union", 2)?; + test_limit("1.0.0-littleendian", "generated_map", 2)?; + test_limit("1.0.0-littleendian", "generated_nested_dictionary", 2)?; + test_limit("1.0.0-littleendian", "generated_nested", 2)?; + Ok(()) +} diff --git a/tests/it/io/ipc/read_file_async.rs b/tests/it/io/ipc/read_file_async.rs index af619604608..b8b61853a3d 100644 --- a/tests/it/io/ipc/read_file_async.rs +++ b/tests/it/io/ipc/read_file_async.rs @@ -17,7 +17,7 @@ async fn test_file(version: &str, file_name: &str) -> Result<()> { .compat(); let metadata = read_file_metadata_async(&mut file).await?; - let mut reader = FileStream::new(file, metadata, None); + let mut reader = FileStream::new(file, metadata, None, None); // read expected JSON output let (schema, ipc_fields, batches) = read_gzip_json(version, file_name)?; diff --git a/tests/it/io/ipc/write/file.rs b/tests/it/io/ipc/write/file.rs index 4b261d97731..a899aa459d0 100644 --- a/tests/it/io/ipc/write/file.rs +++ b/tests/it/io/ipc/write/file.rs @@ -39,7 +39,7 @@ fn round_trip( let metadata = read_file_metadata(&mut reader)?; let schema = metadata.schema.clone(); - let reader = FileReader::new(reader, metadata, None); + let reader = FileReader::new(reader, metadata, None, None); assert_eq!(schema, expected_schema); @@ -64,7 +64,7 @@ fn test_file(version: &str, file_name: &str, compressed: bool) -> Result<()> { let schema = metadata.schema.clone(); let ipc_fields = metadata.ipc_schema.fields.clone(); - let reader = FileReader::new(reader, metadata, None); + let reader = FileReader::new(reader, metadata, None, None); // read expected JSON output let (expected_schema, expected_ipc_fields, expected_batches) = diff --git a/tests/it/io/ipc/write/file_append.rs b/tests/it/io/ipc/write/file_append.rs index 6abec8eabc9..52edde06d24 100644 --- a/tests/it/io/ipc/write/file_append.rs +++ b/tests/it/io/ipc/write/file_append.rs @@ -34,7 +34,7 @@ fn basic() -> Result<()> { // read the file again and confirm that it contains both messages let metadata = read::read_file_metadata(&mut reader)?; assert_eq!(schema, expected_schema); - let reader = read::FileReader::new(reader, metadata, None); + let reader = read::FileReader::new(reader, metadata, None, None); let chunks = reader.collect::>>()?; diff --git a/tests/it/io/ipc/write_file_async.rs b/tests/it/io/ipc/write_file_async.rs index 25b8164f058..d1963184da3 100644 --- a/tests/it/io/ipc/write_file_async.rs +++ b/tests/it/io/ipc/write_file_async.rs @@ -38,7 +38,7 @@ async fn test_file(version: &str, file_name: &str) -> Result<()> { let mut reader = Cursor::new(result); let metadata = read::read_file_metadata(&mut reader)?; - let reader = read::FileReader::new(reader, metadata, None); + let reader = read::FileReader::new(reader, metadata, None, None); let schema = &reader.metadata().schema; let ipc_fields = reader.metadata().ipc_schema.fields.clone();