Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating parquet tools #182

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions parquet-tools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ name = "parquet_tools"
path = "src/main.rs"

[dependencies]
parquet2 = { version = "0.14", path = "../" }
clap = {version = "2.33", features = ["yaml"]}
parquet2 = { version = "0.15", path = "../" }
clap = {version = "3.2.17", features = ["yaml"]}
186 changes: 90 additions & 96 deletions parquet-tools/src/lib/dump.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,40 @@
//! Subcommand `dump`. This subcommand shows the parquet metadata information
use parquet2::{
page::{
BinaryPageDict, DataPageHeader, DictPage, FixedLenByteArrayPageDict, PrimitivePageDict,
},
read::{get_page_iterator, read_metadata},
error::{Error, Result},
page::Page,
read::{decompress, get_page_iterator, read_metadata},
schema::types::PhysicalType,
types::{decode, NativeType},
};

use std::{fs::File, io::Write, path::Path, sync::Arc};
use std::{fs::File, io::Write, path::Path};

use crate::{Result, SEPARATOR};
use crate::SEPARATOR;

pub struct PrimitivePageDict<T: NativeType> {
values: Vec<T>,
}

impl<T: NativeType> PrimitivePageDict<T> {
pub fn new(values: Vec<T>) -> Self {
Self { values }
}

pub fn values(&self) -> &[T] {
&self.values
}

#[inline]
pub fn value(&self, index: usize) -> Result<&T> {
let a = self.values.get(index).ok_or_else(|| {
Error::OutOfSpec(
"The data page has an index larger than the dictionary page values".to_string(),
)
});

a
}
}

// Dumps data from the file.
// The function prints a sample of the data from each of the RowGroups.
Expand Down Expand Up @@ -40,7 +65,7 @@ where
for (i, group) in metadata.row_groups.iter().enumerate() {
writeln!(
writer,
"Group: {:<10}Rows: {:<15}Bytes: {:}",
"Group: {:<10}Rows: {:<15} Bytes: {:}",
i,
group.num_rows(),
group.total_byte_size()
Expand All @@ -49,109 +74,78 @@ where

for column in &columns {
let column_meta = &group.columns()[*column];
let iter =
get_page_iterator(column_meta, &mut file, None, Vec::with_capacity(4 * 1024))?;
let iter = get_page_iterator(
column_meta,
&mut file,
None,
Vec::with_capacity(4 * 1024),
1024 * 1024,
)?;

let mut decompress_buffer = vec![];
for (page_ind, page) in iter.enumerate() {
let page = page?;
writeln!(
writer,
"\nPage: {:<10}Column: {:<15} Bytes: {:}",
page_ind,
column,
page.uncompressed_size()
)?;
let (dict, msg_type) = match page.header() {
DataPageHeader::V1(_) => {
if let Some(dict) = page.dictionary_page {
(dict, "PageV1")
} else {
continue;
}
}
DataPageHeader::V2(_) => {
if let Some(dict) = page.dictionary_page {
(dict, "PageV2")
} else {
continue;
}
}
};

writeln!(
writer,
"Compressed page: {:<15} Physical type: {:?}",
msg_type,
dict.physical_type()
)?;
writeln!(writer, "\nPage: {:<10}Column: {:<15}", page_ind, column,)?;

print_dictionary(dict, sample_size, writer)?;
let page = decompress(page, &mut decompress_buffer)?;
match page {
Page::Dict(_) => {
todo!()
}
Page::Data(page) => {
match page.descriptor.primitive_type.physical_type {
PhysicalType::Int32 => {
print_page::<i32, W>(page.buffer(), sample_size, true, writer)?
}
PhysicalType::Int64 => {
print_page::<i64, W>(page.buffer(), sample_size, true, writer)?
}
PhysicalType::Float => {
print_page::<f32, W>(page.buffer(), sample_size, true, writer)?
}
PhysicalType::Double => {
print_page::<f64, W>(page.buffer(), sample_size, true, writer)?
}
_ => continue,
};
}
}
}
}
}

Ok(())
}

fn print_dictionary<W>(dict: Arc<dyn DictPage>, sample_size: usize, writer: &mut W) -> Result<()>
pub fn read<T: NativeType>(
buf: &[u8],
num_values: usize,
_is_sorted: bool,
) -> Result<PrimitivePageDict<T>> {
let size_of = std::mem::size_of::<T>();

let typed_size = num_values.wrapping_mul(size_of);

let values = buf.get(..typed_size).ok_or_else(|| {
Error::OutOfSpec(
"The number of values declared in the dict page does not match the length of the page"
.to_string(),
)
})?;

let values = values.chunks_exact(size_of).map(decode::<T>).collect();

Ok(PrimitivePageDict::new(values))
}

fn print_page<T, W>(buffer: &[u8], sample_size: usize, sorted: bool, writer: &mut W) -> Result<()>
where
T: NativeType,
W: Write,
{
match dict.physical_type() {
PhysicalType::Boolean => {
writeln!(writer, "Boolean physical type cannot be dictionary-encoded")?;
}
PhysicalType::Int32 => {
if let Some(res) = dict.as_any().downcast_ref::<PrimitivePageDict<i32>>() {
print_iterator(res.values().iter(), sample_size, writer)?;
}
}
PhysicalType::Int64 => {
if let Some(res) = dict.as_any().downcast_ref::<PrimitivePageDict<i64>>() {
print_iterator(res.values().iter(), sample_size, writer)?;
}
}
PhysicalType::Int96 => {
if let Some(res) = dict.as_any().downcast_ref::<PrimitivePageDict<[u32; 3]>>() {
print_iterator(res.values().iter(), sample_size, writer)?;
}
}
PhysicalType::Float => {
if let Some(res) = dict.as_any().downcast_ref::<PrimitivePageDict<f32>>() {
print_iterator(res.values().iter(), sample_size, writer)?;
}
}
PhysicalType::Double => {
if let Some(res) = dict.as_any().downcast_ref::<PrimitivePageDict<f64>>() {
print_iterator(res.values().iter(), sample_size, writer)?;
}
}
PhysicalType::ByteArray => {
if let Some(res) = dict.as_any().downcast_ref::<BinaryPageDict>() {
for (i, pair) in res.offsets().windows(2).enumerate().take(sample_size) {
let bytes = &res.values()[pair[0] as usize..pair[1] as usize];
let msg = String::from_utf8_lossy(bytes);

writeln!(writer, "Value: {:<10}\t{:?}", i, msg)?;
}
}
}
PhysicalType::FixedLenByteArray(size) => {
if let Some(res) = dict.as_any().downcast_ref::<FixedLenByteArrayPageDict>() {
for (i, bytes) in res
.values()
.chunks(*size as usize)
.enumerate()
.take(sample_size)
{
let msg = String::from_utf8_lossy(bytes);

writeln!(writer, "Value: {:<10}\t{:?}", i, msg)?;
}
}
}
}

Ok(())
let dict = read::<T>(buffer, sample_size, sorted)?;
print_iterator(dict.values().iter(), sample_size, writer)
}

fn print_iterator<I, T, W>(iter: I, sample_size: usize, writer: &mut W) -> Result<()>
Expand Down