From 293fe436c77b616318507ee0552eeebfa4bee05b Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 30 Apr 2022 09:34:32 +0000 Subject: [PATCH] Added support to append to existing Arrow file --- src/io/flight/mod.rs | 5 +- src/io/ipc/append/mod.rs | 80 ++++++++++++++++++++++++++++ src/io/ipc/mod.rs | 1 + src/io/ipc/read/mod.rs | 2 +- src/io/ipc/read/reader.rs | 6 +-- src/io/ipc/write/common.rs | 37 ++++++------- src/io/ipc/write/file_async.rs | 5 +- src/io/ipc/write/mod.rs | 2 +- src/io/ipc/write/stream.rs | 5 +- src/io/ipc/write/stream_async.rs | 5 +- src/io/ipc/write/writer.rs | 25 +++++---- tests/it/io/ipc/write/file.rs | 6 +-- tests/it/io/ipc/write/file_append.rs | 51 ++++++++++++++++++ tests/it/io/ipc/write/mod.rs | 1 + 14 files changed, 186 insertions(+), 45 deletions(-) create mode 100644 src/io/ipc/append/mod.rs create mode 100644 tests/it/io/ipc/write/file_append.rs diff --git a/src/io/flight/mod.rs b/src/io/flight/mod.rs index 5202dc89c13..330093136f6 100644 --- a/src/io/flight/mod.rs +++ b/src/io/flight/mod.rs @@ -25,7 +25,10 @@ pub fn serialize_batch( fields: &[IpcField], options: &WriteOptions, ) -> (Vec, FlightData) { - let mut dictionary_tracker = DictionaryTracker::new(false); + let mut dictionary_tracker = DictionaryTracker { + dictionaries: Default::default(), + cannot_replace: false, + }; let (encoded_dictionaries, encoded_batch) = encode_chunk(columns, fields, &mut dictionary_tracker, options) diff --git a/src/io/ipc/append/mod.rs b/src/io/ipc/append/mod.rs new file mode 100644 index 00000000000..b86df14b71e --- /dev/null +++ b/src/io/ipc/append/mod.rs @@ -0,0 +1,80 @@ +//! A struct adapter of Read+Seek+Write to append to IPC files +// read header and convert to writer information +// seek to first byte of header - 1 +// write new batch +// write new footer +use std::io::{Read, Seek, SeekFrom, Write}; + +use crate::error::{ArrowError, Result}; + +use super::endianess::is_native_little_endian; +use super::read::{self, FileMetadata}; +use super::write::common::DictionaryTracker; +use super::write::writer::*; +use super::write::*; + +impl FileWriter { + /// Creates a new [`FileWriter`] from an existing file, seeking to the last message + /// and appending new messages afterwards. Users call `finish` to write the footer (with both) + /// the existing and appended messages on it. + /// # Error + /// This function errors iff: + /// * the file's endianess is not the native endianess (not yet supported) + /// * the file is not a valid Arrow IPC file + pub fn try_from_file( + mut writer: R, + metadata: FileMetadata, + options: WriteOptions, + ) -> Result> { + if metadata.ipc_schema.is_little_endian != is_native_little_endian() { + return Err(ArrowError::nyi( + "Appending to a file of a non-native endianess is still not supported", + )); + } + + let dictionaries = if let Some(blocks) = &metadata.dictionaries { + read::reader::read_dictionaries( + &mut writer, + &metadata.schema.fields, + &metadata.ipc_schema, + blocks, + )? + } else { + Default::default() + }; + + let last_block = metadata.blocks.last().ok_or_else(|| { + ArrowError::oos("An Arrow IPC file must have at least 1 message (the schema message)") + })?; + let offset: u64 = last_block + .offset + .try_into() + .map_err(|_| ArrowError::oos("The block's offset must be a positive number"))?; + let meta_data_length: u64 = last_block + .meta_data_length + .try_into() + .map_err(|_| ArrowError::oos("The block's meta length must be a positive number"))?; + let body_length: u64 = last_block + .body_length + .try_into() + .map_err(|_| ArrowError::oos("The block's body length must be a positive number"))?; + let offset: u64 = offset + meta_data_length + body_length; + + writer.seek(SeekFrom::Start(offset))?; + + Ok(FileWriter { + writer, + options, + schema: metadata.schema, + ipc_fields: metadata.ipc_schema.fields, + block_offsets: offset as usize, + dictionary_blocks: metadata.dictionaries.unwrap_or_default(), + record_blocks: metadata.blocks, + state: State::Started, // file already exists, so we are ready + dictionary_tracker: DictionaryTracker { + dictionaries, + cannot_replace: true, + }, + }) + } +} diff --git a/src/io/ipc/mod.rs b/src/io/ipc/mod.rs index e99d1a87ac8..1d1ff9fab53 100644 --- a/src/io/ipc/mod.rs +++ b/src/io/ipc/mod.rs @@ -81,6 +81,7 @@ use crate::error::ArrowError; mod compression; mod endianess; +pub mod append; pub mod read; pub mod write; diff --git a/src/io/ipc/read/mod.rs b/src/io/ipc/read/mod.rs index 207c33329fc..83e41a79187 100644 --- a/src/io/ipc/read/mod.rs +++ b/src/io/ipc/read/mod.rs @@ -13,7 +13,7 @@ mod array; mod common; mod deserialize; mod read_basic; -mod reader; +pub(crate) mod reader; mod schema; mod stream; #[cfg(feature = "io_ipc_read_async")] diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index a951105bd35..52c71275690 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -27,10 +27,10 @@ pub struct FileMetadata { /// The blocks in the file /// /// A block indicates the regions in the file to read to get data - pub(super) blocks: Vec, + pub(crate) blocks: Vec, /// Dictionaries associated to each dict_id - pub(super) dictionaries: Option>, + pub(crate) dictionaries: Option>, } /// Arrow File reader @@ -65,7 +65,7 @@ fn read_dictionary_message( Ok(()) } -fn read_dictionaries( +pub(crate) fn read_dictionaries( reader: &mut R, fields: &[Field], ipc_schema: &IpcSchema, diff --git a/src/io/ipc/write/common.rs b/src/io/ipc/write/common.rs index bd905948714..edc32d9c7b3 100644 --- a/src/io/ipc/write/common.rs +++ b/src/io/ipc/write/common.rs @@ -32,7 +32,7 @@ pub struct WriteOptions { fn encode_dictionary( field: &IpcField, - array: &Arc, + array: &dyn Array, options: &WriteOptions, dictionary_tracker: &mut DictionaryTracker, encoded_dictionaries: &mut Vec, @@ -50,7 +50,7 @@ fn encode_dictionary( let array = array.as_any().downcast_ref::>().unwrap(); let values = array.values(); encode_dictionary(field, - values, + values.as_ref(), options, dictionary_tracker, encoded_dictionaries @@ -80,7 +80,7 @@ fn encode_dictionary( .try_for_each(|(field, values)| { encode_dictionary( field, - values, + values.as_ref(), options, dictionary_tracker, encoded_dictionaries, @@ -96,7 +96,7 @@ fn encode_dictionary( let field = &field.fields[0]; // todo: error instead encode_dictionary( field, - values, + values.as_ref(), options, dictionary_tracker, encoded_dictionaries, @@ -111,7 +111,7 @@ fn encode_dictionary( let field = &field.fields[0]; // todo: error instead encode_dictionary( field, - values, + values.as_ref(), options, dictionary_tracker, encoded_dictionaries, @@ -126,7 +126,7 @@ fn encode_dictionary( let field = &field.fields[0]; // todo: error instead encode_dictionary( field, - values, + values.as_ref(), options, dictionary_tracker, encoded_dictionaries, @@ -151,7 +151,7 @@ fn encode_dictionary( .try_for_each(|(field, values)| { encode_dictionary( field, - values, + values.as_ref(), options, dictionary_tracker, encoded_dictionaries, @@ -163,7 +163,7 @@ fn encode_dictionary( let field = &field.fields[0]; // todo: error instead encode_dictionary( field, - values, + values.as_ref(), options, dictionary_tracker, encoded_dictionaries, @@ -183,7 +183,7 @@ pub fn encode_chunk( for (field, array) in fields.iter().zip(columns.as_ref()) { encode_dictionary( field, - array, + array.as_ref(), options, dictionary_tracker, &mut encoded_dictionaries, @@ -312,18 +312,11 @@ fn dictionary_batch_to_bytes( /// multiple times. Can optionally error if an update to an existing dictionary is attempted, which /// isn't allowed in the `FileWriter`. pub struct DictionaryTracker { - written: Dictionaries, - error_on_replacement: bool, + pub dictionaries: Dictionaries, + pub cannot_replace: bool, } impl DictionaryTracker { - pub fn new(error_on_replacement: bool) -> Self { - Self { - written: Dictionaries::new(), - error_on_replacement, - } - } - /// Keep track of the dictionary with the given ID and values. Behavior: /// /// * If this ID has been written already and has the same data, return `Ok(false)` to indicate @@ -333,7 +326,7 @@ impl DictionaryTracker { /// * If the tracker has not been configured to error on replacement or this dictionary /// has never been seen before, return `Ok(true)` to indicate that the dictionary was just /// inserted. - pub fn insert(&mut self, dict_id: i64, array: &Arc) -> Result { + pub fn insert(&mut self, dict_id: i64, array: &dyn Array) -> Result { let values = match array.data_type() { DataType::Dictionary(key_type, _, _) => { match_integer_type!(key_type, |$T| { @@ -348,11 +341,11 @@ impl DictionaryTracker { }; // If a dictionary with this id was already emitted, check if it was the same. - if let Some(last) = self.written.get(&dict_id) { + if let Some(last) = self.dictionaries.get(&dict_id) { if last.as_ref() == values.as_ref() { // Same dictionary values => no need to emit it again return Ok(false); - } else if self.error_on_replacement { + } else if self.cannot_replace { return Err(ArrowError::InvalidArgumentError( "Dictionary replacement detected when writing IPC file format. \ Arrow IPC files only support a single dictionary for a given field \ @@ -362,7 +355,7 @@ impl DictionaryTracker { } }; - self.written.insert(dict_id, values.clone()); + self.dictionaries.insert(dict_id, values.clone()); Ok(true) } } diff --git a/src/io/ipc/write/file_async.rs b/src/io/ipc/write/file_async.rs index 930bc4d5c5b..6568866073e 100644 --- a/src/io/ipc/write/file_async.rs +++ b/src/io/ipc/write/file_async.rs @@ -96,7 +96,10 @@ where fields, offset: 0, schema: schema.clone(), - dictionary_tracker: DictionaryTracker::new(true), + dictionary_tracker: DictionaryTracker { + dictionaries: Default::default(), + cannot_replace: true, + }, record_blocks: vec![], dictionary_blocks: vec![], } diff --git a/src/io/ipc/write/mod.rs b/src/io/ipc/write/mod.rs index 47167ef491a..6fc355c85ca 100644 --- a/src/io/ipc/write/mod.rs +++ b/src/io/ipc/write/mod.rs @@ -3,7 +3,7 @@ pub(crate) mod common; mod schema; mod serialize; mod stream; -mod writer; +pub(crate) mod writer; pub use common::{Compression, Record, WriteOptions}; pub use schema::schema_to_bytes; diff --git a/src/io/ipc/write/stream.rs b/src/io/ipc/write/stream.rs index 00ea287528e..c58ab20707b 100644 --- a/src/io/ipc/write/stream.rs +++ b/src/io/ipc/write/stream.rs @@ -42,7 +42,10 @@ impl StreamWriter { writer, write_options, finished: false, - dictionary_tracker: DictionaryTracker::new(false), + dictionary_tracker: DictionaryTracker { + dictionaries: Default::default(), + cannot_replace: false, + }, ipc_fields: None, } } diff --git a/src/io/ipc/write/stream_async.rs b/src/io/ipc/write/stream_async.rs index 033fa06038b..f43b1371a8e 100644 --- a/src/io/ipc/write/stream_async.rs +++ b/src/io/ipc/write/stream_async.rs @@ -73,7 +73,10 @@ where writer: None, task, fields, - dictionary_tracker: DictionaryTracker::new(false), + dictionary_tracker: DictionaryTracker { + dictionaries: Default::default(), + cannot_replace: false, + }, options: write_options, } } diff --git a/src/io/ipc/write/writer.rs b/src/io/ipc/write/writer.rs index 467e9fefd8f..375a4c8f1d5 100644 --- a/src/io/ipc/write/writer.rs +++ b/src/io/ipc/write/writer.rs @@ -16,7 +16,7 @@ use crate::datatypes::*; use crate::error::{ArrowError, Result}; #[derive(Clone, Copy, PartialEq, Eq)] -enum State { +pub(crate) enum State { None, Started, Finished, @@ -25,22 +25,22 @@ enum State { /// Arrow file writer pub struct FileWriter { /// The object to write to - writer: W, + pub(crate) writer: W, /// IPC write options - options: WriteOptions, + pub(crate) options: WriteOptions, /// A reference to the schema, used in validating record batches - schema: Schema, - ipc_fields: Vec, + pub(crate) schema: Schema, + pub(crate) ipc_fields: Vec, /// The number of bytes between each block of bytes, as an offset for random access - block_offsets: usize, + pub(crate) block_offsets: usize, /// Dictionary blocks that will be written as part of the IPC footer - dictionary_blocks: Vec, + pub(crate) dictionary_blocks: Vec, /// Record blocks that will be written as part of the IPC footer - record_blocks: Vec, + pub(crate) record_blocks: Vec, /// Whether the writer footer has been written, and the writer is finished - state: State, + pub(crate) state: State, /// Keeps track of dictionaries that have been written - dictionary_tracker: DictionaryTracker, + pub(crate) dictionary_tracker: DictionaryTracker, } impl FileWriter { @@ -79,7 +79,10 @@ impl FileWriter { dictionary_blocks: vec![], record_blocks: vec![], state: State::None, - dictionary_tracker: DictionaryTracker::new(true), + dictionary_tracker: DictionaryTracker { + dictionaries: Default::default(), + cannot_replace: true, + }, } } diff --git a/tests/it/io/ipc/write/file.rs b/tests/it/io/ipc/write/file.rs index d466c8291bc..d76855fdd93 100644 --- a/tests/it/io/ipc/write/file.rs +++ b/tests/it/io/ipc/write/file.rs @@ -10,7 +10,7 @@ use arrow2::io::ipc::{write::*, IpcField}; use crate::io::ipc::common::read_gzip_json; -fn write_( +pub(crate) fn write( batches: &[Chunk>], schema: &Schema, ipc_fields: Option>, @@ -34,7 +34,7 @@ fn round_trip( ) -> Result<()> { let (expected_schema, expected_batches) = (schema.clone(), vec![columns]); - let result = write_(&expected_batches, &schema, ipc_fields, compression)?; + let result = write(&expected_batches, &schema, ipc_fields, compression)?; let mut reader = Cursor::new(result); let metadata = read_file_metadata(&mut reader)?; let schema = metadata.schema.clone(); @@ -58,7 +58,7 @@ fn test_file(version: &str, file_name: &str, compressed: bool) -> Result<()> { None }; - let result = write_(&batches, &schema, Some(ipc_fields), compression)?; + let result = write(&batches, &schema, Some(ipc_fields), compression)?; let mut reader = Cursor::new(result); let metadata = read_file_metadata(&mut reader)?; let schema = metadata.schema.clone(); diff --git a/tests/it/io/ipc/write/file_append.rs b/tests/it/io/ipc/write/file_append.rs new file mode 100644 index 00000000000..02573ebb634 --- /dev/null +++ b/tests/it/io/ipc/write/file_append.rs @@ -0,0 +1,51 @@ +use std::sync::Arc; + +use arrow2::array::*; +use arrow2::chunk::Chunk; +use arrow2::datatypes::*; +use arrow2::error::Result; +use arrow2::io::ipc::read; +use arrow2::io::ipc::write::{FileWriter, WriteOptions}; + +use super::file::write; + +#[test] +fn basic() -> Result<()> { + // prepare some data + let array = Arc::new(BooleanArray::from([ + Some(true), + Some(false), + None, + Some(true), + ])) as Arc; + let schema = Schema::from(vec![Field::new("a", array.data_type().clone(), true)]); + let columns = Chunk::try_new(vec![array])?; + + let (expected_schema, expected_batches) = (schema.clone(), vec![columns.clone()]); + + // write to a file + let result = write(&expected_batches, &schema, None, None)?; + + // read the file to append + let mut file = std::io::Cursor::new(result); + let metadata = read::read_file_metadata(&mut file)?; + let mut writer = FileWriter::try_from_file(file, metadata, WriteOptions { compression: None })?; + + // write a new column + writer.write(&columns, None)?; + writer.finish()?; + + let data = writer.into_inner(); + let mut reader = std::io::Cursor::new(data.into_inner()); + + // read the file again and confirm that it contains both messages + let metadata = read::read_file_metadata(&mut reader)?; + assert_eq!(schema, expected_schema); + let reader = read::FileReader::new(reader, metadata, None); + + let chunks = reader.collect::>>()?; + + assert_eq!(chunks, vec![columns.clone(), columns]); + + Ok(()) +} diff --git a/tests/it/io/ipc/write/mod.rs b/tests/it/io/ipc/write/mod.rs index 6d1510b36c4..f3a4fbfe57c 100644 --- a/tests/it/io/ipc/write/mod.rs +++ b/tests/it/io/ipc/write/mod.rs @@ -1,2 +1,3 @@ mod file; +mod file_append; mod stream;