Skip to content

Commit

Permalink
fix(ingestion): properly detect optional fields in avro schemas (#2343)
Browse files Browse the repository at this point in the history
Co-authored-by: thomas.larsson <[email protected]>
  • Loading branch information
thomasplarsson and thomas.larsson authored Apr 8, 2021
1 parent 4a47abf commit 4215dcd
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ def _get_column_type(field_type) -> SchemaFieldDataType:
return dt


def _is_nullable(field):
if isinstance(field.type, avro.schema.UnionSchema):
if any(schema.name == "null" for schema in field.type.schemas):
return True
else:
return False
elif isinstance(field.type, avro.schema.PrimitiveSchema):
return field.type.name == "null"
else:
return False


def avro_schema_to_mce_fields(avro_schema_string: str) -> List[SchemaField]:
"""Converts an avro schema into a schema compatible with MCE"""

Expand All @@ -71,7 +83,7 @@ def avro_schema_to_mce_fields(avro_schema_string: str) -> List[SchemaField]:
type=_get_column_type(parsed_field.type),
description=parsed_field.props.get("doc", None),
recursive=False,
nullable=(parsed_field.type == "null"),
nullable=_is_nullable(parsed_field),
)

fields.append(field)
Expand Down
63 changes: 63 additions & 0 deletions metadata-ingestion/tests/unit/test_schema_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import unittest

from datahub.ingestion.extractor.schema_util import avro_schema_to_mce_fields

EXAMPLE_EVENT_OPTIONAL_FIELD_VIA_UNION_TYPE = """
{
"type": "record",
"name": "some.event.name",
"namespace": "some.event.namespace",
"fields": [
{
"name": "my.field",
"type": ["null", "string"],
"doc": "some.doc"
}
]
}
"""

EXAMPLE_EVENT_OPTIONAL_FIELD_VIA_UNION_TYPE_NULL_ISNT_FIRST_IN_UNION = """
{
"type": "record",
"name": "some.event.name",
"namespace": "some.event.namespace",
"fields": [
{
"name": "my.field",
"type": ["string", "null"],
"doc": "some.doc"
}
]
}
"""

EXAMPLE_EVENT_OPTIONAL_FIELD_VIA_PRIMITIVE_TYPE = """
{
"type": "record",
"name": "some.event.name",
"namespace": "some.event.namespace",
"fields": [
{
"name": "my.field",
"type": "null",
"doc": "some.doc"
}
]
}
"""


class SchemaUtilTest(unittest.TestCase):
def test_avro_schema_to_mce_fields_events_with_nullable_fields(self):

events = [
EXAMPLE_EVENT_OPTIONAL_FIELD_VIA_UNION_TYPE,
EXAMPLE_EVENT_OPTIONAL_FIELD_VIA_UNION_TYPE_NULL_ISNT_FIRST_IN_UNION,
EXAMPLE_EVENT_OPTIONAL_FIELD_VIA_PRIMITIVE_TYPE,
]

for event in events:
fields = avro_schema_to_mce_fields(event)
self.assertEqual(1, len(fields))
self.assertTrue(fields[0].nullable)

0 comments on commit 4215dcd

Please sign in to comment.