Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add public API for decoding parquet footer #1804

Merged
merged 2 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 11 additions & 21 deletions parquet/src/arrow/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,12 @@

use std::collections::VecDeque;
use std::fmt::Formatter;
use std::io::{Cursor, SeekFrom};
use std::io::SeekFrom;
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use byteorder::{ByteOrder, LittleEndian};
use bytes::{Buf, Bytes};
use futures::future::{BoxFuture, FutureExt};
use futures::stream::Stream;
Expand All @@ -99,10 +98,10 @@ use crate::arrow::ProjectionMask;
use crate::basic::Compression;
use crate::column::page::{PageIterator, PageReader};
use crate::errors::{ParquetError, Result};
use crate::file::footer::parse_metadata_buffer;
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::ParquetMetaData;
use crate::file::reader::SerializedPageReader;
use crate::file::PARQUET_MAGIC;
use crate::file::FOOTER_SIZE;
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};

/// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files
Expand Down Expand Up @@ -134,30 +133,21 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
}

fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64;
async move {
self.seek(SeekFrom::End(-8)).await?;
self.seek(SeekFrom::End(-FOOTER_SIZE_I64)).await?;

let mut buf = [0_u8; 8];
let mut buf = [0_u8; FOOTER_SIZE];
self.read_exact(&mut buf).await?;

if buf[4..] != PARQUET_MAGIC {
return Err(general_err!("Invalid Parquet file. Corrupt footer"));
}

let metadata_len = LittleEndian::read_i32(&buf[..4]) as i64;
if metadata_len < 0 {
return Err(general_err!(
"Invalid Parquet file. Metadata length is less than zero ({})",
metadata_len
));
}

self.seek(SeekFrom::End(-8 - metadata_len)).await?;
let metadata_len = decode_footer(&buf)?;
self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64))
.await?;

let mut buf = Vec::with_capacity(metadata_len as usize + 8);
let mut buf = Vec::with_capacity(metadata_len);
self.read_to_end(&mut buf).await?;

Ok(Arc::new(parse_metadata_buffer(&mut Cursor::new(buf))?))
Ok(Arc::new(decode_metadata(&buf)?))
}
.boxed()
}
Expand Down
97 changes: 48 additions & 49 deletions parquet/src/file/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::{
cmp::min,
io::{Cursor, Read, Seek, SeekFrom},
sync::Arc,
};
use std::{io::Read, sync::Arc};

use byteorder::{ByteOrder, LittleEndian};
use parquet_format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData};
Expand All @@ -28,10 +24,7 @@ use thrift::protocol::TCompactInputProtocol;
use crate::basic::ColumnOrder;

use crate::errors::{ParquetError, Result};
use crate::file::{
metadata::*, reader::ChunkReader, DEFAULT_FOOTER_READ_SIZE, FOOTER_SIZE,
PARQUET_MAGIC,
};
use crate::file::{metadata::*, reader::ChunkReader, FOOTER_SIZE, PARQUET_MAGIC};

use crate::schema::types::{self, SchemaDescriptor};

Expand All @@ -52,55 +45,42 @@ pub fn parse_metadata<R: ChunkReader>(chunk_reader: &R) -> Result<ParquetMetaDat
));
}

// read and cache up to DEFAULT_FOOTER_READ_SIZE bytes from the end and process the footer
let default_end_len = min(DEFAULT_FOOTER_READ_SIZE, chunk_reader.len() as usize);
let mut default_end_reader = chunk_reader
.get_read(chunk_reader.len() - default_end_len as u64, default_end_len)?;
let mut default_len_end_buf = vec![0; default_end_len];
default_end_reader.read_exact(&mut default_len_end_buf)?;
let mut footer = [0_u8; 8];
chunk_reader
.get_read(file_size - 8, 8)?
.read_exact(&mut footer)?;

// check this is indeed a parquet file
if default_len_end_buf[default_end_len - 4..] != PARQUET_MAGIC {
return Err(general_err!("Invalid Parquet file. Corrupt footer"));
}
let metadata_len = decode_footer(&footer)?;
let footer_metadata_len = FOOTER_SIZE + metadata_len;

// get the metadata length from the footer
let metadata_len = LittleEndian::read_i32(
&default_len_end_buf[default_end_len - 8..default_end_len - 4],
) as i64;
if metadata_len < 0 {
if footer_metadata_len > file_size as usize {
return Err(general_err!(
"Invalid Parquet file. Metadata length is less than zero ({})",
metadata_len
"Invalid Parquet file. Reported metadata length of {} + {} byte footer, but file is only {} bytes",
metadata_len,
FOOTER_SIZE,
file_size
));
}
let footer_metadata_len = FOOTER_SIZE + metadata_len as usize;

// build up the reader covering the entire metadata
let mut default_end_cursor = Cursor::new(default_len_end_buf);
if footer_metadata_len > file_size as usize {
return Err(general_err!(
"Invalid Parquet file. Metadata start is less than zero ({})",
file_size as i64 - footer_metadata_len as i64
let mut metadata = Vec::with_capacity(metadata_len);

let read = chunk_reader
.get_read(file_size - footer_metadata_len as u64, metadata_len)?
.read_to_end(&mut metadata)?;

if read != metadata_len {
return Err(eof_err!(
"Expected to read {} bytes of metadata, got {}",
metadata_len,
read
));
} else if footer_metadata_len < DEFAULT_FOOTER_READ_SIZE {
// the whole metadata is in the bytes we already read
default_end_cursor.seek(SeekFrom::End(-(footer_metadata_len as i64)))?;
parse_metadata_buffer(&mut default_end_cursor)
} else {
// the end of file read by default is not long enough, read missing bytes
let complementary_end_read = chunk_reader.get_read(
file_size - footer_metadata_len as u64,
FOOTER_SIZE + metadata_len as usize - default_end_len,
)?;
parse_metadata_buffer(&mut complementary_end_read.chain(default_end_cursor))
}

decode_metadata(&metadata)
}

/// Reads [`ParquetMetaData`] from the provided [`Read`] starting at the readers current position
pub(crate) fn parse_metadata_buffer<T: Read + ?Sized>(
metadata_read: &mut T,
) -> Result<ParquetMetaData> {
/// Decodes [`ParquetMetaData`] from the provided bytes
pub fn decode_metadata(metadata_read: &[u8]) -> Result<ParquetMetaData> {
// TODO: row group filtering
let mut prot = TCompactInputProtocol::new(metadata_read);
let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
Expand All @@ -124,6 +104,23 @@ pub(crate) fn parse_metadata_buffer<T: Read + ?Sized>(
Ok(ParquetMetaData::new(file_metadata, row_groups))
}

/// Decodes the footer returning the metadata length in bytes
pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
// check this is indeed a parquet file
if slice[4..] != PARQUET_MAGIC {
return Err(general_err!("Invalid Parquet file. Corrupt footer"));
}

// get the metadata length from the footer
let metadata_len = LittleEndian::read_i32(&slice[..4]);
metadata_len.try_into().map_err(|_| {
general_err!(
"Invalid Parquet file. Metadata length is less than zero ({})",
metadata_len
)
})
}

/// Parses column orders from Thrift definition.
/// If no column orders are defined, returns `None`.
fn parse_column_orders(
Expand Down Expand Up @@ -209,7 +206,9 @@ mod tests {
assert!(reader_result.is_err());
assert_eq!(
reader_result.err().unwrap(),
general_err!("Invalid Parquet file. Metadata start is less than zero (-255)")
general_err!(
"Invalid Parquet file. Reported metadata length of 255 + 8 byte footer, but file is only 8 bytes"
)
);
}

Expand Down
8 changes: 3 additions & 5 deletions parquet/src/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@ pub mod serialized_reader;
pub mod statistics;
pub mod writer;

const FOOTER_SIZE: usize = 8;
pub(crate) const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1'];

/// The number of bytes read at the end of the parquet file on first read
const DEFAULT_FOOTER_READ_SIZE: usize = 64 * 1024;
/// The length of the parquet footer in bytes
pub const FOOTER_SIZE: usize = 8;
const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1'];