Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Allowed creating IPC FileWriter without writing to the file #970

Merged
merged 2 commits into from
Apr 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ jobs:
submodules: true
- uses: actions-rs/toolchain@v1
with:
toolchain: nightly
toolchain: nightly-2022-03-16
target: ${{ matrix.target }}
override: true
- uses: Swatinem/rust-cache@v1
Expand Down
4 changes: 3 additions & 1 deletion examples/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ fn write_ipc<W: Write + Seek>(writer: W, array: impl Array + 'static) -> Result<
let schema = vec![Field::new("a", array.data_type().clone(), false)].into();

let options = write::WriteOptions { compression: None };
let mut writer = write::FileWriter::try_new(writer, &schema, None, options)?;
let mut writer = write::FileWriter::new(writer, schema, None, options);

let batch = Chunk::try_new(vec![Arc::new(array) as Arc<dyn Array>])?;

writer.start()?;
writer.write(&batch, None)?;
writer.finish()?;

Ok(writer.into_inner())
}
Expand Down
7 changes: 4 additions & 3 deletions examples/ipc_file_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ use arrow2::datatypes::{DataType, Field, Schema};
use arrow2::error::Result;
use arrow2::io::ipc::write;

fn write_batches(path: &str, schema: &Schema, columns: &[Chunk<Arc<dyn Array>>]) -> Result<()> {
fn write_batches(path: &str, schema: Schema, columns: &[Chunk<Arc<dyn Array>>]) -> Result<()> {
let file = File::create(path)?;

let options = write::WriteOptions { compression: None };
let mut writer = write::FileWriter::try_new(file, schema, None, options)?;
let mut writer = write::FileWriter::new(file, schema, None, options);

writer.start()?;
for columns in columns {
writer.write(columns, None)?
}
Expand All @@ -37,6 +38,6 @@ fn main() -> Result<()> {
let batch = Chunk::try_new(vec![Arc::new(a) as Arc<dyn Array>, Arc::new(b)])?;

// write it
write_batches(file_path, &schema, &[batch])?;
write_batches(file_path, schema, &[batch])?;
Ok(())
}
86 changes: 62 additions & 24 deletions src/io/ipc/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ use crate::chunk::Chunk;
use crate::datatypes::*;
use crate::error::{ArrowError, Result};

#[derive(Clone, Copy, PartialEq, Eq)]
enum State {
None,
Started,
Finished,
}

/// Arrow file writer
pub struct FileWriter<W: Write> {
/// The object to write to
Expand All @@ -31,65 +38,90 @@ pub struct FileWriter<W: Write> {
/// Record blocks that will be written as part of the IPC footer
record_blocks: Vec<arrow_format::ipc::Block>,
/// Whether the writer footer has been written, and the writer is finished
finished: bool,
state: State,
/// Keeps track of dictionaries that have been written
dictionary_tracker: DictionaryTracker,
}

impl<W: Write> FileWriter<W> {
/// Try create a new writer, with the schema written as part of the header
/// Creates a new [`FileWriter`] and writes the header to `writer`
pub fn try_new(
mut writer: W,
writer: W,
schema: &Schema,
ipc_fields: Option<Vec<IpcField>>,
options: WriteOptions,
) -> Result<Self> {
// write magic to header
writer.write_all(&ARROW_MAGIC[..])?;
// create an 8-byte boundary after the header
writer.write_all(&[0, 0])?;
// write the schema, set the written bytes to the schema
let mut slf = Self::new(writer, schema.clone(), ipc_fields, options);
slf.start()?;

Ok(slf)
}

/// Creates a new [`FileWriter`].
pub fn new(
writer: W,
schema: Schema,
ipc_fields: Option<Vec<IpcField>>,
options: WriteOptions,
) -> Self {
let ipc_fields = if let Some(ipc_fields) = ipc_fields {
ipc_fields
} else {
default_ipc_fields(&schema.fields)
};
let encoded_message = EncodedData {
ipc_message: schema_to_bytes(schema, &ipc_fields),
arrow_data: vec![],
};

let (meta, data) = write_message(&mut writer, encoded_message)?;
Ok(Self {
Self {
writer,
options,
schema: schema.clone(),
schema,
ipc_fields,
block_offsets: meta + data + 8,
block_offsets: 0,
dictionary_blocks: vec![],
record_blocks: vec![],
finished: false,
state: State::None,
dictionary_tracker: DictionaryTracker::new(true),
})
}
}

/// Consumes itself into the inner writer
pub fn into_inner(self) -> W {
self.writer
}

/// Writes the header and first (schema) message to the file.
/// # Errors
/// Errors if the file has been started or has finished.
pub fn start(&mut self) -> Result<()> {
if self.state != State::None {
return Err(ArrowError::oos("The IPC file can only be started once"));
}
// write magic to header
self.writer.write_all(&ARROW_MAGIC[..])?;
// create an 8-byte boundary after the header
self.writer.write_all(&[0, 0])?;
// write the schema, set the written bytes to the schema

let encoded_message = EncodedData {
ipc_message: schema_to_bytes(&self.schema, &self.ipc_fields),
arrow_data: vec![],
};

let (meta, data) = write_message(&mut self.writer, encoded_message)?;
self.block_offsets += meta + data + 8; // 8 <=> arrow magic + 2 bytes for alignment
self.state = State::Started;
Ok(())
}

/// Writes [`Chunk`] to the file
pub fn write(
&mut self,
columns: &Chunk<Arc<dyn Array>>,
ipc_fields: Option<&[IpcField]>,
) -> Result<()> {
if self.finished {
return Err(ArrowError::Io(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"Cannot write to a finished file".to_string(),
)));
if self.state != State::Started {
return Err(ArrowError::oos(
"The IPC file must be started before it can be written to. Call `start` before `write`",
));
}

let ipc_fields = if let Some(ipc_fields) = ipc_fields {
Expand Down Expand Up @@ -132,6 +164,12 @@ impl<W: Write> FileWriter<W> {

/// Write footer and closing tag, then mark the writer as done
pub fn finish(&mut self) -> Result<()> {
if self.state != State::Started {
return Err(ArrowError::oos(
"The IPC file must be started before it can be finished. Call `start` before `finish`",
));
}

// write EOS
write_continuation(&mut self.writer, 0)?;

Expand All @@ -151,7 +189,7 @@ impl<W: Write> FileWriter<W> {
.write_all(&(footer_data.len() as i32).to_le_bytes())?;
self.writer.write_all(&ARROW_MAGIC)?;
self.writer.flush()?;
self.finished = true;
self.state = State::Finished;

Ok(())
}
Expand Down