Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Removed most of allocations in IPC reading #611

Merged
merged 3 commits into from
Nov 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 81 additions & 59 deletions src/io/ipc/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ pub struct FileMetadata {
/// A block indicates the regions in the file to read to get data
blocks: Vec<ipc::File::Block>,

/// The total number of blocks, which may contain record batches and other types
total_blocks: usize,

/// Dictionaries associated to each dict_id
dictionaries: HashMap<usize, Arc<dyn Array>>,

Expand All @@ -66,6 +63,28 @@ pub struct FileReader<R: Read + Seek> {
metadata: FileMetadata,
current_block: usize,
projection: Option<(Vec<usize>, Arc<Schema>)>,
buffer: Vec<u8>,
}

fn read_dictionary_message<R: Read + Seek>(
reader: &mut R,
offset: u64,
data: &mut Vec<u8>,
) -> Result<()> {
let mut message_size: [u8; 4] = [0; 4];
reader.seek(SeekFrom::Start(offset))?;
reader.read_exact(&mut message_size)?;
if message_size == CONTINUATION_MARKER {
reader.read_exact(&mut message_size)?;
};
let footer_len = i32::from_le_bytes(message_size);

// prepare `data` to read the message
data.clear();
data.resize(footer_len as usize, 0);

reader.read_exact(data)?;
Ok(())
}

/// Read the IPC file's metadata
Expand Down Expand Up @@ -109,38 +128,34 @@ pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetadata
let footer = ipc::File::root_as_footer_with_opts(&verifier_options, &footer_data[..])
.map_err(|err| ArrowError::Ipc(format!("Unable to get root as footer: {:?}", err)))?;

let blocks = footer.recordBatches().ok_or_else(|| {
ArrowError::Ipc("Unable to get record batches from IPC Footer".to_string())
})?;
let blocks = footer
.recordBatches()
.ok_or_else(|| ArrowError::Ipc("Unable to get record batches from footer".to_string()))?;

let total_blocks = blocks.len();

let ipc_schema = footer.schema().unwrap();
let ipc_schema = footer
.schema()
.ok_or_else(|| ArrowError::Ipc("Unable to get the schema from footer".to_string()))?;
let (schema, is_little_endian) = convert::fb_to_schema(ipc_schema);
let schema = Arc::new(schema);

let mut dictionaries = Default::default();

for block in footer.dictionaries().unwrap() {
// read length from end of offset
let mut message_size: [u8; 4] = [0; 4];
reader.seek(SeekFrom::Start(block.offset() as u64))?;
reader.read_exact(&mut message_size)?;
if message_size == CONTINUATION_MARKER {
reader.read_exact(&mut message_size)?;
};
let footer_len = i32::from_le_bytes(message_size);

let mut block_data = vec![0; footer_len as usize];
let dictionary_blocks = footer
.dictionaries()
.ok_or_else(|| ArrowError::Ipc("Unable to get dictionaries from footer".to_string()))?;

reader.read_exact(&mut block_data)?;
let mut data = vec![];
for block in dictionary_blocks {
let offset = block.offset() as u64;
let length = block.metaDataLength() as u64;
read_dictionary_message(reader, offset, &mut data)?;

let message = ipc::Message::root_as_message(&block_data[..])
let message = ipc::Message::root_as_message(&data)
.map_err(|err| ArrowError::Ipc(format!("Unable to get root as message: {:?}", err)))?;

match message.header_type() {
ipc::Message::MessageHeader::DictionaryBatch => {
let block_offset = block.offset() as u64 + block.metaDataLength() as u64;
let block_offset = offset + length;
let batch = message.header_as_dictionary_batch().unwrap();
read_dictionary(
batch,
Expand All @@ -163,19 +178,38 @@ pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetadata
schema,
is_little_endian,
blocks: blocks.to_vec(),
total_blocks,
dictionaries,
version: footer.version(),
})
}

/// Read the IPC file's metadata
fn get_serialized_batch<'a>(
message: &'a ipc::Message::Message,
) -> Result<ipc::Message::RecordBatch<'a>> {
match message.header_type() {
ipc::Message::MessageHeader::Schema => Err(ArrowError::Ipc(
"Not expecting a schema when messages are read".to_string(),
)),
ipc::Message::MessageHeader::RecordBatch => {
message.header_as_record_batch().ok_or_else(|| {
ArrowError::Ipc("Unable to read IPC message as record batch".to_string())
})
}
t => Err(ArrowError::Ipc(format!(
"Reading types other than record batches not yet supported, unable to read {:?}",
t
))),
}
}

/// Read a batch from the reader.
pub fn read_batch<R: Read + Seek>(
reader: &mut R,
metadata: &FileMetadata,
projection: Option<(&[usize], Arc<Schema>)>,
block: usize,
) -> Result<Option<RecordBatch>> {
block_data: &mut Vec<u8>,
) -> Result<RecordBatch> {
let block = metadata.blocks[block];

// read length
Expand All @@ -186,10 +220,11 @@ pub fn read_batch<R: Read + Seek>(
// continuation marker encountered, read message next
reader.read_exact(&mut meta_buf)?;
}
let meta_len = i32::from_le_bytes(meta_buf);
let meta_len = i32::from_le_bytes(meta_buf) as usize;

let mut block_data = vec![0; meta_len as usize];
reader.read_exact(&mut block_data)?;
block_data.clear();
block_data.resize(meta_len, 0);
reader.read_exact(block_data)?;

let message = ipc::Message::root_as_message(&block_data[..])
.map_err(|err| ArrowError::Ipc(format!("Unable to get root as footer: {:?}", err)))?;
Expand All @@ -202,32 +237,18 @@ pub fn read_batch<R: Read + Seek>(
));
}

match message.header_type() {
ipc::Message::MessageHeader::Schema => Err(ArrowError::Ipc(
"Not expecting a schema when messages are read".to_string(),
)),
ipc::Message::MessageHeader::RecordBatch => {
let batch = message.header_as_record_batch().ok_or_else(|| {
ArrowError::Ipc("Unable to read IPC message as record batch".to_string())
})?;
read_record_batch(
batch,
metadata.schema.clone(),
projection,
metadata.is_little_endian,
&metadata.dictionaries,
metadata.version,
reader,
block.offset() as u64 + block.metaDataLength() as u64,
)
.map(Some)
}
ipc::Message::MessageHeader::NONE => Ok(None),
t => Err(ArrowError::Ipc(format!(
"Reading types other than record batches not yet supported, unable to read {:?}",
t
))),
}
let batch = get_serialized_batch(&message)?;

read_record_batch(
batch,
metadata.schema.clone(),
projection,
metadata.is_little_endian,
&metadata.dictionaries,
metadata.version,
reader,
block.offset() as u64 + block.metaDataLength() as u64,
)
}

impl<R: Read + Seek> FileReader<R> {
Expand Down Expand Up @@ -257,6 +278,7 @@ impl<R: Read + Seek> FileReader<R> {
metadata,
projection,
current_block: 0,
buffer: vec![],
}
}

Expand All @@ -279,18 +301,18 @@ impl<R: Read + Seek> Iterator for FileReader<R> {

fn next(&mut self) -> Option<Self::Item> {
// get current block
if self.current_block < self.metadata.total_blocks {
if self.current_block < self.metadata.blocks.len() {
let block = self.current_block;
self.current_block += 1;
read_batch(
Some(read_batch(
&mut self.reader,
&self.metadata,
self.projection
.as_ref()
.map(|x| (x.0.as_ref(), x.1.clone())),
block,
)
.transpose()
&mut self.buffer,
))
} else {
None
}
Expand Down
47 changes: 30 additions & 17 deletions src/io/ipc/read/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,17 @@ impl StreamState {

/// Reads the next item, yielding `None` if the stream is done,
/// and a [`StreamState`] otherwise.
pub fn read_next<R: Read>(
fn read_next<R: Read>(
reader: &mut R,
metadata: &StreamMetadata,
dictionaries: &mut HashMap<usize, Arc<dyn Array>>,
message_buffer: &mut Vec<u8>,
data_buffer: &mut Vec<u8>,
) -> Result<Option<StreamState>> {
// determine metadata length
let mut meta_size: [u8; 4] = [0; 4];
let mut meta_length: [u8; 4] = [0; 4];

match reader.read_exact(&mut meta_size) {
match reader.read_exact(&mut meta_length) {
Ok(()) => (),
Err(e) => {
return if e.kind() == std::io::ErrorKind::UnexpectedEof {
Expand All @@ -131,25 +133,25 @@ pub fn read_next<R: Read>(
}
}

let meta_len = {
let meta_length = {
// If a continuation marker is encountered, skip over it and read
// the size from the next four bytes.
if meta_size == CONTINUATION_MARKER {
reader.read_exact(&mut meta_size)?;
if meta_length == CONTINUATION_MARKER {
reader.read_exact(&mut meta_length)?;
}
i32::from_le_bytes(meta_size)
i32::from_le_bytes(meta_length) as usize
};

if meta_len == 0 {
if meta_length == 0 {
// the stream has ended, mark the reader as finished
return Ok(None);
}

let mut meta_buffer = vec![0; meta_len as usize];
reader.read_exact(&mut meta_buffer)?;
message_buffer.clear();
message_buffer.resize(meta_length, 0);
reader.read_exact(message_buffer)?;

let vecs = &meta_buffer.to_vec();
let message = ipc::Message::root_as_message(vecs)
let message = ipc::Message::root_as_message(message_buffer)
.map_err(|err| ArrowError::Ipc(format!("Unable to get root as message: {:?}", err)))?;

match message.header_type() {
Expand All @@ -161,10 +163,11 @@ pub fn read_next<R: Read>(
ArrowError::Ipc("Unable to read IPC message as record batch".to_string())
})?;
// read the block that makes up the record batch into a buffer
let mut buf = vec![0; message.bodyLength() as usize];
reader.read_exact(&mut buf)?;
data_buffer.clear();
data_buffer.resize(message.bodyLength() as usize, 0);
reader.read_exact(data_buffer)?;

let mut reader = std::io::Cursor::new(buf);
let mut reader = std::io::Cursor::new(data_buffer);

read_record_batch(
batch,
Expand Down Expand Up @@ -198,7 +201,7 @@ pub fn read_next<R: Read>(
)?;

// read the next message until we encounter a RecordBatch
read_next(reader, metadata, dictionaries)
read_next(reader, metadata, dictionaries, message_buffer, data_buffer)
}
ipc::Message::MessageHeader::NONE => Ok(Some(StreamState::Waiting)),
t => Err(ArrowError::Ipc(format!(
Expand All @@ -219,6 +222,8 @@ pub struct StreamReader<R: Read> {
metadata: StreamMetadata,
dictionaries: HashMap<usize, Arc<dyn Array>>,
finished: bool,
data_buffer: Vec<u8>,
message_buffer: Vec<u8>,
}

impl<R: Read> StreamReader<R> {
Expand All @@ -233,6 +238,8 @@ impl<R: Read> StreamReader<R> {
metadata,
dictionaries: Default::default(),
finished: false,
data_buffer: vec![],
message_buffer: vec![],
}
}

Expand All @@ -250,7 +257,13 @@ impl<R: Read> StreamReader<R> {
if self.finished {
return Ok(None);
}
let batch = read_next(&mut self.reader, &self.metadata, &mut self.dictionaries)?;
let batch = read_next(
&mut self.reader,
&self.metadata,
&mut self.dictionaries,
&mut self.message_buffer,
&mut self.data_buffer,
)?;
if batch.is_none() {
self.finished = true;
}
Expand Down