diff --git a/src/error.rs b/src/error.rs index 48c51e45ba6..aca64d3d659 100644 --- a/src/error.rs +++ b/src/error.rs @@ -29,6 +29,14 @@ impl ArrowError { pub fn from_external_error(error: impl std::error::Error + Send + Sync + 'static) -> Self { Self::External("".to_string(), Box::new(error)) } + + pub(crate) fn oos>(msg: A) -> Self { + Self::OutOfSpec(msg.into()) + } + + pub(crate) fn nyi>(msg: A) -> Self { + Self::NotYetImplemented(msg.into()) + } } impl From<::std::io::Error> for ArrowError { diff --git a/src/io/flight/mod.rs b/src/io/flight/mod.rs index 21ef52dc572..31d794d7196 100644 --- a/src/io/flight/mod.rs +++ b/src/io/flight/mod.rs @@ -86,7 +86,7 @@ fn schema_as_encoded_data(schema: &Schema, ipc_fields: &[IpcField]) -> EncodedDa pub fn deserialize_schemas(bytes: &[u8]) -> Result<(Schema, IpcSchema)> { if let Ok(ipc) = ipc::Message::root_as_message(bytes) { if let Some(schemas) = ipc.header_as_schema().map(read::fb_to_schema) { - Ok(schemas) + schemas } else { Err(ArrowError::OutOfSpec( "Unable to get head as schema".to_string(), diff --git a/src/io/ipc/compression.rs b/src/io/ipc/compression.rs index 9cc0216a329..11311a5a85b 100644 --- a/src/io/ipc/compression.rs +++ b/src/io/ipc/compression.rs @@ -32,7 +32,11 @@ pub fn decompress_zstd(_input_buf: &[u8], _output_buf: &mut [u8]) -> Result<()> #[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))] pub fn compress_lz4(input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { use std::io::Write; - let mut encoder = lz4::EncoderBuilder::new().build(output_buf).unwrap(); + + use crate::error::ArrowError; + let mut encoder = lz4::EncoderBuilder::new() + .build(output_buf) + .map_err(ArrowError::from)?; encoder.write_all(input_buf)?; encoder.finish().1.map_err(|e| e.into()) } diff --git a/src/io/ipc/read/array/binary.rs b/src/io/ipc/read/array/binary.rs index 89809db8c0b..b758f43550a 100644 --- a/src/io/ipc/read/array/binary.rs +++ b/src/io/ipc/read/array/binary.rs @@ -6,7 +6,7 @@ use arrow_format::ipc; use crate::array::{BinaryArray, Offset}; use crate::buffer::Buffer; use crate::datatypes::DataType; -use crate::error::Result; +use crate::error::{ArrowError, Result}; use super::super::deserialize::Node; use super::super::read_basic::*; @@ -20,7 +20,12 @@ pub fn read_binary( is_little_endian: bool, compression: Option, ) -> Result> { - let field_node = field_nodes.pop_front().unwrap(); + let field_node = field_nodes.pop_front().ok_or_else(|| { + ArrowError::oos(format!( + "IPC: unable to fetch the field for {:?}. The file or stream is corrupted.", + data_type + )) + })?; let validity = read_validity( buffers, @@ -57,10 +62,24 @@ pub fn read_binary( )) } -pub fn skip_binary(field_nodes: &mut VecDeque, buffers: &mut VecDeque<&ipc::Schema::Buffer>) { - let _ = field_nodes.pop_front().unwrap(); +pub fn skip_binary( + field_nodes: &mut VecDeque, + buffers: &mut VecDeque<&ipc::Schema::Buffer>, +) -> Result<()> { + let _ = field_nodes.pop_front().ok_or_else(|| { + ArrowError::oos( + "IPC: unable to fetch the field for binary. The file or stream is corrupted.", + ) + })?; - let _ = buffers.pop_front().unwrap(); - let _ = buffers.pop_front().unwrap(); - let _ = buffers.pop_front().unwrap(); + let _ = buffers + .pop_front() + .ok_or_else(|| ArrowError::oos("IPC: missing validity buffer."))?; + let _ = buffers + .pop_front() + .ok_or_else(|| ArrowError::oos("IPC: missing offsets buffer."))?; + let _ = buffers + .pop_front() + .ok_or_else(|| ArrowError::oos("IPC: missing values buffer."))?; + Ok(()) } diff --git a/src/io/ipc/read/array/boolean.rs b/src/io/ipc/read/array/boolean.rs index bab7b4250ee..2c9db797647 100644 --- a/src/io/ipc/read/array/boolean.rs +++ b/src/io/ipc/read/array/boolean.rs @@ -5,7 +5,7 @@ use arrow_format::ipc; use crate::array::BooleanArray; use crate::datatypes::DataType; -use crate::error::Result; +use crate::error::{ArrowError, Result}; use super::super::deserialize::Node; use super::super::read_basic::*; @@ -19,7 +19,12 @@ pub fn read_boolean( is_little_endian: bool, compression: Option, ) -> Result { - let field_node = field_nodes.pop_front().unwrap(); + let field_node = field_nodes.pop_front().ok_or_else(|| { + ArrowError::oos(format!( + "IPC: unable to fetch the field for {:?}. The file or stream is corrupted.", + data_type + )) + })?; let length = field_node.length() as usize; let validity = read_validity( @@ -45,9 +50,18 @@ pub fn read_boolean( pub fn skip_boolean( field_nodes: &mut VecDeque, buffers: &mut VecDeque<&ipc::Schema::Buffer>, -) { - let _ = field_nodes.pop_front().unwrap(); +) -> Result<()> { + let _ = field_nodes.pop_front().ok_or_else(|| { + ArrowError::oos( + "IPC: unable to fetch the field for boolean. The file or stream is corrupted.", + ) + })?; - let _ = buffers.pop_front().unwrap(); - let _ = buffers.pop_front().unwrap(); + let _ = buffers + .pop_front() + .ok_or_else(|| ArrowError::oos("IPC: missing validity buffer."))?; + let _ = buffers + .pop_front() + .ok_or_else(|| ArrowError::oos("IPC: missing values buffer."))?; + Ok(()) } diff --git a/src/io/ipc/read/array/dictionary.rs b/src/io/ipc/read/array/dictionary.rs index c7d1a04170d..f083741a8f8 100644 --- a/src/io/ipc/read/array/dictionary.rs +++ b/src/io/ipc/read/array/dictionary.rs @@ -57,6 +57,6 @@ where pub fn skip_dictionary( field_nodes: &mut VecDeque, buffers: &mut VecDeque<&ipc::Schema::Buffer>, -) { +) -> Result<()> { skip_primitive(field_nodes, buffers) } diff --git a/src/io/ipc/read/array/fixed_size_binary.rs b/src/io/ipc/read/array/fixed_size_binary.rs index b2547016e4b..ce7e1adf026 100644 --- a/src/io/ipc/read/array/fixed_size_binary.rs +++ b/src/io/ipc/read/array/fixed_size_binary.rs @@ -5,7 +5,7 @@ use arrow_format::ipc; use crate::array::FixedSizeBinaryArray; use crate::datatypes::DataType; -use crate::error::Result; +use crate::error::{ArrowError, Result}; use super::super::deserialize::Node; use super::super::read_basic::*; @@ -19,7 +19,12 @@ pub fn read_fixed_size_binary( is_little_endian: bool, compression: Option, ) -> Result { - let field_node = field_nodes.pop_front().unwrap(); + let field_node = field_nodes.pop_front().ok_or_else(|| { + ArrowError::oos(format!( + "IPC: unable to fetch the field for {:?}. The file or stream is corrupted.", + data_type + )) + })?; let validity = read_validity( buffers, @@ -46,9 +51,18 @@ pub fn read_fixed_size_binary( pub fn skip_fixed_size_binary( field_nodes: &mut VecDeque, buffers: &mut VecDeque<&ipc::Schema::Buffer>, -) { - let _ = field_nodes.pop_front().unwrap(); +) -> Result<()> { + let _ = field_nodes.pop_front().ok_or_else(|| { + ArrowError::oos( + "IPC: unable to fetch the field for fixed-size binary. The file or stream is corrupted.", + ) + })?; - let _ = buffers.pop_front().unwrap(); - let _ = buffers.pop_front().unwrap(); + let _ = buffers + .pop_front() + .ok_or_else(|| ArrowError::oos("IPC: missing validity buffer."))?; + let _ = buffers + .pop_front() + .ok_or_else(|| ArrowError::oos("IPC: missing values buffer."))?; + Ok(()) } diff --git a/src/io/ipc/read/array/fixed_size_list.rs b/src/io/ipc/read/array/fixed_size_list.rs index b5db90fd752..02e0d986b07 100644 --- a/src/io/ipc/read/array/fixed_size_list.rs +++ b/src/io/ipc/read/array/fixed_size_list.rs @@ -5,7 +5,7 @@ use arrow_format::ipc; use crate::array::FixedSizeListArray; use crate::datatypes::DataType; -use crate::error::Result; +use crate::error::{ArrowError, Result}; use super::super::super::IpcField; use super::super::deserialize::{read, skip, Node}; @@ -25,7 +25,12 @@ pub fn read_fixed_size_list( compression: Option, version: ipc::Schema::MetadataVersion, ) -> Result { - let field_node = field_nodes.pop_front().unwrap(); + let field_node = field_nodes.pop_front().ok_or_else(|| { + ArrowError::oos(format!( + "IPC: unable to fetch the field for {:?}. The file or stream is corrupted.", + data_type + )) + })?; let validity = read_validity( buffers, @@ -57,10 +62,16 @@ pub fn skip_fixed_size_list( field_nodes: &mut VecDeque, data_type: &DataType, buffers: &mut VecDeque<&ipc::Schema::Buffer>, -) { - let _ = field_nodes.pop_front().unwrap(); +) -> Result<()> { + let _ = field_nodes.pop_front().ok_or_else(|| { + ArrowError::oos( + "IPC: unable to fetch the field for fixed-size list. The file or stream is corrupted.", + ) + })?; - let _ = buffers.pop_front().unwrap(); + let _ = buffers + .pop_front() + .ok_or_else(|| ArrowError::oos("IPC: missing validity buffer."))?; let (field, _) = FixedSizeListArray::get_child_and_size(data_type); diff --git a/src/io/ipc/read/array/list.rs b/src/io/ipc/read/array/list.rs index cf8ddd41bdf..666417b1c4a 100644 --- a/src/io/ipc/read/array/list.rs +++ b/src/io/ipc/read/array/list.rs @@ -7,7 +7,7 @@ use arrow_format::ipc; use crate::array::{ListArray, Offset}; use crate::buffer::Buffer; use crate::datatypes::DataType; -use crate::error::Result; +use crate::error::{ArrowError, Result}; use super::super::super::IpcField; use super::super::deserialize::{read, skip, Node}; @@ -30,7 +30,12 @@ pub fn read_list( where Vec: TryInto, { - let field_node = field_nodes.pop_front().unwrap(); + let field_node = field_nodes.pop_front().ok_or_else(|| { + ArrowError::oos(format!( + "IPC: unable to fetch the field for {:?}. The file or stream is corrupted.", + data_type + )) + })?; let validity = read_validity( buffers, @@ -73,11 +78,17 @@ pub fn skip_list( field_nodes: &mut VecDeque, data_type: &DataType, buffers: &mut VecDeque<&ipc::Schema::Buffer>, -) { - let _ = field_nodes.pop_front().unwrap(); +) -> Result<()> { + let _ = field_nodes.pop_front().ok_or_else(|| { + ArrowError::oos("IPC: unable to fetch the field for list. The file or stream is corrupted.") + })?; - let _ = buffers.pop_front().unwrap(); - let _ = buffers.pop_front().unwrap(); + let _ = buffers + .pop_front() + .ok_or_else(|| ArrowError::oos("IPC: missing validity buffer."))?; + let _ = buffers + .pop_front() + .ok_or_else(|| ArrowError::oos("IPC: missing offsets buffer."))?; let data_type = ListArray::::get_child_type(data_type); diff --git a/src/io/ipc/read/array/map.rs b/src/io/ipc/read/array/map.rs index e61887cc5ba..0163f61bb75 100644 --- a/src/io/ipc/read/array/map.rs +++ b/src/io/ipc/read/array/map.rs @@ -6,7 +6,7 @@ use arrow_format::ipc; use crate::array::MapArray; use crate::buffer::Buffer; use crate::datatypes::DataType; -use crate::error::Result; +use crate::error::{ArrowError, Result}; use super::super::super::IpcField; use super::super::deserialize::{read, skip, Node}; @@ -26,7 +26,12 @@ pub fn read_map( compression: Option, version: ipc::Schema::MetadataVersion, ) -> Result { - let field_node = field_nodes.pop_front().unwrap(); + let field_node = field_nodes.pop_front().ok_or_else(|| { + ArrowError::oos(format!( + "IPC: unable to fetch the field for {:?}. The file or stream is corrupted.", + data_type + )) + })?; let validity = read_validity( buffers, @@ -69,11 +74,17 @@ pub fn skip_map( field_nodes: &mut VecDeque, data_type: &DataType, buffers: &mut VecDeque<&ipc::Schema::Buffer>, -) { - let _ = field_nodes.pop_front().unwrap(); +) -> Result<()> { + let _ = field_nodes.pop_front().ok_or_else(|| { + ArrowError::oos("IPC: unable to fetch the field for map. The file or stream is corrupted.") + })?; - let _ = buffers.pop_front().unwrap(); - let _ = buffers.pop_front().unwrap(); + let _ = buffers + .pop_front() + .ok_or_else(|| ArrowError::oos("IPC: missing validity buffer."))?; + let _ = buffers + .pop_front() + .ok_or_else(|| ArrowError::oos("IPC: missing offsets buffer."))?; let data_type = MapArray::get_field(data_type).data_type(); diff --git a/src/io/ipc/read/array/null.rs b/src/io/ipc/read/array/null.rs index 2885a620ffd..06bdc5f9725 100644 --- a/src/io/ipc/read/array/null.rs +++ b/src/io/ipc/read/array/null.rs @@ -1,16 +1,30 @@ use std::collections::VecDeque; -use crate::{array::NullArray, datatypes::DataType}; +use crate::{ + array::NullArray, + datatypes::DataType, + error::{ArrowError, Result}, +}; use super::super::deserialize::Node; -pub fn read_null(field_nodes: &mut VecDeque, data_type: DataType) -> NullArray { - NullArray::from_data( +pub fn read_null(field_nodes: &mut VecDeque, data_type: DataType) -> Result { + let field_node = field_nodes.pop_front().ok_or_else(|| { + ArrowError::oos(format!( + "IPC: unable to fetch the field for {:?}. The file or stream is corrupted.", + data_type + )) + })?; + + Ok(NullArray::from_data( data_type, - field_nodes.pop_front().unwrap().length() as usize, - ) + field_node.length() as usize, + )) } -pub fn skip_null(field_nodes: &mut VecDeque) { - let _ = field_nodes.pop_front(); +pub fn skip_null(field_nodes: &mut VecDeque) -> Result<()> { + let _ = field_nodes.pop_front().ok_or_else(|| { + ArrowError::oos("IPC: unable to fetch the field for null. The file or stream is corrupted.") + })?; + Ok(()) } diff --git a/src/io/ipc/read/array/primitive.rs b/src/io/ipc/read/array/primitive.rs index f43eb8f2703..039c853496b 100644 --- a/src/io/ipc/read/array/primitive.rs +++ b/src/io/ipc/read/array/primitive.rs @@ -4,7 +4,7 @@ use std::{collections::VecDeque, convert::TryInto}; use arrow_format::ipc; use crate::datatypes::DataType; -use crate::error::Result; +use crate::error::{ArrowError, Result}; use crate::{array::PrimitiveArray, types::NativeType}; use super::super::deserialize::Node; @@ -22,7 +22,12 @@ pub fn read_primitive( where Vec: TryInto, { - let field_node = field_nodes.pop_front().unwrap(); + let field_node = field_nodes.pop_front().ok_or_else(|| { + ArrowError::oos(format!( + "IPC: unable to fetch the field for {:?}. The file or stream is corrupted.", + data_type + )) + })?; let validity = read_validity( buffers, @@ -47,9 +52,18 @@ where pub fn skip_primitive( field_nodes: &mut VecDeque, buffers: &mut VecDeque<&ipc::Schema::Buffer>, -) { - let _ = field_nodes.pop_front().unwrap(); +) -> Result<()> { + let _ = field_nodes.pop_front().ok_or_else(|| { + ArrowError::oos( + "IPC: unable to fetch the field for primitive. The file or stream is corrupted.", + ) + })?; - let _ = buffers.pop_front().unwrap(); - let _ = buffers.pop_front().unwrap(); + let _ = buffers + .pop_front() + .ok_or_else(|| ArrowError::oos("IPC: missing validity buffer."))?; + let _ = buffers + .pop_front() + .ok_or_else(|| ArrowError::oos("IPC: missing values buffer."))?; + Ok(()) } diff --git a/src/io/ipc/read/array/struct_.rs b/src/io/ipc/read/array/struct_.rs index 30c47f654f2..435534a38d7 100644 --- a/src/io/ipc/read/array/struct_.rs +++ b/src/io/ipc/read/array/struct_.rs @@ -5,7 +5,7 @@ use arrow_format::ipc; use crate::array::StructArray; use crate::datatypes::DataType; -use crate::error::Result; +use crate::error::{ArrowError, Result}; use super::super::super::IpcField; use super::super::deserialize::{read, skip, Node}; @@ -25,7 +25,12 @@ pub fn read_struct( compression: Option, version: ipc::Schema::MetadataVersion, ) -> Result { - let field_node = field_nodes.pop_front().unwrap(); + let field_node = field_nodes.pop_front().ok_or_else(|| { + ArrowError::oos(format!( + "IPC: unable to fetch the field for {:?}. The file or stream is corrupted.", + data_type + )) + })?; let validity = read_validity( buffers, @@ -64,14 +69,20 @@ pub fn skip_struct( field_nodes: &mut VecDeque, data_type: &DataType, buffers: &mut VecDeque<&ipc::Schema::Buffer>, -) { - let _ = field_nodes.pop_front().unwrap(); +) -> Result<()> { + let _ = field_nodes.pop_front().ok_or_else(|| { + ArrowError::oos( + "IPC: unable to fetch the field for struct. The file or stream is corrupted.", + ) + })?; - let _ = buffers.pop_front().unwrap(); + let _ = buffers + .pop_front() + .ok_or_else(|| ArrowError::oos("IPC: missing validity buffer."))?; let fields = StructArray::get_fields(data_type); fields .iter() - .for_each(|field| skip(field_nodes, field.data_type(), buffers)) + .try_for_each(|field| skip(field_nodes, field.data_type(), buffers)) } diff --git a/src/io/ipc/read/array/union.rs b/src/io/ipc/read/array/union.rs index ce3fb0fb79b..bb270f053fe 100644 --- a/src/io/ipc/read/array/union.rs +++ b/src/io/ipc/read/array/union.rs @@ -6,7 +6,7 @@ use arrow_format::ipc; use crate::array::UnionArray; use crate::datatypes::DataType; use crate::datatypes::UnionMode::Dense; -use crate::error::Result; +use crate::error::{ArrowError, Result}; use super::super::super::IpcField; use super::super::deserialize::{read, skip, Node}; @@ -26,10 +26,17 @@ pub fn read_union( compression: Option, version: ipc::Schema::MetadataVersion, ) -> Result { - let field_node = field_nodes.pop_front().unwrap(); + let field_node = field_nodes.pop_front().ok_or_else(|| { + ArrowError::oos(format!( + "IPC: unable to fetch the field for {:?}. The file or stream is corrupted.", + data_type + )) + })?; if version != ipc::Schema::MetadataVersion::V5 { - let _ = buffers.pop_front().unwrap(); + let _ = buffers + .pop_front() + .ok_or_else(|| ArrowError::oos("IPC: missing validity buffer."))?; }; let types = read_buffer( @@ -55,7 +62,7 @@ pub fn read_union( None } } else { - panic!() + unreachable!() }; let fields = UnionArray::get_fields(&data_type); @@ -86,19 +93,27 @@ pub fn skip_union( field_nodes: &mut VecDeque, data_type: &DataType, buffers: &mut VecDeque<&ipc::Schema::Buffer>, -) { - let _ = field_nodes.pop_front().unwrap(); +) -> Result<()> { + let _ = field_nodes.pop_front().ok_or_else(|| { + ArrowError::oos( + "IPC: unable to fetch the field for struct. The file or stream is corrupted.", + ) + })?; - let _ = buffers.pop_front().unwrap(); + let _ = buffers + .pop_front() + .ok_or_else(|| ArrowError::oos("IPC: missing validity buffer."))?; if let DataType::Union(_, _, Dense) = data_type { - let _ = buffers.pop_front().unwrap(); + let _ = buffers + .pop_front() + .ok_or_else(|| ArrowError::oos("IPC: missing offsets buffer."))?; } else { - panic!() + unreachable!() }; let fields = UnionArray::get_fields(data_type); fields .iter() - .for_each(|field| skip(field_nodes, field.data_type(), buffers)) + .try_for_each(|field| skip(field_nodes, field.data_type(), buffers)) } diff --git a/src/io/ipc/read/array/utf8.rs b/src/io/ipc/read/array/utf8.rs index 5e7797ff678..8c5cee4fea7 100644 --- a/src/io/ipc/read/array/utf8.rs +++ b/src/io/ipc/read/array/utf8.rs @@ -6,7 +6,7 @@ use arrow_format::ipc; use crate::array::{Offset, Utf8Array}; use crate::buffer::Buffer; use crate::datatypes::DataType; -use crate::error::Result; +use crate::error::{ArrowError, Result}; use super::super::deserialize::Node; use super::super::read_basic::*; @@ -20,7 +20,12 @@ pub fn read_utf8( is_little_endian: bool, compression: Option, ) -> Result> { - let field_node = field_nodes.pop_front().unwrap(); + let field_node = field_nodes.pop_front().ok_or_else(|| { + ArrowError::oos(format!( + "IPC: unable to fetch the field for {:?}. The file or stream is corrupted.", + data_type + )) + })?; let validity = read_validity( buffers, @@ -57,10 +62,22 @@ pub fn read_utf8( )) } -pub fn skip_utf8(field_nodes: &mut VecDeque, buffers: &mut VecDeque<&ipc::Schema::Buffer>) { - let _ = field_nodes.pop_front().unwrap(); +pub fn skip_utf8( + field_nodes: &mut VecDeque, + buffers: &mut VecDeque<&ipc::Schema::Buffer>, +) -> Result<()> { + let _ = field_nodes.pop_front().ok_or_else(|| { + ArrowError::oos("IPC: unable to fetch the field for utf8. The file or stream is corrupted.") + })?; - let _ = buffers.pop_front().unwrap(); - let _ = buffers.pop_front().unwrap(); - let _ = buffers.pop_front().unwrap(); + let _ = buffers + .pop_front() + .ok_or_else(|| ArrowError::oos("IPC: missing validity buffer."))?; + let _ = buffers + .pop_front() + .ok_or_else(|| ArrowError::oos("IPC: missing offsets buffer."))?; + let _ = buffers + .pop_front() + .ok_or_else(|| ArrowError::oos("IPC: missing values buffer."))?; + Ok(()) } diff --git a/src/io/ipc/read/common.rs b/src/io/ipc/read/common.rs index 1bc9b87ac31..623b4a9219a 100644 --- a/src/io/ipc/read/common.rs +++ b/src/io/ipc/read/common.rs @@ -89,14 +89,14 @@ pub fn read_record_batch( block_offset: u64, ) -> Result { assert_eq!(schema.fields().len(), ipc_schema.fields.len()); - let buffers = batch.buffers().ok_or_else(|| { - ArrowError::OutOfSpec("Unable to get buffers from IPC RecordBatch".to_string()) - })?; + let buffers = batch + .buffers() + .ok_or_else(|| ArrowError::oos("IPC RecordBatch must contain buffers"))?; let mut buffers: VecDeque<&ipc::Schema::Buffer> = buffers.iter().collect(); - let field_nodes = batch.nodes().ok_or_else(|| { - ArrowError::OutOfSpec("Unable to get field nodes from IPC RecordBatch".to_string()) - })?; + let field_nodes = batch + .nodes() + .ok_or_else(|| ArrowError::oos("IPC RecordBatch must contain field nodes"))?; let mut field_nodes = field_nodes.iter().collect::>(); let (schema, columns) = if let Some(projection) = projection { @@ -109,7 +109,7 @@ pub fn read_record_batch( let arrays = projection .map(|maybe_field| match maybe_field { - ProjectionResult::Selected((field, ipc_field)) => Some(read( + ProjectionResult::Selected((field, ipc_field)) => Ok(Some(read( &mut field_nodes, field, ipc_field, @@ -120,12 +120,13 @@ pub fn read_record_batch( ipc_schema.is_little_endian, batch.compression(), version, - )), + )?)), ProjectionResult::NotSelected((field, _)) => { - skip(&mut field_nodes, field.data_type(), &mut buffers); - None + skip(&mut field_nodes, field.data_type(), &mut buffers)?; + Ok(None) } }) + .map(|x| x.transpose()) .flatten() .collect::>>()?; (projected_schema, arrays) @@ -243,7 +244,9 @@ pub fn read_dictionary( assert_eq!(ipc_schema.fields.len(), schema.fields().len()); // Read a single column let record_batch = read_record_batch( - batch.data().unwrap(), + batch + .data() + .ok_or_else(|| ArrowError::oos("The dictionary batch must have data."))?, schema, &ipc_schema, None, diff --git a/src/io/ipc/read/deserialize.rs b/src/io/ipc/read/deserialize.rs index 824f1ff4218..e6a2400dc1e 100644 --- a/src/io/ipc/read/deserialize.rs +++ b/src/io/ipc/read/deserialize.rs @@ -33,10 +33,7 @@ pub fn read( let data_type = field.data_type().clone(); match data_type.to_physical_type() { - Null => { - let array = read_null(field_nodes, data_type); - Ok(Arc::new(array)) - } + Null => read_null(field_nodes, data_type).map(|x| Arc::new(x) as Arc), Boolean => read_boolean( field_nodes, data_type, @@ -219,7 +216,7 @@ pub fn skip( field_nodes: &mut VecDeque, data_type: &DataType, buffers: &mut VecDeque<&ipc::Schema::Buffer>, -) { +) -> Result<()> { use PhysicalType::*; match data_type.to_physical_type() { Null => skip_null(field_nodes), diff --git a/src/io/ipc/read/read_basic.rs b/src/io/ipc/read/read_basic.rs index 840bab44bee..03abaa92de9 100644 --- a/src/io/ipc/read/read_basic.rs +++ b/src/io/ipc/read/read_basic.rs @@ -35,11 +35,12 @@ fn read_swapped( }; *slot = T::from_be_bytes(a); Result::Ok(()) - }) - .unwrap(); + })?; } else { // machine is big endian, file is little endian - todo!("reading little endian files from big endian machines not yet implemented.") + return Err(ArrowError::NotYetImplemented( + "Reading little endian files from big endian machines".to_string(), + )); } Ok(()) } @@ -139,7 +140,9 @@ pub fn read_buffer( is_little_endian: bool, compression: Option, ) -> Result> { - let buf = buf.pop_front().unwrap(); + let buf = buf + .pop_front() + .ok_or_else(|| ArrowError::oos("IPC: unable to fetch a buffer. The file is corrupted."))?; reader.seek(SeekFrom::Start(block_offset + buf.offset() as u64))?; @@ -206,7 +209,9 @@ pub fn read_bitmap( _: bool, compression: Option, ) -> Result { - let buf = buf.pop_front().unwrap(); + let buf = buf + .pop_front() + .ok_or_else(|| ArrowError::oos("IPC: unable to fetch a buffer. The file is corrupted."))?; reader.seek(SeekFrom::Start(block_offset + buf.offset() as u64))?; @@ -239,7 +244,9 @@ pub fn read_validity( compression, )?) } else { - let _ = buffers.pop_front().unwrap(); + let _ = buffers.pop_front().ok_or_else(|| { + ArrowError::oos("IPC: unable to fetch a buffer. The file is corrupted.") + })?; None }) } diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index af980b7ef75..59592a646e9 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -93,7 +93,11 @@ fn read_dictionaries( match message.header_type() { ipc::Message::MessageHeader::DictionaryBatch => { let block_offset = offset + length; - let batch = message.header_as_dictionary_batch().unwrap(); + let batch = message.header_as_dictionary_batch().ok_or_else(|| { + ArrowError::OutOfSpec( + "The dictionary message does not have a dictionary header. The file is corrupted.".to_string(), + ) + })?; read_dictionary( batch, schema.fields(), @@ -162,7 +166,7 @@ pub fn read_file_metadata(reader: &mut R) -> Result (Field, IpcField) { +fn try_unzip_vec>>(iter: I) -> Result<(Vec, Vec)> { + let mut a = vec![]; + let mut b = vec![]; + for maybe_item in iter { + let (a_i, b_i) = maybe_item?; + a.push(a_i); + b.push(b_i); + } + + Ok((a, b)) +} + +fn deserialize_field(ipc_field: ipc::Field) -> Result<(Field, IpcField)> { let metadata = read_metadata(&ipc_field); let extension = get_extension(&metadata); - let (data_type, ipc_field_) = get_data_type(ipc_field, extension, true); + let (data_type, ipc_field_) = get_data_type(ipc_field, extension, true)?; let field = Field { - name: ipc_field.name().unwrap().to_string(), + name: ipc_field + .name() + .ok_or_else(|| ArrowError::oos("Every field in IPC must have a name"))? + .to_string(), data_type, nullable: ipc_field.nullable(), metadata, }; - (field, ipc_field_) + Ok((field, ipc_field_)) } fn read_metadata(field: &ipc::Field) -> Metadata { @@ -44,63 +62,62 @@ fn read_metadata(field: &ipc::Field) -> Metadata { } } +fn deserialize_integer(int: ipc::Int) -> Result { + Ok(match (int.bitWidth(), int.is_signed()) { + (8, true) => IntegerType::Int8, + (8, false) => IntegerType::UInt8, + (16, true) => IntegerType::Int16, + (16, false) => IntegerType::UInt16, + (32, true) => IntegerType::Int32, + (32, false) => IntegerType::UInt32, + (64, true) => IntegerType::Int64, + (64, false) => IntegerType::UInt64, + _ => { + return Err(ArrowError::oos( + "IPC: indexType can only be 8, 16, 32 or 64.", + )) + } + }) +} + /// Get the Arrow data type from the flatbuffer Field table fn get_data_type( field: ipc::Field, extension: Extension, may_be_dictionary: bool, -) -> (DataType, IpcField) { +) -> Result<(DataType, IpcField)> { if let Some(dictionary) = field.dictionary() { if may_be_dictionary { - let int = dictionary.indexType().unwrap(); - let index_type = match (int.bitWidth(), int.is_signed()) { - (8, true) => IntegerType::Int8, - (8, false) => IntegerType::UInt8, - (16, true) => IntegerType::Int16, - (16, false) => IntegerType::UInt16, - (32, true) => IntegerType::Int32, - (32, false) => IntegerType::UInt32, - (64, true) => IntegerType::Int64, - (64, false) => IntegerType::UInt64, - _ => panic!("Unexpected bitwidth and signed"), - }; - let (inner, mut ipc_field) = get_data_type(field, extension, false); + let int = dictionary + .indexType() + .ok_or_else(|| ArrowError::oos("indexType is mandatory in Dictionary."))?; + let index_type = deserialize_integer(int)?; + let (inner, mut ipc_field) = get_data_type(field, extension, false)?; ipc_field.dictionary_id = Some(dictionary.id()); - return ( + return Ok(( DataType::Dictionary(index_type, Box::new(inner), dictionary.isOrdered()), ipc_field, - ); + )); } } if let Some(extension) = extension { let (name, metadata) = extension; - let (data_type, fields) = get_data_type(field, None, false); - return ( + let (data_type, fields) = get_data_type(field, None, false)?; + return Ok(( DataType::Extension(name, Box::new(data_type), metadata), fields, - ); + )); } - match field.type_type() { + Ok(match field.type_type() { ipc::Type::Null => (DataType::Null, IpcField::default()), ipc::Type::Bool => (DataType::Boolean, IpcField::default()), ipc::Type::Int => { - let int = field.type_as_int().unwrap(); - let data_type = match (int.bitWidth(), int.is_signed()) { - (8, true) => DataType::Int8, - (8, false) => DataType::UInt8, - (16, true) => DataType::Int16, - (16, false) => DataType::UInt16, - (32, true) => DataType::Int32, - (32, false) => DataType::UInt32, - (64, true) => DataType::Int64, - (64, false) => DataType::UInt64, - z => panic!( - "Int type with bit width of {} and signed of {} not supported", - z.0, z.1 - ), - }; + let int = field + .type_as_int() + .ok_or_else(|| ArrowError::oos("IPC: Integer type must be an integer"))?; + let data_type = deserialize_integer(int)?.into(); (data_type, IpcField::default()) } ipc::Type::Binary => (DataType::Binary, IpcField::default()), @@ -108,89 +125,129 @@ fn get_data_type( ipc::Type::Utf8 => (DataType::Utf8, IpcField::default()), ipc::Type::LargeUtf8 => (DataType::LargeUtf8, IpcField::default()), ipc::Type::FixedSizeBinary => { - let fsb = field.type_as_fixed_size_binary().unwrap(); + let fsb = field.type_as_fixed_size_binary().ok_or_else(|| { + ArrowError::oos("IPC: FixedSizeBinary type must be a FixedSizeBinary") + })?; ( DataType::FixedSizeBinary(fsb.byteWidth() as usize), IpcField::default(), ) } ipc::Type::FloatingPoint => { - let float = field.type_as_floating_point().unwrap(); + let float = field.type_as_floating_point().ok_or_else(|| { + ArrowError::oos("IPC: FloatingPoint type must be a FloatingPoint") + })?; let data_type = match float.precision() { ipc::Precision::HALF => DataType::Float16, ipc::Precision::SINGLE => DataType::Float32, ipc::Precision::DOUBLE => DataType::Float64, - z => panic!("FloatingPoint type with precision of {:?} not supported", z), + z => return Err(ArrowError::nyi(format!("IPC: float of precision {:?}", z))), }; (data_type, IpcField::default()) } ipc::Type::Date => { - let date = field.type_as_date().unwrap(); + let date = field + .type_as_date() + .ok_or_else(|| ArrowError::oos("IPC: Date type must be a Date"))?; let data_type = match date.unit() { ipc::DateUnit::DAY => DataType::Date32, ipc::DateUnit::MILLISECOND => DataType::Date64, - z => panic!("Date type with unit of {:?} not supported", z), + z => { + return Err(ArrowError::nyi(format!( + "IPC: date unit of precision {:?}", + z + ))) + } }; (data_type, IpcField::default()) } ipc::Type::Time => { - let time = field.type_as_time().unwrap(); + let time = field + .type_as_time() + .ok_or_else(|| ArrowError::oos("IPC: Time type must be a Time"))?; let data_type = match (time.bitWidth(), time.unit()) { (32, ipc::TimeUnit::SECOND) => DataType::Time32(TimeUnit::Second), (32, ipc::TimeUnit::MILLISECOND) => DataType::Time32(TimeUnit::Millisecond), (64, ipc::TimeUnit::MICROSECOND) => DataType::Time64(TimeUnit::Microsecond), (64, ipc::TimeUnit::NANOSECOND) => DataType::Time64(TimeUnit::Nanosecond), - z => panic!( - "Time type with bit width of {} and unit of {:?} not supported", - z.0, z.1 - ), + (bits, precision) => { + return Err(ArrowError::nyi(format!( + "Time type with bit width of {} and unit of {:?}", + bits, precision + ))) + } }; (data_type, IpcField::default()) } ipc::Type::Timestamp => { - let timestamp = field.type_as_timestamp().unwrap(); + let timestamp = field + .type_as_timestamp() + .ok_or_else(|| ArrowError::oos("IPC: Timestamp type must be a Timestamp"))?; let timezone: Option = timestamp.timezone().map(|tz| tz.to_string()); let data_type = match timestamp.unit() { ipc::TimeUnit::SECOND => DataType::Timestamp(TimeUnit::Second, timezone), ipc::TimeUnit::MILLISECOND => DataType::Timestamp(TimeUnit::Millisecond, timezone), ipc::TimeUnit::MICROSECOND => DataType::Timestamp(TimeUnit::Microsecond, timezone), ipc::TimeUnit::NANOSECOND => DataType::Timestamp(TimeUnit::Nanosecond, timezone), - z => panic!("Timestamp type with unit of {:?} not supported", z), + z => { + return Err(ArrowError::nyi(format!( + "Timestamp type with unit of {:?}", + z + ))) + } }; (data_type, IpcField::default()) } ipc::Type::Interval => { - let interval = field.type_as_interval().unwrap(); + let interval = field + .type_as_interval() + .ok_or_else(|| ArrowError::oos("IPC: Interval type must be a Interval"))?; let data_type = match interval.unit() { ipc::IntervalUnit::YEAR_MONTH => DataType::Interval(IntervalUnit::YearMonth), ipc::IntervalUnit::DAY_TIME => DataType::Interval(IntervalUnit::DayTime), ipc::IntervalUnit::MONTH_DAY_NANO => DataType::Interval(IntervalUnit::MonthDayNano), - z => panic!("Interval type with unit of {:?} unsupported", z), + z => { + return Err(ArrowError::nyi(format!( + "Interval type with unit of {:?}", + z + ))) + } }; (data_type, IpcField::default()) } ipc::Type::Duration => { - let duration = field.type_as_duration().unwrap(); + let duration = field + .type_as_duration() + .ok_or_else(|| ArrowError::oos("IPC: Duration type must be a Duration"))?; let data_type = match duration.unit() { ipc::TimeUnit::SECOND => DataType::Duration(TimeUnit::Second), ipc::TimeUnit::MILLISECOND => DataType::Duration(TimeUnit::Millisecond), ipc::TimeUnit::MICROSECOND => DataType::Duration(TimeUnit::Microsecond), ipc::TimeUnit::NANOSECOND => DataType::Duration(TimeUnit::Nanosecond), - z => panic!("Duration type with unit of {:?} unsupported", z), + z => { + return Err(ArrowError::nyi(format!( + "Duration type with unit of {:?}", + z + ))) + } }; (data_type, IpcField::default()) } ipc::Type::Decimal => { - let fsb = field.type_as_decimal().unwrap(); + let fsb = field + .type_as_decimal() + .ok_or_else(|| ArrowError::oos("IPC: Decimal type must be a Decimal"))?; let data_type = DataType::Decimal(fsb.precision() as usize, fsb.scale() as usize); (data_type, IpcField::default()) } ipc::Type::List => { - let children = field.children().unwrap(); + let children = field + .children() + .ok_or_else(|| ArrowError::oos("IPC: List must contain children"))?; if children.len() != 1 { - panic!("expect a list to have one child") + return Err(ArrowError::oos("IPC: List must contain one child")); } - let (field, ipc_field) = deserialize_field(children.get(0)); + let (field, ipc_field) = deserialize_field(children.get(0))?; ( DataType::List(Box::new(field)), @@ -201,11 +258,13 @@ fn get_data_type( ) } ipc::Type::LargeList => { - let children = field.children().unwrap(); + let children = field + .children() + .ok_or_else(|| ArrowError::oos("IPC: LargeList must contain children"))?; if children.len() != 1 { - panic!("expect a large list to have one child") + return Err(ArrowError::oos("IPC: LargeList must contain one child")); } - let (field, ipc_field) = deserialize_field(children.get(0)); + let (field, ipc_field) = deserialize_field(children.get(0))?; ( DataType::LargeList(Box::new(field)), @@ -216,13 +275,17 @@ fn get_data_type( ) } ipc::Type::FixedSizeList => { - let fsl = field.type_as_fixed_size_list().unwrap(); + let fsl = field.type_as_fixed_size_list().ok_or_else(|| { + ArrowError::oos("IPC: FixedSizeList type must be a FixedSizeList") + })?; let size = fsl.listSize() as usize; - let children = field.children().unwrap(); + let children = field + .children() + .ok_or_else(|| ArrowError::oos("IPC: FixedSizeList must contain children"))?; if children.len() != 1 { - panic!("expect a list to have one child") + return Err(ArrowError::oos("IPC: FixedSizeList must contain one child")); } - let (field, ipc_field) = deserialize_field(children.get(0)); + let (field, ipc_field) = deserialize_field(children.get(0))?; ( DataType::FixedSizeList(Box::new(field), size), @@ -233,17 +296,18 @@ fn get_data_type( ) } ipc::Type::Struct_ => { - let fields = field.children().unwrap(); + let fields = field + .children() + .ok_or_else(|| ArrowError::oos("IPC: Struct must contain children"))?; if fields.is_empty() { - panic!("expect a struct to have at least one child") + return Err(ArrowError::oos( + "IPC: Struct must contain at least one child", + )); } - let (fields, ipc_fields): (Vec<_>, Vec<_>) = (0..fields.len()) - .map(|field| { - let field = fields.get(field); - let (field, fields) = deserialize_field(field); - (field, fields) - }) - .unzip(); + let (fields, ipc_fields) = try_unzip_vec(fields.iter().map(|field| { + let (field, fields) = deserialize_field(field)?; + Ok((field, fields)) + }))?; let ipc_field = IpcField { fields: ipc_fields, dictionary_id: None, @@ -251,22 +315,25 @@ fn get_data_type( (DataType::Struct(fields), ipc_field) } ipc::Type::Union => { - let type_ = field.type_as_union().unwrap(); + let type_ = field + .type_as_union() + .ok_or_else(|| ArrowError::oos("IPC: Union type must be a Union"))?; let mode = UnionMode::sparse(type_.mode() == ipc::UnionMode::Sparse); let ids = type_.typeIds().map(|x| x.iter().collect()); - let fields = field.children().unwrap(); + let fields = field + .children() + .ok_or_else(|| ArrowError::oos("IPC: Union must contain children"))?; if fields.is_empty() { - panic!("expect a struct to have at least one child") + return Err(ArrowError::oos( + "IPC: Union must contain at least one child", + )); } - let (fields, ipc_fields): (Vec<_>, Vec<_>) = (0..fields.len()) - .map(|field| { - let field = fields.get(field); - let (field, fields) = deserialize_field(field); - (field, fields) - }) - .unzip(); + let (fields, ipc_fields) = try_unzip_vec(fields.iter().map(|field| { + let (field, fields) = deserialize_field(field)?; + Ok((field, fields)) + }))?; let ipc_field = IpcField { fields: ipc_fields, dictionary_id: None, @@ -274,14 +341,18 @@ fn get_data_type( (DataType::Union(fields, ids, mode), ipc_field) } ipc::Type::Map => { - let map = field.type_as_map().unwrap(); + let map = field + .type_as_map() + .ok_or_else(|| ArrowError::oos("IPC: Map type must be a Map"))?; let is_sorted = map.keysSorted(); - let children = field.children().unwrap(); + let children = field + .children() + .ok_or_else(|| ArrowError::oos("IPC: Map must contain children"))?; if children.len() != 1 { - panic!("expect a list to have one child") + return Err(ArrowError::oos("IPC: Map must contain one child")); } - let (field, ipc_field) = deserialize_field(children.get(0)); + let (field, ipc_field) = deserialize_field(children.get(0))?; let data_type = DataType::Map(Box::new(field), is_sorted); ( @@ -292,20 +363,24 @@ fn get_data_type( }, ) } - t => unimplemented!("Type {:?} not supported", t), - } + t => { + return Err(ArrowError::NotYetImplemented(format!( + "Reading {:?} from IPC", + t + ))) + } + }) } /// Deserialize the raw Schema table from IPC format to Schema data type -pub fn fb_to_schema(fb: ipc::Schema) -> (Schema, IpcSchema) { - let fields = fb.fields().unwrap(); - let (fields, ipc_fields): (Vec<_>, Vec<_>) = (0..fields.len()) - .map(|field| { - let field = fields.get(field); - let (field, fields) = deserialize_field(field); - (field, fields) - }) - .unzip(); +pub fn fb_to_schema(fb: ipc::Schema) -> Result<(Schema, IpcSchema)> { + let fields = fb + .fields() + .ok_or_else(|| ArrowError::oos("IPC: Schema must contain fields"))?; + let (fields, ipc_fields) = try_unzip_vec(fields.iter().map(|field| { + let (field, fields) = deserialize_field(field)?; + Ok((field, fields)) + }))?; let is_little_endian = fb.endianness().variant_name().unwrap_or("Little") == "Little"; @@ -324,11 +399,11 @@ pub fn fb_to_schema(fb: ipc::Schema) -> (Schema, IpcSchema) { } } - ( + Ok(( Schema { fields, metadata }, IpcSchema { fields: ipc_fields, is_little_endian, }, - ) + )) } diff --git a/src/io/ipc/read/stream.rs b/src/io/ipc/read/stream.rs index ebc6a83c4f1..a975eb9d3aa 100644 --- a/src/io/ipc/read/stream.rs +++ b/src/io/ipc/read/stream.rs @@ -49,7 +49,7 @@ pub fn read_stream_metadata(reader: &mut R) -> Result { let ipc_schema: ipc::Schema::Schema = message .header_as_schema() .ok_or_else(|| ArrowError::OutOfSpec("Unable to read IPC message as schema".to_string()))?; - let (schema, ipc_schema) = fb_to_schema(ipc_schema); + let (schema, ipc_schema) = fb_to_schema(ipc_schema)?; let schema = Arc::new(schema); Ok(StreamMetadata { diff --git a/src/io/parquet/read/schema/metadata.rs b/src/io/parquet/read/schema/metadata.rs index 2b5825bb8df..868bbf4c77d 100644 --- a/src/io/parquet/read/schema/metadata.rs +++ b/src/io/parquet/read/schema/metadata.rs @@ -36,11 +36,11 @@ fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result { match ipc::Message::root_as_message(slice) { Ok(message) => message .header_as_schema() - .map(fb_to_schema) - .map(|x| x.0) .ok_or_else(|| { ArrowError::OutOfSpec("the message is not Arrow Schema".to_string()) - }), + }) + .and_then(fb_to_schema) + .map(|x| x.0), Err(err) => { // The flatbuffers implementation returns an error on verification error. Err(ArrowError::OutOfSpec(format!(