From 39f730a2f9f627a5b1e685691bc4d7eac6142260 Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Wed, 19 Jan 2022 22:07:12 +0100 Subject: [PATCH] Allowed None IPC field (#780) --- .../integration_test.rs | 2 +- .../integration_test.rs | 4 +-- src/io/flight/mod.rs | 30 ++++++++++++++----- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/integration-testing/src/flight_client_scenarios/integration_test.rs b/integration-testing/src/flight_client_scenarios/integration_test.rs index ce015f3ae7a..fe56eff4d96 100644 --- a/integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/integration-testing/src/flight_client_scenarios/integration_test.rs @@ -96,7 +96,7 @@ async fn upload_data( let options = write::WriteOptions { compression: None }; - let mut schema = flight::serialize_schema(schema, fields); + let mut schema = flight::serialize_schema(schema, Some(fields)); schema.flight_descriptor = Some(descriptor.clone()); upload_tx.send(schema).await?; diff --git a/integration-testing/src/flight_server_scenarios/integration_test.rs b/integration-testing/src/flight_server_scenarios/integration_test.rs index 98c26a56c4f..f971036e0ef 100644 --- a/integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/integration-testing/src/flight_server_scenarios/integration_test.rs @@ -113,7 +113,7 @@ impl FlightService for FlightServiceImpl { let schema = std::iter::once(Ok(serialize_schema( &flight.schema, - &flight.ipc_schema.fields, + Some(&flight.ipc_schema.fields), ))); let batches = flight @@ -175,7 +175,7 @@ impl FlightService for FlightServiceImpl { let total_records: usize = flight.chunks.iter().map(|chunk| chunk.len()).sum(); - let schema = serialize_schema_to_info(&flight.schema, &flight.ipc_schema.fields) + let schema = serialize_schema_to_info(&flight.schema, Some(&flight.ipc_schema.fields)) .expect( "Could not generate schema bytes from schema stored by a DoPut; \ this should be impossible", diff --git a/src/io/flight/mod.rs b/src/io/flight/mod.rs index a9a02b87792..5db4c70770e 100644 --- a/src/io/flight/mod.rs +++ b/src/io/flight/mod.rs @@ -14,6 +14,7 @@ use crate::{ io::ipc::write::common::{encode_chunk, DictionaryTracker, EncodedData, WriteOptions}, }; +use super::ipc::write::default_ipc_fields; use super::ipc::{IpcField, IpcSchema}; /// Serializes [`Chunk`] to a vector of [`FlightData`] representing the serialized dictionaries @@ -46,14 +47,17 @@ impl From for FlightData { } /// Serializes a [`Schema`] to [`SchemaResult`]. -pub fn serialize_schema_to_result(schema: &Schema, ipc_fields: &[IpcField]) -> SchemaResult { +pub fn serialize_schema_to_result( + schema: &Schema, + ipc_fields: Option<&[IpcField]>, +) -> SchemaResult { SchemaResult { schema: schema_as_flatbuffer(schema, ipc_fields), } } /// Serializes a [`Schema`] to [`FlightData`]. -pub fn serialize_schema(schema: &Schema, ipc_fields: &[IpcField]) -> FlightData { +pub fn serialize_schema(schema: &Schema, ipc_fields: Option<&[IpcField]>) -> FlightData { let data_header = schema_as_flatbuffer(schema, ipc_fields); FlightData { data_header, @@ -62,17 +66,29 @@ pub fn serialize_schema(schema: &Schema, ipc_fields: &[IpcField]) -> FlightData } /// Convert a [`Schema`] to bytes in the format expected in [`arrow_format::flight::data::FlightInfo`]. -pub fn serialize_schema_to_info(schema: &Schema, ipc_fields: &[IpcField]) -> Result> { - let encoded_data = schema_as_encoded_data(schema, ipc_fields); +pub fn serialize_schema_to_info( + schema: &Schema, + ipc_fields: Option<&[IpcField]>, +) -> Result> { + let encoded_data = if let Some(ipc_fields) = ipc_fields { + schema_as_encoded_data(schema, ipc_fields) + } else { + let ipc_fields = default_ipc_fields(&schema.fields); + schema_as_encoded_data(schema, &ipc_fields) + }; let mut schema = vec![]; write::common_sync::write_message(&mut schema, encoded_data)?; Ok(schema) } -fn schema_as_flatbuffer(schema: &Schema, ipc_fields: &[IpcField]) -> Vec { - let encoded_data = schema_as_encoded_data(schema, ipc_fields); - encoded_data.ipc_message +fn schema_as_flatbuffer(schema: &Schema, ipc_fields: Option<&[IpcField]>) -> Vec { + if let Some(ipc_fields) = ipc_fields { + write::schema_to_bytes(schema, ipc_fields) + } else { + let ipc_fields = default_ipc_fields(&schema.fields); + write::schema_to_bytes(schema, &ipc_fields) + } } fn schema_as_encoded_data(schema: &Schema, ipc_fields: &[IpcField]) -> EncodedData {