Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Changed IPC FileWriter to own the writer. #420

Merged
merged 3 commits into from
Sep 22, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions benches/write_ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ fn write(array: &dyn Array) -> Result<()> {
let schema = Schema::new(vec![field]);
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![clone(array).into()])?;

let mut writer = Cursor::new(vec![]);
let mut writer = FileWriter::try_new(&mut writer, &schema)?;
let writer = Cursor::new(vec![]);
let mut writer = FileWriter::try_new(writer, &schema)?;

writer.write(&batch)
}
Expand Down
12 changes: 7 additions & 5 deletions examples/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ fn main() -> Result<()> {
let array = UInt16Array::from_slice([1, 2]).to(extension_type.clone());

// from here on, it works as usual
let mut buffer = Cursor::new(vec![]);
let buffer = Cursor::new(vec![]);

// write to IPC
write_ipc(&mut buffer, array)?;
let result_buffer = write_ipc(buffer, array)?;

// read it back
let batch = read_ipc(&buffer.into_inner())?;
let batch = read_ipc(&result_buffer.into_inner())?;

// and verify that the datatype is preserved.
let array = &batch.columns()[0];
Expand All @@ -34,14 +34,16 @@ fn main() -> Result<()> {
Ok(())
}

fn write_ipc<W: Write + Seek>(writer: &mut W, array: impl Array + 'static) -> Result<()> {
fn write_ipc<W: Write + Seek>(writer: W, array: impl Array + 'static) -> Result<W> {
let schema = Schema::new(vec![Field::new("a", array.data_type().clone(), false)]);

let mut writer = write::FileWriter::try_new(writer, &schema)?;

let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?;

writer.write(&batch)
writer.write(&batch);

writer.into_inner()
}

fn read_ipc(reader: &[u8]) -> Result<RecordBatch> {
Expand Down
4 changes: 2 additions & 2 deletions examples/ipc_file_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use arrow2::io::ipc::write;
use arrow2::record_batch::RecordBatch;

fn write_batches(path: &str, schema: &Schema, batches: &[RecordBatch]) -> Result<()> {
let mut file = File::create(path)?;
let file = File::create(path)?;

let mut writer = write::FileWriter::try_new(&mut file, schema)?;
let mut writer = write::FileWriter::try_new(file, schema)?;

for batch in batches {
writer.write(batch)?
Expand Down
4 changes: 2 additions & 2 deletions integration-testing/src/bin/arrow-json-integration-test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ fn json_to_arrow(json_name: &str, arrow_name: &str, verbose: bool) -> Result<()>

let json_file = read_json_file(json_name)?;

let mut arrow_file = File::create(arrow_name)?;
let mut writer = FileWriter::try_new(&mut arrow_file, &json_file.schema)?;
let arrow_file = File::create(arrow_name)?;
let mut writer = FileWriter::try_new(arrow_file, &json_file.schema)?;

for b in json_file.batches {
writer.write(&b)?;
Expand Down
4 changes: 2 additions & 2 deletions integration-testing/src/bin/arrow-stream-to-file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ fn main() -> Result<()> {
let mut arrow_stream_reader = read::StreamReader::new(reader, metadata);
let schema = arrow_stream_reader.schema();

let mut writer = io::stdout();
let writer = io::stdout();

let mut writer = FileWriter::try_new(&mut writer, schema)?;
let mut writer = FileWriter::try_new(writer, schema)?;

arrow_stream_reader.try_for_each(|batch| writer.write(&batch?.unwrap()))?;
writer.finish()?;
Expand Down
28 changes: 12 additions & 16 deletions src/io/ipc/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
//! The `FileWriter` and `StreamWriter` have similar interfaces,
//! however the `FileWriter` expects a reader that supports `Seek`ing

use std::io::Write;
use std::io::{Error, Write, BufWriter};

use super::super::ARROW_MAGIC;
use super::{
Expand All @@ -37,9 +37,9 @@ use crate::datatypes::*;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;

pub struct FileWriter<'a, W: Write> {
pub struct FileWriter<W: Write> {
/// The object to write to
writer: &'a mut W,
writer: BufWriter<W>,
/// IPC write options
write_options: IpcWriteOptions,
/// A reference to the schema, used in validating record batches
Expand All @@ -56,19 +56,20 @@ pub struct FileWriter<'a, W: Write> {
dictionary_tracker: DictionaryTracker,
}

impl<'a, W: Write> FileWriter<'a, W> {
impl<W: Write> FileWriter<W> {
/// Try create a new writer, with the schema written as part of the header
pub fn try_new(writer: &'a mut W, schema: &Schema) -> Result<Self> {
pub fn try_new(writer: W, schema: &Schema) -> Result<Self> {
let write_options = IpcWriteOptions::default();
Self::try_new_with_options(writer, schema, write_options)
}

/// Try create a new writer with IpcWriteOptions
pub fn try_new_with_options(
writer: &'a mut W,
writer: W,
schema: &Schema,
write_options: IpcWriteOptions,
) -> Result<Self> {
let mut writer = BufWriter::new(writer);
yjshen marked this conversation as resolved.
Show resolved Hide resolved
// write magic to header
writer.write_all(&ARROW_MAGIC[..])?;
// create an 8-byte boundary after the header
Expand All @@ -78,7 +79,7 @@ impl<'a, W: Write> FileWriter<'a, W> {
ipc_message: schema_to_bytes(schema, *write_options.metadata_version()),
arrow_data: vec![],
};
let (meta, data) = write_message(writer, encoded_message, &write_options)?;
let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
Ok(Self {
writer,
write_options,
Expand All @@ -91,6 +92,10 @@ impl<'a, W: Write> FileWriter<'a, W> {
})
}

pub fn into_inner(self) -> Result<W> {
self.writer.into_inner().map_err(|e| Error::from(e).into())
}

yjshen marked this conversation as resolved.
Show resolved Hide resolved
/// Write a record batch to the file
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
if self.finished {
Expand Down Expand Up @@ -153,12 +158,3 @@ impl<'a, W: Write> FileWriter<'a, W> {
Ok(())
}
}

/// Finish the file if it is not 'finished' when it goes out of scope
impl<'a, W: Write> Drop for FileWriter<'a, W> {
jorgecarleitao marked this conversation as resolved.
Show resolved Hide resolved
fn drop(&mut self) {
if !self.finished {
self.finish().unwrap();
}
}
}
22 changes: 12 additions & 10 deletions tests/it/io/ipc/write/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@ use arrow2::record_batch::RecordBatch;
use crate::io::ipc::common::read_gzip_json;

fn round_trip(batch: RecordBatch) -> Result<()> {
let mut result = Vec::<u8>::new();
let result = Vec::<u8>::new();

// write IPC version 5
{
let written_result = {
let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)?;
let mut writer = FileWriter::try_new_with_options(&mut result, batch.schema(), options)?;
let mut writer = FileWriter::try_new_with_options(result, batch.schema(), options)?;
writer.write(&batch)?;
writer.finish()?;
}
let mut reader = Cursor::new(result);
writer.into_inner()?
};
let mut reader = Cursor::new(written_result);
let metadata = read_file_metadata(&mut reader)?;
let schema = metadata.schema().clone();

Expand All @@ -38,18 +39,19 @@ fn round_trip(batch: RecordBatch) -> Result<()> {
fn test_file(version: &str, file_name: &str) -> Result<()> {
let (schema, batches) = read_gzip_json(version, file_name)?;

let mut result = Vec::<u8>::new();
let result = Vec::<u8>::new();

// write IPC version 5
{
let written_result = {
let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)?;
let mut writer = FileWriter::try_new_with_options(&mut result, &schema, options)?;
let mut writer = FileWriter::try_new_with_options(result, &schema, options)?;
for batch in batches {
writer.write(&batch)?;
}
writer.finish()?;
}
let mut reader = Cursor::new(result);
writer.into_inner()?
};
let mut reader = Cursor::new(written_result);
let metadata = read_file_metadata(&mut reader)?;
let schema = metadata.schema().clone();

Expand Down