Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Made IPC reader less restrictive #678

Merged
merged 2 commits into from
Dec 13, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 49 additions & 43 deletions src/io/ipc/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::sync::Arc;

use arrow_format::ipc;
use arrow_format::ipc::flatbuffers::VerifierOptions;
use arrow_format::ipc::File::Block;

use crate::array::*;
use crate::datatypes::Schema;
Expand Down Expand Up @@ -87,6 +88,48 @@ fn read_dictionary_message<R: Read + Seek>(
Ok(())
}

fn read_dictionaries<R: Read + Seek>(
reader: &mut R,
schema: &Schema,
is_little_endian: bool,
blocks: &[Block],
) -> Result<HashMap<usize, Arc<dyn Array>>> {
let mut dictionaries = Default::default();
let mut data = vec![];

for block in blocks {
let offset = block.offset() as u64;
let length = block.metaDataLength() as u64;
read_dictionary_message(reader, offset, &mut data)?;

let message = ipc::Message::root_as_message(&data).map_err(|err| {
ArrowError::OutOfSpec(format!("Unable to get root as message: {:?}", err))
})?;

match message.header_type() {
ipc::Message::MessageHeader::DictionaryBatch => {
let block_offset = offset + length;
let batch = message.header_as_dictionary_batch().unwrap();
read_dictionary(
batch,
schema,
is_little_endian,
&mut dictionaries,
reader,
block_offset,
)?;
}
t => {
return Err(ArrowError::OutOfSpec(format!(
"Expecting DictionaryBatch in dictionary blocks, found {:?}.",
t
)));
}
};
}
Ok(dictionaries)
}

/// Read the IPC file's metadata
pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetadata> {
// check if header and footer contain correct magic bytes
Expand Down Expand Up @@ -138,43 +181,14 @@ pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetadata
let (schema, is_little_endian) = convert::fb_to_schema(ipc_schema);
let schema = Arc::new(schema);

let mut dictionaries = Default::default();
let dictionary_blocks = footer.dictionaries();

let dictionary_blocks = footer.dictionaries().ok_or_else(|| {
ArrowError::OutOfSpec("Unable to get dictionaries from footer".to_string())
})?;

let mut data = vec![];
for block in dictionary_blocks {
let offset = block.offset() as u64;
let length = block.metaDataLength() as u64;
read_dictionary_message(reader, offset, &mut data)?;

let message = ipc::Message::root_as_message(&data).map_err(|err| {
ArrowError::OutOfSpec(format!("Unable to get root as message: {:?}", err))
})?;
let dictionaries = if let Some(blocks) = dictionary_blocks {
read_dictionaries(reader, &schema, is_little_endian, blocks)?
} else {
Default::default()
};

match message.header_type() {
ipc::Message::MessageHeader::DictionaryBatch => {
let block_offset = offset + length;
let batch = message.header_as_dictionary_batch().unwrap();
read_dictionary(
batch,
&schema,
is_little_endian,
&mut dictionaries,
reader,
block_offset,
)?;
}
t => {
return Err(ArrowError::OutOfSpec(format!(
"Expecting DictionaryBatch in dictionary blocks, found {:?}.",
t
)));
}
};
}
Ok(FileMetadata {
schema,
is_little_endian,
Expand Down Expand Up @@ -230,14 +244,6 @@ pub fn read_batch<R: Read + Seek>(
let message = ipc::Message::root_as_message(&block_data[..])
.map_err(|err| ArrowError::OutOfSpec(format!("Unable to get root as footer: {:?}", err)))?;

// some old test data's footer metadata is not set, so we account for that
if metadata.version != ipc::Schema::MetadataVersion::V1 && message.version() != metadata.version
{
return Err(ArrowError::OutOfSpec(
"Could not read IPC message as metadata versions mismatch".to_string(),
));
}

let batch = get_serialized_batch(&message)?;

read_record_batch(
Expand Down