diff --git a/integration-testing/src/flight_client_scenarios/integration_test.rs b/integration-testing/src/flight_client_scenarios/integration_test.rs index 27a06b307c7..dd65352df5b 100644 --- a/integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/integration-testing/src/flight_client_scenarios/integration_test.rs @@ -33,7 +33,7 @@ use arrow_format::ipc::Message::MessageHeader; use futures::{channel::mpsc, sink::SinkExt, stream, StreamExt}; use tonic::{Request, Streaming}; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; type ArrayRef = Arc; type SchemaRef = Arc; @@ -199,10 +199,10 @@ async fn consume_flight_location( // first FlightData. Ignore this one. let _schema_again = resp.next().await.unwrap(); - let mut dictionaries_by_field = vec![None; schema.fields().len()]; + let mut dictionaries = Default::default(); for (counter, expected_batch) in expected_data.iter().enumerate() { - let data = receive_batch_flight_data(&mut resp, schema.clone(), &mut dictionaries_by_field) + let data = receive_batch_flight_data(&mut resp, schema.clone(), &mut dictionaries) .await .unwrap_or_else(|| { panic!( @@ -215,7 +215,7 @@ async fn consume_flight_location( let metadata = counter.to_string().into_bytes(); assert_eq!(metadata, data.app_metadata); - let actual_batch = deserialize_batch(&data, schema.clone(), true, &dictionaries_by_field) + let actual_batch = deserialize_batch(&data, schema.clone(), true, &dictionaries) .expect("Unable to convert flight data to Arrow batch"); assert_eq!(expected_batch.schema(), actual_batch.schema()); @@ -245,7 +245,7 @@ async fn consume_flight_location( async fn receive_batch_flight_data( resp: &mut Streaming, schema: SchemaRef, - dictionaries_by_field: &mut [Option], + dictionaries: &mut HashMap>, ) -> Option { let mut data = resp.next().await?.ok()?; let mut message = @@ -259,7 +259,7 @@ async fn receive_batch_flight_data( .expect("Error parsing dictionary"), &schema, true, - dictionaries_by_field, + dictionaries, &mut reader, 0, ) diff --git a/integration-testing/src/flight_server_scenarios/integration_test.rs b/integration-testing/src/flight_server_scenarios/integration_test.rs index c2f9d967be0..a91e2b7348e 100644 --- a/integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/integration-testing/src/flight_server_scenarios/integration_test.rs @@ -275,7 +275,7 @@ async fn record_batch_from_message( message: Message<'_>, data_body: &[u8], schema_ref: Arc, - dictionaries_by_field: &[Option>], + dictionaries: &mut HashMap>, ) -> Result { let ipc_batch = message .header_as_record_batch() @@ -288,7 +288,7 @@ async fn record_batch_from_message( schema_ref, None, true, - dictionaries_by_field, + dictionaries, ArrowSchema::MetadataVersion::V5, &mut reader, 0, @@ -302,7 +302,7 @@ async fn dictionary_from_message( message: Message<'_>, data_body: &[u8], schema_ref: Arc, - dictionaries_by_field: &mut [Option>], + dictionaries: &mut HashMap>, ) -> Result<(), Status> { let ipc_batch = message .header_as_dictionary_batch() @@ -310,14 +310,8 @@ async fn dictionary_from_message( let mut reader = std::io::Cursor::new(data_body); - let dictionary_batch_result = ipc::read::read_dictionary( - ipc_batch, - &schema_ref, - true, - dictionaries_by_field, - &mut reader, - 0, - ); + let dictionary_batch_result = + ipc::read::read_dictionary(ipc_batch, &schema_ref, true, dictionaries, &mut reader, 0); dictionary_batch_result .map_err(|e| Status::internal(format!("Could not convert to Dictionary: {:?}", e))) } @@ -333,7 +327,7 @@ async fn save_uploaded_chunks( let mut chunks = vec![]; let mut uploaded_chunks = uploaded_chunks.lock().await; - let mut dictionaries_by_field = vec![None; schema_ref.fields().len()]; + let mut dictionaries = Default::default(); while let Some(Ok(data)) = input_stream.next().await { let message = root_as_message(&data.data_header[..]) @@ -352,7 +346,7 @@ async fn save_uploaded_chunks( message, &data.data_body, schema_ref.clone(), - &dictionaries_by_field, + &mut dictionaries, ) .await?; @@ -363,7 +357,7 @@ async fn save_uploaded_chunks( message, &data.data_body, schema_ref.clone(), - &mut dictionaries_by_field, + &mut dictionaries, ) .await?; } diff --git a/src/io/json_integration/mod.rs b/src/io/json_integration/mod.rs index 7f425c7b710..175c5068c78 100644 --- a/src/io/json_integration/mod.rs +++ b/src/io/json_integration/mod.rs @@ -20,10 +20,13 @@ //! These utilities define structs that read the integration JSON format for integration testing purposes. use serde_derive::{Deserialize, Serialize}; -use serde_json::Value; +use serde_json::{Map, Value}; + +use crate::datatypes::*; -mod read; mod schema; +use schema::ToJson; +mod read; mod write; pub use read::to_record_batch; pub use write::from_record_batch; @@ -61,6 +64,59 @@ pub struct ArrowJsonField { pub metadata: Option, } +impl From<&Field> for ArrowJsonField { + fn from(field: &Field) -> Self { + let metadata_value = match field.metadata() { + Some(kv_list) => { + let mut array = Vec::new(); + for (k, v) in kv_list { + let mut kv_map = Map::new(); + kv_map.insert(k.clone(), Value::String(v.clone())); + array.push(Value::Object(kv_map)); + } + if !array.is_empty() { + Some(Value::Array(array)) + } else { + None + } + } + _ => None, + }; + + let dictionary = if let DataType::Dictionary(key_type, _) = &field.data_type { + use crate::datatypes::IntegerType::*; + Some(ArrowJsonFieldDictionary { + id: field.dict_id, + index_type: IntegerType { + name: "".to_string(), + bit_width: match key_type { + Int8 | UInt8 => 8, + Int16 | UInt16 => 16, + Int32 | UInt32 => 32, + Int64 | UInt64 => 64, + }, + is_signed: match key_type { + Int8 | Int16 | Int32 | Int64 => true, + UInt8 | UInt16 | UInt32 | UInt64 => false, + }, + }, + is_ordered: field.dict_is_ordered, + }) + } else { + None + }; + + Self { + name: field.name().to_string(), + field_type: field.data_type().to_json(), + nullable: field.is_nullable(), + children: vec![], + dictionary, + metadata: metadata_value, + } + } +} + #[derive(Deserialize, Serialize, Debug)] pub struct ArrowJsonFieldDictionary { pub id: i64,