Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
IPC panic free
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jun 15, 2022
1 parent 76d2a39 commit 676ca65
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 14 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ tokio-util = { version = "0.6", features = ["compat"] }
# used to run formal property testing
proptest = { version = "1", default_features = false, features = ["std"] }
avro-rs = { version = "0.13", features = ["snappy"] }
# use for flaky testing
rand = "0.8"

[package.metadata.docs.rs]
features = ["full"]
Expand Down
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,14 @@ Most uses of `unsafe` fall into 3 categories:
We actively monitor for vulnerabilities in Rust's advisory and either patch or mitigate
them (see e.g. `.cargo/audit.yaml` and `.github/workflows/security.yaml`).

Reading parquet and IPC currently `panic!` when they receive invalid. We are
actively addressing this.
Reading from untrusted data currently _may_ `panic!` on the following formats:

* parquet
* avro
* Arrow IPC streams
* compressed Arrow IPC files and streams

We are actively addressing this.

## Integration tests

Expand Down
2 changes: 2 additions & 0 deletions src/io/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ pub fn deserialize_batch(
let message = arrow_format::ipc::MessageRef::read_as_root(&data.data_header)
.map_err(|err| Error::OutOfSpec(format!("Unable to get root as message: {:?}", err)))?;

let length = data.data_body.len();
let mut reader = std::io::Cursor::new(&data.data_body);

match message.header()?.ok_or_else(|| {
Expand All @@ -132,6 +133,7 @@ pub fn deserialize_batch(
message.version()?,
&mut reader,
0,
length as u64,
),
_ => Err(Error::nyi(
"flight currently only supports reading RecordBatch messages",
Expand Down
19 changes: 16 additions & 3 deletions src/io/ipc/read/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,22 @@ pub fn read_record_batch<R: Read + Seek>(
version: arrow_format::ipc::MetadataVersion,
reader: &mut R,
block_offset: u64,
file_size: u64,
) -> Result<Chunk<Box<dyn Array>>> {
assert_eq!(fields.len(), ipc_schema.fields.len());
let buffers = batch
.buffers()?
.ok_or_else(|| Error::oos("IPC RecordBatch must contain buffers"))?;
let mut buffers: VecDeque<arrow_format::ipc::BufferRef> = buffers.iter().collect();

for buffer in buffers.iter() {
if buffer.length() > file_size as i64 {
return Err(Error::oos(
"Any buffer's length must be smaller than the size of the file",
));
}
}

let field_nodes = batch
.nodes()?
.ok_or_else(|| Error::oos("IPC RecordBatch must contain field nodes"))?;
Expand Down Expand Up @@ -205,6 +214,7 @@ pub fn read_dictionary<R: Read + Seek>(
dictionaries: &mut Dictionaries,
reader: &mut R,
block_offset: u64,
file_size: u64,
) -> Result<()> {
if batch.is_delta()? {
return Err(Error::NotYetImplemented(
Expand All @@ -220,23 +230,26 @@ pub fn read_dictionary<R: Read + Seek>(
// Get an array representing this dictionary's values.
let dictionary_values: Box<dyn Array> = match &first_field.data_type {
DataType::Dictionary(_, ref value_type, _) => {
let batch = batch
.data()?
.ok_or_else(|| Error::oos("The dictionary batch must have data."))?;

// Make a fake schema for the dictionary batch.
let fields = vec![Field::new("", value_type.as_ref().clone(), false)];
let ipc_schema = IpcSchema {
fields: vec![first_ipc_field.clone()],
is_little_endian: ipc_schema.is_little_endian,
};
let columns = read_record_batch(
batch
.data()?
.ok_or_else(|| Error::oos("The dictionary batch must have data."))?,
batch,
&fields,
&ipc_schema,
None,
dictionaries,
arrow_format::ipc::MetadataVersion::V5,
reader,
block_offset,
file_size,
)?;
let mut arrays = columns.into_arrays();
Some(arrays.pop().unwrap())
Expand Down
13 changes: 11 additions & 2 deletions src/io/ipc/read/file_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ where
reader.seek(SeekFrom::End(-10 - footer_size as i64)).await?;
reader.read_exact(&mut footer).await?;

deserialize_footer(&footer)
deserialize_footer(&footer, u64::MAX)
}

async fn read_batch<R>(
Expand Down Expand Up @@ -188,6 +188,7 @@ where
message.version()?,
&mut cursor,
0,
metadata.size,
)
}

Expand Down Expand Up @@ -220,7 +221,15 @@ where
buffer.resize(length, 0);
reader.read_exact(&mut buffer).await?;
let mut cursor = std::io::Cursor::new(&mut buffer);
read_dictionary(batch, fields, ipc_schema, &mut dictionaries, &mut cursor, 0)?;
read_dictionary(
batch,
fields,
ipc_schema,
&mut dictionaries,
&mut cursor,
0,
u64::MAX,
)?;
}
other => {
return Err(Error::OutOfSpec(format!(
Expand Down
24 changes: 17 additions & 7 deletions src/io/ipc/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ pub struct FileMetadata {

/// Dictionaries associated to each dict_id
pub(crate) dictionaries: Option<Vec<arrow_format::ipc::Block>>,

/// The total size of the file in bytes
pub(crate) size: u64,
}

/// Arrow File reader
Expand Down Expand Up @@ -92,6 +95,7 @@ fn read_dictionary_block<R: Read + Seek>(
dictionaries,
reader,
block_offset,
metadata.size,
)?;
}
t => {
Expand Down Expand Up @@ -126,9 +130,10 @@ pub fn read_file_dictionaries<R: Read + Seek>(
}

/// Reads the footer's length and magic number in footer
fn read_footer_len<R: Read + Seek>(reader: &mut R) -> Result<usize> {
fn read_footer_len<R: Read + Seek>(reader: &mut R) -> Result<(u64, usize)> {
// read footer length and magic number in footer
reader.seek(SeekFrom::End(-10))?;
let end = reader.seek(SeekFrom::End(-10))? + 10;

let mut footer: [u8; 10] = [0; 10];

reader.read_exact(&mut footer)?;
Expand All @@ -139,12 +144,14 @@ fn read_footer_len<R: Read + Seek>(reader: &mut R) -> Result<usize> {
"Arrow file does not contain correct footer".to_string(),
));
}
footer_len
let footer_len = footer_len
.try_into()
.map_err(|_| Error::oos("The footer's lenght must be a positive number"))
.map_err(|_| Error::oos("The footer's lenght must be a positive number"))?;

Ok((end, footer_len))
}

pub(super) fn deserialize_footer(footer_data: &[u8]) -> Result<FileMetadata> {
pub(super) fn deserialize_footer(footer_data: &[u8], size: u64) -> Result<FileMetadata> {
let footer = arrow_format::ipc::FooterRef::read_as_root(footer_data)
.map_err(|err| Error::OutOfSpec(format!("Unable to get root as footer: {:?}", err)))?;

Expand Down Expand Up @@ -177,28 +184,30 @@ pub(super) fn deserialize_footer(footer_data: &[u8]) -> Result<FileMetadata> {
ipc_schema,
blocks,
dictionaries,
size,
})
}

/// Read the IPC file's metadata
pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetadata> {
// check if header contain the correct magic bytes
let mut magic_buffer: [u8; 6] = [0; 6];
let start = reader.seek(SeekFrom::Current(0))?;
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != ARROW_MAGIC {
return Err(Error::OutOfSpec(
"Arrow file does not contain correct header".to_string(),
));
}

let footer_len = read_footer_len(reader)?;
let (end, footer_len) = read_footer_len(reader)?;

// read footer
let mut footer_data = vec![0; footer_len as usize];
reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
reader.read_exact(&mut footer_data)?;

deserialize_footer(&footer_data)
deserialize_footer(&footer_data, end - start)
}

pub(super) fn get_serialized_batch<'a>(
Expand Down Expand Up @@ -264,6 +273,7 @@ pub fn read_batch<R: Read + Seek>(
message.version()?,
reader,
block.offset as u64 + block.meta_data_length as u64,
metadata.size,
)
}

Expand Down
2 changes: 2 additions & 0 deletions src/io/ipc/read/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ fn read_next<R: Read>(
metadata.version,
&mut reader,
0,
u64::MAX,
)
.map(|x| Some(StreamState::Some(x)))
}
Expand All @@ -164,6 +165,7 @@ fn read_next<R: Read>(
dictionaries,
&mut dict_reader,
0,
u64::MAX,
)?;

// read the next message until we encounter a RecordBatch message
Expand Down
2 changes: 2 additions & 0 deletions src/io/ipc/read/stream_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ async fn maybe_next<R: AsyncRead + Unpin + Send>(
state.metadata.version,
&mut std::io::Cursor::new(&state.data_buffer),
0,
u64::MAX,
)
.map(|chunk| Some(StreamState::Some((state, chunk))))
}
Expand All @@ -137,6 +138,7 @@ async fn maybe_next<R: AsyncRead + Unpin + Send>(
&mut state.dictionaries,
&mut dict_reader,
0,
u64::MAX,
)?;

// read the next message until we encounter a Chunk<Box<dyn Array>> message
Expand Down
38 changes: 38 additions & 0 deletions tests/it/io/ipc/read/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,41 @@ fn read_projected() -> Result<()> {

test_projection("1.0.0-littleendian", "generated_primitive", vec![2, 1])
}

fn read_corrupted_ipc(data: Vec<u8>) -> Result<()> {
let mut file = std::io::Cursor::new(data);

let metadata = read_file_metadata(&mut file)?;
let mut reader = FileReader::new(file, metadata, None);

reader.try_for_each(|rhs| {
rhs?;
Result::Ok(())
})?;

Ok(())
}

#[test]
fn test_does_not_panic() {
use rand::Rng; // 0.8.0

let version = "1.0.0-littleendian";
let file_name = "generated_primitive";
let testdata = crate::test_util::arrow_test_data();
let path = format!(
"{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
testdata, version, file_name
);
let original = std::fs::read(path).unwrap();

for errors in 0..1000 {
let mut data = original.clone();
for _ in 0..errors {
let position: usize = rand::thread_rng().gen_range(0..data.len());
let new_byte: u8 = rand::thread_rng().gen_range(0..u8::MAX);
data[position] = new_byte
}
let _ = read_corrupted_ipc(data);
}
}

0 comments on commit 676ca65

Please sign in to comment.