Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added read_schema #196

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benches/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ fn make_records(record: Value, count: usize) -> Vec<Value> {
}

fn write(schema: &Schema, records: &[Value]) -> Vec<u8> {
let mut writer = Writer::new(&schema, Vec::new());
let mut writer = Writer::new(schema, Vec::new());
writer.extend_from_slice(records).unwrap();
writer.into_inner().unwrap()
}
Expand Down
2 changes: 1 addition & 1 deletion examples/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fn benchmark(schema: &Schema, record: &Value, s: &str, count: usize, runs: usize
let records = records.clone();

let start = Instant::now();
let mut writer = Writer::new(&schema, Vec::new());
let mut writer = Writer::new(schema, Vec::new());
writer.extend(records.into_iter()).unwrap();

let duration = Instant::now().duration_since(start);
Expand Down
10 changes: 5 additions & 5 deletions src/de.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,9 +436,9 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> {
{
match *self.input {
// This branch can be anything...
Value::Record(ref fields) => visitor.visit_enum(EnumDeserializer::new(&fields)),
Value::Record(ref fields) => visitor.visit_enum(EnumDeserializer::new(fields)),
// This has to be a unit Enum
Value::Enum(_index, ref field) => visitor.visit_enum(EnumUnitDeserializer::new(&field)),
Value::Enum(_index, ref field) => visitor.visit_enum(EnumUnitDeserializer::new(field)),
_ => Err(de::Error::custom("not an enum")),
}
}
Expand Down Expand Up @@ -466,7 +466,7 @@ impl<'de> de::SeqAccess<'de> for SeqDeserializer<'de> {
T: DeserializeSeed<'de>,
{
match self.input.next() {
Some(item) => seed.deserialize(&Deserializer::new(&item)).map(Some),
Some(item) => seed.deserialize(&Deserializer::new(item)).map(Some),
None => Ok(None),
}
}
Expand All @@ -480,7 +480,7 @@ impl<'de> de::MapAccess<'de> for MapDeserializer<'de> {
K: DeserializeSeed<'de>,
{
match self.input_keys.next() {
Some(ref key) => seed
Some(key) => seed
.deserialize(StringDeserializer {
input: (*key).clone(),
})
Expand All @@ -494,7 +494,7 @@ impl<'de> de::MapAccess<'de> for MapDeserializer<'de> {
V: DeserializeSeed<'de>,
{
match self.input_values.next() {
Some(ref value) => seed.deserialize(&Deserializer::new(value)),
Some(value) => seed.deserialize(&Deserializer::new(value)),
None => Err(de::Error::custom("should not happen - too many values")),
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::convert::TryInto;
/// be valid with regards to the schema. Schema are needed only to guide the
/// encoding for complex type values.
pub fn encode(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {
encode_ref(&value, schema, buffer)
encode_ref(value, schema, buffer)
}

fn encode_bytes<B: AsRef<[u8]> + ?Sized>(s: &B, buffer: &mut Vec<u8>) {
Expand Down Expand Up @@ -136,7 +136,7 @@ pub fn encode_ref(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {

pub fn encode_to_vec(value: &Value, schema: &Schema) -> Vec<u8> {
let mut buffer = Vec::new();
encode(&value, schema, &mut buffer);
encode(value, schema, &mut buffer);
buffer
}

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,7 @@ mod tests {
// Would allocated 18446744073709551605 bytes
let illformed: &[u8] = &[0x3e, 0x15, 0xff, 0x1f, 0x15, 0xff];

let value = from_avro_datum(&schema, &mut &illformed[..], None);
let value = from_avro_datum(&schema, &mut &*illformed, None);
assert!(value.is_err());
}
}
94 changes: 52 additions & 42 deletions src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,54 @@ use std::{
str::FromStr,
};

/// Reads the schema from `reader`, returning the file's [`Schema`] and [`Codec`].
/// # Error
/// This function errors iff the header is not a valid avro file header.
pub fn read_schema<R: Read>(reader: &mut R) -> AvroResult<(Schema, Codec)> {
let meta_schema = Schema::Map(Box::new(Schema::Bytes));

let mut buf = [0u8; 4];
reader.read_exact(&mut buf).map_err(Error::ReadHeader)?;

if buf != [b'O', b'b', b'j', 1u8] {
return Err(Error::HeaderMagic);
}

if let Value::Map(meta) = decode(&meta_schema, reader)? {
// TODO: surface original parse schema errors instead of coalescing them here
let json = meta
.get("avro.schema")
.and_then(|bytes| {
if let Value::Bytes(ref bytes) = *bytes {
from_slice(bytes.as_ref()).ok()
} else {
None
}
})
.ok_or(Error::GetAvroSchemaFromMap)?;
let schema = Schema::parse(&json)?;

let codec = if let Some(codec) = meta
.get("avro.codec")
.and_then(|codec| {
if let Value::Bytes(ref bytes) = *codec {
std::str::from_utf8(bytes.as_ref()).ok()
} else {
None
}
})
.and_then(|codec| Codec::from_str(codec).ok())
{
codec
} else {
Codec::Null
};
Ok((schema, codec))
} else {
Err(Error::GetHeaderMetadata)
}
}

// Internal Block reader.
#[derive(Debug, Clone)]
struct Block<R> {
Expand Down Expand Up @@ -39,47 +87,9 @@ impl<R: Read> Block<R> {
/// Try to read the header and to set the writer `Schema`, the `Codec` and the marker based on
/// its content.
fn read_header(&mut self) -> AvroResult<()> {
let meta_schema = Schema::Map(Box::new(Schema::Bytes));

let mut buf = [0u8; 4];
self.reader
.read_exact(&mut buf)
.map_err(Error::ReadHeader)?;

if buf != [b'O', b'b', b'j', 1u8] {
return Err(Error::HeaderMagic);
}

if let Value::Map(meta) = decode(&meta_schema, &mut self.reader)? {
// TODO: surface original parse schema errors instead of coalescing them here
let json = meta
.get("avro.schema")
.and_then(|bytes| {
if let Value::Bytes(ref bytes) = *bytes {
from_slice(bytes.as_ref()).ok()
} else {
None
}
})
.ok_or(Error::GetAvroSchemaFromMap)?;
self.writer_schema = Schema::parse(&json)?;

if let Some(codec) = meta
.get("avro.codec")
.and_then(|codec| {
if let Value::Bytes(ref bytes) = *codec {
std::str::from_utf8(bytes.as_ref()).ok()
} else {
None
}
})
.and_then(|codec| Codec::from_str(codec).ok())
{
self.codec = codec;
}
} else {
return Err(Error::GetHeaderMetadata);
}
let (schema, codec) = read_schema(&mut self.reader)?;
self.writer_schema = schema;
self.codec = codec;

self.reader
.read_exact(&mut self.marker)
Expand Down Expand Up @@ -279,7 +289,7 @@ pub fn from_avro_datum<R: Read>(
) -> AvroResult<Value> {
let value = decode(writer_schema, reader)?;
match reader_schema {
Some(ref schema) => value.resolve(schema),
Some(schema) => value.resolve(schema),
None => Ok(value),
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ impl Schema {
for js in input {
let schema: Value = serde_json::from_str(js).map_err(Error::ParseSchemaJson)?;
if let Value::Object(inner) = &schema {
let fullname = Name::parse(&inner)?.fullname(None);
let fullname = Name::parse(inner)?.fullname(None);
let previous_value = input_schemas.insert(fullname.clone(), schema);
if previous_value.is_some() {
return Err(Error::NameCollision(fullname));
Expand Down
81 changes: 36 additions & 45 deletions src/schema_compatibility.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl Checker {
symbols: r_symbols, ..
} = readers_schema
{
return w_symbols.iter().find(|e| !r_symbols.contains(e)).is_none();
return !w_symbols.iter().any(|e| !r_symbols.contains(e));
}
}
false
Expand Down Expand Up @@ -580,10 +580,10 @@ mod tests {
&writer_schema(),
&reader_schema,
));
assert_eq!(
SchemaCompatibility::can_read(&reader_schema, &writer_schema()),
false
);
assert!(!SchemaCompatibility::can_read(
&reader_schema,
&writer_schema()
));
}

#[test]
Expand All @@ -600,10 +600,10 @@ mod tests {
&writer_schema(),
&reader_schema
));
assert_eq!(
SchemaCompatibility::can_read(&reader_schema, &writer_schema()),
false
);
assert!(!SchemaCompatibility::can_read(
&reader_schema,
&writer_schema()
));
}

#[test]
Expand Down Expand Up @@ -642,10 +642,10 @@ mod tests {
&writer_schema(),
&reader_schema
));
assert_eq!(
SchemaCompatibility::can_read(&reader_schema, &writer_schema()),
false
);
assert!(!SchemaCompatibility::can_read(
&reader_schema,
&writer_schema()
));
}

#[test]
Expand All @@ -659,14 +659,14 @@ mod tests {
"#,
)
.unwrap();
assert_eq!(
SchemaCompatibility::can_read(&writer_schema(), &reader_schema),
false
);
assert_eq!(
SchemaCompatibility::can_read(&reader_schema, &writer_schema()),
false
);
assert!(!SchemaCompatibility::can_read(
&writer_schema(),
&reader_schema
));
assert!(!SchemaCompatibility::can_read(
&reader_schema,
&writer_schema()
));
}

#[test]
Expand All @@ -678,10 +678,10 @@ mod tests {
&string_array_schema(),
&valid_reader
));
assert_eq!(
SchemaCompatibility::can_read(&string_array_schema(), &invalid_reader),
false
);
assert!(!SchemaCompatibility::can_read(
&string_array_schema(),
&invalid_reader
));
}

#[test]
Expand All @@ -691,10 +691,10 @@ mod tests {
&Schema::String,
&valid_reader
));
assert_eq!(
SchemaCompatibility::can_read(&Schema::Int, &Schema::String),
false
);
assert!(!SchemaCompatibility::can_read(
&Schema::Int,
&Schema::String
));
}

#[test]
Expand All @@ -703,10 +703,7 @@ mod tests {
let union_writer = union_schema(vec![Schema::Int, Schema::String]);
let union_reader = union_schema(vec![Schema::String]);

assert_eq!(
SchemaCompatibility::can_read(&union_writer, &union_reader),
false
);
assert!(!SchemaCompatibility::can_read(&union_writer, &union_reader));
assert!(SchemaCompatibility::can_read(&union_reader, &union_writer));
}

Expand All @@ -730,10 +727,7 @@ mod tests {
)
.unwrap();

assert_eq!(
SchemaCompatibility::can_read(&string_schema, &int_schema),
false
);
assert!(!SchemaCompatibility::can_read(&string_schema, &int_schema));
}

#[test]
Expand All @@ -747,10 +741,7 @@ mod tests {
let enum_schema2 =
Schema::parse_str(r#"{"type":"enum", "name":"MyEnum", "symbols":["A","B","C"]}"#)
.unwrap();
assert_eq!(
SchemaCompatibility::can_read(&enum_schema2, &enum_schema1),
false
);
assert!(!SchemaCompatibility::can_read(&enum_schema2, &enum_schema1));
assert!(SchemaCompatibility::can_read(&enum_schema1, &enum_schema2));
}

Expand Down Expand Up @@ -827,10 +818,10 @@ mod tests {
fn test_union_resolution_no_structure_match() {
// short name match, but no structure match
let read_schema = union_schema(vec![Schema::Null, point_3d_no_default_schema()]);
assert_eq!(
SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema),
false
);
assert!(!SchemaCompatibility::can_read(
&point_2d_fullname_schema(),
&read_schema
));
}

// TODO(nlopes): the below require named schemas to be fully supported. See:
Expand Down
4 changes: 2 additions & 2 deletions src/ser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ impl<'a> ser::SerializeTupleVariant for SeqVariantSerializer<'a> {
}

fn end(self) -> Result<Self::Ok, Self::Error> {
Ok(ser::SerializeSeq::end(self)?)
ser::SerializeSeq::end(self)
}
}

Expand Down Expand Up @@ -775,7 +775,7 @@ mod tests {
a: SingleValueInternalEnum::Double(64.0),
};

assert_eq!(to_value(test).is_err(), true);
assert!(to_value(test).is_err());

let test = TestSingleValueAdjacentEnum {
a: SingleValueAdjacentEnum::Double(64.0),
Expand Down
2 changes: 1 addition & 1 deletion src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ mod tests {
#[test]
fn test_overflow() {
let causes_left_shift_overflow: &[u8] = &[0xe1, 0xe1, 0xe1, 0xe1, 0xe1];
assert!(decode_variable(&mut &causes_left_shift_overflow[..]).is_err());
assert!(decode_variable(&mut &*causes_left_shift_overflow).is_err());
}

#[test]
Expand Down
Loading