Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Minor cleanup of internal naming (#1160)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Jul 13, 2022
1 parent 91e1ef7 commit bfc6df8
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 20 deletions.
13 changes: 6 additions & 7 deletions src/io/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ pub use crate::io::ipc::write::common::WriteOptions;
/// # Errors
/// This function errors iff `fields` is not consistent with `columns`
pub fn serialize_batch(
columns: &Chunk<Box<dyn Array>>,
chunk: &Chunk<Box<dyn Array>>,
fields: &[IpcField],
options: &WriteOptions,
) -> Result<(Vec<FlightData>, FlightData)> {
if fields.len() != columns.arrays().len() {
if fields.len() != chunk.arrays().len() {
return Err(Error::InvalidArgumentError("The argument `fields` must be consistent with the columns' schema. Use e.g. &arrow2::io::flight::default_ipc_fields(&schema.fields)".to_string()));
}

Expand All @@ -38,7 +38,7 @@ pub fn serialize_batch(
};

let (encoded_dictionaries, encoded_batch) =
encode_chunk(columns, fields, &mut dictionary_tracker, options)
encode_chunk(chunk, fields, &mut dictionary_tracker, options)
.expect("DictionaryTracker configured above to not error on replacement");

let flight_dictionaries = encoded_dictionaries.into_iter().map(Into::into).collect();
Expand All @@ -63,15 +63,14 @@ pub fn serialize_schema_to_result(
ipc_fields: Option<&[IpcField]>,
) -> SchemaResult {
SchemaResult {
schema: schema_as_flatbuffer(schema, ipc_fields),
schema: _serialize_schema(schema, ipc_fields),
}
}

/// Serializes a [`Schema`] to [`FlightData`].
pub fn serialize_schema(schema: &Schema, ipc_fields: Option<&[IpcField]>) -> FlightData {
let data_header = schema_as_flatbuffer(schema, ipc_fields);
FlightData {
data_header,
data_header: _serialize_schema(schema, ipc_fields),
..Default::default()
}
}
Expand All @@ -93,7 +92,7 @@ pub fn serialize_schema_to_info(
Ok(schema)
}

fn schema_as_flatbuffer(schema: &Schema, ipc_fields: Option<&[IpcField]>) -> Vec<u8> {
fn _serialize_schema(schema: &Schema, ipc_fields: Option<&[IpcField]>) -> Vec<u8> {
if let Some(ipc_fields) = ipc_fields {
write::schema_to_bytes(schema, ipc_fields)
} else {
Expand Down
9 changes: 4 additions & 5 deletions src/io/ipc/read/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ fn first_dict_field<'a>(
Err(Error::from(OutOfSpecKind::InvalidId { requested_id: id }))
}

/// Read the dictionary from the buffer and provided metadata,
/// updating the `dictionaries` with the resulting dictionary
/// Reads a dictionary from the reader,
/// updating `dictionaries` with the resulting dictionary
#[allow(clippy::too_many_arguments)]
pub fn read_dictionary<R: Read + Seek>(
batch: arrow_format::ipc::DictionaryBatchRef,
Expand Down Expand Up @@ -269,7 +269,7 @@ pub fn read_dictionary<R: Read + Seek>(
fields: vec![first_ipc_field.clone()],
is_little_endian: ipc_schema.is_little_endian,
};
let columns = read_record_batch(
let chunk = read_record_batch(
batch,
&fields,
&ipc_schema,
Expand All @@ -282,8 +282,7 @@ pub fn read_dictionary<R: Read + Seek>(
file_size,
scratch,
)?;
let mut arrays = columns.into_arrays();
arrays.pop().unwrap()
chunk.into_arrays().pop().unwrap()
}
_ => {
return Err(Error::from(OutOfSpecKind::InvalidIdDataType {
Expand Down
15 changes: 7 additions & 8 deletions src/io/ipc/read/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,17 +328,16 @@ fn get_data_type(
}

/// Deserialize an flatbuffers-encoded Schema message into [`Schema`] and [`IpcSchema`].
pub fn deserialize_schema(bytes: &[u8]) -> Result<(Schema, IpcSchema)> {
let message = arrow_format::ipc::MessageRef::read_as_root(bytes)
pub fn deserialize_schema(message: &[u8]) -> Result<(Schema, IpcSchema)> {
let message = arrow_format::ipc::MessageRef::read_as_root(message)
.map_err(|err| Error::oos(format!("Unable deserialize message: {:?}", err)))?;

let schema = match message.header()?.ok_or_else(|| {
Error::oos("Unable to convert flight data header to a record batch".to_string())
})? {
let schema = match message
.header()?
.ok_or_else(|| Error::oos("Unable to convert header to a schema".to_string()))?
{
arrow_format::ipc::MessageHeaderRef::Schema(schema) => Ok(schema),
_ => Err(Error::nyi(
"flight currently only supports reading RecordBatch messages",
)),
_ => Err(Error::nyi("The message is expected to be a Schema message")),
}?;

fb_to_schema(schema)
Expand Down

0 comments on commit bfc6df8

Please sign in to comment.