Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
chore
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Dec 28, 2021
1 parent e73f94d commit 8ebc2a8
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 9 deletions.
16 changes: 9 additions & 7 deletions arrow-parquet-integration-testing/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::fs::File;
use std::sync::Arc;
use std::{collections::HashMap, convert::TryFrom, io::Read};
use std::{collections::HashMap, io::Read};

use arrow2::io::ipc::IpcField;
use arrow2::{
datatypes::{DataType, Schema},
error::Result,
io::{
json_integration::{to_record_batch, ArrowJson},
json_integration::read,
json_integration::ArrowJson,
parquet::write::{
write_file, Compression, Encoding, RowGroupIterator, Version, WriteOptions,
},
Expand All @@ -19,7 +21,7 @@ use clap::{App, Arg};
use flate2::read::GzDecoder;

/// Read gzipped JSON file
fn read_gzip_json(version: &str, file_name: &str) -> (Schema, Vec<RecordBatch>) {
fn read_gzip_json(version: &str, file_name: &str) -> (Schema, Vec<IpcField>, Vec<RecordBatch>) {
let path = format!(
"../testing/arrow-testing/data/arrow-ipc-stream/integration/{}/{}.json.gz",
version, file_name
Expand All @@ -32,7 +34,7 @@ fn read_gzip_json(version: &str, file_name: &str) -> (Schema, Vec<RecordBatch>)
let arrow_json: ArrowJson = serde_json::from_str(&s).unwrap();

let schema = serde_json::to_value(arrow_json.schema).unwrap();
let schema = Schema::try_from(&schema).unwrap();
let (schema, ipc_fields) = read::deserialize_schema(&schema).unwrap();

// read dictionaries
let mut dictionaries = HashMap::new();
Expand All @@ -46,11 +48,11 @@ fn read_gzip_json(version: &str, file_name: &str) -> (Schema, Vec<RecordBatch>)
let batches = arrow_json
.batches
.iter()
.map(|batch| to_record_batch(&schema, batch, &dictionaries))
.map(|batch| read::to_record_batch(&schema, &ipc_fields, batch, &dictionaries))
.collect::<Result<Vec<_>>>()
.unwrap();

(schema, batches)
(schema, ipc_fields, batches)
}

fn main() -> Result<()> {
Expand Down Expand Up @@ -106,7 +108,7 @@ fn main() -> Result<()> {
.collect::<Vec<_>>()
});

let (schema, batches) = read_gzip_json("1.0.0-littleendian", json_file);
let (schema, _, batches) = read_gzip_json("1.0.0-littleendian", json_file);

let schema = if let Some(projection) = &projection {
let fields = schema
Expand Down
4 changes: 2 additions & 2 deletions tests/it/io/ipc/write_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ async fn write_(

let options = stream_async::WriteOptions { compression: None };
let mut writer = stream_async::StreamWriter::new(&mut result, options);
writer.start(&schema, Some(&ipc_fields)).await?;
writer.start(schema, Some(ipc_fields)).await?;
for batch in batches {
writer.write(batch, Some(&ipc_fields)).await?;
writer.write(batch, Some(ipc_fields)).await?;
}
writer.finish().await?;
Ok(result.into_inner())
Expand Down

0 comments on commit 8ebc2a8

Please sign in to comment.