Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support to append to existing IPC Arrow file #972

Merged
merged 1 commit into from
Apr 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/io/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ pub fn serialize_batch(
fields: &[IpcField],
options: &WriteOptions,
) -> (Vec<FlightData>, FlightData) {
let mut dictionary_tracker = DictionaryTracker::new(false);
let mut dictionary_tracker = DictionaryTracker {
dictionaries: Default::default(),
cannot_replace: false,
};

let (encoded_dictionaries, encoded_batch) =
encode_chunk(columns, fields, &mut dictionary_tracker, options)
Expand Down
80 changes: 80 additions & 0 deletions src/io/ipc/append/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
//! A struct adapter of Read+Seek+Write to append to IPC files
// read header and convert to writer information
// seek to first byte of header - 1
// write new batch
// write new footer
use std::io::{Read, Seek, SeekFrom, Write};

use crate::error::{ArrowError, Result};

use super::endianess::is_native_little_endian;
use super::read::{self, FileMetadata};
use super::write::common::DictionaryTracker;
use super::write::writer::*;
use super::write::*;

impl<R: Read + Seek + Write> FileWriter<R> {
/// Creates a new [`FileWriter`] from an existing file, seeking to the last message
/// and appending new messages afterwards. Users call `finish` to write the footer (with both)
/// the existing and appended messages on it.
/// # Error
/// This function errors iff:
/// * the file's endianess is not the native endianess (not yet supported)
/// * the file is not a valid Arrow IPC file
pub fn try_from_file(
mut writer: R,
metadata: FileMetadata,
options: WriteOptions,
) -> Result<FileWriter<R>> {
if metadata.ipc_schema.is_little_endian != is_native_little_endian() {
return Err(ArrowError::nyi(
"Appending to a file of a non-native endianess is still not supported",
));
}

let dictionaries = if let Some(blocks) = &metadata.dictionaries {
read::reader::read_dictionaries(
&mut writer,
&metadata.schema.fields,
&metadata.ipc_schema,
blocks,
)?
} else {
Default::default()
};

let last_block = metadata.blocks.last().ok_or_else(|| {
ArrowError::oos("An Arrow IPC file must have at least 1 message (the schema message)")
})?;
let offset: u64 = last_block
.offset
.try_into()
.map_err(|_| ArrowError::oos("The block's offset must be a positive number"))?;
let meta_data_length: u64 = last_block
.meta_data_length
.try_into()
.map_err(|_| ArrowError::oos("The block's meta length must be a positive number"))?;
let body_length: u64 = last_block
.body_length
.try_into()
.map_err(|_| ArrowError::oos("The block's body length must be a positive number"))?;
let offset: u64 = offset + meta_data_length + body_length;

writer.seek(SeekFrom::Start(offset))?;

Ok(FileWriter {
writer,
options,
schema: metadata.schema,
ipc_fields: metadata.ipc_schema.fields,
block_offsets: offset as usize,
dictionary_blocks: metadata.dictionaries.unwrap_or_default(),
record_blocks: metadata.blocks,
state: State::Started, // file already exists, so we are ready
dictionary_tracker: DictionaryTracker {
dictionaries,
cannot_replace: true,
},
})
}
}
1 change: 1 addition & 0 deletions src/io/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ use crate::error::ArrowError;
mod compression;
mod endianess;

pub mod append;
pub mod read;
pub mod write;

Expand Down
2 changes: 1 addition & 1 deletion src/io/ipc/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod array;
mod common;
mod deserialize;
mod read_basic;
mod reader;
pub(crate) mod reader;
mod schema;
mod stream;
#[cfg(feature = "io_ipc_read_async")]
Expand Down
6 changes: 3 additions & 3 deletions src/io/ipc/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ pub struct FileMetadata {
/// The blocks in the file
///
/// A block indicates the regions in the file to read to get data
pub(super) blocks: Vec<arrow_format::ipc::Block>,
pub(crate) blocks: Vec<arrow_format::ipc::Block>,

/// Dictionaries associated to each dict_id
pub(super) dictionaries: Option<Vec<arrow_format::ipc::Block>>,
pub(crate) dictionaries: Option<Vec<arrow_format::ipc::Block>>,
}

/// Arrow File reader
Expand Down Expand Up @@ -65,7 +65,7 @@ fn read_dictionary_message<R: Read + Seek>(
Ok(())
}

fn read_dictionaries<R: Read + Seek>(
pub(crate) fn read_dictionaries<R: Read + Seek>(
reader: &mut R,
fields: &[Field],
ipc_schema: &IpcSchema,
Expand Down
37 changes: 15 additions & 22 deletions src/io/ipc/write/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct WriteOptions {

fn encode_dictionary(
field: &IpcField,
array: &Arc<dyn Array>,
array: &dyn Array,
options: &WriteOptions,
dictionary_tracker: &mut DictionaryTracker,
encoded_dictionaries: &mut Vec<EncodedData>,
Expand All @@ -50,7 +50,7 @@ fn encode_dictionary(
let array = array.as_any().downcast_ref::<DictionaryArray<$T>>().unwrap();
let values = array.values();
encode_dictionary(field,
values,
values.as_ref(),
options,
dictionary_tracker,
encoded_dictionaries
Expand Down Expand Up @@ -80,7 +80,7 @@ fn encode_dictionary(
.try_for_each(|(field, values)| {
encode_dictionary(
field,
values,
values.as_ref(),
options,
dictionary_tracker,
encoded_dictionaries,
Expand All @@ -96,7 +96,7 @@ fn encode_dictionary(
let field = &field.fields[0]; // todo: error instead
encode_dictionary(
field,
values,
values.as_ref(),
options,
dictionary_tracker,
encoded_dictionaries,
Expand All @@ -111,7 +111,7 @@ fn encode_dictionary(
let field = &field.fields[0]; // todo: error instead
encode_dictionary(
field,
values,
values.as_ref(),
options,
dictionary_tracker,
encoded_dictionaries,
Expand All @@ -126,7 +126,7 @@ fn encode_dictionary(
let field = &field.fields[0]; // todo: error instead
encode_dictionary(
field,
values,
values.as_ref(),
options,
dictionary_tracker,
encoded_dictionaries,
Expand All @@ -151,7 +151,7 @@ fn encode_dictionary(
.try_for_each(|(field, values)| {
encode_dictionary(
field,
values,
values.as_ref(),
options,
dictionary_tracker,
encoded_dictionaries,
Expand All @@ -163,7 +163,7 @@ fn encode_dictionary(
let field = &field.fields[0]; // todo: error instead
encode_dictionary(
field,
values,
values.as_ref(),
options,
dictionary_tracker,
encoded_dictionaries,
Expand All @@ -183,7 +183,7 @@ pub fn encode_chunk(
for (field, array) in fields.iter().zip(columns.as_ref()) {
encode_dictionary(
field,
array,
array.as_ref(),
options,
dictionary_tracker,
&mut encoded_dictionaries,
Expand Down Expand Up @@ -312,18 +312,11 @@ fn dictionary_batch_to_bytes<K: DictionaryKey>(
/// multiple times. Can optionally error if an update to an existing dictionary is attempted, which
/// isn't allowed in the `FileWriter`.
pub struct DictionaryTracker {
written: Dictionaries,
error_on_replacement: bool,
pub dictionaries: Dictionaries,
pub cannot_replace: bool,
}

impl DictionaryTracker {
pub fn new(error_on_replacement: bool) -> Self {
Self {
written: Dictionaries::new(),
error_on_replacement,
}
}

/// Keep track of the dictionary with the given ID and values. Behavior:
///
/// * If this ID has been written already and has the same data, return `Ok(false)` to indicate
Expand All @@ -333,7 +326,7 @@ impl DictionaryTracker {
/// * If the tracker has not been configured to error on replacement or this dictionary
/// has never been seen before, return `Ok(true)` to indicate that the dictionary was just
/// inserted.
pub fn insert(&mut self, dict_id: i64, array: &Arc<dyn Array>) -> Result<bool> {
pub fn insert(&mut self, dict_id: i64, array: &dyn Array) -> Result<bool> {
let values = match array.data_type() {
DataType::Dictionary(key_type, _, _) => {
match_integer_type!(key_type, |$T| {
Expand All @@ -348,11 +341,11 @@ impl DictionaryTracker {
};

// If a dictionary with this id was already emitted, check if it was the same.
if let Some(last) = self.written.get(&dict_id) {
if let Some(last) = self.dictionaries.get(&dict_id) {
if last.as_ref() == values.as_ref() {
// Same dictionary values => no need to emit it again
return Ok(false);
} else if self.error_on_replacement {
} else if self.cannot_replace {
return Err(ArrowError::InvalidArgumentError(
"Dictionary replacement detected when writing IPC file format. \
Arrow IPC files only support a single dictionary for a given field \
Expand All @@ -362,7 +355,7 @@ impl DictionaryTracker {
}
};

self.written.insert(dict_id, values.clone());
self.dictionaries.insert(dict_id, values.clone());
Ok(true)
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/io/ipc/write/file_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ where
fields,
offset: 0,
schema: schema.clone(),
dictionary_tracker: DictionaryTracker::new(true),
dictionary_tracker: DictionaryTracker {
dictionaries: Default::default(),
cannot_replace: true,
},
record_blocks: vec![],
dictionary_blocks: vec![],
}
Expand Down
2 changes: 1 addition & 1 deletion src/io/ipc/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub(crate) mod common;
mod schema;
mod serialize;
mod stream;
mod writer;
pub(crate) mod writer;

pub use common::{Compression, Record, WriteOptions};
pub use schema::schema_to_bytes;
Expand Down
5 changes: 4 additions & 1 deletion src/io/ipc/write/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ impl<W: Write> StreamWriter<W> {
writer,
write_options,
finished: false,
dictionary_tracker: DictionaryTracker::new(false),
dictionary_tracker: DictionaryTracker {
dictionaries: Default::default(),
cannot_replace: false,
},
ipc_fields: None,
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/io/ipc/write/stream_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ where
writer: None,
task,
fields,
dictionary_tracker: DictionaryTracker::new(false),
dictionary_tracker: DictionaryTracker {
dictionaries: Default::default(),
cannot_replace: false,
},
options: write_options,
}
}
Expand Down
25 changes: 14 additions & 11 deletions src/io/ipc/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::datatypes::*;
use crate::error::{ArrowError, Result};

#[derive(Clone, Copy, PartialEq, Eq)]
enum State {
pub(crate) enum State {
None,
Started,
Finished,
Expand All @@ -25,22 +25,22 @@ enum State {
/// Arrow file writer
pub struct FileWriter<W: Write> {
/// The object to write to
writer: W,
pub(crate) writer: W,
/// IPC write options
options: WriteOptions,
pub(crate) options: WriteOptions,
/// A reference to the schema, used in validating record batches
schema: Schema,
ipc_fields: Vec<IpcField>,
pub(crate) schema: Schema,
pub(crate) ipc_fields: Vec<IpcField>,
/// The number of bytes between each block of bytes, as an offset for random access
block_offsets: usize,
pub(crate) block_offsets: usize,
/// Dictionary blocks that will be written as part of the IPC footer
dictionary_blocks: Vec<arrow_format::ipc::Block>,
pub(crate) dictionary_blocks: Vec<arrow_format::ipc::Block>,
/// Record blocks that will be written as part of the IPC footer
record_blocks: Vec<arrow_format::ipc::Block>,
pub(crate) record_blocks: Vec<arrow_format::ipc::Block>,
/// Whether the writer footer has been written, and the writer is finished
state: State,
pub(crate) state: State,
/// Keeps track of dictionaries that have been written
dictionary_tracker: DictionaryTracker,
pub(crate) dictionary_tracker: DictionaryTracker,
}

impl<W: Write> FileWriter<W> {
Expand Down Expand Up @@ -79,7 +79,10 @@ impl<W: Write> FileWriter<W> {
dictionary_blocks: vec![],
record_blocks: vec![],
state: State::None,
dictionary_tracker: DictionaryTracker::new(true),
dictionary_tracker: DictionaryTracker {
dictionaries: Default::default(),
cannot_replace: true,
},
}
}

Expand Down
6 changes: 3 additions & 3 deletions tests/it/io/ipc/write/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use arrow2::io::ipc::{write::*, IpcField};

use crate::io::ipc::common::read_gzip_json;

fn write_(
pub(crate) fn write(
batches: &[Chunk<Arc<dyn Array>>],
schema: &Schema,
ipc_fields: Option<Vec<IpcField>>,
Expand All @@ -34,7 +34,7 @@ fn round_trip(
) -> Result<()> {
let (expected_schema, expected_batches) = (schema.clone(), vec![columns]);

let result = write_(&expected_batches, &schema, ipc_fields, compression)?;
let result = write(&expected_batches, &schema, ipc_fields, compression)?;
let mut reader = Cursor::new(result);
let metadata = read_file_metadata(&mut reader)?;
let schema = metadata.schema.clone();
Expand All @@ -58,7 +58,7 @@ fn test_file(version: &str, file_name: &str, compressed: bool) -> Result<()> {
None
};

let result = write_(&batches, &schema, Some(ipc_fields), compression)?;
let result = write(&batches, &schema, Some(ipc_fields), compression)?;
let mut reader = Cursor::new(result);
let metadata = read_file_metadata(&mut reader)?;
let schema = metadata.schema.clone();
Expand Down
Loading