Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved read of metadata #143

Merged
merged 1 commit into from
May 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 30 additions & 88 deletions src/read/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ use std::{
use parquet_format_async_temp::thrift::protocol::TCompactInputProtocol;
use parquet_format_async_temp::FileMetaData as TFileMetaData;

use super::super::{metadata::FileMetaData, DEFAULT_FOOTER_READ_SIZE, FOOTER_SIZE, PARQUET_MAGIC};
use super::super::{
metadata::FileMetaData, DEFAULT_FOOTER_READ_SIZE, FOOTER_SIZE, HEADER_SIZE, PARQUET_MAGIC,
};

use crate::error::{Error, Result};
use crate::HEADER_SIZE;

pub(super) fn metadata_len(buffer: &[u8], len: usize) -> i32 {
i32::from_le_bytes(buffer[len - 8..len - 4].try_into().unwrap())
Expand Down Expand Up @@ -51,109 +52,50 @@ pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetaData> {
// read and cache up to DEFAULT_FOOTER_READ_SIZE bytes from the end and process the footer
let default_end_len = min(DEFAULT_FOOTER_READ_SIZE, file_size) as usize;
reader.seek(SeekFrom::End(-(default_end_len as i64)))?;
let mut default_len_end_buf = vec![0; default_end_len];
reader.read_exact(&mut default_len_end_buf)?;
let mut buffer = vec![0; default_end_len];
reader.read_exact(&mut buffer)?;

// check this is indeed a parquet file
if default_len_end_buf[default_end_len - 4..] != PARQUET_MAGIC {
if buffer[default_end_len - 4..] != PARQUET_MAGIC {
return Err(general_err!("Invalid Parquet file. Corrupt footer"));
}

let metadata_len = metadata_len(&default_len_end_buf, default_end_len);
let metadata = metadata_len(&buffer, default_end_len);

if metadata_len < 0 {
return Err(general_err!(
let metadata_len: u64 = metadata.try_into().map_err(|_| {
general_err!(
"Invalid Parquet file. Metadata length is less than zero ({})",
metadata_len
));
}
let footer_metadata_len = FOOTER_SIZE + metadata_len as u64;
metadata
)
})?;

let metadata = if footer_metadata_len > file_size {
let footer_len = FOOTER_SIZE + metadata_len;
if footer_len > file_size {
return Err(general_err!(
"Invalid Parquet file. Metadata start is less than zero ({})",
file_size as i64 - footer_metadata_len as i64
file_size as i64 - footer_len as i64
));
} else if footer_metadata_len < DEFAULT_FOOTER_READ_SIZE {
}

let reader = if (footer_len as usize) < buffer.len() {
// the whole metadata is in the bytes we already read
// build up the reader covering the entire metadata
let mut reader = Cursor::new(default_len_end_buf);
reader.seek(SeekFrom::End(-(footer_metadata_len as i64)))?;
let mut reader = Cursor::new(buffer);
reader.seek(SeekFrom::End(-(footer_len as i64)))?;

let mut prot = TCompactInputProtocol::new(reader);
TFileMetaData::read_from_in_protocol(&mut prot)
reader
} else {
// the end of file read by default is not long enough, read again including all metadata.
reader.seek(SeekFrom::End(-(footer_metadata_len as i64)))?;
reader.seek(SeekFrom::End(-(footer_len as i64)))?;
let mut buffer = vec![0; footer_len as usize];
reader.read_exact(&mut buffer)?;

let mut prot = TCompactInputProtocol::new(reader);
TFileMetaData::read_from_in_protocol(&mut prot)
}
.map_err(|e| Error::General(format!("Could not parse metadata: {}", e)))?;
Cursor::new(buffer)
};

FileMetaData::try_from_thrift(metadata)
}
let mut prot = TCompactInputProtocol::new(reader);
let metadata = TFileMetaData::read_from_in_protocol(&mut prot)
.map_err(|e| Error::General(format!("Could not parse metadata: {}", e)))?;

#[cfg(test)]
mod tests {
use std::fs::File;

use super::*;

use crate::schema::{types::PhysicalType, Repetition};
use crate::tests::get_path;

#[test]
fn test_basics() {
let mut testdata = get_path();
testdata.push("alltypes_plain.parquet");
let mut file = File::open(testdata).unwrap();

let metadata = read_metadata(&mut file).unwrap();

let columns = metadata.schema_descr.columns();

/*
from pyarrow:
required group field_id=0 schema {
optional int32 field_id=1 id;
optional boolean field_id=2 bool_col;
optional int32 field_id=3 tinyint_col;
optional int32 field_id=4 smallint_col;
optional int32 field_id=5 int_col;
optional int64 field_id=6 bigint_col;
optional float field_id=7 float_col;
optional double field_id=8 double_col;
optional binary field_id=9 date_string_col;
optional binary field_id=10 string_col;
optional int96 field_id=11 timestamp_col;
}
*/
let expected = vec![
PhysicalType::Int32,
PhysicalType::Boolean,
PhysicalType::Int32,
PhysicalType::Int32,
PhysicalType::Int32,
PhysicalType::Int64,
PhysicalType::Float,
PhysicalType::Double,
PhysicalType::ByteArray,
PhysicalType::ByteArray,
PhysicalType::Int96,
];

let result = columns
.iter()
.map(|column| {
assert_eq!(
column.descriptor.primitive_type.field_info.repetition,
Repetition::Optional
);
column.descriptor.primitive_type.physical_type
})
.collect::<Vec<_>>();

assert_eq!(expected, result);
}
FileMetaData::try_from_thrift(metadata)
}
30 changes: 15 additions & 15 deletions src/read/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,41 +52,41 @@ pub async fn read_metadata<R: AsyncRead + AsyncSeek + Send + std::marker::Unpin>
));
}

let metadata_len = metadata_len(&buffer, default_end_len);
let metadata = metadata_len(&buffer, default_end_len);

if metadata_len < 0 {
return Err(general_err!(
"Invalid file. Metadata length is less than zero ({})",
metadata_len
));
}
let footer_len = FOOTER_SIZE + metadata_len as u64;
let metadata_len: u64 = metadata.try_into().map_err(|_| {
general_err!(
"Invalid Parquet file. Metadata length is less than zero ({})",
metadata
)
})?;

let footer_len = FOOTER_SIZE + metadata_len;
if footer_len > file_size {
return Err(general_err!(
"Invalid Parquet file. Metadata start is less than zero ({})",
file_size as i64 - footer_len as i64
));
}

let metadata = if footer_len < DEFAULT_FOOTER_READ_SIZE {
let reader = if (footer_len as usize) < buffer.len() {
// the whole metadata is in the bytes we already read
// build up the reader covering the entire metadata
let mut reader = Cursor::new(buffer);
reader.seek(SeekFrom::End(-(footer_len as i64)))?;

let mut prot = TCompactInputProtocol::new(reader);
TFileMetaData::read_from_in_protocol(&mut prot)?
reader
} else {
// the end of file read by default is not long enough, read again including all metadata.
reader.seek(SeekFrom::End(-(footer_len as i64))).await?;
let mut buffer = vec![0; footer_len as usize];
reader.read_exact(&mut buffer).await?;

let reader = Cursor::new(buffer);

let mut prot = TCompactInputProtocol::new(reader);
TFileMetaData::read_from_in_protocol(&mut prot)?
Cursor::new(buffer)
};

let mut prot = TCompactInputProtocol::new(reader);
let metadata = TFileMetaData::read_from_in_protocol(&mut prot)?;

FileMetaData::try_from_thrift(metadata)
}
56 changes: 56 additions & 0 deletions tests/it/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use parquet2::read::{MutStreamingIterator, State};
use parquet2::schema::types::GroupConvertedType;
use parquet2::schema::types::ParquetType;
use parquet2::schema::types::PhysicalType;
use parquet2::schema::Repetition;
use parquet2::statistics::{BinaryStatistics, BooleanStatistics, PrimitiveStatistics, Statistics};
use parquet2::types::int96_to_i64_ns;
use parquet2::FallibleStreamingIterator;
Expand Down Expand Up @@ -517,3 +518,58 @@ fn pyarrow_v1_struct_required() -> Result<()> {
fn pyarrow_v2_struct_required() -> Result<()> {
test_pyarrow_integration("struct", "struct_required", 2, false, false, "")
}

#[test]
fn test_metadata() -> Result<()> {
let mut testdata = get_path();
testdata.push("alltypes_plain.parquet");
let mut file = File::open(testdata).unwrap();

let metadata = read_metadata(&mut file)?;

let columns = metadata.schema_descr.columns();

/*
from pyarrow:
required group field_id=0 schema {
optional int32 field_id=1 id;
optional boolean field_id=2 bool_col;
optional int32 field_id=3 tinyint_col;
optional int32 field_id=4 smallint_col;
optional int32 field_id=5 int_col;
optional int64 field_id=6 bigint_col;
optional float field_id=7 float_col;
optional double field_id=8 double_col;
optional binary field_id=9 date_string_col;
optional binary field_id=10 string_col;
optional int96 field_id=11 timestamp_col;
}
*/
let expected = vec![
PhysicalType::Int32,
PhysicalType::Boolean,
PhysicalType::Int32,
PhysicalType::Int32,
PhysicalType::Int32,
PhysicalType::Int64,
PhysicalType::Float,
PhysicalType::Double,
PhysicalType::ByteArray,
PhysicalType::ByteArray,
PhysicalType::Int96,
];

let result = columns
.iter()
.map(|column| {
assert_eq!(
column.descriptor.primitive_type.field_info.repetition,
Repetition::Optional
);
column.descriptor.primitive_type.physical_type
})
.collect::<Vec<_>>();

assert_eq!(expected, result);
Ok(())
}