Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Allowed None IPC field
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jan 19, 2022
1 parent 84a4911 commit 74e6eb6
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl FlightService for FlightServiceImpl {

let schema = std::iter::once(Ok(serialize_schema(
&flight.schema,
&flight.ipc_schema.fields,
Some(&flight.ipc_schema.fields),
)));

let batches = flight
Expand Down Expand Up @@ -175,7 +175,7 @@ impl FlightService for FlightServiceImpl {

let total_records: usize = flight.chunks.iter().map(|chunk| chunk.len()).sum();

let schema = serialize_schema_to_info(&flight.schema, &flight.ipc_schema.fields)
let schema = serialize_schema_to_info(&flight.schema, Some(&flight.ipc_schema.fields))
.expect(
"Could not generate schema bytes from schema stored by a DoPut; \
this should be impossible",
Expand Down
30 changes: 23 additions & 7 deletions src/io/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::{
io::ipc::write::common::{encode_chunk, DictionaryTracker, EncodedData, WriteOptions},
};

use super::ipc::write::default_ipc_fields;
use super::ipc::{IpcField, IpcSchema};

/// Serializes [`Chunk`] to a vector of [`FlightData`] representing the serialized dictionaries
Expand Down Expand Up @@ -46,14 +47,17 @@ impl From<EncodedData> for FlightData {
}

/// Serializes a [`Schema`] to [`SchemaResult`].
pub fn serialize_schema_to_result(schema: &Schema, ipc_fields: &[IpcField]) -> SchemaResult {
pub fn serialize_schema_to_result(
schema: &Schema,
ipc_fields: Option<&[IpcField]>,
) -> SchemaResult {
SchemaResult {
schema: schema_as_flatbuffer(schema, ipc_fields),
}
}

/// Serializes a [`Schema`] to [`FlightData`].
pub fn serialize_schema(schema: &Schema, ipc_fields: &[IpcField]) -> FlightData {
pub fn serialize_schema(schema: &Schema, ipc_fields: Option<&[IpcField]>) -> FlightData {
let data_header = schema_as_flatbuffer(schema, ipc_fields);
FlightData {
data_header,
Expand All @@ -62,17 +66,29 @@ pub fn serialize_schema(schema: &Schema, ipc_fields: &[IpcField]) -> FlightData
}

/// Convert a [`Schema`] to bytes in the format expected in [`arrow_format::flight::data::FlightInfo`].
pub fn serialize_schema_to_info(schema: &Schema, ipc_fields: &[IpcField]) -> Result<Vec<u8>> {
let encoded_data = schema_as_encoded_data(schema, ipc_fields);
pub fn serialize_schema_to_info(
schema: &Schema,
ipc_fields: Option<&[IpcField]>,
) -> Result<Vec<u8>> {
let encoded_data = if let Some(ipc_fields) = ipc_fields {
schema_as_encoded_data(schema, ipc_fields)
} else {
let ipc_fields = default_ipc_fields(&schema.fields);
schema_as_encoded_data(schema, &ipc_fields)
};

let mut schema = vec![];
write::common_sync::write_message(&mut schema, encoded_data)?;
Ok(schema)
}

fn schema_as_flatbuffer(schema: &Schema, ipc_fields: &[IpcField]) -> Vec<u8> {
let encoded_data = schema_as_encoded_data(schema, ipc_fields);
encoded_data.ipc_message
fn schema_as_flatbuffer(schema: &Schema, ipc_fields: Option<&[IpcField]>) -> Vec<u8> {
if let Some(ipc_fields) = ipc_fields {
write::schema_to_bytes(schema, ipc_fields)
} else {
let ipc_fields = default_ipc_fields(&schema.fields);
write::schema_to_bytes(schema, &ipc_fields)
}
}

fn schema_as_encoded_data(schema: &Schema, ipc_fields: &[IpcField]) -> EncodedData {
Expand Down

0 comments on commit 74e6eb6

Please sign in to comment.