diff --git a/src/io/flight/mod.rs b/src/io/flight/mod.rs index 59ba0a83d88..4072c0d0d09 100644 --- a/src/io/flight/mod.rs +++ b/src/io/flight/mod.rs @@ -24,11 +24,11 @@ pub use crate::io::ipc::write::common::WriteOptions; /// # Errors /// This function errors iff `fields` is not consistent with `columns` pub fn serialize_batch( - columns: &Chunk>, + chunk: &Chunk>, fields: &[IpcField], options: &WriteOptions, ) -> Result<(Vec, FlightData)> { - if fields.len() != columns.arrays().len() { + if fields.len() != chunk.arrays().len() { return Err(Error::InvalidArgumentError("The argument `fields` must be consistent with the columns' schema. Use e.g. &arrow2::io::flight::default_ipc_fields(&schema.fields)".to_string())); } @@ -38,7 +38,7 @@ pub fn serialize_batch( }; let (encoded_dictionaries, encoded_batch) = - encode_chunk(columns, fields, &mut dictionary_tracker, options) + encode_chunk(chunk, fields, &mut dictionary_tracker, options) .expect("DictionaryTracker configured above to not error on replacement"); let flight_dictionaries = encoded_dictionaries.into_iter().map(Into::into).collect(); @@ -63,15 +63,14 @@ pub fn serialize_schema_to_result( ipc_fields: Option<&[IpcField]>, ) -> SchemaResult { SchemaResult { - schema: schema_as_flatbuffer(schema, ipc_fields), + schema: _serialize_schema(schema, ipc_fields), } } /// Serializes a [`Schema`] to [`FlightData`]. pub fn serialize_schema(schema: &Schema, ipc_fields: Option<&[IpcField]>) -> FlightData { - let data_header = schema_as_flatbuffer(schema, ipc_fields); FlightData { - data_header, + data_header: _serialize_schema(schema, ipc_fields), ..Default::default() } } @@ -93,7 +92,7 @@ pub fn serialize_schema_to_info( Ok(schema) } -fn schema_as_flatbuffer(schema: &Schema, ipc_fields: Option<&[IpcField]>) -> Vec { +fn _serialize_schema(schema: &Schema, ipc_fields: Option<&[IpcField]>) -> Vec { if let Some(ipc_fields) = ipc_fields { write::schema_to_bytes(schema, ipc_fields) } else { diff --git a/src/io/ipc/read/common.rs b/src/io/ipc/read/common.rs index 5638a5289e6..306df0061cf 100644 --- a/src/io/ipc/read/common.rs +++ b/src/io/ipc/read/common.rs @@ -226,8 +226,8 @@ fn first_dict_field<'a>( Err(Error::from(OutOfSpecKind::InvalidId { requested_id: id })) } -/// Read the dictionary from the buffer and provided metadata, -/// updating the `dictionaries` with the resulting dictionary +/// Reads a dictionary from the reader, +/// updating `dictionaries` with the resulting dictionary #[allow(clippy::too_many_arguments)] pub fn read_dictionary( batch: arrow_format::ipc::DictionaryBatchRef, @@ -269,7 +269,7 @@ pub fn read_dictionary( fields: vec![first_ipc_field.clone()], is_little_endian: ipc_schema.is_little_endian, }; - let columns = read_record_batch( + let chunk = read_record_batch( batch, &fields, &ipc_schema, @@ -282,8 +282,7 @@ pub fn read_dictionary( file_size, scratch, )?; - let mut arrays = columns.into_arrays(); - arrays.pop().unwrap() + chunk.into_arrays().pop().unwrap() } _ => { return Err(Error::from(OutOfSpecKind::InvalidIdDataType { diff --git a/src/io/ipc/read/schema.rs b/src/io/ipc/read/schema.rs index b54292baab8..7f970786aa1 100644 --- a/src/io/ipc/read/schema.rs +++ b/src/io/ipc/read/schema.rs @@ -328,17 +328,16 @@ fn get_data_type( } /// Deserialize an flatbuffers-encoded Schema message into [`Schema`] and [`IpcSchema`]. -pub fn deserialize_schema(bytes: &[u8]) -> Result<(Schema, IpcSchema)> { - let message = arrow_format::ipc::MessageRef::read_as_root(bytes) +pub fn deserialize_schema(message: &[u8]) -> Result<(Schema, IpcSchema)> { + let message = arrow_format::ipc::MessageRef::read_as_root(message) .map_err(|err| Error::oos(format!("Unable deserialize message: {:?}", err)))?; - let schema = match message.header()?.ok_or_else(|| { - Error::oos("Unable to convert flight data header to a record batch".to_string()) - })? { + let schema = match message + .header()? + .ok_or_else(|| Error::oos("Unable to convert header to a schema".to_string()))? + { arrow_format::ipc::MessageHeaderRef::Schema(schema) => Ok(schema), - _ => Err(Error::nyi( - "flight currently only supports reading RecordBatch messages", - )), + _ => Err(Error::nyi("The message is expected to be a Schema message")), }?; fb_to_schema(schema)