diff --git a/benches/write_ipc.rs b/benches/write_ipc.rs index a0dd9081c4e..ab60d3b856a 100644 --- a/benches/write_ipc.rs +++ b/benches/write_ipc.rs @@ -15,8 +15,8 @@ fn write(array: &dyn Array) -> Result<()> { let schema = Schema::new(vec![field]); let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![clone(array).into()])?; - let mut writer = Cursor::new(vec![]); - let mut writer = FileWriter::try_new(&mut writer, &schema)?; + let writer = Cursor::new(vec![]); + let mut writer = FileWriter::try_new(writer, &schema)?; writer.write(&batch) } diff --git a/examples/extension.rs b/examples/extension.rs index 25210214aa9..e6efd061853 100644 --- a/examples/extension.rs +++ b/examples/extension.rs @@ -17,13 +17,13 @@ fn main() -> Result<()> { let array = UInt16Array::from_slice([1, 2]).to(extension_type.clone()); // from here on, it works as usual - let mut buffer = Cursor::new(vec![]); + let buffer = Cursor::new(vec![]); // write to IPC - write_ipc(&mut buffer, array)?; + let result_buffer = write_ipc(buffer, array)?; // read it back - let batch = read_ipc(&buffer.into_inner())?; + let batch = read_ipc(&result_buffer.into_inner())?; // and verify that the datatype is preserved. let array = &batch.columns()[0]; @@ -34,14 +34,16 @@ fn main() -> Result<()> { Ok(()) } -fn write_ipc(writer: &mut W, array: impl Array + 'static) -> Result<()> { +fn write_ipc(writer: W, array: impl Array + 'static) -> Result { let schema = Schema::new(vec![Field::new("a", array.data_type().clone(), false)]); let mut writer = write::FileWriter::try_new(writer, &schema)?; let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?; - writer.write(&batch) + writer.write(&batch)?; + + Ok(writer.into_inner()) } fn read_ipc(reader: &[u8]) -> Result { diff --git a/examples/ipc_file_write.rs b/examples/ipc_file_write.rs index 474bea999cf..232258f2ab5 100644 --- a/examples/ipc_file_write.rs +++ b/examples/ipc_file_write.rs @@ -8,9 +8,9 @@ use arrow2::io::ipc::write; use arrow2::record_batch::RecordBatch; fn write_batches(path: &str, schema: &Schema, batches: &[RecordBatch]) -> Result<()> { - let mut file = File::create(path)?; + let file = File::create(path)?; - let mut writer = write::FileWriter::try_new(&mut file, schema)?; + let mut writer = write::FileWriter::try_new(file, schema)?; for batch in batches { writer.write(batch)? diff --git a/integration-testing/src/bin/arrow-json-integration-test.rs b/integration-testing/src/bin/arrow-json-integration-test.rs index 2c378cb938a..dc28a7ea004 100644 --- a/integration-testing/src/bin/arrow-json-integration-test.rs +++ b/integration-testing/src/bin/arrow-json-integration-test.rs @@ -80,8 +80,8 @@ fn json_to_arrow(json_name: &str, arrow_name: &str, verbose: bool) -> Result<()> let json_file = read_json_file(json_name)?; - let mut arrow_file = File::create(arrow_name)?; - let mut writer = FileWriter::try_new(&mut arrow_file, &json_file.schema)?; + let arrow_file = File::create(arrow_name)?; + let mut writer = FileWriter::try_new(arrow_file, &json_file.schema)?; for b in json_file.batches { writer.write(&b)?; diff --git a/integration-testing/src/bin/arrow-stream-to-file.rs b/integration-testing/src/bin/arrow-stream-to-file.rs index c17ff7d9e91..f40d8add7eb 100644 --- a/integration-testing/src/bin/arrow-stream-to-file.rs +++ b/integration-testing/src/bin/arrow-stream-to-file.rs @@ -27,9 +27,9 @@ fn main() -> Result<()> { let mut arrow_stream_reader = read::StreamReader::new(reader, metadata); let schema = arrow_stream_reader.schema(); - let mut writer = io::stdout(); + let writer = io::stdout(); - let mut writer = FileWriter::try_new(&mut writer, schema)?; + let mut writer = FileWriter::try_new(writer, schema)?; arrow_stream_reader.try_for_each(|batch| writer.write(&batch?.unwrap()))?; writer.finish()?; diff --git a/src/io/ipc/write/writer.rs b/src/io/ipc/write/writer.rs index df477757e02..76811dc12a1 100644 --- a/src/io/ipc/write/writer.rs +++ b/src/io/ipc/write/writer.rs @@ -37,9 +37,9 @@ use crate::datatypes::*; use crate::error::{ArrowError, Result}; use crate::record_batch::RecordBatch; -pub struct FileWriter<'a, W: Write> { +pub struct FileWriter { /// The object to write to - writer: &'a mut W, + writer: W, /// IPC write options write_options: IpcWriteOptions, /// A reference to the schema, used in validating record batches @@ -56,16 +56,16 @@ pub struct FileWriter<'a, W: Write> { dictionary_tracker: DictionaryTracker, } -impl<'a, W: Write> FileWriter<'a, W> { +impl FileWriter { /// Try create a new writer, with the schema written as part of the header - pub fn try_new(writer: &'a mut W, schema: &Schema) -> Result { + pub fn try_new(writer: W, schema: &Schema) -> Result { let write_options = IpcWriteOptions::default(); Self::try_new_with_options(writer, schema, write_options) } /// Try create a new writer with IpcWriteOptions pub fn try_new_with_options( - writer: &'a mut W, + mut writer: W, schema: &Schema, write_options: IpcWriteOptions, ) -> Result { @@ -78,7 +78,7 @@ impl<'a, W: Write> FileWriter<'a, W> { ipc_message: schema_to_bytes(schema, *write_options.metadata_version()), arrow_data: vec![], }; - let (meta, data) = write_message(writer, encoded_message, &write_options)?; + let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?; Ok(Self { writer, write_options, @@ -91,6 +91,10 @@ impl<'a, W: Write> FileWriter<'a, W> { }) } + pub fn into_inner(self) -> W { + self.writer + } + /// Write a record batch to the file pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { if self.finished { @@ -153,12 +157,3 @@ impl<'a, W: Write> FileWriter<'a, W> { Ok(()) } } - -/// Finish the file if it is not 'finished' when it goes out of scope -impl<'a, W: Write> Drop for FileWriter<'a, W> { - fn drop(&mut self) { - if !self.finished { - self.finish().unwrap(); - } - } -} diff --git a/tests/it/io/ipc/write/file.rs b/tests/it/io/ipc/write/file.rs index 449a057eeca..005b51403ae 100644 --- a/tests/it/io/ipc/write/file.rs +++ b/tests/it/io/ipc/write/file.rs @@ -9,16 +9,17 @@ use arrow2::record_batch::RecordBatch; use crate::io::ipc::common::read_gzip_json; fn round_trip(batch: RecordBatch) -> Result<()> { - let mut result = Vec::::new(); + let result = Vec::::new(); // write IPC version 5 - { + let written_result = { let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)?; - let mut writer = FileWriter::try_new_with_options(&mut result, batch.schema(), options)?; + let mut writer = FileWriter::try_new_with_options(result, batch.schema(), options)?; writer.write(&batch)?; writer.finish()?; - } - let mut reader = Cursor::new(result); + writer.into_inner() + }; + let mut reader = Cursor::new(written_result); let metadata = read_file_metadata(&mut reader)?; let schema = metadata.schema().clone(); @@ -38,18 +39,19 @@ fn round_trip(batch: RecordBatch) -> Result<()> { fn test_file(version: &str, file_name: &str) -> Result<()> { let (schema, batches) = read_gzip_json(version, file_name)?; - let mut result = Vec::::new(); + let result = Vec::::new(); // write IPC version 5 - { + let written_result = { let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)?; - let mut writer = FileWriter::try_new_with_options(&mut result, &schema, options)?; + let mut writer = FileWriter::try_new_with_options(result, &schema, options)?; for batch in batches { writer.write(&batch)?; } writer.finish()?; - } - let mut reader = Cursor::new(result); + writer.into_inner() + }; + let mut reader = Cursor::new(written_result); let metadata = read_file_metadata(&mut reader)?; let schema = metadata.schema().clone();