Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raise error when writing a page that is too large #84

Merged
merged 1 commit into from
Feb 14, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 72 additions & 15 deletions src/write/page.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::convert::TryInto;
use std::io::Write;
use std::sync::Arc;

Expand All @@ -7,12 +8,30 @@ use parquet_format_async_temp::thrift::protocol::{
};
use parquet_format_async_temp::{DictionaryPageHeader, Encoding, PageType};

use crate::error::Result;
use crate::error::{ParquetError, Result};
use crate::page::{
CompressedDataPage, CompressedDictPage, CompressedPage, DataPageHeader, ParquetPageHeader,
};
use crate::statistics::Statistics;

fn maybe_bytes(uncompressed: usize, compressed: usize) -> Result<(i32, i32)> {
let uncompressed_page_size: i32 = uncompressed.try_into().map_err(|_| {
ParquetError::OutOfSpec(format!(
"A page can only contain i32::MAX uncompressed bytes. This one contains {}",
uncompressed
))
})?;

let compressed_page_size: i32 = compressed.try_into().map_err(|_| {
ParquetError::OutOfSpec(format!(
"A page can only contain i32::MAX compressed bytes. This one contains {}",
compressed
))
})?;

Ok((uncompressed_page_size, compressed_page_size))
}

/// Contains page write metrics.
pub struct PageWriteSpec {
pub header: ParquetPageHeader,
Expand All @@ -30,7 +49,7 @@ pub fn write_page<W: Write>(
let header = match &compressed_page {
CompressedPage::Data(compressed_page) => assemble_data_page_header(compressed_page),
CompressedPage::Dict(compressed_page) => assemble_dict_page_header(compressed_page),
};
}?;

let header_size = write_page_header(writer, &header)?;
let mut bytes_written = header_size as u64;
Expand Down Expand Up @@ -68,7 +87,7 @@ pub async fn write_page_async<W: AsyncWrite + Unpin + Send>(
let header = match &compressed_page {
CompressedPage::Data(compressed_page) => assemble_data_page_header(compressed_page),
CompressedPage::Dict(compressed_page) => assemble_dict_page_header(compressed_page),
};
}?;

let header_size = write_page_header_async(writer, &header).await?;
let mut bytes_written = header_size as u64;
Expand Down Expand Up @@ -98,47 +117,60 @@ pub async fn write_page_async<W: AsyncWrite + Unpin + Send>(
})
}

fn assemble_data_page_header(compressed_page: &CompressedDataPage) -> ParquetPageHeader {
fn assemble_data_page_header(page: &CompressedDataPage) -> Result<ParquetPageHeader> {
let (uncompressed_page_size, compressed_page_size) =
maybe_bytes(page.uncompressed_size(), page.compressed_size())?;

let mut page_header = ParquetPageHeader {
type_: match compressed_page.header() {
type_: match page.header() {
DataPageHeader::V1(_) => PageType::DATA_PAGE,
DataPageHeader::V2(_) => PageType::DATA_PAGE_V2,
},
uncompressed_page_size: compressed_page.uncompressed_size() as i32,
compressed_page_size: compressed_page.compressed_size() as i32,
uncompressed_page_size,
compressed_page_size,
crc: None,
data_page_header: None,
index_page_header: None,
dictionary_page_header: None,
data_page_header_v2: None,
};

match compressed_page.header() {
match page.header() {
DataPageHeader::V1(header) => {
page_header.data_page_header = Some(header.clone());
}
DataPageHeader::V2(header) => {
page_header.data_page_header_v2 = Some(header.clone());
}
}
page_header
Ok(page_header)
}

fn assemble_dict_page_header(page: &CompressedDictPage) -> ParquetPageHeader {
ParquetPageHeader {
fn assemble_dict_page_header(page: &CompressedDictPage) -> Result<ParquetPageHeader> {
let (uncompressed_page_size, compressed_page_size) =
maybe_bytes(page.uncompressed_page_size, page.buffer.len())?;

let num_values: i32 = page.num_values.try_into().map_err(|_| {
ParquetError::OutOfSpec(format!(
"A dictionary page can only contain i32::MAX items. This one contains {}",
page.num_values
))
})?;

Ok(ParquetPageHeader {
type_: PageType::DICTIONARY_PAGE,
uncompressed_page_size: page.uncompressed_page_size as i32,
compressed_page_size: page.buffer.len() as i32,
uncompressed_page_size,
compressed_page_size,
crc: None,
data_page_header: None,
index_page_header: None,
dictionary_page_header: Some(DictionaryPageHeader {
num_values: page.num_values as i32,
num_values,
encoding: Encoding::PLAIN,
is_sorted: None,
}),
data_page_header_v2: None,
}
})
}

/// writes the page header into `writer`, returning the number of bytes used in the process.
Expand All @@ -155,3 +187,28 @@ async fn write_page_header_async<W: AsyncWrite + Unpin + Send>(
let mut protocol = TCompactOutputStreamProtocol::new(&mut writer);
Ok(header.write_to_out_stream_protocol(&mut protocol).await? as u64)
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn dict_too_large() {
let page = CompressedDictPage {
buffer: vec![],
uncompressed_page_size: i32::MAX as usize + 1,
num_values: 100,
};
assert!(assemble_dict_page_header(&page).is_err());
}

#[test]
fn dict_too_many_values() {
let page = CompressedDictPage {
buffer: vec![],
uncompressed_page_size: 0,
num_values: i32::MAX as usize + 1,
};
assert!(assemble_dict_page_header(&page).is_err());
}
}