Skip to content

Commit

Permalink
Raise error if page is too large
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Feb 14, 2022
1 parent 9bc4fe4 commit 05e4def
Showing 1 changed file with 72 additions and 15 deletions.
87 changes: 72 additions & 15 deletions src/write/page.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::convert::TryInto;
use std::io::Write;
use std::sync::Arc;

Expand All @@ -7,12 +8,30 @@ use parquet_format_async_temp::thrift::protocol::{
};
use parquet_format_async_temp::{DictionaryPageHeader, Encoding, PageType};

use crate::error::Result;
use crate::error::{ParquetError, Result};
use crate::page::{
CompressedDataPage, CompressedDictPage, CompressedPage, DataPageHeader, ParquetPageHeader,
};
use crate::statistics::Statistics;

fn maybe_bytes(uncompressed: usize, compressed: usize) -> Result<(i32, i32)> {
let uncompressed_page_size: i32 = uncompressed.try_into().map_err(|_| {
ParquetError::OutOfSpec(format!(
"A page can only contain i32::MAX uncompressed bytes. This one contains {}",
uncompressed
))
})?;

let compressed_page_size: i32 = compressed.try_into().map_err(|_| {
ParquetError::OutOfSpec(format!(
"A page can only contain i32::MAX compressed bytes. This one contains {}",
compressed
))
})?;

Ok((uncompressed_page_size, compressed_page_size))
}

/// Contains page write metrics.
pub struct PageWriteSpec {
pub header: ParquetPageHeader,
Expand All @@ -30,7 +49,7 @@ pub fn write_page<W: Write>(
let header = match &compressed_page {
CompressedPage::Data(compressed_page) => assemble_data_page_header(compressed_page),
CompressedPage::Dict(compressed_page) => assemble_dict_page_header(compressed_page),
};
}?;

let header_size = write_page_header(writer, &header)?;
let mut bytes_written = header_size as u64;
Expand Down Expand Up @@ -68,7 +87,7 @@ pub async fn write_page_async<W: AsyncWrite + Unpin + Send>(
let header = match &compressed_page {
CompressedPage::Data(compressed_page) => assemble_data_page_header(compressed_page),
CompressedPage::Dict(compressed_page) => assemble_dict_page_header(compressed_page),
};
}?;

let header_size = write_page_header_async(writer, &header).await?;
let mut bytes_written = header_size as u64;
Expand Down Expand Up @@ -98,47 +117,60 @@ pub async fn write_page_async<W: AsyncWrite + Unpin + Send>(
})
}

fn assemble_data_page_header(compressed_page: &CompressedDataPage) -> ParquetPageHeader {
fn assemble_data_page_header(page: &CompressedDataPage) -> Result<ParquetPageHeader> {
let (uncompressed_page_size, compressed_page_size) =
maybe_bytes(page.uncompressed_size(), page.compressed_size())?;

let mut page_header = ParquetPageHeader {
type_: match compressed_page.header() {
type_: match page.header() {
DataPageHeader::V1(_) => PageType::DATA_PAGE,
DataPageHeader::V2(_) => PageType::DATA_PAGE_V2,
},
uncompressed_page_size: compressed_page.uncompressed_size() as i32,
compressed_page_size: compressed_page.compressed_size() as i32,
uncompressed_page_size,
compressed_page_size,
crc: None,
data_page_header: None,
index_page_header: None,
dictionary_page_header: None,
data_page_header_v2: None,
};

match compressed_page.header() {
match page.header() {
DataPageHeader::V1(header) => {
page_header.data_page_header = Some(header.clone());
}
DataPageHeader::V2(header) => {
page_header.data_page_header_v2 = Some(header.clone());
}
}
page_header
Ok(page_header)
}

fn assemble_dict_page_header(page: &CompressedDictPage) -> ParquetPageHeader {
ParquetPageHeader {
fn assemble_dict_page_header(page: &CompressedDictPage) -> Result<ParquetPageHeader> {
let (uncompressed_page_size, compressed_page_size) =
maybe_bytes(page.uncompressed_page_size, page.buffer.len())?;

let num_values: i32 = page.num_values.try_into().map_err(|_| {
ParquetError::OutOfSpec(format!(
"A dictionary page can only contain i32::MAX items. This one contains {}",
page.num_values
))
})?;

Ok(ParquetPageHeader {
type_: PageType::DICTIONARY_PAGE,
uncompressed_page_size: page.uncompressed_page_size as i32,
compressed_page_size: page.buffer.len() as i32,
uncompressed_page_size,
compressed_page_size,
crc: None,
data_page_header: None,
index_page_header: None,
dictionary_page_header: Some(DictionaryPageHeader {
num_values: page.num_values as i32,
num_values,
encoding: Encoding::PLAIN,
is_sorted: None,
}),
data_page_header_v2: None,
}
})
}

/// writes the page header into `writer`, returning the number of bytes used in the process.
Expand All @@ -155,3 +187,28 @@ async fn write_page_header_async<W: AsyncWrite + Unpin + Send>(
let mut protocol = TCompactOutputStreamProtocol::new(&mut writer);
Ok(header.write_to_out_stream_protocol(&mut protocol).await? as u64)
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn dict_too_large() {
let page = CompressedDictPage {
buffer: vec![],
uncompressed_page_size: i32::MAX as usize + 1,
num_values: 100,
};
assert!(assemble_dict_page_header(&page).is_err());
}

#[test]
fn dict_too_many_values() {
let page = CompressedDictPage {
buffer: vec![],
uncompressed_page_size: 0,
num_values: i32::MAX as usize + 1,
};
assert!(assemble_dict_page_header(&page).is_err());
}
}

0 comments on commit 05e4def

Please sign in to comment.