Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Round Trip [Rust -> arrow2_convert -> Arrow -> Parquet -> Arrow -> Rust] #1376

Closed
DayOfThePenguin opened this issue Feb 1, 2023 · 0 comments · Fixed by #1390
Closed

Round Trip [Rust -> arrow2_convert -> Arrow -> Parquet -> Arrow -> Rust] #1376

DayOfThePenguin opened this issue Feb 1, 2023 · 0 comments · Fixed by #1390
Labels
bug Something isn't working

Comments

@DayOfThePenguin
Copy link

Hi! I've been cobbling together an end to end example of serializing deeply nested Rust structs through arrow2_convert to parquet and back (using their complex example with the fixed size buffers removed since fixed sized types are not currently implemented in arrow2).

I've reproduced the relevant code from the arrow2_convert complex_example.rs example, without fixed sized types. I'm able to create a Vec, convert to Arrow, and serialize the result to a buffer of Parquet bytes. Metadata + statistics reading back out of the buffer appears fine, but when I iterate through the chunks, I get an OutOfSpec error:

OutOfSpec("The children must have an equal number of values.\n However, the values at index 1 have a length of 9, which is different from values at index 0, 6.")', 

I have a feeling this is just a serialization misuse on my end that's getting propogated downstream. Attempting to read the intermediate Parquet file with Pyarrow results in an OSError: Malformed levels. min: 0 max: 3 out of range. Max Level: 2, which again points to a serialization error. My suspicion is that I'm not defining the row groups (RowGroupIterator) correctly, specifically the encodings, which I have as a vec![vec![Encoding::Plain; 25]].

I'm happy to contribute this back as a test case after we get it working; I think it is a common usecase that'd be valuable to document.

Thanks for the help!

Deeply Nested Structs & arrow2_convert implementation

/// Complex example that uses the following features:
///
/// - Deeply Nested structs and lists
/// - Custom types

use std::borrow::Borrow;
use std::sync::Arc;

use arrow2::array::*;
use arrow2::chunk::Chunk;
use arrow2::datatypes::{Schema, Field};
use arrow2::io::parquet::read;
use arrow2::io::parquet::write::{FileWriter, Encoding, RowGroupIterator, Version, ZstdLevel, CompressionOptions, WriteOptions};
use arrow2_convert::deserialize::{arrow_array_deserialize_iterator, TryIntoCollection};
use arrow2_convert::serialize::{TryIntoArrow, ArrowSerialize};
use arrow2_convert::ArrowField;


#[derive(Debug, Clone, PartialEq, ArrowField)]
pub struct Root {
    name: Option<String>,
    is_deleted: bool,
    a1: Option<f64>,
    a2: i64,
    // binary
    a3: Option<Vec<u8>>,
    // date32
    a4: chrono::NaiveDate,
    // timestamp(ns, None)
    a5: chrono::NaiveDateTime,
    // timestamp(ns, None)
    a6: Option<chrono::NaiveDateTime>,
    // array of date times
    date_time_list: Vec<chrono::NaiveDateTime>,
    // optional list array of optional strings
    nullable_list: Option<Vec<Option<String>>>,
    // optional list array of required strings
    required_list: Vec<Option<String>>,
    // custom type
    custom: CustomType,
    // custom optional type
    nullable_custom: Option<CustomType>,
    // vec custom type
    custom_list: Vec<CustomType>,
    // nested struct
    child: Child,
    // int 32 array
    int32_array: Vec<i32>,
    // large binary
    #[arrow_field(type = "arrow2_convert::field::LargeBinary")]
    large_binary: Vec<u8>,
    // fixed size binary
    // #[arrow_field(type = "arrow2_convert::field::FixedSizeBinary<3>")]
    fixed_size_binary: Vec<u8>,
    // large string
    #[arrow_field(type = "arrow2_convert::field::LargeString")]
    large_string: String,
    // large vec
    #[arrow_field(type = "arrow2_convert::field::LargeVec<i64>")]
    large_vec: Vec<i64>,
    // fixed size vec
    // #[arrow_field(type = "arrow2_convert::field::FixedSizeVec<i64, 3>")]
    fixed_size_vec: Vec<i64>,
}

#[derive(Debug, Clone, PartialEq, Eq, ArrowField)]
pub struct Child {
    a1: i64,
    a2: String,
    // nested struct array
    child_array: Vec<ChildChild>,
}

#[derive(Debug, Clone, PartialEq, Eq, ArrowField)]
pub struct ChildChild {
    a1: i32,
    bool_array: Vec<bool>,
    int64_array: Vec<i64>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
/// A newtype around a u64
pub struct CustomType(u64);

/// To use with Arrow three traits need to be implemented:
/// - ArrowField
/// - ArrowSerialize
/// - ArrowDeserialize
impl arrow2_convert::field::ArrowField for CustomType {
    type Type = Self;

    #[inline]
    fn data_type() -> arrow2::datatypes::DataType {
        arrow2::datatypes::DataType::Extension(
            "custom".to_string(),
            Box::new(arrow2::datatypes::DataType::UInt64),
            None,
        )
    }
}

impl arrow2_convert::serialize::ArrowSerialize for CustomType {
    type MutableArrayType = arrow2::array::MutablePrimitiveArray<u64>;

    #[inline]
    fn new_array() -> Self::MutableArrayType {
        Self::MutableArrayType::from(<Self as arrow2_convert::field::ArrowField>::data_type())
    }

    #[inline]
    fn arrow_serialize(v: &Self, array: &mut Self::MutableArrayType) -> arrow2::error::Result<()> {
        array.try_push(Some(v.0))
    }
}

impl arrow2_convert::deserialize::ArrowDeserialize for CustomType {
    type ArrayType = arrow2::array::PrimitiveArray<u64>;

    #[inline]
    fn arrow_deserialize(v: Option<&u64>) -> Option<Self> {
        v.map(|t| CustomType(*t))
    }
}

// enable Vec<CustomType>
arrow2_convert::arrow_enable_vec_for_type!(CustomType);

fn item() -> Root {
    use chrono::{NaiveDate, NaiveDateTime};

    Root {
        name: Some("a".to_string()),
        is_deleted: false,
        a1: Some(0.1),
        a2: 1,
        a3: Some(b"aa".to_vec()),
        a4: NaiveDate::from_ymd_opt(1970, 1, 2).unwrap(),
        a5: NaiveDateTime::from_timestamp_opt(10000, 0).unwrap(),
        a6: Some(NaiveDateTime::from_timestamp_opt(10001, 0)).unwrap(),
        date_time_list: vec![
            NaiveDateTime::from_timestamp_opt(10000, 10).unwrap(),
            NaiveDateTime::from_timestamp_opt(10000, 11).unwrap(),
        ],
        nullable_list: Some(vec![Some("cc".to_string()), Some("dd".to_string())]),
        required_list: vec![Some("aa".to_string()), Some("bb".to_string())],
        custom: CustomType(10),
        nullable_custom: Some(CustomType(11)),
        custom_list: vec![CustomType(12), CustomType(13)],
        child: Child {
            a1: 10,
            a2: "hello".to_string(),
            child_array: vec![
                ChildChild {
                    a1: 100,
                    bool_array: vec![false],
                    int64_array: vec![45555, 2124214, 224, 24214, 2424],
                },
                ChildChild {
                    a1: 101,
                    bool_array: vec![true, true, true],
                    int64_array: vec![4533, 22222, 2323, 333, 33322],
                },
            ],
        },
        int32_array: vec![0, 1, 3],
        large_binary: b"aa".to_vec(),
        fixed_size_binary: b"aaa".to_vec(),
        large_string: "abcdefg".to_string(),
        large_vec: vec![1, 2, 3, 4],
        fixed_size_vec: vec![10, 20, 30],
    }
}

Failing Test Case

#[test]
fn round_trip_parquet() -> arrow2::error::Result<()> {
    // serialize to an arrow array
    let original_array = [item(), item(), item()];

    // declare a schema with fields
    let schema = Schema::from(vec![
        Field::new("root_custom_struct", <Root as arrow2_convert::field::ArrowField>::data_type(), true),
    ]);

    let chunk: Chunk<Arc<dyn Array>> = original_array.try_into_arrow()?;

    let options = WriteOptions {
        write_statistics: true,
        compression: CompressionOptions::Zstd(Some(ZstdLevel::default())),
        version: Version::V1,
    };
    
    // encodings has to be the length of the number of elements in the struct
    // Maybe dynamically do this the same way that io/parquet/write/pages.rs is checking?
    let row_groups = RowGroupIterator::try_new(
        vec![Ok(chunk)].into_iter(),
        &schema,
        options,
        vec![vec![Encoding::Plain; 25]],
    )?;

    // anything implementing `std::io::Write` works
    let mut buffer = vec![];
    let mut writer = FileWriter::try_new(&mut buffer, schema, options)?;

    // Write to buffer
    for group in row_groups {
        writer.write(group?)?;
    }
    let _file_size = writer.end(None)?;

    // Wrap buffer in a Cursor...this makes the buffer impl Read & Seek (needed for read::read_metadata)
    let mut reader = std::io::Cursor::new(buffer);

    // we can read its metadata:
    let metadata = read::read_metadata(&mut reader)?;

    // and infer a [`Schema`] from the `metadata`.
    let schema = read::infer_schema(&metadata)?;

    println!("Schema: {:?}", &schema);

    // we can filter the columns we need (here we select all)
    let schema = schema.filter(|_index, _field| true);

    // we can read the statistics of all parquet's row groups (here for each field)
    for field in &schema.fields {
        let statistics = read::statistics::deserialize(field, &metadata.row_groups)?;
        println!("{statistics:#?}");
    }

    // Get all the row groups
    let row_groups = metadata
        .row_groups;

    // We can then read the row groups into chunks
    let chunks = read::FileReader::new(reader, row_groups, schema, None, None, None);

    // iterate over chunks and validate each is not empty
    for maybe_chunk in chunks {
        println!("{:?}", maybe_chunk);
        let chunk = maybe_chunk?;
        assert!(!chunk.is_empty());
    }

    Ok(())
}

Rust Backtrace

thread 'arrow::round_trip_parquet' panicked at 'called `Result::unwrap()` on an `Err` value: OutOfSpec("The children must have an equal number of values.\n                         However, the values at index 1 have a length of 9, which is different from values at index 0, 6.")', /home/user/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow2-0.14.2/src/array/struct_/mod.rs:120:52
stack backtrace:
   0: rust_begin_unwind
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/std/src/panicking.rs:575:5
   1: core::panicking::panic_fmt
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/core/src/panicking.rs:64:14
   2: core::result::unwrap_failed
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/core/src/result.rs:1791:5
   3: core::result::Result<T,E>::unwrap
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/core/src/result.rs:1113:23
   4: arrow2::array::struct_::StructArray::new
             at /home/user/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow2-0.14.2/src/array/struct_/mod.rs:120:9
   5: arrow2::array::struct_::StructArray::from_data
             at /home/user/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow2-0.14.2/src/array/struct_/mod.rs:129:9
   6: <arrow2::io::parquet::read::deserialize::struct_::StructIterator as core::iter::traits::iterator::Iterator>::next
             at /home/user/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow2-0.14.2/src/io/parquet/read/deserialize/struct_.rs:50:22
   7: <alloc::boxed::Box<I,A> as core::iter::traits::iterator::Iterator>::next
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/alloc/src/boxed.rs:1923:9
   8: <core::iter::adapters::map::Map<I,F> as core::iter::traits::iterator::Iterator>::next
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/core/src/iter/adapters/map.rs:103:9
   9: <alloc::boxed::Box<I,A> as core::iter::traits::iterator::Iterator>::next
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/alloc/src/boxed.rs:1923:9
  10: <arrow2::io::parquet::read::deserialize::struct_::StructIterator as core::iter::traits::iterator::Iterator>::next::{{closure}}
             at /home/user/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow2-0.14.2/src/io/parquet/read/deserialize/struct_.rs:26:25
  11: core::iter::adapters::map::map_fold::{{closure}}
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/core/src/iter/adapters/map.rs:84:28
  12: core::iter::traits::iterator::Iterator::fold
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/core/src/iter/traits/iterator.rs:2414:21
  13: <core::iter::adapters::map::Map<I,F> as core::iter::traits::iterator::Iterator>::fold
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/core/src/iter/adapters/map.rs:124:9
  14: core::iter::traits::iterator::Iterator::for_each
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/core/src/iter/traits/iterator.rs:831:9
  15: alloc::vec::Vec<T,A>::extend_trusted
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/alloc/src/vec/mod.rs:2880:17
  16: <alloc::vec::Vec<T,A> as alloc::vec::spec_extend::SpecExtend<T,I>>::spec_extend
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/alloc/src/vec/spec_extend.rs:26:9
  17: <alloc::vec::Vec<T> as alloc::vec::spec_from_iter_nested::SpecFromIterNested<T,I>>::from_iter
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/alloc/src/vec/spec_from_iter_nested.rs:62:9
  18: <alloc::vec::Vec<T> as alloc::vec::spec_from_iter::SpecFromIter<T,I>>::from_iter
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/alloc/src/vec/spec_from_iter.rs:33:9
  19: <alloc::vec::Vec<T> as core::iter::traits::collect::FromIterator<T>>::from_iter
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/alloc/src/vec/mod.rs:2748:9
  20: core::iter::traits::iterator::Iterator::collect
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/core/src/iter/traits/iterator.rs:1836:9
  21: <arrow2::io::parquet::read::deserialize::struct_::StructIterator as core::iter::traits::iterator::Iterator>::next
             at /home/user/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow2-0.14.2/src/io/parquet/read/deserialize/struct_.rs:23:22
  22: <alloc::boxed::Box<I,A> as core::iter::traits::iterator::Iterator>::next
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/alloc/src/boxed.rs:1923:9
  23: <arrow2::io::parquet::read::deserialize::struct_::StructIterator as core::iter::traits::iterator::Iterator>::next::{{closure}}
             at /home/user/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow2-0.14.2/src/io/parquet/read/deserialize/struct_.rs:26:25
  24: core::iter::adapters::map::map_fold::{{closure}}
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/core/src/iter/adapters/map.rs:84:28
  25: core::iter::traits::iterator::Iterator::fold
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/core/src/iter/traits/iterator.rs:2414:21
  26: <core::iter::adapters::map::Map<I,F> as core::iter::traits::iterator::Iterator>::fold
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/core/src/iter/adapters/map.rs:124:9
  27: core::iter::traits::iterator::Iterator::for_each
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/core/src/iter/traits/iterator.rs:831:9
  28: alloc::vec::Vec<T,A>::extend_trusted
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/alloc/src/vec/mod.rs:2880:17
  29: <alloc::vec::Vec<T,A> as alloc::vec::spec_extend::SpecExtend<T,I>>::spec_extend
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/alloc/src/vec/spec_extend.rs:26:9
  30: <alloc::vec::Vec<T> as alloc::vec::spec_from_iter_nested::SpecFromIterNested<T,I>>::from_iter
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/alloc/src/vec/spec_from_iter_nested.rs:62:9
  31: <alloc::vec::Vec<T> as alloc::vec::spec_from_iter::SpecFromIter<T,I>>::from_iter
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/alloc/src/vec/spec_from_iter.rs:33:9
  32: <alloc::vec::Vec<T> as core::iter::traits::collect::FromIterator<T>>::from_iter
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/alloc/src/vec/mod.rs:2748:9
  33: core::iter::traits::iterator::Iterator::collect
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/core/src/iter/traits/iterator.rs:1836:9
  34: <arrow2::io::parquet::read::deserialize::struct_::StructIterator as core::iter::traits::iterator::Iterator>::next
             at /home/user/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow2-0.14.2/src/io/parquet/read/deserialize/struct_.rs:23:22
  35: <alloc::boxed::Box<I,A> as core::iter::traits::iterator::Iterator>::next
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/alloc/src/boxed.rs:1923:9
  36: <core::iter::adapters::map::Map<I,F> as core::iter::traits::iterator::Iterator>::next
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/core/src/iter/adapters/map.rs:103:9
  37: <alloc::boxed::Box<I,A> as core::iter::traits::iterator::Iterator>::next
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/alloc/src/boxed.rs:1923:9
  38: <arrow2::io::parquet::read::row_group::RowGroupDeserializer as core::iter::traits::iterator::Iterator>::next::{{closure}}
             at /home/user/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow2-0.14.2/src/io/parquet/read/row_group.rs:69:25
  39: core::iter::adapters::map::map_try_fold::{{closure}}
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/core/src/iter/adapters/map.rs:91:28
  40: core::iter::traits::iterator::Iterator::try_fold
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/core/src/iter/traits/iterator.rs:2238:21
  41: <core::iter::adapters::map::Map<I,F> as core::iter::traits::iterator::Iterator>::try_fold
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/core/src/iter/adapters/map.rs:117:9
  42: <core::iter::adapters::GenericShunt<I,R> as core::iter::traits::iterator::Iterator>::try_fold
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/core/src/iter/adapters/mod.rs:195:9
  43: core::iter::traits::iterator::Iterator::try_for_each
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/core/src/iter/traits/iterator.rs:2299:9
  44: <core::iter::adapters::GenericShunt<I,R> as core::iter::traits::iterator::Iterator>::next
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/core/src/iter/adapters/mod.rs:178:9
  45: <alloc::vec::Vec<T> as alloc::vec::spec_from_iter_nested::SpecFromIterNested<T,I>>::from_iter
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/alloc/src/vec/spec_from_iter_nested.rs:26:32
  46: <alloc::vec::Vec<T> as alloc::vec::spec_from_iter::SpecFromIter<T,I>>::from_iter
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/alloc/src/vec/spec_from_iter.rs:33:9
  47: <alloc::vec::Vec<T> as core::iter::traits::collect::FromIterator<T>>::from_iter
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/alloc/src/vec/mod.rs:2748:9
  48: core::iter::traits::iterator::Iterator::collect
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/core/src/iter/traits/iterator.rs:1836:9
  49: <core::result::Result<V,E> as core::iter::traits::collect::FromIterator<core::result::Result<A,E>>>::from_iter::{{closure}}
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/core/src/result.rs:2075:49
  50: core::iter::adapters::try_process
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/core/src/iter/adapters/mod.rs:164:17
  51: <core::result::Result<V,E> as core::iter::traits::collect::FromIterator<core::result::Result<A,E>>>::from_iter
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/core/src/result.rs:2075:9
  52: core::iter::traits::iterator::Iterator::collect
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/core/src/iter/traits/iterator.rs:1836:9
  53: <arrow2::io::parquet::read::row_group::RowGroupDeserializer as core::iter::traits::iterator::Iterator>::next
             at /home/user/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow2-0.14.2/src/io/parquet/read/row_group.rs:66:21
  54: <arrow2::io::parquet::read::file::FileReader<R> as core::iter::traits::iterator::Iterator>::next
             at /home/user/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow2-0.14.2/src/io/parquet/read/file.rs:77:19
  55: <arrow2::io::parquet::read::file::FileReader<R> as core::iter::traits::iterator::Iterator>::next
             at /home/user/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow2-0.14.2/src/io/parquet/read/file.rs:97:21
  56: parquet_round_trip::arrow::round_trip_parquet
             at ./src/arrow.rs:324:24
  57: parquet_round_trip::arrow::round_trip_parquet::{{closure}}
             at ./src/arrow.rs:260:28
  58: core::ops::function::FnOnce::call_once
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/core/src/ops/function.rs:507:5
  59: core::ops::function::FnOnce::call_once
             at /rustc/fc594f15669680fa70d255faec3ca3fb507c3405/library/core/src/ops/function.rs:507:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
test arrow::round_trip_parquet ... FAILED

failures:

failures:
    arrow::round_trip_parquet
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants