Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Minor cleanup of internal namings #1160

Merged
merged 1 commit into from
Jul 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions src/io/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ pub use crate::io::ipc::write::common::WriteOptions;
/// # Errors
/// This function errors iff `fields` is not consistent with `columns`
pub fn serialize_batch(
columns: &Chunk<Box<dyn Array>>,
chunk: &Chunk<Box<dyn Array>>,
fields: &[IpcField],
options: &WriteOptions,
) -> Result<(Vec<FlightData>, FlightData)> {
if fields.len() != columns.arrays().len() {
if fields.len() != chunk.arrays().len() {
return Err(Error::InvalidArgumentError("The argument `fields` must be consistent with the columns' schema. Use e.g. &arrow2::io::flight::default_ipc_fields(&schema.fields)".to_string()));
}

Expand All @@ -38,7 +38,7 @@ pub fn serialize_batch(
};

let (encoded_dictionaries, encoded_batch) =
encode_chunk(columns, fields, &mut dictionary_tracker, options)
encode_chunk(chunk, fields, &mut dictionary_tracker, options)
.expect("DictionaryTracker configured above to not error on replacement");

let flight_dictionaries = encoded_dictionaries.into_iter().map(Into::into).collect();
Expand All @@ -63,15 +63,14 @@ pub fn serialize_schema_to_result(
ipc_fields: Option<&[IpcField]>,
) -> SchemaResult {
SchemaResult {
schema: schema_as_flatbuffer(schema, ipc_fields),
schema: _serialize_schema(schema, ipc_fields),
}
}

/// Serializes a [`Schema`] to [`FlightData`].
pub fn serialize_schema(schema: &Schema, ipc_fields: Option<&[IpcField]>) -> FlightData {
let data_header = schema_as_flatbuffer(schema, ipc_fields);
FlightData {
data_header,
data_header: _serialize_schema(schema, ipc_fields),
..Default::default()
}
}
Expand All @@ -93,7 +92,7 @@ pub fn serialize_schema_to_info(
Ok(schema)
}

fn schema_as_flatbuffer(schema: &Schema, ipc_fields: Option<&[IpcField]>) -> Vec<u8> {
fn _serialize_schema(schema: &Schema, ipc_fields: Option<&[IpcField]>) -> Vec<u8> {
if let Some(ipc_fields) = ipc_fields {
write::schema_to_bytes(schema, ipc_fields)
} else {
Expand Down
9 changes: 4 additions & 5 deletions src/io/ipc/read/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ fn first_dict_field<'a>(
Err(Error::from(OutOfSpecKind::InvalidId { requested_id: id }))
}

/// Read the dictionary from the buffer and provided metadata,
/// updating the `dictionaries` with the resulting dictionary
/// Reads a dictionary from the reader,
/// updating `dictionaries` with the resulting dictionary
#[allow(clippy::too_many_arguments)]
pub fn read_dictionary<R: Read + Seek>(
batch: arrow_format::ipc::DictionaryBatchRef,
Expand Down Expand Up @@ -269,7 +269,7 @@ pub fn read_dictionary<R: Read + Seek>(
fields: vec![first_ipc_field.clone()],
is_little_endian: ipc_schema.is_little_endian,
};
let columns = read_record_batch(
let chunk = read_record_batch(
batch,
&fields,
&ipc_schema,
Expand All @@ -282,8 +282,7 @@ pub fn read_dictionary<R: Read + Seek>(
file_size,
scratch,
)?;
let mut arrays = columns.into_arrays();
arrays.pop().unwrap()
chunk.into_arrays().pop().unwrap()
}
_ => {
return Err(Error::from(OutOfSpecKind::InvalidIdDataType {
Expand Down
15 changes: 7 additions & 8 deletions src/io/ipc/read/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,17 +328,16 @@ fn get_data_type(
}

/// Deserialize an flatbuffers-encoded Schema message into [`Schema`] and [`IpcSchema`].
pub fn deserialize_schema(bytes: &[u8]) -> Result<(Schema, IpcSchema)> {
let message = arrow_format::ipc::MessageRef::read_as_root(bytes)
pub fn deserialize_schema(message: &[u8]) -> Result<(Schema, IpcSchema)> {
let message = arrow_format::ipc::MessageRef::read_as_root(message)
.map_err(|err| Error::oos(format!("Unable deserialize message: {:?}", err)))?;

let schema = match message.header()?.ok_or_else(|| {
Error::oos("Unable to convert flight data header to a record batch".to_string())
})? {
let schema = match message
.header()?
.ok_or_else(|| Error::oos("Unable to convert header to a schema".to_string()))?
{
arrow_format::ipc::MessageHeaderRef::Schema(schema) => Ok(schema),
_ => Err(Error::nyi(
"flight currently only supports reading RecordBatch messages",
)),
_ => Err(Error::nyi("The message is expected to be a Schema message")),
}?;

fb_to_schema(schema)
Expand Down