From 5ee92eb49a286ae564938f901d098bf703ef31aa Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sun, 26 Dec 2021 22:37:31 +0000 Subject: [PATCH] chore --- arrow-parquet-integration-testing/src/main.rs | 16 +++++++++------- tests/it/io/ipc/write_async.rs | 4 ++-- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/arrow-parquet-integration-testing/src/main.rs b/arrow-parquet-integration-testing/src/main.rs index 28f161095ce..ba60e7b8462 100644 --- a/arrow-parquet-integration-testing/src/main.rs +++ b/arrow-parquet-integration-testing/src/main.rs @@ -1,12 +1,14 @@ use std::fs::File; use std::sync::Arc; -use std::{collections::HashMap, convert::TryFrom, io::Read}; +use std::{collections::HashMap, io::Read}; +use arrow2::io::ipc::IpcField; use arrow2::{ datatypes::{DataType, Schema}, error::Result, io::{ - json_integration::{to_record_batch, ArrowJson}, + json_integration::read, + json_integration::ArrowJson, parquet::write::{ write_file, Compression, Encoding, RowGroupIterator, Version, WriteOptions, }, @@ -19,7 +21,7 @@ use clap::{App, Arg}; use flate2::read::GzDecoder; /// Read gzipped JSON file -fn read_gzip_json(version: &str, file_name: &str) -> (Schema, Vec) { +fn read_gzip_json(version: &str, file_name: &str) -> (Schema, Vec, Vec) { let path = format!( "../testing/arrow-testing/data/arrow-ipc-stream/integration/{}/{}.json.gz", version, file_name @@ -32,7 +34,7 @@ fn read_gzip_json(version: &str, file_name: &str) -> (Schema, Vec) let arrow_json: ArrowJson = serde_json::from_str(&s).unwrap(); let schema = serde_json::to_value(arrow_json.schema).unwrap(); - let schema = Schema::try_from(&schema).unwrap(); + let (schema, ipc_fields) = read::deserialize_schema(&schema).unwrap(); // read dictionaries let mut dictionaries = HashMap::new(); @@ -46,11 +48,11 @@ fn read_gzip_json(version: &str, file_name: &str) -> (Schema, Vec) let batches = arrow_json .batches .iter() - .map(|batch| to_record_batch(&schema, batch, &dictionaries)) + .map(|batch| read::to_record_batch(&schema, &ipc_fields, batch, &dictionaries)) .collect::>>() .unwrap(); - (schema, batches) + (schema, ipc_fields, batches) } fn main() -> Result<()> { @@ -106,7 +108,7 @@ fn main() -> Result<()> { .collect::>() }); - let (schema, batches) = read_gzip_json("1.0.0-littleendian", json_file); + let (schema, _, batches) = read_gzip_json("1.0.0-littleendian", json_file); let schema = if let Some(projection) = &projection { let fields = schema diff --git a/tests/it/io/ipc/write_async.rs b/tests/it/io/ipc/write_async.rs index 18110bbe3bd..e77558ce510 100644 --- a/tests/it/io/ipc/write_async.rs +++ b/tests/it/io/ipc/write_async.rs @@ -20,9 +20,9 @@ async fn write_( let options = stream_async::WriteOptions { compression: None }; let mut writer = stream_async::StreamWriter::new(&mut result, options); - writer.start(&schema, Some(&ipc_fields)).await?; + writer.start(schema, Some(ipc_fields)).await?; for batch in batches { - writer.write(batch, Some(&ipc_fields)).await?; + writer.write(batch, Some(ipc_fields)).await?; } writer.finish().await?; Ok(result.into_inner())