From 1cf922f31aa910e40cbf6b5d319cb0d4f90216be Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Sat, 18 Sep 2021 01:58:56 +0800 Subject: [PATCH 1/3] Own writer in IPC FileWriter --- benches/write_ipc.rs | 4 +-- examples/extension.rs | 6 ++-- examples/ipc_file_write.rs | 4 +-- .../src/bin/arrow-json-integration-test.rs | 4 +-- .../src/bin/arrow-stream-to-file.rs | 4 +-- src/io/ipc/write/writer.rs | 28 ++++++++----------- tests/it/io/ipc/write/file.rs | 20 +++++++------ 7 files changed, 34 insertions(+), 36 deletions(-) diff --git a/benches/write_ipc.rs b/benches/write_ipc.rs index a0dd9081c4e..ab60d3b856a 100644 --- a/benches/write_ipc.rs +++ b/benches/write_ipc.rs @@ -15,8 +15,8 @@ fn write(array: &dyn Array) -> Result<()> { let schema = Schema::new(vec![field]); let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![clone(array).into()])?; - let mut writer = Cursor::new(vec![]); - let mut writer = FileWriter::try_new(&mut writer, &schema)?; + let writer = Cursor::new(vec![]); + let mut writer = FileWriter::try_new(writer, &schema)?; writer.write(&batch) } diff --git a/examples/extension.rs b/examples/extension.rs index 25210214aa9..5f27bda30c8 100644 --- a/examples/extension.rs +++ b/examples/extension.rs @@ -17,10 +17,10 @@ fn main() -> Result<()> { let array = UInt16Array::from_slice([1, 2]).to(extension_type.clone()); // from here on, it works as usual - let mut buffer = Cursor::new(vec![]); + let buffer = Cursor::new(vec![]); // write to IPC - write_ipc(&mut buffer, array)?; + write_ipc(buffer, array)?; // read it back let batch = read_ipc(&buffer.into_inner())?; @@ -34,7 +34,7 @@ fn main() -> Result<()> { Ok(()) } -fn write_ipc(writer: &mut W, array: impl Array + 'static) -> Result<()> { +fn write_ipc(writer: W, array: impl Array + 'static) -> Result<()> { let schema = Schema::new(vec![Field::new("a", array.data_type().clone(), false)]); let mut writer = write::FileWriter::try_new(writer, &schema)?; diff --git a/examples/ipc_file_write.rs b/examples/ipc_file_write.rs index 474bea999cf..232258f2ab5 100644 --- a/examples/ipc_file_write.rs +++ b/examples/ipc_file_write.rs @@ -8,9 +8,9 @@ use arrow2::io::ipc::write; use arrow2::record_batch::RecordBatch; fn write_batches(path: &str, schema: &Schema, batches: &[RecordBatch]) -> Result<()> { - let mut file = File::create(path)?; + let file = File::create(path)?; - let mut writer = write::FileWriter::try_new(&mut file, schema)?; + let mut writer = write::FileWriter::try_new(file, schema)?; for batch in batches { writer.write(batch)? diff --git a/integration-testing/src/bin/arrow-json-integration-test.rs b/integration-testing/src/bin/arrow-json-integration-test.rs index 2c378cb938a..dc28a7ea004 100644 --- a/integration-testing/src/bin/arrow-json-integration-test.rs +++ b/integration-testing/src/bin/arrow-json-integration-test.rs @@ -80,8 +80,8 @@ fn json_to_arrow(json_name: &str, arrow_name: &str, verbose: bool) -> Result<()> let json_file = read_json_file(json_name)?; - let mut arrow_file = File::create(arrow_name)?; - let mut writer = FileWriter::try_new(&mut arrow_file, &json_file.schema)?; + let arrow_file = File::create(arrow_name)?; + let mut writer = FileWriter::try_new(arrow_file, &json_file.schema)?; for b in json_file.batches { writer.write(&b)?; diff --git a/integration-testing/src/bin/arrow-stream-to-file.rs b/integration-testing/src/bin/arrow-stream-to-file.rs index c17ff7d9e91..f40d8add7eb 100644 --- a/integration-testing/src/bin/arrow-stream-to-file.rs +++ b/integration-testing/src/bin/arrow-stream-to-file.rs @@ -27,9 +27,9 @@ fn main() -> Result<()> { let mut arrow_stream_reader = read::StreamReader::new(reader, metadata); let schema = arrow_stream_reader.schema(); - let mut writer = io::stdout(); + let writer = io::stdout(); - let mut writer = FileWriter::try_new(&mut writer, schema)?; + let mut writer = FileWriter::try_new(writer, schema)?; arrow_stream_reader.try_for_each(|batch| writer.write(&batch?.unwrap()))?; writer.finish()?; diff --git a/src/io/ipc/write/writer.rs b/src/io/ipc/write/writer.rs index df477757e02..6946cb02d75 100644 --- a/src/io/ipc/write/writer.rs +++ b/src/io/ipc/write/writer.rs @@ -20,7 +20,7 @@ //! The `FileWriter` and `StreamWriter` have similar interfaces, //! however the `FileWriter` expects a reader that supports `Seek`ing -use std::io::Write; +use std::io::{Error, Write, BufWriter}; use super::super::ARROW_MAGIC; use super::{ @@ -37,9 +37,9 @@ use crate::datatypes::*; use crate::error::{ArrowError, Result}; use crate::record_batch::RecordBatch; -pub struct FileWriter<'a, W: Write> { +pub struct FileWriter { /// The object to write to - writer: &'a mut W, + writer: BufWriter, /// IPC write options write_options: IpcWriteOptions, /// A reference to the schema, used in validating record batches @@ -56,19 +56,20 @@ pub struct FileWriter<'a, W: Write> { dictionary_tracker: DictionaryTracker, } -impl<'a, W: Write> FileWriter<'a, W> { +impl FileWriter { /// Try create a new writer, with the schema written as part of the header - pub fn try_new(writer: &'a mut W, schema: &Schema) -> Result { + pub fn try_new(writer: W, schema: &Schema) -> Result { let write_options = IpcWriteOptions::default(); Self::try_new_with_options(writer, schema, write_options) } /// Try create a new writer with IpcWriteOptions pub fn try_new_with_options( - writer: &'a mut W, + writer: W, schema: &Schema, write_options: IpcWriteOptions, ) -> Result { + let mut writer = BufWriter::new(writer); // write magic to header writer.write_all(&ARROW_MAGIC[..])?; // create an 8-byte boundary after the header @@ -78,7 +79,7 @@ impl<'a, W: Write> FileWriter<'a, W> { ipc_message: schema_to_bytes(schema, *write_options.metadata_version()), arrow_data: vec![], }; - let (meta, data) = write_message(writer, encoded_message, &write_options)?; + let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?; Ok(Self { writer, write_options, @@ -91,6 +92,10 @@ impl<'a, W: Write> FileWriter<'a, W> { }) } + pub fn into_inner(self) -> Result { + self.writer.into_inner().map_err(|e| Error::from(e).into()) + } + /// Write a record batch to the file pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { if self.finished { @@ -153,12 +158,3 @@ impl<'a, W: Write> FileWriter<'a, W> { Ok(()) } } - -/// Finish the file if it is not 'finished' when it goes out of scope -impl<'a, W: Write> Drop for FileWriter<'a, W> { - fn drop(&mut self) { - if !self.finished { - self.finish().unwrap(); - } - } -} diff --git a/tests/it/io/ipc/write/file.rs b/tests/it/io/ipc/write/file.rs index 449a057eeca..628cccd3d01 100644 --- a/tests/it/io/ipc/write/file.rs +++ b/tests/it/io/ipc/write/file.rs @@ -9,16 +9,17 @@ use arrow2::record_batch::RecordBatch; use crate::io::ipc::common::read_gzip_json; fn round_trip(batch: RecordBatch) -> Result<()> { - let mut result = Vec::::new(); + let result = Vec::::new(); // write IPC version 5 - { + let written_result = { let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)?; - let mut writer = FileWriter::try_new_with_options(&mut result, batch.schema(), options)?; + let mut writer = FileWriter::try_new_with_options(result, batch.schema(), options)?; writer.write(&batch)?; writer.finish()?; - } - let mut reader = Cursor::new(result); + writer.into_inner()? + }; + let mut reader = Cursor::new(written_result); let metadata = read_file_metadata(&mut reader)?; let schema = metadata.schema().clone(); @@ -41,15 +42,16 @@ fn test_file(version: &str, file_name: &str) -> Result<()> { let mut result = Vec::::new(); // write IPC version 5 - { + let written_result = { let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)?; - let mut writer = FileWriter::try_new_with_options(&mut result, &schema, options)?; + let mut writer = FileWriter::try_new_with_options(result, &schema, options)?; for batch in batches { writer.write(&batch)?; } writer.finish()?; - } - let mut reader = Cursor::new(result); + writer.into_inner()? + }; + let mut reader = Cursor::new(written_result); let metadata = read_file_metadata(&mut reader)?; let schema = metadata.schema().clone(); From c317a8205cb9d87d6d78963199ddb9d23936f302 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Sat, 18 Sep 2021 03:44:30 +0800 Subject: [PATCH 2/3] fix compile --- examples/extension.rs | 10 ++++++---- tests/it/io/ipc/write/file.rs | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/examples/extension.rs b/examples/extension.rs index 5f27bda30c8..8462dd6c79f 100644 --- a/examples/extension.rs +++ b/examples/extension.rs @@ -20,10 +20,10 @@ fn main() -> Result<()> { let buffer = Cursor::new(vec![]); // write to IPC - write_ipc(buffer, array)?; + let result_buffer = write_ipc(buffer, array)?; // read it back - let batch = read_ipc(&buffer.into_inner())?; + let batch = read_ipc(&result_buffer.into_inner())?; // and verify that the datatype is preserved. let array = &batch.columns()[0]; @@ -34,14 +34,16 @@ fn main() -> Result<()> { Ok(()) } -fn write_ipc(writer: W, array: impl Array + 'static) -> Result<()> { +fn write_ipc(writer: W, array: impl Array + 'static) -> Result { let schema = Schema::new(vec![Field::new("a", array.data_type().clone(), false)]); let mut writer = write::FileWriter::try_new(writer, &schema)?; let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?; - writer.write(&batch) + writer.write(&batch); + + writer.into_inner() } fn read_ipc(reader: &[u8]) -> Result { diff --git a/tests/it/io/ipc/write/file.rs b/tests/it/io/ipc/write/file.rs index 628cccd3d01..f821d31fc35 100644 --- a/tests/it/io/ipc/write/file.rs +++ b/tests/it/io/ipc/write/file.rs @@ -39,7 +39,7 @@ fn round_trip(batch: RecordBatch) -> Result<()> { fn test_file(version: &str, file_name: &str) -> Result<()> { let (schema, batches) = read_gzip_json(version, file_name)?; - let mut result = Vec::::new(); + let result = Vec::::new(); // write IPC version 5 let written_result = { From b5a9b0991ef91ea8d692441f0dea32099089ed0c Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Tue, 21 Sep 2021 15:32:43 +0800 Subject: [PATCH 3/3] remove buffer --- examples/extension.rs | 4 ++-- src/io/ipc/write/writer.rs | 11 +++++------ tests/it/io/ipc/write/file.rs | 4 ++-- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/examples/extension.rs b/examples/extension.rs index 8462dd6c79f..e6efd061853 100644 --- a/examples/extension.rs +++ b/examples/extension.rs @@ -41,9 +41,9 @@ fn write_ipc(writer: W, array: impl Array + 'static) -> Result< let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?; - writer.write(&batch); + writer.write(&batch)?; - writer.into_inner() + Ok(writer.into_inner()) } fn read_ipc(reader: &[u8]) -> Result { diff --git a/src/io/ipc/write/writer.rs b/src/io/ipc/write/writer.rs index 6946cb02d75..76811dc12a1 100644 --- a/src/io/ipc/write/writer.rs +++ b/src/io/ipc/write/writer.rs @@ -20,7 +20,7 @@ //! The `FileWriter` and `StreamWriter` have similar interfaces, //! however the `FileWriter` expects a reader that supports `Seek`ing -use std::io::{Error, Write, BufWriter}; +use std::io::Write; use super::super::ARROW_MAGIC; use super::{ @@ -39,7 +39,7 @@ use crate::record_batch::RecordBatch; pub struct FileWriter { /// The object to write to - writer: BufWriter, + writer: W, /// IPC write options write_options: IpcWriteOptions, /// A reference to the schema, used in validating record batches @@ -65,11 +65,10 @@ impl FileWriter { /// Try create a new writer with IpcWriteOptions pub fn try_new_with_options( - writer: W, + mut writer: W, schema: &Schema, write_options: IpcWriteOptions, ) -> Result { - let mut writer = BufWriter::new(writer); // write magic to header writer.write_all(&ARROW_MAGIC[..])?; // create an 8-byte boundary after the header @@ -92,8 +91,8 @@ impl FileWriter { }) } - pub fn into_inner(self) -> Result { - self.writer.into_inner().map_err(|e| Error::from(e).into()) + pub fn into_inner(self) -> W { + self.writer } /// Write a record batch to the file diff --git a/tests/it/io/ipc/write/file.rs b/tests/it/io/ipc/write/file.rs index f821d31fc35..005b51403ae 100644 --- a/tests/it/io/ipc/write/file.rs +++ b/tests/it/io/ipc/write/file.rs @@ -17,7 +17,7 @@ fn round_trip(batch: RecordBatch) -> Result<()> { let mut writer = FileWriter::try_new_with_options(result, batch.schema(), options)?; writer.write(&batch)?; writer.finish()?; - writer.into_inner()? + writer.into_inner() }; let mut reader = Cursor::new(written_result); let metadata = read_file_metadata(&mut reader)?; @@ -49,7 +49,7 @@ fn test_file(version: &str, file_name: &str) -> Result<()> { writer.write(&batch)?; } writer.finish()?; - writer.into_inner()? + writer.into_inner() }; let mut reader = Cursor::new(written_result); let metadata = read_file_metadata(&mut reader)?;