Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Made all panics in IPC read errors #722

Merged
merged 1 commit into from
Jan 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ impl ArrowError {
pub fn from_external_error(error: impl std::error::Error + Send + Sync + 'static) -> Self {
Self::External("".to_string(), Box::new(error))
}

pub(crate) fn oos<A: Into<String>>(msg: A) -> Self {
Self::OutOfSpec(msg.into())
}

pub(crate) fn nyi<A: Into<String>>(msg: A) -> Self {
Self::NotYetImplemented(msg.into())
}
}

impl From<::std::io::Error> for ArrowError {
Expand Down
2 changes: 1 addition & 1 deletion src/io/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ fn schema_as_encoded_data(schema: &Schema, ipc_fields: &[IpcField]) -> EncodedDa
pub fn deserialize_schemas(bytes: &[u8]) -> Result<(Schema, IpcSchema)> {
if let Ok(ipc) = ipc::Message::root_as_message(bytes) {
if let Some(schemas) = ipc.header_as_schema().map(read::fb_to_schema) {
Ok(schemas)
schemas
} else {
Err(ArrowError::OutOfSpec(
"Unable to get head as schema".to_string(),
Expand Down
6 changes: 5 additions & 1 deletion src/io/ipc/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ pub fn decompress_zstd(_input_buf: &[u8], _output_buf: &mut [u8]) -> Result<()>
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))]
pub fn compress_lz4(input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
use std::io::Write;
let mut encoder = lz4::EncoderBuilder::new().build(output_buf).unwrap();

use crate::error::ArrowError;
let mut encoder = lz4::EncoderBuilder::new()
.build(output_buf)
.map_err(ArrowError::from)?;
encoder.write_all(input_buf)?;
encoder.finish().1.map_err(|e| e.into())
}
Expand Down
33 changes: 26 additions & 7 deletions src/io/ipc/read/array/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use arrow_format::ipc;
use crate::array::{BinaryArray, Offset};
use crate::buffer::Buffer;
use crate::datatypes::DataType;
use crate::error::Result;
use crate::error::{ArrowError, Result};

use super::super::deserialize::Node;
use super::super::read_basic::*;
Expand All @@ -20,7 +20,12 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
is_little_endian: bool,
compression: Option<ipc::Message::BodyCompression>,
) -> Result<BinaryArray<O>> {
let field_node = field_nodes.pop_front().unwrap();
let field_node = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos(format!(
"IPC: unable to fetch the field for {:?}. The file or stream is corrupted.",
data_type
))
})?;

let validity = read_validity(
buffers,
Expand Down Expand Up @@ -57,10 +62,24 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
))
}

pub fn skip_binary(field_nodes: &mut VecDeque<Node>, buffers: &mut VecDeque<&ipc::Schema::Buffer>) {
let _ = field_nodes.pop_front().unwrap();
pub fn skip_binary(
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<&ipc::Schema::Buffer>,
) -> Result<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos(
"IPC: unable to fetch the field for binary. The file or stream is corrupted.",
)
})?;

let _ = buffers.pop_front().unwrap();
let _ = buffers.pop_front().unwrap();
let _ = buffers.pop_front().unwrap();
let _ = buffers
.pop_front()
.ok_or_else(|| ArrowError::oos("IPC: missing validity buffer."))?;
let _ = buffers
.pop_front()
.ok_or_else(|| ArrowError::oos("IPC: missing offsets buffer."))?;
let _ = buffers
.pop_front()
.ok_or_else(|| ArrowError::oos("IPC: missing values buffer."))?;
Ok(())
}
26 changes: 20 additions & 6 deletions src/io/ipc/read/array/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use arrow_format::ipc;

use crate::array::BooleanArray;
use crate::datatypes::DataType;
use crate::error::Result;
use crate::error::{ArrowError, Result};

use super::super::deserialize::Node;
use super::super::read_basic::*;
Expand All @@ -19,7 +19,12 @@ pub fn read_boolean<R: Read + Seek>(
is_little_endian: bool,
compression: Option<ipc::Message::BodyCompression>,
) -> Result<BooleanArray> {
let field_node = field_nodes.pop_front().unwrap();
let field_node = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos(format!(
"IPC: unable to fetch the field for {:?}. The file or stream is corrupted.",
data_type
))
})?;

let length = field_node.length() as usize;
let validity = read_validity(
Expand All @@ -45,9 +50,18 @@ pub fn read_boolean<R: Read + Seek>(
pub fn skip_boolean(
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<&ipc::Schema::Buffer>,
) {
let _ = field_nodes.pop_front().unwrap();
) -> Result<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos(
"IPC: unable to fetch the field for boolean. The file or stream is corrupted.",
)
})?;

let _ = buffers.pop_front().unwrap();
let _ = buffers.pop_front().unwrap();
let _ = buffers
.pop_front()
.ok_or_else(|| ArrowError::oos("IPC: missing validity buffer."))?;
let _ = buffers
.pop_front()
.ok_or_else(|| ArrowError::oos("IPC: missing values buffer."))?;
Ok(())
}
2 changes: 1 addition & 1 deletion src/io/ipc/read/array/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,6 @@ where
pub fn skip_dictionary(
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<&ipc::Schema::Buffer>,
) {
) -> Result<()> {
skip_primitive(field_nodes, buffers)
}
26 changes: 20 additions & 6 deletions src/io/ipc/read/array/fixed_size_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use arrow_format::ipc;

use crate::array::FixedSizeBinaryArray;
use crate::datatypes::DataType;
use crate::error::Result;
use crate::error::{ArrowError, Result};

use super::super::deserialize::Node;
use super::super::read_basic::*;
Expand All @@ -19,7 +19,12 @@ pub fn read_fixed_size_binary<R: Read + Seek>(
is_little_endian: bool,
compression: Option<ipc::Message::BodyCompression>,
) -> Result<FixedSizeBinaryArray> {
let field_node = field_nodes.pop_front().unwrap();
let field_node = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos(format!(
"IPC: unable to fetch the field for {:?}. The file or stream is corrupted.",
data_type
))
})?;

let validity = read_validity(
buffers,
Expand All @@ -46,9 +51,18 @@ pub fn read_fixed_size_binary<R: Read + Seek>(
pub fn skip_fixed_size_binary(
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<&ipc::Schema::Buffer>,
) {
let _ = field_nodes.pop_front().unwrap();
) -> Result<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos(
"IPC: unable to fetch the field for fixed-size binary. The file or stream is corrupted.",
)
})?;

let _ = buffers.pop_front().unwrap();
let _ = buffers.pop_front().unwrap();
let _ = buffers
.pop_front()
.ok_or_else(|| ArrowError::oos("IPC: missing validity buffer."))?;
let _ = buffers
.pop_front()
.ok_or_else(|| ArrowError::oos("IPC: missing values buffer."))?;
Ok(())
}
21 changes: 16 additions & 5 deletions src/io/ipc/read/array/fixed_size_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use arrow_format::ipc;

use crate::array::FixedSizeListArray;
use crate::datatypes::DataType;
use crate::error::Result;
use crate::error::{ArrowError, Result};

use super::super::super::IpcField;
use super::super::deserialize::{read, skip, Node};
Expand All @@ -25,7 +25,12 @@ pub fn read_fixed_size_list<R: Read + Seek>(
compression: Option<ipc::Message::BodyCompression>,
version: ipc::Schema::MetadataVersion,
) -> Result<FixedSizeListArray> {
let field_node = field_nodes.pop_front().unwrap();
let field_node = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos(format!(
"IPC: unable to fetch the field for {:?}. The file or stream is corrupted.",
data_type
))
})?;

let validity = read_validity(
buffers,
Expand Down Expand Up @@ -57,10 +62,16 @@ pub fn skip_fixed_size_list(
field_nodes: &mut VecDeque<Node>,
data_type: &DataType,
buffers: &mut VecDeque<&ipc::Schema::Buffer>,
) {
let _ = field_nodes.pop_front().unwrap();
) -> Result<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos(
"IPC: unable to fetch the field for fixed-size list. The file or stream is corrupted.",
)
})?;

let _ = buffers.pop_front().unwrap();
let _ = buffers
.pop_front()
.ok_or_else(|| ArrowError::oos("IPC: missing validity buffer."))?;

let (field, _) = FixedSizeListArray::get_child_and_size(data_type);

Expand Down
23 changes: 17 additions & 6 deletions src/io/ipc/read/array/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use arrow_format::ipc;
use crate::array::{ListArray, Offset};
use crate::buffer::Buffer;
use crate::datatypes::DataType;
use crate::error::Result;
use crate::error::{ArrowError, Result};

use super::super::super::IpcField;
use super::super::deserialize::{read, skip, Node};
Expand All @@ -30,7 +30,12 @@ pub fn read_list<O: Offset, R: Read + Seek>(
where
Vec<u8>: TryInto<O::Bytes>,
{
let field_node = field_nodes.pop_front().unwrap();
let field_node = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos(format!(
"IPC: unable to fetch the field for {:?}. The file or stream is corrupted.",
data_type
))
})?;

let validity = read_validity(
buffers,
Expand Down Expand Up @@ -73,11 +78,17 @@ pub fn skip_list<O: Offset>(
field_nodes: &mut VecDeque<Node>,
data_type: &DataType,
buffers: &mut VecDeque<&ipc::Schema::Buffer>,
) {
let _ = field_nodes.pop_front().unwrap();
) -> Result<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos("IPC: unable to fetch the field for list. The file or stream is corrupted.")
})?;

let _ = buffers.pop_front().unwrap();
let _ = buffers.pop_front().unwrap();
let _ = buffers
.pop_front()
.ok_or_else(|| ArrowError::oos("IPC: missing validity buffer."))?;
let _ = buffers
.pop_front()
.ok_or_else(|| ArrowError::oos("IPC: missing offsets buffer."))?;

let data_type = ListArray::<O>::get_child_type(data_type);

Expand Down
23 changes: 17 additions & 6 deletions src/io/ipc/read/array/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use arrow_format::ipc;
use crate::array::MapArray;
use crate::buffer::Buffer;
use crate::datatypes::DataType;
use crate::error::Result;
use crate::error::{ArrowError, Result};

use super::super::super::IpcField;
use super::super::deserialize::{read, skip, Node};
Expand All @@ -26,7 +26,12 @@ pub fn read_map<R: Read + Seek>(
compression: Option<ipc::Message::BodyCompression>,
version: ipc::Schema::MetadataVersion,
) -> Result<MapArray> {
let field_node = field_nodes.pop_front().unwrap();
let field_node = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos(format!(
"IPC: unable to fetch the field for {:?}. The file or stream is corrupted.",
data_type
))
})?;

let validity = read_validity(
buffers,
Expand Down Expand Up @@ -69,11 +74,17 @@ pub fn skip_map(
field_nodes: &mut VecDeque<Node>,
data_type: &DataType,
buffers: &mut VecDeque<&ipc::Schema::Buffer>,
) {
let _ = field_nodes.pop_front().unwrap();
) -> Result<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos("IPC: unable to fetch the field for map. The file or stream is corrupted.")
})?;

let _ = buffers.pop_front().unwrap();
let _ = buffers.pop_front().unwrap();
let _ = buffers
.pop_front()
.ok_or_else(|| ArrowError::oos("IPC: missing validity buffer."))?;
let _ = buffers
.pop_front()
.ok_or_else(|| ArrowError::oos("IPC: missing offsets buffer."))?;

let data_type = MapArray::get_field(data_type).data_type();

Expand Down
28 changes: 21 additions & 7 deletions src/io/ipc/read/array/null.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,30 @@
use std::collections::VecDeque;

use crate::{array::NullArray, datatypes::DataType};
use crate::{
array::NullArray,
datatypes::DataType,
error::{ArrowError, Result},
};

use super::super::deserialize::Node;

pub fn read_null(field_nodes: &mut VecDeque<Node>, data_type: DataType) -> NullArray {
NullArray::from_data(
pub fn read_null(field_nodes: &mut VecDeque<Node>, data_type: DataType) -> Result<NullArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos(format!(
"IPC: unable to fetch the field for {:?}. The file or stream is corrupted.",
data_type
))
})?;

Ok(NullArray::from_data(
data_type,
field_nodes.pop_front().unwrap().length() as usize,
)
field_node.length() as usize,
))
}

pub fn skip_null(field_nodes: &mut VecDeque<Node>) {
let _ = field_nodes.pop_front();
pub fn skip_null(field_nodes: &mut VecDeque<Node>) -> Result<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos("IPC: unable to fetch the field for null. The file or stream is corrupted.")
})?;
Ok(())
}
Loading