Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added avro record names when converting arrow schema to avro #1279

Merged
merged 1 commit into from
Nov 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions src/io/avro/write/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ use crate::error::{Error, Result};

/// Converts a [`Schema`] to an Avro [`Record`].
pub fn to_record(schema: &Schema) -> Result<Record> {
let mut name_counter: i32 = 0;
let fields = schema
.fields
.iter()
.map(field_to_field)
.map(|f| field_to_field(f, &mut name_counter))
.collect::<Result<_>>()?;
Ok(Record {
name: "".to_string(),
Expand All @@ -22,20 +23,32 @@ pub fn to_record(schema: &Schema) -> Result<Record> {
})
}

fn field_to_field(field: &Field) -> Result<AvroField> {
let schema = type_to_schema(field.data_type(), field.is_nullable)?;
fn field_to_field(field: &Field, name_counter: &mut i32) -> Result<AvroField> {
let schema = type_to_schema(field.data_type(), field.is_nullable, name_counter)?;
Ok(AvroField::new(&field.name, schema))
}

fn type_to_schema(data_type: &DataType, is_nullable: bool) -> Result<AvroSchema> {
fn type_to_schema(
data_type: &DataType,
is_nullable: bool,
name_counter: &mut i32,
) -> Result<AvroSchema> {
Ok(if is_nullable {
AvroSchema::Union(vec![AvroSchema::Null, _type_to_schema(data_type)?])
AvroSchema::Union(vec![
AvroSchema::Null,
_type_to_schema(data_type, name_counter)?,
])
} else {
_type_to_schema(data_type)?
_type_to_schema(data_type, name_counter)?
})
}

fn _type_to_schema(data_type: &DataType) -> Result<AvroSchema> {
fn _get_field_name(name_counter: &mut i32) -> String {
*name_counter += 1;
format!("r{}", name_counter)
}

fn _type_to_schema(data_type: &DataType, name_counter: &mut i32) -> Result<AvroSchema> {
Ok(match data_type.to_logical_type() {
DataType::Null => AvroSchema::Null,
DataType::Boolean => AvroSchema::Boolean,
Expand All @@ -48,13 +61,13 @@ fn _type_to_schema(data_type: &DataType) -> Result<AvroSchema> {
DataType::Utf8 => AvroSchema::String(None),
DataType::LargeUtf8 => AvroSchema::String(None),
DataType::LargeList(inner) | DataType::List(inner) => AvroSchema::Array(Box::new(
type_to_schema(&inner.data_type, inner.is_nullable)?,
type_to_schema(&inner.data_type, inner.is_nullable, name_counter)?,
)),
DataType::Struct(fields) => AvroSchema::Record(Record::new(
"",
_get_field_name(name_counter),
fields
.iter()
.map(field_to_field)
.map(|f| field_to_field(f, name_counter))
.collect::<Result<Vec<_>>>()?,
)),
DataType::Date32 => AvroSchema::Int(Some(IntLogical::Date)),
Expand Down
91 changes: 91 additions & 0 deletions tests/it/io/avro/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use arrow2::io::avro::avro_schema::file::{Block, CompressedBlock, Compression};
use arrow2::io::avro::avro_schema::write::{compress, write_block, write_metadata};
use arrow2::io::avro::write;
use arrow2::types::months_days_ns;
use avro_schema::schema::{Field as AvroField, Record, Schema as AvroSchema};

use super::read::read_avro;

Expand Down Expand Up @@ -284,6 +285,96 @@ fn struct_data() -> Chunk<Box<dyn Array>> {
])
}

fn avro_record() -> Record {
Record {
name: "".to_string(),
namespace: None,
doc: None,
aliases: vec![],
fields: vec![
AvroField {
name: "struct".to_string(),
doc: None,
schema: AvroSchema::Record(Record {
name: "r1".to_string(),
namespace: None,
doc: None,
aliases: vec![],
fields: vec![
AvroField {
name: "item1".to_string(),
doc: None,
schema: AvroSchema::Int(None),
default: None,
order: None,
aliases: vec![],
},
AvroField {
name: "item2".to_string(),
doc: None,
schema: AvroSchema::Union(vec![
AvroSchema::Null,
AvroSchema::Int(None),
]),
default: None,
order: None,
aliases: vec![],
},
],
}),
default: None,
order: None,
aliases: vec![],
},
AvroField {
name: "struct nullable".to_string(),
doc: None,
schema: AvroSchema::Union(vec![
AvroSchema::Null,
AvroSchema::Record(Record {
name: "r2".to_string(),
namespace: None,
doc: None,
aliases: vec![],
fields: vec![
AvroField {
name: "item1".to_string(),
doc: None,
schema: AvroSchema::Int(None),
default: None,
order: None,
aliases: vec![],
},
AvroField {
name: "item2".to_string(),
doc: None,
schema: AvroSchema::Union(vec![
AvroSchema::Null,
AvroSchema::Int(None),
]),
default: None,
order: None,
aliases: vec![],
},
],
}),
]),
default: None,
order: None,
aliases: vec![],
},
],
}
}

#[test]
fn avro_record_schema() -> Result<()> {
let arrow_schema = struct_schema();
let record = write::to_record(&arrow_schema)?;
assert_eq!(record, avro_record());
Ok(())
}

#[test]
fn struct_() -> Result<()> {
let write_schema = struct_schema();
Expand Down