Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Read Decimal from Parquet File #444

Closed
potter420 opened this issue Sep 24, 2021 · 15 comments
Closed

Read Decimal from Parquet File #444

potter420 opened this issue Sep 24, 2021 · 15 comments
Labels
feature A new feature good first issue Good for newcomers

Comments

@potter420
Copy link
Contributor

Hi,

So far we have the capability to write decimal to parquet. I wonder if we can implement reading decimal value from parquet file as well.

Thank you very much.

@jorgecarleitao jorgecarleitao added feature A new feature good first issue Good for newcomers labels Sep 24, 2021
@potter420
Copy link
Contributor Author

Currently, here is one of the make-shift code I made to deal with reading decimal, seem to work in my case, with a lot of non-nested decimal-having parquet data file:

use io::parquet::read::{page_iter_to_array, ParquetType, PhysicalType};
...
match metadata.descriptor().type_() {
  ParquetType::PrimitiveType { physical_type, .. } => {
      match physical_type {
          PhysicalType::Int32 => page_iter_to_array(
              &mut pages,
              metadata,
              DataType::Int32,
          )
          .map(|ar| {
              Box::new(
                  PrimitiveArray::<i128>::from_trusted_len_iter(
                      ar.as_any()
                          .downcast_ref::<PrimitiveArray<i32>>()
                          .unwrap()
                          .iter()
                          .map(|e| e.map(|e| *e as i128)),
                  )
                  .to(data_type),
              )
                  as Box<dyn Array>
          }),
          PhysicalType::Int64 => page_iter_to_array(
              &mut pages,
              metadata,
              DataType::Int64,
          )
          .map(|ar| {
              Box::new(
                  PrimitiveArray::<i128>::from_trusted_len_iter(
                      ar.as_any()
                          .downcast_ref::<PrimitiveArray<i64>>()
                          .unwrap()
                          .iter()
                          .map(|e| e.map(|e| *e as i128)),
                  )
                  .to(data_type),
              )
                  as Box<dyn Array>
          }),
          PhysicalType::FixedLenByteArray(n) => {
              page_iter_to_array(
                  &mut pages,
                  metadata,
                  DataType::FixedSizeBinary(*n),
              )
              .map(
                  |ar| {
                      let v = ar
                          .as_any()
                          .downcast_ref::<FixedSizeBinaryArray>()
                          .unwrap()
                          .iter()
                          .map(|e| {
                              e.and_then(|e| {
                                  match e
                                      .into_iter()
                                      .rev()
                                      .map(|e| *e)
                                      .pad_using(16, |_| 0u8)
                                      .rev()
                                      .collect_vec()
                                      .try_into()
                                  {
                                      Ok(v) => {
                                          Some(i128::from_be_bytes(v))
                                      }
                                      Err(_) => None,
                                  }
                              })
                          }).collect_vec();
                      Box::new(
                          PrimitiveArray::<i128>::from_trusted_len_iter(v.into_iter()).to(data_type)
                      ) as Box<dyn Array>
                  },
              )
          }
          _ => unreachable!(),
      }
  }
  _ => unreachable!(),
},

@jorgecarleitao
Copy link
Owner

That snipped seems correct!

I would add one roundtrip test on tests/it/io/parquet/write per physical type, and a test to read parquet written by pyarrow, so that we verify that the assumptions about the format hold on our reader.

Let me know if you would like to work on this or if you would like me to take it =)

@potter420
Copy link
Contributor Author

Hello, sorry for the late reply, been a busy week. I would take this one, including the tests.
Just to be sure, we would write a test which include 3 type of physical decimal, according the write code, that should be:

  • A decimal with precision not greater than 9 (int32), 18 (int64) and 26 (fix length bytes, I just pick a random number greater than 19 here)
    And a parquet written by pyarrow, which similar columns above. How should we generate the pyarrow files for the test tho?

@jorgecarleitao
Copy link
Owner

Hey @potter420 , no worries. Exactly, so that we demonstrate interoperability with the 3 physical types.

We have a script here where we generate parquet files as part of the tests. The data placed there is then replicated around here, which are compared in tests such as this one.

So, something like

  1. add new fields to nullable and required in the .py linked above
  2. add the corresponding data to the .rs file linked above
  3. add the test targeting the new columns to the .rs file linked above

Note that we also test statistics, so those are also needed.

@jorgecarleitao
Copy link
Owner

Ah, for testing out locally, I usually create a venv as we do it in the CI here

@potter420
Copy link
Contributor Author

Hmm, my test failed miserably. According to pyarrow output:

<pyarrow._parquet.ColumnChunkMetaData object at 0x7f4159b5adb0>
  file_offset: 2400
  file_path: 
  physical_type: FIXED_LEN_BYTE_ARRAY
  num_values: 10
  path_in_schema: decimal_9
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x7f4159b5aea0>
      has_min_max: True
      min: b'\x00\x00\x00\x00'
      max: b'\x00\x00\x00\t'
      null_count: 3
      distinct_count: 0
      num_values: 7
      physical_type: FIXED_LEN_BYTE_ARRAY
      logical_type: Decimal(precision=9, scale=0)
      converted_type (legacy): DECIMAL
  compression: UNCOMPRESSED
  encodings: ('PLAIN', 'RLE')
  has_dictionary_page: False
  dictionary_page_offset: None
  data_page_offset: 2320
  total_compressed_size: 80
  total_uncompressed_size: 80

Seem like we have to implement FIXED_LEN_BYTE_ARRAY statistics as well, or perhaps I can set statistics to null for the moments?

@potter420
Copy link
Contributor Author

Hi, upon further investigation, it's seem to me that the FixedLenStatistics from parquet2 crate missing the descriptor as found in other statistics.

So if decimal are not the only type that comes from FIXED_LEN_BYTE_ARRAY, I have to create also a pull request for parquet2 as well.

@jorgecarleitao
Copy link
Owner

Well, that is a great summayr: there is indeed a missing descriptor on the statistics to differentiate between logical types. I totally forgot about using FixedLen in parquet for multiple logical types :(

@potter420
Copy link
Contributor Author

Hello, I've been looking at statistics, should we implement separate statistics struct for decimals?
Currently I'm writing something like this in separate decimal.rs file:

pub struct DecimalStatistics {
    pub null_count: Option<i64>,
    pub distinct_count: Option<i64>,
    pub min_value: Option<i128>,
    pub max_value: Option<i128>,
    pub data_type: DataType
}

As I can see from the statistics module, currently statistics for Timestamp and many other logical types reside neatly in PrimitiveStatistics.
But decimal can come from 3 separate physical types. (and according to parquet spec, 4 if you count byte arrays as well, as stated here)

@jorgecarleitao
Copy link
Owner

Imo the design of the stats in arrow2 are currently incorrect: we should offer 1 struct per arrow physical type, not per parquet physical type; the deserialize is incorrect I think.

This way, deserialize maps a a parquet statistics (FixedLen or other, depending on the scale/precision) into a single arrow statistics (the primitive with i128):

I.e. the route to deserialize should be something like:

  • parquet stats (Vec<u8>)
  • deserialized parquet stats (the Parquet Statistics trait object)
  • deserialized arrow stats (the Arrow Statistics trait object)

The first two items are done by the parquet crate, the last is done by us (and this is where we map the different physical types from parquet into a single arrow physical type (i128))

@potter420
Copy link
Contributor Author

Thanks, I will implement something like
impl TryFrom<&ParquetFixedLenStatistics> for for PrimitiveStatistics<T>
TryFrom here since there are chances that we received fixed Binary with length greater than 16, which does not fit into a decimal128. Could be decimal256?

And put Decimal(p, s) match case in primitive::statistics_from_i32 and primitive::statistics_from_i64.

Is that correct and help you with the rewrite laters?

@potter420
Copy link
Contributor Author

potter420 commented Oct 2, 2021

Seem to me the scope getting bigger and bigger, and I'm not complaining 😄.
All test seem to pass until roundtrip with the Decimal(26,0) test. It seem to me that statistics module of parquet2 haven't implement the reduce statistics for FixedLenByteArray physical type. I can reuse the ByteArray one for that right?

Is there any where else I have to watch out for as well? Thanks

@jorgecarleitao
Copy link
Owner

That is awesome!

I agree that the deserialization is equivalent (it also works for FixedSizeBinary(_)). The parquet implementation already asserted that they have the correct number of bytes, so we do not need to check that and just deserialize to arrow, like you wrote.

We do not support decimal256 yet, so we will have to panic, truncate or simply use None (your call).

Let me know if you would like me to implement something and I will follow your lead. :)

@potter420
Copy link
Contributor Author

potter420 commented Oct 2, 2021

I go ahead and create a pull request to add writing reduced FixedLenByteArray for parquet2. All test seem to pass when testing against local copy of parquet2, including round trip for Decimal(26,0).

As for decimal256, I've decided to return an errors to notify that we can't deserialize Decimal128 from FixedLenByteArray of size > 16, so as to provides other devs some more clarity

Err(ArrowError::NotYetImplemented(format!(
    "Can't decode Decimal128 type from Fixed Size Byte Array of len {:?}",
    n
)))

I'll just put everything in NotYetImplemented category :D

And the deserialization bit goes like this, since I don't want to introduce Itertools to the list of dependencies for io_parquet

let paddings = (0..(16-*n)).map(|_| 0u8).collect::<Vec<_>>();
fixed_size_binary::iter_to_array(iter, DataType::FixedSizeBinary(*n), metadata)
.map(|e|{
    let a = e.into_iter().map(|v| 
            v.and_then(|v1| {
                [&paddings, v1].concat().try_into().map(
                    |pad16| i128::from_be_bytes(pad16)
                ).ok()   
            }
        )
    ).collect::<Vec<_>>();
    Box::new(PrimitiveArray::<i128>::from(a).to(data_type)) as Box<dyn Array>
}
)

The paddings allocated only once, subsequent allocation do happen in concat, since we "hopefully" know the length of our bytes array before hand. Please let me know if there are improvement to be made.

@potter420
Copy link
Contributor Author

Resolved at #489

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
feature A new feature good first issue Good for newcomers
Projects
None yet
Development

No branches or pull requests

2 participants