Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Writing multiple columns using arrow2::io::parquet::write::FileSink? #1056

Closed
nielsmeima opened this issue Jun 7, 2022 · 3 comments
Closed
Labels
bug Something isn't working no-changelog Issues whose changes are covered by a PR and thus should not be shown in the changelog

Comments

@nielsmeima
Copy link

I am trying to write multiple columns to a parquet file using the async FileSink. However, when attempting to adapt the example listed in the docs I end up with the following error when trying to read back the .parquet file: "The number of columns in the row group (1) must be equal to the number of columns in the schema (2)".

Any ideas? Am I using the API incorrectly?

I adapted the code as listed in the docs as follows:

use std::sync::Arc;
use async_compat::{Compat, CompatExt};
use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;
use futures::SinkExt;
use arrow2::array::{Array, Int32Array, Float32Array};
use arrow2::datatypes::{DataType, Field, Schema};
use arrow2::chunk::Chunk;
use arrow2::io::parquet::write::{Encoding, WriteOptions, CompressionOptions, Version, FileSink};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let schema = Schema::from(vec![
        Field::new("values", DataType::Int32, true),
        Field::new("values_two", DataType::Int32, true)
    ]);
    let encoding = vec![vec![Encoding::Plain]];
    let options = WriteOptions {
        write_statistics: true,
        compression: CompressionOptions::Uncompressed,
        version: Version::V2,
    };
    
    let buffer = File::create("./output.parquet").await.unwrap();
    let mut sink = FileSink::try_new(
        buffer.compat(),
        schema,
        encoding,
        options,
    )?;
    
    for i in 0..3 {
        let values = Int32Array::from(&[Some(i), None]);
        let values_two = Int32Array::from(&[Some(i), None]);
        let chunk = Chunk::new(vec![values.arced(), values_two.arced()]);
        sink.feed(chunk).await?;
    }
    sink.metadata.insert(String::from("key"), Some(String::from("value")));
    sink.close().await?;
    
    Ok(())
}
[package]
name = "write-parquet"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow2 = { version = "0.12.0", features = ["io_parquet", "io_parquet_compression"] }
async-compat = "0.2.1"
futures = "0.3.21"
tokio = { version = "1.19.2", features = ["full"] }
@jorgecarleitao
Copy link
Owner

This is a validation bug on incoming parameters - addressed in #1057 .

Could you try using let encoding = vec![vec![Encoding::Plain], vec![Encoding::Plain]]; (i.e. one encoding per field)`?

@jorgecarleitao jorgecarleitao added bug Something isn't working no-changelog Issues whose changes are covered by a PR and thus should not be shown in the changelog labels Jun 7, 2022
@nielsmeima
Copy link
Author

That works! Thanks so much!

@jorgecarleitao
Copy link
Owner

Closed by #1057

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
bug Something isn't working no-changelog Issues whose changes are covered by a PR and thus should not be shown in the changelog
Projects
None yet
Development

No branches or pull requests

2 participants