Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Improved performance of reading from IPC #1125

Merged
merged 2 commits into from
Jul 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ impl From<simdutf8::basic::Utf8Error> for Error {
}
}

impl From<std::collections::TryReserveError> for Error {
fn from(_: std::collections::TryReserveError) -> Error {
Error::Overflow
}
}

impl Display for Error {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/read/array/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::datatypes::DataType;
use crate::error::{Error, Result};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

#[allow(clippy::too_many_arguments)]
pub fn read_binary<O: Offset, R: Read + Seek>(
Expand All @@ -18,7 +18,7 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<BinaryArray<O>> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/read/array/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::array::{DictionaryArray, DictionaryKey};
use crate::error::{Error, Result};

use super::super::Dictionaries;
use super::super::{Compression, IpcBuffer, Node, ReadBuffer};
use super::super::{Compression, IpcBuffer, Node};
use super::{read_primitive, skip_primitive};

#[allow(clippy::too_many_arguments)]
Expand All @@ -19,7 +19,7 @@ pub fn read_dictionary<T: DictionaryKey, R: Read + Seek>(
block_offset: u64,
compression: Option<Compression>,
is_little_endian: bool,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<DictionaryArray<T>>
where
Vec<u8>: TryInto<T::Bytes>,
Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/read/array/fixed_size_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::datatypes::DataType;
use crate::error::{Error, Result};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

#[allow(clippy::too_many_arguments)]
pub fn read_fixed_size_binary<R: Read + Seek>(
Expand All @@ -17,7 +17,7 @@ pub fn read_fixed_size_binary<R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<FixedSizeBinaryArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/read/array/fixed_size_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::error::{Error, Result};
use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::{Compression, Dictionaries, IpcBuffer, Node, ReadBuffer, Version};
use super::super::{Compression, Dictionaries, IpcBuffer, Node, Version};

#[allow(clippy::too_many_arguments)]
pub fn read_fixed_size_list<R: Read + Seek>(
Expand All @@ -22,7 +22,7 @@ pub fn read_fixed_size_list<R: Read + Seek>(
is_little_endian: bool,
compression: Option<Compression>,
version: Version,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<FixedSizeListArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand Down
6 changes: 2 additions & 4 deletions src/io/ipc/read/array/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ use crate::error::{Error, Result};
use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::{
Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, ReadBuffer, Version,
};
use super::super::{Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, Version};

#[allow(clippy::too_many_arguments)]
pub fn read_list<O: Offset, R: Read + Seek>(
Expand All @@ -26,7 +24,7 @@ pub fn read_list<O: Offset, R: Read + Seek>(
is_little_endian: bool,
compression: Option<Compression>,
version: Version,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<ListArray<O>>
where
Vec<u8>: TryInto<O::Bytes>,
Expand Down
6 changes: 2 additions & 4 deletions src/io/ipc/read/array/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ use crate::error::{Error, Result};
use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::{
Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, ReadBuffer, Version,
};
use super::super::{Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, Version};

#[allow(clippy::too_many_arguments)]
pub fn read_map<R: Read + Seek>(
Expand All @@ -25,7 +23,7 @@ pub fn read_map<R: Read + Seek>(
is_little_endian: bool,
compression: Option<Compression>,
version: Version,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<MapArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/read/array/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::error::{Error, Result};
use crate::{array::PrimitiveArray, types::NativeType};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

#[allow(clippy::too_many_arguments)]
pub fn read_primitive<T: NativeType, R: Read + Seek>(
Expand All @@ -17,7 +17,7 @@ pub fn read_primitive<T: NativeType, R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<PrimitiveArray<T>>
where
Vec<u8>: TryInto<T::Bytes>,
Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/read/array/struct_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::error::{Error, Result};
use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::{Compression, Dictionaries, IpcBuffer, Node, ReadBuffer, Version};
use super::super::{Compression, Dictionaries, IpcBuffer, Node, Version};

#[allow(clippy::too_many_arguments)]
pub fn read_struct<R: Read + Seek>(
Expand All @@ -22,7 +22,7 @@ pub fn read_struct<R: Read + Seek>(
is_little_endian: bool,
compression: Option<Compression>,
version: Version,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<StructArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand Down
6 changes: 2 additions & 4 deletions src/io/ipc/read/array/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ use crate::error::{Error, Result};
use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::{
Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, ReadBuffer, Version,
};
use super::super::{Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, Version};

#[allow(clippy::too_many_arguments)]
pub fn read_union<R: Read + Seek>(
Expand All @@ -25,7 +23,7 @@ pub fn read_union<R: Read + Seek>(
is_little_endian: bool,
compression: Option<Compression>,
version: Version,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<UnionArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/read/array/utf8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::datatypes::DataType;
use crate::error::{Error, Result};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

#[allow(clippy::too_many_arguments)]
pub fn read_utf8<O: Offset, R: Read + Seek>(
Expand All @@ -18,7 +18,7 @@ pub fn read_utf8<O: Offset, R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<Utf8Array<O>> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand Down
5 changes: 2 additions & 3 deletions src/io/ipc/read/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::datatypes::{DataType, Field};
use crate::error::{Error, Result};
use crate::io::ipc::read::OutOfSpecKind;
use crate::io::ipc::{IpcField, IpcSchema};
use crate::io::ReadBuffer;

use super::deserialize::{read, skip};
use super::Dictionaries;
Expand Down Expand Up @@ -86,7 +85,7 @@ pub fn read_record_batch<R: Read + Seek>(
reader: &mut R,
block_offset: u64,
file_size: u64,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<Chunk<Box<dyn Array>>> {
assert_eq!(fields.len(), ipc_schema.fields.len());
let buffers = batch
Expand Down Expand Up @@ -234,7 +233,7 @@ pub fn read_dictionary<R: Read + Seek>(
reader: &mut R,
block_offset: u64,
file_size: u64,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<()> {
if batch
.is_delta()
Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::error::Result;
use crate::io::ipc::IpcField;

use super::{array::*, Dictionaries};
use super::{IpcBuffer, Node, ReadBuffer};
use super::{IpcBuffer, Node};

#[allow(clippy::too_many_arguments)]
pub fn read<R: Read + Seek>(
Expand All @@ -24,7 +24,7 @@ pub fn read<R: Read + Seek>(
is_little_endian: bool,
compression: Option<BodyCompressionRef>,
version: MetadataVersion,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<Box<dyn Array>> {
use PhysicalType::*;
let data_type = field.data_type.clone();
Expand Down
63 changes: 42 additions & 21 deletions src/io/ipc/read/file_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use super::reader::{deserialize_footer, get_serialized_batch};
use super::Dictionaries;
use super::FileMetadata;
use super::OutOfSpecKind;
use super::ReadBuffer;

/// Async reader for Arrow IPC files
pub struct FileStream<'a> {
Expand Down Expand Up @@ -142,9 +141,14 @@ where
{
let footer_size = read_footer_len(reader).await?;
// Read footer
let mut footer = vec![0; footer_size];
reader.seek(SeekFrom::End(-10 - footer_size as i64)).await?;
reader.read_exact(&mut footer).await?;

let mut footer = vec![];
footer.try_reserve(footer_size)?;
reader
.take(footer_size as u64)
.read_to_end(&mut footer)
.await?;

deserialize_footer(&footer, u64::MAX)
}
Expand All @@ -156,9 +160,9 @@ async fn read_batch<R>(
metadata: &FileMetadata,
projection: Option<&[usize]>,
block: usize,
meta_buffer: &mut ReadBuffer,
block_buffer: &mut ReadBuffer,
scratch: &mut ReadBuffer,
meta_buffer: &mut Vec<u8>,
block_buffer: &mut Vec<u8>,
scratch: &mut Vec<u8>,
) -> Result<Chunk<Box<dyn Array>>>
where
R: AsyncRead + AsyncSeek + Unpin,
Expand All @@ -181,10 +185,14 @@ where
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?;

meta_buffer.set_len(meta_len);
reader.read_exact(meta_buffer.as_mut()).await?;
meta_buffer.clear();
meta_buffer.try_reserve(meta_len)?;
(&mut reader)
.take(meta_len as u64)
.read_to_end(meta_buffer)
.await?;

let message = arrow_format::ipc::MessageRef::read_as_root(meta_buffer.as_ref())
let message = arrow_format::ipc::MessageRef::read_as_root(meta_buffer)
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?;

let batch = get_serialized_batch(&message)?;
Expand All @@ -195,9 +203,14 @@ where
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?;

block_buffer.set_len(block_length);
reader.read_exact(block_buffer.as_mut()).await?;
let mut cursor = std::io::Cursor::new(block_buffer.as_ref());
block_buffer.clear();
block_buffer.try_reserve(block_length)?;
reader
.take(block_length as u64)
.read_to_end(block_buffer)
.await?;

let mut cursor = std::io::Cursor::new(&block_buffer);

read_record_batch(
batch,
Expand All @@ -220,14 +233,14 @@ async fn read_dictionaries<R>(
fields: &[Field],
ipc_schema: &IpcSchema,
blocks: &[Block],
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<Dictionaries>
where
R: AsyncRead + AsyncSeek + Unpin,
{
let mut dictionaries = Default::default();
let mut data: ReadBuffer = vec![].into();
let mut buffer: ReadBuffer = vec![].into();
let mut data: Vec<u8> = vec![];
let mut buffer: Vec<u8> = vec![];

for block in blocks {
let offset: u64 = block
Expand All @@ -250,11 +263,15 @@ where
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferHeader(err)))?
.ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageHeader))?;

buffer.set_len(length);
match header {
MessageHeaderRef::DictionaryBatch(batch) => {
reader.read_exact(buffer.as_mut()).await?;
let mut cursor = std::io::Cursor::new(buffer.as_ref());
buffer.clear();
buffer.try_reserve(length)?;
(&mut reader)
.take(length as u64)
.read_to_end(&mut buffer)
.await?;
let mut cursor = std::io::Cursor::new(&buffer);
read_dictionary(
batch,
fields,
Expand All @@ -272,7 +289,7 @@ where
Ok(dictionaries)
}

async fn read_dictionary_message<R>(mut reader: R, offset: u64, data: &mut ReadBuffer) -> Result<()>
async fn read_dictionary_message<R>(mut reader: R, offset: u64, data: &mut Vec<u8>) -> Result<()>
where
R: AsyncRead + AsyncSeek + Unpin,
{
Expand All @@ -288,8 +305,12 @@ where
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

data.set_len(footer_size);
reader.read_exact(data.as_mut()).await?;
data.clear();
data.try_reserve(footer_size)?;
(&mut reader)
.take(footer_size as u64)
.read_to_end(data)
.await?;

Ok(())
}
Expand Down
2 changes: 0 additions & 2 deletions src/io/ipc/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ pub mod stream_async;
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_read_async")))]
pub mod file_async;

use super::super::ReadBuffer;

pub use common::{read_dictionary, read_record_batch};
pub use reader::{
read_batch, read_file_dictionaries, read_file_metadata, FileMetadata, FileReader,
Expand Down
Loading