Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support to append to existing Arrow file
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Apr 30, 2022
1 parent 34f4b07 commit 293fe43
Show file tree
Hide file tree
Showing 14 changed files with 186 additions and 45 deletions.
5 changes: 4 additions & 1 deletion src/io/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ pub fn serialize_batch(
fields: &[IpcField],
options: &WriteOptions,
) -> (Vec<FlightData>, FlightData) {
let mut dictionary_tracker = DictionaryTracker::new(false);
let mut dictionary_tracker = DictionaryTracker {
dictionaries: Default::default(),
cannot_replace: false,
};

let (encoded_dictionaries, encoded_batch) =
encode_chunk(columns, fields, &mut dictionary_tracker, options)
Expand Down
80 changes: 80 additions & 0 deletions src/io/ipc/append/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
//! A struct adapter of Read+Seek+Write to append to IPC files
// read header and convert to writer information
// seek to first byte of header - 1
// write new batch
// write new footer
use std::io::{Read, Seek, SeekFrom, Write};

use crate::error::{ArrowError, Result};

use super::endianess::is_native_little_endian;
use super::read::{self, FileMetadata};
use super::write::common::DictionaryTracker;
use super::write::writer::*;
use super::write::*;

impl<R: Read + Seek + Write> FileWriter<R> {
/// Creates a new [`FileWriter`] from an existing file, seeking to the last message
/// and appending new messages afterwards. Users call `finish` to write the footer (with both)
/// the existing and appended messages on it.
/// # Error
/// This function errors iff:
/// * the file's endianess is not the native endianess (not yet supported)
/// * the file is not a valid Arrow IPC file
pub fn try_from_file(
mut writer: R,
metadata: FileMetadata,
options: WriteOptions,
) -> Result<FileWriter<R>> {
if metadata.ipc_schema.is_little_endian != is_native_little_endian() {
return Err(ArrowError::nyi(
"Appending to a file of a non-native endianess is still not supported",
));
}

let dictionaries = if let Some(blocks) = &metadata.dictionaries {
read::reader::read_dictionaries(
&mut writer,
&metadata.schema.fields,
&metadata.ipc_schema,
blocks,
)?
} else {
Default::default()
};

let last_block = metadata.blocks.last().ok_or_else(|| {
ArrowError::oos("An Arrow IPC file must have at least 1 message (the schema message)")
})?;
let offset: u64 = last_block
.offset
.try_into()
.map_err(|_| ArrowError::oos("The block's offset must be a positive number"))?;
let meta_data_length: u64 = last_block
.meta_data_length
.try_into()
.map_err(|_| ArrowError::oos("The block's meta length must be a positive number"))?;
let body_length: u64 = last_block
.body_length
.try_into()
.map_err(|_| ArrowError::oos("The block's body length must be a positive number"))?;
let offset: u64 = offset + meta_data_length + body_length;

writer.seek(SeekFrom::Start(offset))?;

Ok(FileWriter {
writer,
options,
schema: metadata.schema,
ipc_fields: metadata.ipc_schema.fields,
block_offsets: offset as usize,
dictionary_blocks: metadata.dictionaries.unwrap_or_default(),
record_blocks: metadata.blocks,
state: State::Started, // file already exists, so we are ready
dictionary_tracker: DictionaryTracker {
dictionaries,
cannot_replace: true,
},
})
}
}
1 change: 1 addition & 0 deletions src/io/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ use crate::error::ArrowError;
mod compression;
mod endianess;

pub mod append;
pub mod read;
pub mod write;

Expand Down
2 changes: 1 addition & 1 deletion src/io/ipc/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod array;
mod common;
mod deserialize;
mod read_basic;
mod reader;
pub(crate) mod reader;
mod schema;
mod stream;
#[cfg(feature = "io_ipc_read_async")]
Expand Down
6 changes: 3 additions & 3 deletions src/io/ipc/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ pub struct FileMetadata {
/// The blocks in the file
///
/// A block indicates the regions in the file to read to get data
pub(super) blocks: Vec<arrow_format::ipc::Block>,
pub(crate) blocks: Vec<arrow_format::ipc::Block>,

/// Dictionaries associated to each dict_id
pub(super) dictionaries: Option<Vec<arrow_format::ipc::Block>>,
pub(crate) dictionaries: Option<Vec<arrow_format::ipc::Block>>,
}

/// Arrow File reader
Expand Down Expand Up @@ -65,7 +65,7 @@ fn read_dictionary_message<R: Read + Seek>(
Ok(())
}

fn read_dictionaries<R: Read + Seek>(
pub(crate) fn read_dictionaries<R: Read + Seek>(
reader: &mut R,
fields: &[Field],
ipc_schema: &IpcSchema,
Expand Down
37 changes: 15 additions & 22 deletions src/io/ipc/write/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct WriteOptions {

fn encode_dictionary(
field: &IpcField,
array: &Arc<dyn Array>,
array: &dyn Array,
options: &WriteOptions,
dictionary_tracker: &mut DictionaryTracker,
encoded_dictionaries: &mut Vec<EncodedData>,
Expand All @@ -50,7 +50,7 @@ fn encode_dictionary(
let array = array.as_any().downcast_ref::<DictionaryArray<$T>>().unwrap();
let values = array.values();
encode_dictionary(field,
values,
values.as_ref(),
options,
dictionary_tracker,
encoded_dictionaries
Expand Down Expand Up @@ -80,7 +80,7 @@ fn encode_dictionary(
.try_for_each(|(field, values)| {
encode_dictionary(
field,
values,
values.as_ref(),
options,
dictionary_tracker,
encoded_dictionaries,
Expand All @@ -96,7 +96,7 @@ fn encode_dictionary(
let field = &field.fields[0]; // todo: error instead
encode_dictionary(
field,
values,
values.as_ref(),
options,
dictionary_tracker,
encoded_dictionaries,
Expand All @@ -111,7 +111,7 @@ fn encode_dictionary(
let field = &field.fields[0]; // todo: error instead
encode_dictionary(
field,
values,
values.as_ref(),
options,
dictionary_tracker,
encoded_dictionaries,
Expand All @@ -126,7 +126,7 @@ fn encode_dictionary(
let field = &field.fields[0]; // todo: error instead
encode_dictionary(
field,
values,
values.as_ref(),
options,
dictionary_tracker,
encoded_dictionaries,
Expand All @@ -151,7 +151,7 @@ fn encode_dictionary(
.try_for_each(|(field, values)| {
encode_dictionary(
field,
values,
values.as_ref(),
options,
dictionary_tracker,
encoded_dictionaries,
Expand All @@ -163,7 +163,7 @@ fn encode_dictionary(
let field = &field.fields[0]; // todo: error instead
encode_dictionary(
field,
values,
values.as_ref(),
options,
dictionary_tracker,
encoded_dictionaries,
Expand All @@ -183,7 +183,7 @@ pub fn encode_chunk(
for (field, array) in fields.iter().zip(columns.as_ref()) {
encode_dictionary(
field,
array,
array.as_ref(),
options,
dictionary_tracker,
&mut encoded_dictionaries,
Expand Down Expand Up @@ -312,18 +312,11 @@ fn dictionary_batch_to_bytes<K: DictionaryKey>(
/// multiple times. Can optionally error if an update to an existing dictionary is attempted, which
/// isn't allowed in the `FileWriter`.
pub struct DictionaryTracker {
written: Dictionaries,
error_on_replacement: bool,
pub dictionaries: Dictionaries,
pub cannot_replace: bool,
}

impl DictionaryTracker {
pub fn new(error_on_replacement: bool) -> Self {
Self {
written: Dictionaries::new(),
error_on_replacement,
}
}

/// Keep track of the dictionary with the given ID and values. Behavior:
///
/// * If this ID has been written already and has the same data, return `Ok(false)` to indicate
Expand All @@ -333,7 +326,7 @@ impl DictionaryTracker {
/// * If the tracker has not been configured to error on replacement or this dictionary
/// has never been seen before, return `Ok(true)` to indicate that the dictionary was just
/// inserted.
pub fn insert(&mut self, dict_id: i64, array: &Arc<dyn Array>) -> Result<bool> {
pub fn insert(&mut self, dict_id: i64, array: &dyn Array) -> Result<bool> {
let values = match array.data_type() {
DataType::Dictionary(key_type, _, _) => {
match_integer_type!(key_type, |$T| {
Expand All @@ -348,11 +341,11 @@ impl DictionaryTracker {
};

// If a dictionary with this id was already emitted, check if it was the same.
if let Some(last) = self.written.get(&dict_id) {
if let Some(last) = self.dictionaries.get(&dict_id) {
if last.as_ref() == values.as_ref() {
// Same dictionary values => no need to emit it again
return Ok(false);
} else if self.error_on_replacement {
} else if self.cannot_replace {
return Err(ArrowError::InvalidArgumentError(
"Dictionary replacement detected when writing IPC file format. \
Arrow IPC files only support a single dictionary for a given field \
Expand All @@ -362,7 +355,7 @@ impl DictionaryTracker {
}
};

self.written.insert(dict_id, values.clone());
self.dictionaries.insert(dict_id, values.clone());
Ok(true)
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/io/ipc/write/file_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ where
fields,
offset: 0,
schema: schema.clone(),
dictionary_tracker: DictionaryTracker::new(true),
dictionary_tracker: DictionaryTracker {
dictionaries: Default::default(),
cannot_replace: true,
},
record_blocks: vec![],
dictionary_blocks: vec![],
}
Expand Down
2 changes: 1 addition & 1 deletion src/io/ipc/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub(crate) mod common;
mod schema;
mod serialize;
mod stream;
mod writer;
pub(crate) mod writer;

pub use common::{Compression, Record, WriteOptions};
pub use schema::schema_to_bytes;
Expand Down
5 changes: 4 additions & 1 deletion src/io/ipc/write/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ impl<W: Write> StreamWriter<W> {
writer,
write_options,
finished: false,
dictionary_tracker: DictionaryTracker::new(false),
dictionary_tracker: DictionaryTracker {
dictionaries: Default::default(),
cannot_replace: false,
},
ipc_fields: None,
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/io/ipc/write/stream_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ where
writer: None,
task,
fields,
dictionary_tracker: DictionaryTracker::new(false),
dictionary_tracker: DictionaryTracker {
dictionaries: Default::default(),
cannot_replace: false,
},
options: write_options,
}
}
Expand Down
25 changes: 14 additions & 11 deletions src/io/ipc/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::datatypes::*;
use crate::error::{ArrowError, Result};

#[derive(Clone, Copy, PartialEq, Eq)]
enum State {
pub(crate) enum State {
None,
Started,
Finished,
Expand All @@ -25,22 +25,22 @@ enum State {
/// Arrow file writer
pub struct FileWriter<W: Write> {
/// The object to write to
writer: W,
pub(crate) writer: W,
/// IPC write options
options: WriteOptions,
pub(crate) options: WriteOptions,
/// A reference to the schema, used in validating record batches
schema: Schema,
ipc_fields: Vec<IpcField>,
pub(crate) schema: Schema,
pub(crate) ipc_fields: Vec<IpcField>,
/// The number of bytes between each block of bytes, as an offset for random access
block_offsets: usize,
pub(crate) block_offsets: usize,
/// Dictionary blocks that will be written as part of the IPC footer
dictionary_blocks: Vec<arrow_format::ipc::Block>,
pub(crate) dictionary_blocks: Vec<arrow_format::ipc::Block>,
/// Record blocks that will be written as part of the IPC footer
record_blocks: Vec<arrow_format::ipc::Block>,
pub(crate) record_blocks: Vec<arrow_format::ipc::Block>,
/// Whether the writer footer has been written, and the writer is finished
state: State,
pub(crate) state: State,
/// Keeps track of dictionaries that have been written
dictionary_tracker: DictionaryTracker,
pub(crate) dictionary_tracker: DictionaryTracker,
}

impl<W: Write> FileWriter<W> {
Expand Down Expand Up @@ -79,7 +79,10 @@ impl<W: Write> FileWriter<W> {
dictionary_blocks: vec![],
record_blocks: vec![],
state: State::None,
dictionary_tracker: DictionaryTracker::new(true),
dictionary_tracker: DictionaryTracker {
dictionaries: Default::default(),
cannot_replace: true,
},
}
}

Expand Down
6 changes: 3 additions & 3 deletions tests/it/io/ipc/write/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use arrow2::io::ipc::{write::*, IpcField};

use crate::io::ipc::common::read_gzip_json;

fn write_(
pub(crate) fn write(
batches: &[Chunk<Arc<dyn Array>>],
schema: &Schema,
ipc_fields: Option<Vec<IpcField>>,
Expand All @@ -34,7 +34,7 @@ fn round_trip(
) -> Result<()> {
let (expected_schema, expected_batches) = (schema.clone(), vec![columns]);

let result = write_(&expected_batches, &schema, ipc_fields, compression)?;
let result = write(&expected_batches, &schema, ipc_fields, compression)?;
let mut reader = Cursor::new(result);
let metadata = read_file_metadata(&mut reader)?;
let schema = metadata.schema.clone();
Expand All @@ -58,7 +58,7 @@ fn test_file(version: &str, file_name: &str, compressed: bool) -> Result<()> {
None
};

let result = write_(&batches, &schema, Some(ipc_fields), compression)?;
let result = write(&batches, &schema, Some(ipc_fields), compression)?;
let mut reader = Cursor::new(result);
let metadata = read_file_metadata(&mut reader)?;
let schema = metadata.schema.clone();
Expand Down
Loading

0 comments on commit 293fe43

Please sign in to comment.