Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Audited all IPC read code
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jun 16, 2022
1 parent 5aeee0e commit 775aab0
Show file tree
Hide file tree
Showing 20 changed files with 491 additions and 207 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ indexmap = { version = "^1.6", optional = true }
# used to print columns in a nice columnar format
comfy-table = { version = "5.0", optional = true, default-features = false }

arrow-format = { version = "0.6", optional = true, features = ["ipc"] }
arrow-format = { version = "0.7", optional = true, features = ["ipc"] }

hex = { version = "^0.4", optional = true }

Expand Down
8 changes: 0 additions & 8 deletions src/io/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@
//! [2](https://github.com/jorgecarleitao/arrow2/blob/main/examples/ipc_file_write.rs),
//! [3](https://github.com/jorgecarleitao/arrow2/tree/main/examples/ipc_pyarrow)).
use crate::error::Error;

mod compression;
mod endianess;

Expand Down Expand Up @@ -103,9 +101,3 @@ pub struct IpcSchema {
/// Endianness of the file
pub is_little_endian: bool,
}

impl From<arrow_format::ipc::planus::Error> for Error {
fn from(error: arrow_format::ipc::planus::Error) -> Self {
Error::OutOfSpec(error.to_string())
}
}
9 changes: 7 additions & 2 deletions src/io/ipc/read/array/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::datatypes::DataType;
use crate::error::{Error, Result};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

pub fn read_binary<O: Offset, R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
Expand All @@ -34,9 +34,14 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
compression,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let offsets: Buffer<O> = read_buffer(
buffers,
1 + field_node.length() as usize,
1 + length,
reader,
block_offset,
is_little_endian,
Expand Down
8 changes: 6 additions & 2 deletions src/io/ipc/read/array/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::datatypes::DataType;
use crate::error::{Error, Result};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

pub fn read_boolean<R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
Expand All @@ -24,7 +24,11 @@ pub fn read_boolean<R: Read + Seek>(
))
})?;

let length = field_node.length() as usize;
let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let validity = read_validity(
buffers,
field_node,
Expand Down
9 changes: 7 additions & 2 deletions src/io/ipc/read/array/fixed_size_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::datatypes::DataType;
use crate::error::{Error, Result};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

pub fn read_fixed_size_binary<R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
Expand All @@ -33,7 +33,12 @@ pub fn read_fixed_size_binary<R: Read + Seek>(
compression,
)?;

let length = field_node.length() as usize * FixedSizeBinaryArray::get_size(&data_type);
let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let length = length * FixedSizeBinaryArray::get_size(&data_type);
let values = read_buffer(
buffers,
length,
Expand Down
9 changes: 7 additions & 2 deletions src/io/ipc/read/array/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::Dictionaries;
use super::super::{Compression, IpcBuffer, Node, Version};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, Version};

#[allow(clippy::too_many_arguments)]
pub fn read_list<O: Offset, R: Read + Seek>(
Expand Down Expand Up @@ -45,9 +45,14 @@ where
compression,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let offsets = read_buffer::<O, _>(
buffers,
1 + field_node.length() as usize,
1 + length,
reader,
block_offset,
is_little_endian,
Expand Down
9 changes: 7 additions & 2 deletions src/io/ipc/read/array/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::Dictionaries;
use super::super::{Compression, IpcBuffer, Node, Version};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, Version};

#[allow(clippy::too_many_arguments)]
pub fn read_map<R: Read + Seek>(
Expand Down Expand Up @@ -41,9 +41,14 @@ pub fn read_map<R: Read + Seek>(
compression,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let offsets = read_buffer::<i32, _>(
buffers,
1 + field_node.length() as usize,
1 + length,
reader,
block_offset,
is_little_endian,
Expand Down
9 changes: 7 additions & 2 deletions src/io/ipc/read/array/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
error::{Error, Result},
};

use super::super::Node;
use super::super::{Node, OutOfSpecKind};

pub fn read_null(field_nodes: &mut VecDeque<Node>, data_type: DataType) -> Result<NullArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Expand All @@ -16,7 +16,12 @@ pub fn read_null(field_nodes: &mut VecDeque<Node>, data_type: DataType) -> Resul
))
})?;

NullArray::try_new(data_type, field_node.length() as usize)
let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

NullArray::try_new(data_type, length)
}

pub fn skip_null(field_nodes: &mut VecDeque<Node>) -> Result<()> {
Expand Down
9 changes: 7 additions & 2 deletions src/io/ipc/read/array/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::error::{Error, Result};
use crate::{array::PrimitiveArray, types::NativeType};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

pub fn read_primitive<T: NativeType, R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
Expand Down Expand Up @@ -36,9 +36,14 @@ where
compression,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let values = read_buffer(
buffers,
field_node.length() as usize,
length,
reader,
block_offset,
is_little_endian,
Expand Down
11 changes: 8 additions & 3 deletions src/io/ipc/read/array/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::Dictionaries;
use super::super::{Compression, IpcBuffer, Node, Version};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, Version};

#[allow(clippy::too_many_arguments)]
pub fn read_union<R: Read + Seek>(
Expand Down Expand Up @@ -38,9 +38,14 @@ pub fn read_union<R: Read + Seek>(
.ok_or_else(|| Error::oos("IPC: missing validity buffer."))?;
};

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let types = read_buffer(
buffers,
field_node.length() as usize,
length,
reader,
block_offset,
is_little_endian,
Expand All @@ -51,7 +56,7 @@ pub fn read_union<R: Read + Seek>(
if !mode.is_sparse() {
Some(read_buffer(
buffers,
field_node.length() as usize,
length,
reader,
block_offset,
is_little_endian,
Expand Down
9 changes: 7 additions & 2 deletions src/io/ipc/read/array/utf8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::datatypes::DataType;
use crate::error::{Error, Result};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

pub fn read_utf8<O: Offset, R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
Expand All @@ -34,9 +34,14 @@ pub fn read_utf8<O: Offset, R: Read + Seek>(
compression,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let offsets: Buffer<O> = read_buffer(
buffers,
1 + field_node.length() as usize,
1 + length,
reader,
block_offset,
is_little_endian,
Expand Down
73 changes: 48 additions & 25 deletions src/io/ipc/read/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::array::*;
use crate::chunk::Chunk;
use crate::datatypes::{DataType, Field};
use crate::error::{Error, Result};
use crate::io::ipc::read::OutOfSpecKind;
use crate::io::ipc::{IpcField, IpcSchema};

use super::deserialize::{read, skip};
Expand Down Expand Up @@ -87,21 +88,33 @@ pub fn read_record_batch<R: Read + Seek>(
) -> Result<Chunk<Box<dyn Array>>> {
assert_eq!(fields.len(), ipc_schema.fields.len());
let buffers = batch
.buffers()?
.ok_or_else(|| Error::oos("IPC RecordBatch must contain buffers"))?;
.buffers()
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferBuffers(err)))?
.ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageBuffers))?;
let mut buffers: VecDeque<arrow_format::ipc::BufferRef> = buffers.iter().collect();

for buffer in buffers.iter() {
if buffer.length() as u64 > file_size {
return Err(Error::oos(
"Any buffer's length must be smaller than the size of the file",
));
}
// check that the sum of the sizes of all buffers is <= than the size of the file
let buffers_size = buffers
.iter()
.map(|buffer| {
let buffer_size: u64 = buffer
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;
Ok(buffer_size)
})
.sum::<Result<u64>>()?;
if buffers_size > file_size {
return Err(Error::from(OutOfSpecKind::InvalidBuffersLength {
buffers_size,
file_size,
}));
}

let field_nodes = batch
.nodes()?
.ok_or_else(|| Error::oos("IPC RecordBatch must contain field nodes"))?;
.nodes()
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferNodes(err)))?
.ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageNodes))?;
let mut field_nodes = field_nodes.iter().collect::<VecDeque<_>>();

let columns = if let Some(projection) = projection {
Expand All @@ -119,7 +132,9 @@ pub fn read_record_batch<R: Read + Seek>(
dictionaries,
block_offset,
ipc_schema.is_little_endian,
batch.compression()?,
batch.compression().map_err(|err| {
Error::from(OutOfSpecKind::InvalidFlatbufferCompression(err))
})?,
version,
)?)),
ProjectionResult::NotSelected((field, _)) => {
Expand All @@ -143,7 +158,9 @@ pub fn read_record_batch<R: Read + Seek>(
dictionaries,
block_offset,
ipc_schema.is_little_endian,
batch.compression()?,
batch.compression().map_err(|err| {
Error::from(OutOfSpecKind::InvalidFlatbufferCompression(err))
})?,
version,
)
})
Expand Down Expand Up @@ -199,10 +216,7 @@ fn first_dict_field<'a>(
return Ok(field);
}
}
Err(Error::OutOfSpec(format!(
"dictionary id {} not found in schema",
id
)))
Err(Error::from(OutOfSpecKind::InvalidId { requested_id: id }))
}

/// Read the dictionary from the buffer and provided metadata,
Expand All @@ -216,23 +230,29 @@ pub fn read_dictionary<R: Read + Seek>(
block_offset: u64,
file_size: u64,
) -> Result<()> {
if batch.is_delta()? {
if batch
.is_delta()
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferIsDelta(err)))?
{
return Err(Error::NotYetImplemented(
"delta dictionary batches not supported".to_string(),
));
}

let id = batch.id()?;
let id = batch
.id()
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferId(err)))?;
let (first_field, first_ipc_field) = first_dict_field(id, fields, &ipc_schema.fields)?;

// As the dictionary batch does not contain the type of the
// values array, we need to retrieve this from the schema.
// Get an array representing this dictionary's values.
let dictionary_values: Box<dyn Array> = match &first_field.data_type {
let dictionary_values: Box<dyn Array> = match first_field.data_type.to_logical_type() {
DataType::Dictionary(_, ref value_type, _) => {
let batch = batch
.data()?
.ok_or_else(|| Error::oos("The dictionary batch must have data."))?;
.data()
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferData(err)))?
.ok_or_else(|| Error::from(OutOfSpecKind::MissingData))?;

// Make a fake schema for the dictionary batch.
let fields = vec![Field::new("", value_type.as_ref().clone(), false)];
Expand All @@ -252,11 +272,14 @@ pub fn read_dictionary<R: Read + Seek>(
file_size,
)?;
let mut arrays = columns.into_arrays();
Some(arrays.pop().unwrap())
arrays.pop().unwrap()
}
_ => None,
}
.ok_or_else(|| Error::InvalidArgumentError("dictionary id not found in schema".to_string()))?;
_ => {
return Err(Error::from(OutOfSpecKind::InvalidIdDataType {
requested_id: id,
}))
}
};

dictionaries.insert(id, dictionary_values);

Expand Down
Loading

0 comments on commit 775aab0

Please sign in to comment.