From cfae03e031e08dfd6261ee73b40f9b941e060833 Mon Sep 17 00:00:00 2001 From: Blake Smith Date: Sun, 10 Oct 2021 00:54:31 -0500 Subject: [PATCH] Change IPC `FileReader` to own the underlying reader Closes: https://github.com/jorgecarleitao/arrow2/issues/514 --- examples/extension.rs | 8 ++++---- examples/ipc_file_read.rs | 2 +- .../src/bin/arrow-file-to-stream.rs | 2 +- .../src/bin/arrow-json-integration-test.rs | 4 ++-- src/io/ipc/read/reader.rs | 17 +++++++++++------ tests/it/io/ipc/read/file.rs | 2 +- tests/it/io/ipc/write/file.rs | 4 ++-- 7 files changed, 22 insertions(+), 17 deletions(-) diff --git a/examples/extension.rs b/examples/extension.rs index e6efd061853..25807924bc3 100644 --- a/examples/extension.rs +++ b/examples/extension.rs @@ -46,9 +46,9 @@ fn write_ipc(writer: W, array: impl Array + 'static) -> Result< Ok(writer.into_inner()) } -fn read_ipc(reader: &[u8]) -> Result { - let mut reader = Cursor::new(reader); - let metadata = read::read_file_metadata(&mut reader)?; - let mut reader = read::FileReader::new(&mut reader, metadata, None); +fn read_ipc(buf: &[u8]) -> Result { + let mut cursor = Cursor::new(buf); + let metadata = read::read_file_metadata(&mut cursor)?; + let mut reader = read::FileReader::new(cursor, metadata, None); reader.next().unwrap() } diff --git a/examples/ipc_file_read.rs b/examples/ipc_file_read.rs index 311ed7618fe..a1792f10399 100644 --- a/examples/ipc_file_read.rs +++ b/examples/ipc_file_read.rs @@ -12,7 +12,7 @@ fn read_batches(path: &str) -> Result> { let metadata = read_file_metadata(&mut file)?; // Simplest way: use the reader, an iterator over batches. - let reader = FileReader::new(&mut file, metadata, None); + let reader = FileReader::new(file, metadata, None); reader.collect() } diff --git a/integration-testing/src/bin/arrow-file-to-stream.rs b/integration-testing/src/bin/arrow-file-to-stream.rs index 34da2bdf075..f060bd36820 100644 --- a/integration-testing/src/bin/arrow-file-to-stream.rs +++ b/integration-testing/src/bin/arrow-file-to-stream.rs @@ -27,7 +27,7 @@ fn main() -> Result<()> { let filename = &args[1]; let mut f = File::open(filename)?; let metadata = read::read_file_metadata(&mut f)?; - let mut reader = read::FileReader::new(&mut f, metadata, None); + let mut reader = read::FileReader::new(f, metadata, None); let schema = reader.schema(); let mut writer = StreamWriter::try_new(std::io::stdout(), schema)?; diff --git a/integration-testing/src/bin/arrow-json-integration-test.rs b/integration-testing/src/bin/arrow-json-integration-test.rs index dc28a7ea004..3dee11da68c 100644 --- a/integration-testing/src/bin/arrow-json-integration-test.rs +++ b/integration-testing/src/bin/arrow-json-integration-test.rs @@ -99,7 +99,7 @@ fn arrow_to_json(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> let mut arrow_file = File::open(arrow_name)?; let metadata = read::read_file_metadata(&mut arrow_file)?; - let reader = read::FileReader::new(&mut arrow_file, metadata, None); + let reader = read::FileReader::new(arrow_file, metadata, None); let mut fields: Vec = vec![]; for f in reader.schema().fields() { @@ -137,7 +137,7 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> { // open Arrow file let mut arrow_file = File::open(arrow_name)?; let metadata = read::read_file_metadata(&mut arrow_file)?; - let reader = read::FileReader::new(&mut arrow_file, metadata, None); + let reader = read::FileReader::new(arrow_file, metadata, None); let arrow_schema = reader.schema().as_ref().to_owned(); // compare schemas diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index 61c5f3ca6d9..f82feba507e 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -62,8 +62,8 @@ impl FileMetadata { } /// Arrow File reader -pub struct FileReader<'a, R: Read + Seek> { - reader: &'a mut R, +pub struct FileReader { + reader: R, metadata: FileMetadata, current_block: usize, projection: Option<(Vec, Arc)>, @@ -231,11 +231,11 @@ pub fn read_batch( } } -impl<'a, R: Read + Seek> FileReader<'a, R> { +impl FileReader { /// Creates a new [`FileReader`]. Use `projection` to only take certain columns. /// # Panic /// Panics iff the projection is not in increasing order (e.g. `[1, 0]` nor `[0, 1, 1]` are valid) - pub fn new(reader: &'a mut R, metadata: FileMetadata, projection: Option>) -> Self { + pub fn new(reader: R, metadata: FileMetadata, projection: Option>) -> Self { if let Some(projection) = projection.as_ref() { let _ = projection.iter().fold(0, |mut acc, v| { assert!( @@ -270,9 +270,14 @@ impl<'a, R: Read + Seek> FileReader<'a, R> { .map(|x| &x.1) .unwrap_or(&self.metadata.schema) } + + /// Consumes this FileReader, returning the underlying reader + pub fn into_inner(self) -> R { + self.reader + } } -impl<'a, R: Read + Seek> Iterator for FileReader<'a, R> { +impl Iterator for FileReader { type Item = Result; fn next(&mut self) -> Option { @@ -295,7 +300,7 @@ impl<'a, R: Read + Seek> Iterator for FileReader<'a, R> { } } -impl<'a, R: Read + Seek> RecordBatchReader for FileReader<'a, R> { +impl RecordBatchReader for FileReader { fn schema(&self) -> &Schema { self.schema().as_ref() } diff --git a/tests/it/io/ipc/read/file.rs b/tests/it/io/ipc/read/file.rs index 4f5d584e33c..c869b385857 100644 --- a/tests/it/io/ipc/read/file.rs +++ b/tests/it/io/ipc/read/file.rs @@ -16,7 +16,7 @@ fn test_file(version: &str, file_name: &str) -> Result<()> { let (schema, batches) = read_gzip_json(version, file_name)?; let metadata = read_file_metadata(&mut file)?; - let reader = FileReader::new(&mut file, metadata, None); + let reader = FileReader::new(file, metadata, None); assert_eq!(&schema, reader.schema().as_ref()); diff --git a/tests/it/io/ipc/write/file.rs b/tests/it/io/ipc/write/file.rs index e9ebc7da3c8..b9061361ce4 100644 --- a/tests/it/io/ipc/write/file.rs +++ b/tests/it/io/ipc/write/file.rs @@ -23,7 +23,7 @@ fn round_trip(batch: RecordBatch) -> Result<()> { let metadata = read_file_metadata(&mut reader)?; let schema = metadata.schema().clone(); - let reader = FileReader::new(&mut reader, metadata, None); + let reader = FileReader::new(reader, metadata, None); // read expected JSON output let (expected_schema, expected_batches) = (batch.schema().clone(), vec![batch]); @@ -55,7 +55,7 @@ fn test_file(version: &str, file_name: &str) -> Result<()> { let metadata = read_file_metadata(&mut reader)?; let schema = metadata.schema().clone(); - let reader = FileReader::new(&mut reader, metadata, None); + let reader = FileReader::new(reader, metadata, None); // read expected JSON output let (expected_schema, expected_batches) = read_gzip_json(version, file_name)?;