Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Reverted some changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Nov 8, 2021
1 parent b37bde6 commit d699e0f
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use arrow_format::ipc::Message::MessageHeader;
use futures::{channel::mpsc, sink::SinkExt, stream, StreamExt};
use tonic::{Request, Streaming};

use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};

type ArrayRef = Arc<dyn Array>;
type SchemaRef = Arc<Schema>;
Expand Down Expand Up @@ -199,10 +199,10 @@ async fn consume_flight_location(
// first FlightData. Ignore this one.
let _schema_again = resp.next().await.unwrap();

let mut dictionaries_by_field = vec![None; schema.fields().len()];
let mut dictionaries = Default::default();

for (counter, expected_batch) in expected_data.iter().enumerate() {
let data = receive_batch_flight_data(&mut resp, schema.clone(), &mut dictionaries_by_field)
let data = receive_batch_flight_data(&mut resp, schema.clone(), &mut dictionaries)
.await
.unwrap_or_else(|| {
panic!(
Expand All @@ -215,7 +215,7 @@ async fn consume_flight_location(
let metadata = counter.to_string().into_bytes();
assert_eq!(metadata, data.app_metadata);

let actual_batch = deserialize_batch(&data, schema.clone(), true, &dictionaries_by_field)
let actual_batch = deserialize_batch(&data, schema.clone(), true, &dictionaries)
.expect("Unable to convert flight data to Arrow batch");

assert_eq!(expected_batch.schema(), actual_batch.schema());
Expand Down Expand Up @@ -245,7 +245,7 @@ async fn consume_flight_location(
async fn receive_batch_flight_data(
resp: &mut Streaming<FlightData>,
schema: SchemaRef,
dictionaries_by_field: &mut [Option<ArrayRef>],
dictionaries: &mut HashMap<usize, Arc<dyn Array>>,
) -> Option<FlightData> {
let mut data = resp.next().await?.ok()?;
let mut message =
Expand All @@ -259,7 +259,7 @@ async fn receive_batch_flight_data(
.expect("Error parsing dictionary"),
&schema,
true,
dictionaries_by_field,
dictionaries,
&mut reader,
0,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ async fn record_batch_from_message(
message: Message<'_>,
data_body: &[u8],
schema_ref: Arc<Schema>,
dictionaries_by_field: &[Option<Arc<dyn Array>>],
dictionaries: &mut HashMap<usize, Arc<dyn Array>>,
) -> Result<RecordBatch, Status> {
let ipc_batch = message
.header_as_record_batch()
Expand All @@ -288,7 +288,7 @@ async fn record_batch_from_message(
schema_ref,
None,
true,
dictionaries_by_field,
dictionaries,
ArrowSchema::MetadataVersion::V5,
&mut reader,
0,
Expand All @@ -302,22 +302,16 @@ async fn dictionary_from_message(
message: Message<'_>,
data_body: &[u8],
schema_ref: Arc<Schema>,
dictionaries_by_field: &mut [Option<Arc<dyn Array>>],
dictionaries: &mut HashMap<usize, Arc<dyn Array>>,
) -> Result<(), Status> {
let ipc_batch = message
.header_as_dictionary_batch()
.ok_or_else(|| Status::internal("Could not parse message header as dictionary batch"))?;

let mut reader = std::io::Cursor::new(data_body);

let dictionary_batch_result = ipc::read::read_dictionary(
ipc_batch,
&schema_ref,
true,
dictionaries_by_field,
&mut reader,
0,
);
let dictionary_batch_result =
ipc::read::read_dictionary(ipc_batch, &schema_ref, true, dictionaries, &mut reader, 0);
dictionary_batch_result
.map_err(|e| Status::internal(format!("Could not convert to Dictionary: {:?}", e)))
}
Expand All @@ -333,7 +327,7 @@ async fn save_uploaded_chunks(
let mut chunks = vec![];
let mut uploaded_chunks = uploaded_chunks.lock().await;

let mut dictionaries_by_field = vec![None; schema_ref.fields().len()];
let mut dictionaries = Default::default();

while let Some(Ok(data)) = input_stream.next().await {
let message = root_as_message(&data.data_header[..])
Expand All @@ -352,7 +346,7 @@ async fn save_uploaded_chunks(
message,
&data.data_body,
schema_ref.clone(),
&dictionaries_by_field,
&mut dictionaries,
)
.await?;

Expand All @@ -363,7 +357,7 @@ async fn save_uploaded_chunks(
message,
&data.data_body,
schema_ref.clone(),
&mut dictionaries_by_field,
&mut dictionaries,
)
.await?;
}
Expand Down
60 changes: 58 additions & 2 deletions src/io/json_integration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
//! These utilities define structs that read the integration JSON format for integration testing purposes.
use serde_derive::{Deserialize, Serialize};
use serde_json::Value;
use serde_json::{Map, Value};

use crate::datatypes::*;

mod read;
mod schema;
use schema::ToJson;
mod read;
mod write;
pub use read::to_record_batch;
pub use write::from_record_batch;
Expand Down Expand Up @@ -61,6 +64,59 @@ pub struct ArrowJsonField {
pub metadata: Option<Value>,
}

impl From<&Field> for ArrowJsonField {
fn from(field: &Field) -> Self {
let metadata_value = match field.metadata() {
Some(kv_list) => {
let mut array = Vec::new();
for (k, v) in kv_list {
let mut kv_map = Map::new();
kv_map.insert(k.clone(), Value::String(v.clone()));
array.push(Value::Object(kv_map));
}
if !array.is_empty() {
Some(Value::Array(array))
} else {
None
}
}
_ => None,
};

let dictionary = if let DataType::Dictionary(key_type, _) = &field.data_type {
use crate::datatypes::IntegerType::*;
Some(ArrowJsonFieldDictionary {
id: field.dict_id,
index_type: IntegerType {
name: "".to_string(),
bit_width: match key_type {
Int8 | UInt8 => 8,
Int16 | UInt16 => 16,
Int32 | UInt32 => 32,
Int64 | UInt64 => 64,
},
is_signed: match key_type {
Int8 | Int16 | Int32 | Int64 => true,
UInt8 | UInt16 | UInt32 | UInt64 => false,
},
},
is_ordered: field.dict_is_ordered,
})
} else {
None
};

Self {
name: field.name().to_string(),
field_type: field.data_type().to_json(),
nullable: field.is_nullable(),
children: vec![],
dictionary,
metadata: metadata_value,
}
}
}

#[derive(Deserialize, Serialize, Debug)]
pub struct ArrowJsonFieldDictionary {
pub id: i64,
Expand Down

0 comments on commit d699e0f

Please sign in to comment.