Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Simplified infering arrow schema from a parquet schema #819

Merged
merged 2 commits into from
Feb 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/parquet_read_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn main() -> Result<()> {
// this operation is usually done before reading the data, during planning.
// This is a mix of IO and CPU-bounded tasks but both of them are O(1)
let metadata = read::read_metadata_async(&mut reader).await?;
let schema = read::get_schema(&metadata)?;
let schema = read::infer_schema(&metadata)?;

// This factory yields one file descriptor per column and is used to read columns concurrently.
// They do not need to be buffered since we execute exactly 1 seek and 1 read on them.
Expand Down
2 changes: 1 addition & 1 deletion examples/parquet_read_parallel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use arrow2::{array::Array, chunk::Chunk, error::Result, io::parquet::read};
fn parallel_read(path: &str, row_group: usize) -> Result<Chunk<Arc<dyn Array>>> {
let mut file = BufReader::new(File::open(path)?);
let metadata = read::read_metadata(&mut file)?;
let schema = read::get_schema(&metadata)?;
let schema = read::infer_schema(&metadata)?;

// read (IO-bounded) all columns into memory (use a subset of the fields to project)
let columns = read::read_columns(
Expand Down
48 changes: 48 additions & 0 deletions guide/src/io/parquet_read.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,51 @@ by delegating all CPU-intensive tasks to separate threads.
This can of course be reversed; in configurations where IO is bounded (e.g. when a
network is involved), we can use multiple producers of pages, potentially divided
in file readers, and a single consumer that performs all CPU-intensive work.

## Apache Arrow <-> Apache Parquet

Arrow and Parquet are two different formats that declare different physical and logical types.
When reading Parquet, we must _infer_ to which types we are reading the data to.
This inference is based on Parquet's physical, logical and converted types.

When a logical type is defined, we use it as follows:

| `Parquet` | `Parquet logical` | `DataType` |
| ----------------- | ----------------- | ------------- |
| Int32 | Int8 | Int8 |
| Int32 | Int16 | Int16 |
| Int32 | Int32 | Int32 |
| Int32 | UInt8 | UInt8 |
| Int32 | UInt16 | UInt16 |
| Int32 | UInt32 | UInt32 |
| Int32 | Decimal | Decimal |
| Int32 | Date | Date32 |
| Int32 | Time(ms) | Time32(ms) |
| Int64 | Int64 | Int64 |
| Int64 | UInt64 | UInt64 |
| Int64 | Time(us) | Time64(us) |
| Int64 | Time(ns) | Time64(ns) |
| Int64 | Timestamp(\_) | Timestamp(\_) |
| Int64 | Decimal | Decimal |
| ByteArray | Utf8 | Utf8 |
| ByteArray | JSON | Binary |
| ByteArray | BSON | Binary |
| ByteArray | ENUM | Binary |
| ByteArray | Decimal | Decimal |
| FixedLenByteArray | Decimal | Decimal |

When a a logical type is not defined but a converted type is defined, we use
the equivalent convertion as above, mutatis mutandis.

When neither is defined, we fall back to the physical representation:

| `Parquet` | `DataType` |
| ----------------- | --------------- |
| Boolean | Boolean |
| Int32 | Int32 |
| Int64 | Int64 |
| Int96 | Timestamp(ns) |
| Float | Float32 |
| Double | Float64 |
| ByteArray | Binary |
| FixedLenByteArray | FixedSizeBinary |
4 changes: 2 additions & 2 deletions src/io/parquet/read/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
error::{ArrowError, Result},
};

use super::{get_schema, read_metadata, FileMetaData, RowGroupDeserializer, RowGroupMetaData};
use super::{infer_schema, read_metadata, FileMetaData, RowGroupDeserializer, RowGroupMetaData};

type GroupFilter = Arc<dyn Fn(usize, &RowGroupMetaData) -> bool>;

Expand Down Expand Up @@ -47,7 +47,7 @@ impl<R: Read + Seek> FileReader<R> {
) -> Result<Self> {
let metadata = read_metadata(&mut reader)?;

let schema = get_schema(&metadata)?;
let schema = infer_schema(&metadata)?;

let schema_metadata = schema.metadata;
let fields: Vec<Field> = if let Some(projection) = &projection {
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ mod utils;
pub use file::{FileReader, RowGroupReader};
pub use row_group::*;
pub(crate) use schema::is_type_nullable;
pub use schema::{get_schema, FileMetaData};
pub use schema::{infer_schema, FileMetaData};

use self::nested_utils::{InitNested, NestedArrayIter, NestedState};
use deserialize::page_iter_to_arrays;
Expand Down
Loading