Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Reduced allocations of reading bitmaps from IPC #1126

Merged
merged 3 commits into from
Jul 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ impl From<simdutf8::basic::Utf8Error> for Error {
}
}

impl From<std::collections::TryReserveError> for Error {
fn from(_: std::collections::TryReserveError) -> Error {
Error::Overflow
}
}

impl Display for Error {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Expand Down
5 changes: 3 additions & 2 deletions src/io/ipc/read/array/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::datatypes::DataType;
use crate::error::{Error, Result};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

#[allow(clippy::too_many_arguments)]
pub fn read_binary<O: Offset, R: Read + Seek>(
Expand All @@ -18,7 +18,7 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<BinaryArray<O>> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand All @@ -34,6 +34,7 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)?;

let length: usize = field_node
Expand Down
4 changes: 4 additions & 0 deletions src/io/ipc/read/array/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::error::{Error, Result};
use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

#[allow(clippy::too_many_arguments)]
pub fn read_boolean<R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
data_type: DataType,
Expand All @@ -16,6 +17,7 @@ pub fn read_boolean<R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
scratch: &mut Vec<u8>,
) -> Result<BooleanArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand All @@ -36,6 +38,7 @@ pub fn read_boolean<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)?;

let values = read_bitmap(
Expand All @@ -45,6 +48,7 @@ pub fn read_boolean<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)?;
BooleanArray::try_new(data_type, values, validity)
}
Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/read/array/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::array::{DictionaryArray, DictionaryKey};
use crate::error::{Error, Result};

use super::super::Dictionaries;
use super::super::{Compression, IpcBuffer, Node, ReadBuffer};
use super::super::{Compression, IpcBuffer, Node};
use super::{read_primitive, skip_primitive};

#[allow(clippy::too_many_arguments)]
Expand All @@ -19,7 +19,7 @@ pub fn read_dictionary<T: DictionaryKey, R: Read + Seek>(
block_offset: u64,
compression: Option<Compression>,
is_little_endian: bool,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<DictionaryArray<T>>
where
Vec<u8>: TryInto<T::Bytes>,
Expand Down
5 changes: 3 additions & 2 deletions src/io/ipc/read/array/fixed_size_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::datatypes::DataType;
use crate::error::{Error, Result};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

#[allow(clippy::too_many_arguments)]
pub fn read_fixed_size_binary<R: Read + Seek>(
Expand All @@ -17,7 +17,7 @@ pub fn read_fixed_size_binary<R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<FixedSizeBinaryArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand All @@ -33,6 +33,7 @@ pub fn read_fixed_size_binary<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)?;

let length: usize = field_node
Expand Down
5 changes: 3 additions & 2 deletions src/io/ipc/read/array/fixed_size_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::error::{Error, Result};
use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::{Compression, Dictionaries, IpcBuffer, Node, ReadBuffer, Version};
use super::super::{Compression, Dictionaries, IpcBuffer, Node, Version};

#[allow(clippy::too_many_arguments)]
pub fn read_fixed_size_list<R: Read + Seek>(
Expand All @@ -22,7 +22,7 @@ pub fn read_fixed_size_list<R: Read + Seek>(
is_little_endian: bool,
compression: Option<Compression>,
version: Version,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<FixedSizeListArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand All @@ -38,6 +38,7 @@ pub fn read_fixed_size_list<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)?;

let (field, _) = FixedSizeListArray::get_child_and_size(&data_type);
Expand Down
7 changes: 3 additions & 4 deletions src/io/ipc/read/array/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ use crate::error::{Error, Result};
use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::{
Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, ReadBuffer, Version,
};
use super::super::{Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, Version};

#[allow(clippy::too_many_arguments)]
pub fn read_list<O: Offset, R: Read + Seek>(
Expand All @@ -26,7 +24,7 @@ pub fn read_list<O: Offset, R: Read + Seek>(
is_little_endian: bool,
compression: Option<Compression>,
version: Version,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<ListArray<O>>
where
Vec<u8>: TryInto<O::Bytes>,
Expand All @@ -45,6 +43,7 @@ where
block_offset,
is_little_endian,
compression,
scratch,
)?;

let length: usize = field_node
Expand Down
7 changes: 3 additions & 4 deletions src/io/ipc/read/array/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ use crate::error::{Error, Result};
use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::{
Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, ReadBuffer, Version,
};
use super::super::{Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, Version};

#[allow(clippy::too_many_arguments)]
pub fn read_map<R: Read + Seek>(
Expand All @@ -25,7 +23,7 @@ pub fn read_map<R: Read + Seek>(
is_little_endian: bool,
compression: Option<Compression>,
version: Version,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<MapArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand All @@ -41,6 +39,7 @@ pub fn read_map<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)?;

let length: usize = field_node
Expand Down
5 changes: 3 additions & 2 deletions src/io/ipc/read/array/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::error::{Error, Result};
use crate::{array::PrimitiveArray, types::NativeType};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

#[allow(clippy::too_many_arguments)]
pub fn read_primitive<T: NativeType, R: Read + Seek>(
Expand All @@ -17,7 +17,7 @@ pub fn read_primitive<T: NativeType, R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<PrimitiveArray<T>>
where
Vec<u8>: TryInto<T::Bytes>,
Expand All @@ -36,6 +36,7 @@ where
block_offset,
is_little_endian,
compression,
scratch,
)?;

let length: usize = field_node
Expand Down
5 changes: 3 additions & 2 deletions src/io/ipc/read/array/struct_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::error::{Error, Result};
use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::{Compression, Dictionaries, IpcBuffer, Node, ReadBuffer, Version};
use super::super::{Compression, Dictionaries, IpcBuffer, Node, Version};

#[allow(clippy::too_many_arguments)]
pub fn read_struct<R: Read + Seek>(
Expand All @@ -22,7 +22,7 @@ pub fn read_struct<R: Read + Seek>(
is_little_endian: bool,
compression: Option<Compression>,
version: Version,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<StructArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand All @@ -38,6 +38,7 @@ pub fn read_struct<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)?;

let fields = StructArray::get_fields(&data_type);
Expand Down
6 changes: 2 additions & 4 deletions src/io/ipc/read/array/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ use crate::error::{Error, Result};
use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::{
Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, ReadBuffer, Version,
};
use super::super::{Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, Version};

#[allow(clippy::too_many_arguments)]
pub fn read_union<R: Read + Seek>(
Expand All @@ -25,7 +23,7 @@ pub fn read_union<R: Read + Seek>(
is_little_endian: bool,
compression: Option<Compression>,
version: Version,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<UnionArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand Down
5 changes: 3 additions & 2 deletions src/io/ipc/read/array/utf8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::datatypes::DataType;
use crate::error::{Error, Result};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

#[allow(clippy::too_many_arguments)]
pub fn read_utf8<O: Offset, R: Read + Seek>(
Expand All @@ -18,7 +18,7 @@ pub fn read_utf8<O: Offset, R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<Utf8Array<O>> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand All @@ -34,6 +34,7 @@ pub fn read_utf8<O: Offset, R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)?;

let length: usize = field_node
Expand Down
5 changes: 2 additions & 3 deletions src/io/ipc/read/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::datatypes::{DataType, Field};
use crate::error::{Error, Result};
use crate::io::ipc::read::OutOfSpecKind;
use crate::io::ipc::{IpcField, IpcSchema};
use crate::io::ReadBuffer;

use super::deserialize::{read, skip};
use super::Dictionaries;
Expand Down Expand Up @@ -86,7 +85,7 @@ pub fn read_record_batch<R: Read + Seek>(
reader: &mut R,
block_offset: u64,
file_size: u64,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<Chunk<Box<dyn Array>>> {
assert_eq!(fields.len(), ipc_schema.fields.len());
let buffers = batch
Expand Down Expand Up @@ -234,7 +233,7 @@ pub fn read_dictionary<R: Read + Seek>(
reader: &mut R,
block_offset: u64,
file_size: u64,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<()> {
if batch
.is_delta()
Expand Down
7 changes: 4 additions & 3 deletions src/io/ipc/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::error::Result;
use crate::io::ipc::IpcField;

use super::{array::*, Dictionaries};
use super::{IpcBuffer, Node, ReadBuffer};
use super::{IpcBuffer, Node};

#[allow(clippy::too_many_arguments)]
pub fn read<R: Read + Seek>(
Expand All @@ -24,7 +24,7 @@ pub fn read<R: Read + Seek>(
is_little_endian: bool,
compression: Option<BodyCompressionRef>,
version: MetadataVersion,
scratch: &mut ReadBuffer,
scratch: &mut Vec<u8>,
) -> Result<Box<dyn Array>> {
use PhysicalType::*;
let data_type = field.data_type.clone();
Expand All @@ -39,6 +39,7 @@ pub fn read<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)
.map(|x| x.boxed()),
Primitive(primitive) => with_match_primitive_type!(primitive, |$T| {
Expand All @@ -50,7 +51,7 @@ pub fn read<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch
scratch,
)
.map(|x| x.boxed())
}),
Expand Down
Loading