From dcfd533101dd88c377dcdbb08f0132fa657de200 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Mon, 13 Sep 2021 06:38:07 +0000 Subject: [PATCH 1/2] Fixed clippy warnings. --- benches/serde.rs | 2 +- examples/benchmark.rs | 2 +- src/de.rs | 10 ++--- src/encode.rs | 4 +- src/lib.rs | 2 +- src/reader.rs | 2 +- src/schema.rs | 2 +- src/schema_compatibility.rs | 81 +++++++++++++++++-------------------- src/ser.rs | 4 +- src/util.rs | 2 +- src/writer.rs | 4 +- tests/io.rs | 4 +- tests/schema.rs | 15 +++---- 13 files changed, 64 insertions(+), 70 deletions(-) diff --git a/benches/serde.rs b/benches/serde.rs index 7d5dd77..84e5a5b 100644 --- a/benches/serde.rs +++ b/benches/serde.rs @@ -159,7 +159,7 @@ fn make_records(record: Value, count: usize) -> Vec { } fn write(schema: &Schema, records: &[Value]) -> Vec { - let mut writer = Writer::new(&schema, Vec::new()); + let mut writer = Writer::new(schema, Vec::new()); writer.extend_from_slice(records).unwrap(); writer.into_inner().unwrap() } diff --git a/examples/benchmark.rs b/examples/benchmark.rs index 1ebd88d..057a916 100644 --- a/examples/benchmark.rs +++ b/examples/benchmark.rs @@ -32,7 +32,7 @@ fn benchmark(schema: &Schema, record: &Value, s: &str, count: usize, runs: usize let records = records.clone(); let start = Instant::now(); - let mut writer = Writer::new(&schema, Vec::new()); + let mut writer = Writer::new(schema, Vec::new()); writer.extend(records.into_iter()).unwrap(); let duration = Instant::now().duration_since(start); diff --git a/src/de.rs b/src/de.rs index 5894298..214a752 100644 --- a/src/de.rs +++ b/src/de.rs @@ -436,9 +436,9 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> { { match *self.input { // This branch can be anything... - Value::Record(ref fields) => visitor.visit_enum(EnumDeserializer::new(&fields)), + Value::Record(ref fields) => visitor.visit_enum(EnumDeserializer::new(fields)), // This has to be a unit Enum - Value::Enum(_index, ref field) => visitor.visit_enum(EnumUnitDeserializer::new(&field)), + Value::Enum(_index, ref field) => visitor.visit_enum(EnumUnitDeserializer::new(field)), _ => Err(de::Error::custom("not an enum")), } } @@ -466,7 +466,7 @@ impl<'de> de::SeqAccess<'de> for SeqDeserializer<'de> { T: DeserializeSeed<'de>, { match self.input.next() { - Some(item) => seed.deserialize(&Deserializer::new(&item)).map(Some), + Some(item) => seed.deserialize(&Deserializer::new(item)).map(Some), None => Ok(None), } } @@ -480,7 +480,7 @@ impl<'de> de::MapAccess<'de> for MapDeserializer<'de> { K: DeserializeSeed<'de>, { match self.input_keys.next() { - Some(ref key) => seed + Some(key) => seed .deserialize(StringDeserializer { input: (*key).clone(), }) @@ -494,7 +494,7 @@ impl<'de> de::MapAccess<'de> for MapDeserializer<'de> { V: DeserializeSeed<'de>, { match self.input_values.next() { - Some(ref value) => seed.deserialize(&Deserializer::new(value)), + Some(value) => seed.deserialize(&Deserializer::new(value)), None => Err(de::Error::custom("should not happen - too many values")), } } diff --git a/src/encode.rs b/src/encode.rs index bc892df..1e907b5 100644 --- a/src/encode.rs +++ b/src/encode.rs @@ -11,7 +11,7 @@ use std::convert::TryInto; /// be valid with regards to the schema. Schema are needed only to guide the /// encoding for complex type values. pub fn encode(value: &Value, schema: &Schema, buffer: &mut Vec) { - encode_ref(&value, schema, buffer) + encode_ref(value, schema, buffer) } fn encode_bytes + ?Sized>(s: &B, buffer: &mut Vec) { @@ -136,7 +136,7 @@ pub fn encode_ref(value: &Value, schema: &Schema, buffer: &mut Vec) { pub fn encode_to_vec(value: &Value, schema: &Schema) -> Vec { let mut buffer = Vec::new(); - encode(&value, schema, &mut buffer); + encode(value, schema, &mut buffer); buffer } diff --git a/src/lib.rs b/src/lib.rs index d9164f8..901bffa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -953,7 +953,7 @@ mod tests { // Would allocated 18446744073709551605 bytes let illformed: &[u8] = &[0x3e, 0x15, 0xff, 0x1f, 0x15, 0xff]; - let value = from_avro_datum(&schema, &mut &illformed[..], None); + let value = from_avro_datum(&schema, &mut &*illformed, None); assert!(value.is_err()); } } diff --git a/src/reader.rs b/src/reader.rs index bbd070e..9fc8639 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -279,7 +279,7 @@ pub fn from_avro_datum( ) -> AvroResult { let value = decode(writer_schema, reader)?; match reader_schema { - Some(ref schema) => value.resolve(schema), + Some(schema) => value.resolve(schema), None => Ok(value), } } diff --git a/src/schema.rs b/src/schema.rs index aec8ff9..d26638e 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -447,7 +447,7 @@ impl Schema { for js in input { let schema: Value = serde_json::from_str(js).map_err(Error::ParseSchemaJson)?; if let Value::Object(inner) = &schema { - let fullname = Name::parse(&inner)?.fullname(None); + let fullname = Name::parse(inner)?.fullname(None); let previous_value = input_schemas.insert(fullname.clone(), schema); if previous_value.is_some() { return Err(Error::NameCollision(fullname)); diff --git a/src/schema_compatibility.rs b/src/schema_compatibility.rs index 71680d2..c85aab4 100644 --- a/src/schema_compatibility.rs +++ b/src/schema_compatibility.rs @@ -79,7 +79,7 @@ impl Checker { symbols: r_symbols, .. } = readers_schema { - return w_symbols.iter().find(|e| !r_symbols.contains(e)).is_none(); + return !w_symbols.iter().any(|e| !r_symbols.contains(e)); } } false @@ -580,10 +580,10 @@ mod tests { &writer_schema(), &reader_schema, )); - assert_eq!( - SchemaCompatibility::can_read(&reader_schema, &writer_schema()), - false - ); + assert!(!SchemaCompatibility::can_read( + &reader_schema, + &writer_schema() + )); } #[test] @@ -600,10 +600,10 @@ mod tests { &writer_schema(), &reader_schema )); - assert_eq!( - SchemaCompatibility::can_read(&reader_schema, &writer_schema()), - false - ); + assert!(!SchemaCompatibility::can_read( + &reader_schema, + &writer_schema() + )); } #[test] @@ -642,10 +642,10 @@ mod tests { &writer_schema(), &reader_schema )); - assert_eq!( - SchemaCompatibility::can_read(&reader_schema, &writer_schema()), - false - ); + assert!(!SchemaCompatibility::can_read( + &reader_schema, + &writer_schema() + )); } #[test] @@ -659,14 +659,14 @@ mod tests { "#, ) .unwrap(); - assert_eq!( - SchemaCompatibility::can_read(&writer_schema(), &reader_schema), - false - ); - assert_eq!( - SchemaCompatibility::can_read(&reader_schema, &writer_schema()), - false - ); + assert!(!SchemaCompatibility::can_read( + &writer_schema(), + &reader_schema + )); + assert!(!SchemaCompatibility::can_read( + &reader_schema, + &writer_schema() + )); } #[test] @@ -678,10 +678,10 @@ mod tests { &string_array_schema(), &valid_reader )); - assert_eq!( - SchemaCompatibility::can_read(&string_array_schema(), &invalid_reader), - false - ); + assert!(!SchemaCompatibility::can_read( + &string_array_schema(), + &invalid_reader + )); } #[test] @@ -691,10 +691,10 @@ mod tests { &Schema::String, &valid_reader )); - assert_eq!( - SchemaCompatibility::can_read(&Schema::Int, &Schema::String), - false - ); + assert!(!SchemaCompatibility::can_read( + &Schema::Int, + &Schema::String + )); } #[test] @@ -703,10 +703,7 @@ mod tests { let union_writer = union_schema(vec![Schema::Int, Schema::String]); let union_reader = union_schema(vec![Schema::String]); - assert_eq!( - SchemaCompatibility::can_read(&union_writer, &union_reader), - false - ); + assert!(!SchemaCompatibility::can_read(&union_writer, &union_reader)); assert!(SchemaCompatibility::can_read(&union_reader, &union_writer)); } @@ -730,10 +727,7 @@ mod tests { ) .unwrap(); - assert_eq!( - SchemaCompatibility::can_read(&string_schema, &int_schema), - false - ); + assert!(!SchemaCompatibility::can_read(&string_schema, &int_schema)); } #[test] @@ -747,10 +741,7 @@ mod tests { let enum_schema2 = Schema::parse_str(r#"{"type":"enum", "name":"MyEnum", "symbols":["A","B","C"]}"#) .unwrap(); - assert_eq!( - SchemaCompatibility::can_read(&enum_schema2, &enum_schema1), - false - ); + assert!(!SchemaCompatibility::can_read(&enum_schema2, &enum_schema1)); assert!(SchemaCompatibility::can_read(&enum_schema1, &enum_schema2)); } @@ -827,10 +818,10 @@ mod tests { fn test_union_resolution_no_structure_match() { // short name match, but no structure match let read_schema = union_schema(vec![Schema::Null, point_3d_no_default_schema()]); - assert_eq!( - SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema), - false - ); + assert!(!SchemaCompatibility::can_read( + &point_2d_fullname_schema(), + &read_schema + )); } // TODO(nlopes): the below require named schemas to be fully supported. See: diff --git a/src/ser.rs b/src/ser.rs index b1445be..275b203 100644 --- a/src/ser.rs +++ b/src/ser.rs @@ -358,7 +358,7 @@ impl<'a> ser::SerializeTupleVariant for SeqVariantSerializer<'a> { } fn end(self) -> Result { - Ok(ser::SerializeSeq::end(self)?) + ser::SerializeSeq::end(self) } } @@ -775,7 +775,7 @@ mod tests { a: SingleValueInternalEnum::Double(64.0), }; - assert_eq!(to_value(test).is_err(), true); + assert!(to_value(test).is_err()); let test = TestSingleValueAdjacentEnum { a: SingleValueAdjacentEnum::Double(64.0), diff --git a/src/util.rs b/src/util.rs index c5f1ad6..8f04256 100644 --- a/src/util.rs +++ b/src/util.rs @@ -190,7 +190,7 @@ mod tests { #[test] fn test_overflow() { let causes_left_shift_overflow: &[u8] = &[0xe1, 0xe1, 0xe1, 0xe1, 0xe1]; - assert!(decode_variable(&mut &causes_left_shift_overflow[..]).is_err()); + assert!(decode_variable(&mut &*causes_left_shift_overflow).is_err()); } #[test] diff --git a/src/writer.rs b/src/writer.rs index c071125..805d364 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -261,7 +261,7 @@ impl<'a, W: Write> Writer<'a, W> { /// Append a raw Avro Value to the payload avoiding to encode it again. fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult { - self.append_bytes(encode_to_vec(&value, schema).as_ref()) + self.append_bytes(encode_to_vec(value, schema).as_ref()) } /// Append pure bytes to the payload. @@ -416,7 +416,7 @@ mod tests { assert_eq!(&schema, expected_schema); // The serialized format should be the same as the schema. let ser = to_avro_datum(&schema, value.clone())?; - let raw_ser = to_avro_datum(&raw_schema, raw_value)?; + let raw_ser = to_avro_datum(raw_schema, raw_value)?; assert_eq!(ser, raw_ser); // Should deserialize from the schema into the logical type. diff --git a/tests/io.rs b/tests/io.rs index b8b4d6e..2327ba6 100644 --- a/tests/io.rs +++ b/tests/io.rs @@ -85,7 +85,9 @@ fn test_validate() { let schema = Schema::parse_str(raw_schema).unwrap(); assert!( value.validate(&schema), - format!("value {:?} does not validate schema: {}", value, raw_schema) + "value {:?} does not validate schema: {}", + value, + raw_schema ); } } diff --git a/tests/schema.rs b/tests/schema.rs index 7d38616..b1cad75 100644 --- a/tests/schema.rs +++ b/tests/schema.rs @@ -690,8 +690,8 @@ fn test_parse_list_with_cross_deps_basic() { let schemas_first = Schema::parse_list(&schema_strs_first).expect("Test failed"); let schemas_second = Schema::parse_list(&schema_strs_second).expect("Test failed"); - let parsed_1 = Schema::parse_str(&schema_str_1).expect("Test failed"); - let parsed_2 = Schema::parse_str(&schema_composite).expect("Test failed"); + let parsed_1 = Schema::parse_str(schema_str_1).expect("Test failed"); + let parsed_2 = Schema::parse_str(schema_composite).expect("Test failed"); assert_eq!(schemas_first, vec!(parsed_1.clone(), parsed_2.clone())); assert_eq!(schemas_second, vec!(parsed_2, parsed_1)); } @@ -760,8 +760,8 @@ fn test_parse_list_with_cross_deps_and_namespaces() { let schemas_first = Schema::parse_list(&schema_strs_first).expect("Test failed"); let schemas_second = Schema::parse_list(&schema_strs_second).expect("Test failed"); - let parsed_1 = Schema::parse_str(&schema_str_1).expect("Test failed"); - let parsed_2 = Schema::parse_str(&schema_composite).expect("Test failed"); + let parsed_1 = Schema::parse_str(schema_str_1).expect("Test failed"); + let parsed_2 = Schema::parse_str(schema_composite).expect("Test failed"); assert_eq!(schemas_first, vec!(parsed_1.clone(), parsed_2.clone())); assert_eq!(schemas_second, vec!(parsed_2, parsed_1)); } @@ -1010,8 +1010,8 @@ fn test_namespace_prevents_collisions() { }"#; let parsed = Schema::parse_list(&[schema_str_1, schema_str_2]).expect("Test failed"); - let parsed_1 = Schema::parse_str(&schema_str_1).expect("Test failed"); - let parsed_2 = Schema::parse_str(&schema_str_2).expect("Test failed"); + let parsed_1 = Schema::parse_str(schema_str_1).expect("Test failed"); + let parsed_2 = Schema::parse_str(schema_str_2).expect("Test failed"); assert_eq!(parsed, vec!(parsed_1, parsed_2)); } @@ -1153,7 +1153,8 @@ fn test_root_error_is_not_swallowed_on_parse_error() -> Result<(), String> { if let Error::ParseSchemaJson(e) = error { assert!( e.to_string().contains("expected value at line 1 column 1"), - e.to_string() + "{}", + e ); Ok(()) } else { From 9cc2cc81efb36a3da4b8ccf193d01ff11b523236 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Mon, 13 Sep 2021 21:01:22 +0000 Subject: [PATCH 2/2] Made read_header a separate function. --- src/reader.rs | 92 ++++++++++++++++++++++++++++----------------------- 1 file changed, 51 insertions(+), 41 deletions(-) diff --git a/src/reader.rs b/src/reader.rs index 9fc8639..65e3bbd 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -6,6 +6,54 @@ use std::{ str::FromStr, }; +/// Reads the schema from `reader`, returning the file's [`Schema`] and [`Codec`]. +/// # Error +/// This function errors iff the header is not a valid avro file header. +pub fn read_schema(reader: &mut R) -> AvroResult<(Schema, Codec)> { + let meta_schema = Schema::Map(Box::new(Schema::Bytes)); + + let mut buf = [0u8; 4]; + reader.read_exact(&mut buf).map_err(Error::ReadHeader)?; + + if buf != [b'O', b'b', b'j', 1u8] { + return Err(Error::HeaderMagic); + } + + if let Value::Map(meta) = decode(&meta_schema, reader)? { + // TODO: surface original parse schema errors instead of coalescing them here + let json = meta + .get("avro.schema") + .and_then(|bytes| { + if let Value::Bytes(ref bytes) = *bytes { + from_slice(bytes.as_ref()).ok() + } else { + None + } + }) + .ok_or(Error::GetAvroSchemaFromMap)?; + let schema = Schema::parse(&json)?; + + let codec = if let Some(codec) = meta + .get("avro.codec") + .and_then(|codec| { + if let Value::Bytes(ref bytes) = *codec { + std::str::from_utf8(bytes.as_ref()).ok() + } else { + None + } + }) + .and_then(|codec| Codec::from_str(codec).ok()) + { + codec + } else { + Codec::Null + }; + Ok((schema, codec)) + } else { + Err(Error::GetHeaderMetadata) + } +} + // Internal Block reader. #[derive(Debug, Clone)] struct Block { @@ -39,47 +87,9 @@ impl Block { /// Try to read the header and to set the writer `Schema`, the `Codec` and the marker based on /// its content. fn read_header(&mut self) -> AvroResult<()> { - let meta_schema = Schema::Map(Box::new(Schema::Bytes)); - - let mut buf = [0u8; 4]; - self.reader - .read_exact(&mut buf) - .map_err(Error::ReadHeader)?; - - if buf != [b'O', b'b', b'j', 1u8] { - return Err(Error::HeaderMagic); - } - - if let Value::Map(meta) = decode(&meta_schema, &mut self.reader)? { - // TODO: surface original parse schema errors instead of coalescing them here - let json = meta - .get("avro.schema") - .and_then(|bytes| { - if let Value::Bytes(ref bytes) = *bytes { - from_slice(bytes.as_ref()).ok() - } else { - None - } - }) - .ok_or(Error::GetAvroSchemaFromMap)?; - self.writer_schema = Schema::parse(&json)?; - - if let Some(codec) = meta - .get("avro.codec") - .and_then(|codec| { - if let Value::Bytes(ref bytes) = *codec { - std::str::from_utf8(bytes.as_ref()).ok() - } else { - None - } - }) - .and_then(|codec| Codec::from_str(codec).ok()) - { - self.codec = codec; - } - } else { - return Err(Error::GetHeaderMetadata); - } + let (schema, codec) = read_schema(&mut self.reader)?; + self.writer_schema = schema; + self.codec = codec; self.reader .read_exact(&mut self.marker)