Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/kafka)-Add support for ingesting schemas from schema registry #10612

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -231,16 +231,25 @@ def get_schemas_from_confluent_ref_json(
return all_schemas

def _get_schema_and_fields(
self, topic: str, is_key_schema: bool
self, topic: str, is_key_schema: bool, is_subject: bool
) -> Tuple[Optional[Schema], List[SchemaField]]:
schema: Optional[Schema] = None
schema_type_str: str = "key" if is_key_schema else "value"
topic_subject: Optional[str] = self._get_subject_for_topic(
topic=topic, is_key_schema=is_key_schema
)
kafka_entity = "subject" if is_subject else "topic"

# if provided schema as topic, assuming it as value subject
schema_type_str: Optional[str] = "value"
topic_subject: Optional[str] = None
if not is_subject:
schema_type_str = "key" if is_key_schema else "value"
topic_subject = self._get_subject_for_topic(
topic=topic, is_key_schema=is_key_schema
)
else:
topic_subject = topic

if topic_subject is not None:
logger.debug(
f"The {schema_type_str} schema subject:'{topic_subject}' is found for topic:'{topic}'."
f"The {schema_type_str} schema subject:'{topic_subject}' is found for {kafka_entity}:'{topic}'."
)
try:
registered_schema = self.schema_registry_client.get_latest_version(
Expand All @@ -249,29 +258,31 @@ def _get_schema_and_fields(
schema = registered_schema.schema
except Exception as e:
logger.warning(
f"For topic: {topic}, failed to get {schema_type_str} schema from schema registry using subject:'{topic_subject}': {e}."
f"For {kafka_entity}: {topic}, failed to get {schema_type_str} schema from schema registry using subject:'{topic_subject}': {e}."
)
self.report.report_warning(
topic,
f"failed to get {schema_type_str} schema from schema registry using subject:'{topic_subject}': {e}.",
)
else:
logger.debug(
f"For topic: {topic}, the schema registry subject for the {schema_type_str} schema is not found."
f"For {kafka_entity}: {topic}, the schema registry subject for the {schema_type_str} schema is not found."
)
if not is_key_schema:
# Value schema is always expected. Report a warning.
self.report.report_warning(
topic,
f"The schema registry subject for the {schema_type_str} schema is not found."
f" The topic is either schema-less, or no messages have been written to the topic yet.",
f" The {kafka_entity} is either schema-less, or no messages have been written to the {kafka_entity} yet.",
)

# Obtain the schema fields from schema for the topic.
fields: List[SchemaField] = []
if schema is not None:
fields = self._get_schema_fields(
topic=topic, schema=schema, is_key_schema=is_key_schema
topic=topic,
schema=schema,
is_key_schema=is_key_schema,
)
return (schema, fields)

Expand Down Expand Up @@ -352,16 +363,21 @@ def _get_schema_fields(
return fields

def _get_schema_metadata(
self, topic: str, platform_urn: str
self, topic: str, platform_urn: str, is_subject: bool
) -> Optional[SchemaMetadata]:

# Process the value schema
schema, fields = self._get_schema_and_fields(
topic=topic, is_key_schema=False
topic=topic,
is_key_schema=False,
is_subject=is_subject,
) # type: Tuple[Optional[Schema], List[SchemaField]]

# Process the key schema
key_schema, key_fields = self._get_schema_and_fields(
topic=topic, is_key_schema=True
topic=topic,
is_key_schema=True,
is_subject=is_subject,
) # type:Tuple[Optional[Schema], List[SchemaField]]

# Create the schemaMetadata aspect.
Expand All @@ -388,17 +404,22 @@ def _get_schema_metadata(
return None

def get_schema_metadata(
self, topic: str, platform_urn: str
self, topic: str, platform_urn: str, is_subject: bool
) -> Optional[SchemaMetadata]:
logger.debug(f"Inside _get_schema_metadata {topic} {platform_urn}")
logger.debug(f"Inside get_schema_metadata {topic} {platform_urn}")

# Process the value schema
schema, fields = self._get_schema_and_fields(
topic=topic, is_key_schema=False
topic=topic,
is_key_schema=False,
is_subject=is_subject,
) # type: Tuple[Optional[Schema], List[SchemaField]]

# Process the key schema
key_schema, key_fields = self._get_schema_and_fields(
topic=topic, is_key_schema=True
topic=topic,
is_key_schema=True,
is_subject=is_subject,
) # type:Tuple[Optional[Schema], List[SchemaField]]

# Create the schemaMetadata aspect.
Expand All @@ -423,3 +444,6 @@ def get_schema_metadata(
fields=key_fields + fields,
)
return None

def get_subjects(self) -> List[str]:
return self.known_schema_registry_subjects
80 changes: 58 additions & 22 deletions metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,34 +303,63 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
).topics
extra_topic_details = self.fetch_extra_topic_details(topics.keys())

for t, t_detail in topics.items():
self.report.report_topic_scanned(t)
if self.source_config.topic_patterns.allowed(t):
for topic, topic_detail in topics.items():
aabharti-visa marked this conversation as resolved.
Show resolved Hide resolved
self.report.report_topic_scanned(topic)
if self.source_config.topic_patterns.allowed(topic):
try:
yield from self._extract_record(
t, t_detail, extra_topic_details.get(t)
topic, False, topic_detail, extra_topic_details.get(topic)
)
except Exception as e:
logger.warning(f"Failed to extract topic {t}", exc_info=True)
logger.warning(f"Failed to extract topic {topic}", exc_info=True)
self.report.report_warning(
"topic", f"Exception while extracting topic {t}: {e}"
"topic", f"Exception while extracting topic {topic}: {e}"
)
else:
self.report.report_dropped(t)
self.report.report_dropped(topic)

# Get all subjects from schema registry and ingest them as SCHEMA DatasetSubTypes
for subject in self.schema_registry_client.get_subjects():
try:
yield from self._extract_record(
subject, True, topic_detail=None, extra_topic_config=None
)
except Exception as e:
logger.warning(f"Failed to extract subject {subject}", exc_info=True)
self.report.report_warning(
"subject", f"Exception while extracting topic {subject}: {e}"
)

def _extract_record(
self,
topic: str,
is_subject: bool,
topic_detail: Optional[TopicMetadata],
extra_topic_config: Optional[Dict[str, ConfigEntry]],
) -> Iterable[MetadataWorkUnit]:
logger.debug(f"topic = {topic}")

AVRO = "AVRO"

# 1. Create the default dataset snapshot for the topic.
dataset_name = topic
kafka_entity = "subject" if is_subject else "topic"

logger.debug(f"extracting schema metadata from kafka entity = {kafka_entity}")

platform_urn = make_data_platform_urn(self.platform)

# 1. Create schemaMetadata aspect (pass control to SchemaRegistry)
schema_metadata = self.schema_registry_client.get_schema_metadata(
topic, platform_urn, is_subject
)

# topic can have no associated subject, but still it can be ingested without schema
# for schema ingestion, ingest only if it has valid schema
if is_subject:
if schema_metadata is None:
return
dataset_name = schema_metadata.schemaName
else:
dataset_name = topic

# 2. Create the default dataset snapshot for the topic.
dataset_urn = make_dataset_urn_with_platform_instance(
platform=self.platform,
name=dataset_name,
Expand All @@ -342,10 +371,6 @@ def _extract_record(
aspects=[Status(removed=False)], # we append to this list later on
)

# 2. Attach schemaMetadata aspect (pass control to SchemaRegistry)
schema_metadata = self.schema_registry_client.get_schema_metadata(
topic, platform_urn
)
if schema_metadata is not None:
dataset_snapshot.aspects.append(schema_metadata)

Expand All @@ -356,9 +381,19 @@ def _extract_record(
browse_path = BrowsePathsClass([browse_path_str])
dataset_snapshot.aspects.append(browse_path)

custom_props = self.build_custom_properties(
topic, topic_detail, extra_topic_config
)
# build custom properties for topic, schema properties may be added as needed
custom_props: Dict[str, str] = {}
if not is_subject:
custom_props = self.build_custom_properties(
topic, topic_detail, extra_topic_config
)
schema_name: Optional[
str
] = self.schema_registry_client._get_subject_for_topic(
topic, is_key_schema=False
)
if schema_name is not None:
custom_props["Schema Name"] = schema_name

# 4. Set dataset's description, tags, ownership, etc, if topic schema type is avro
description: Optional[str] = None
Expand Down Expand Up @@ -414,7 +449,7 @@ def _extract_record(
)

dataset_properties = DatasetPropertiesClass(
name=topic, customProperties=custom_props, description=description
name=dataset_name, customProperties=custom_props, description=description
)
dataset_snapshot.aspects.append(dataset_properties)

Expand All @@ -431,12 +466,13 @@ def _extract_record(

# 6. Emit the datasetSnapshot MCE
mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
yield MetadataWorkUnit(id=f"kafka-{topic}", mce=mce)
yield MetadataWorkUnit(id=f"kafka-{kafka_entity}", mce=mce)

# 7. Add the subtype aspect marking this as a "topic"
# 7. Add the subtype aspect marking this as a "topic" or "schema"
typeName = DatasetSubTypes.SCHEMA if is_subject else DatasetSubTypes.TOPIC
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=SubTypesClass(typeNames=[DatasetSubTypes.TOPIC]),
aspect=SubTypesClass(typeNames=[typeName]),
).as_workunit()

domain_urn: Optional[str] = None
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
from abc import ABC, abstractmethod
from typing import Optional
from typing import List, Optional

from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaMetadata


class KafkaSchemaRegistryBase(ABC):
@abstractmethod
def get_schema_metadata(
self, topic: str, platform_urn: str
self, topic: str, platform_urn: str, is_subject: bool
) -> Optional[SchemaMetadata]:
pass

@abstractmethod
def get_subjects(self) -> List[str]:
pass

@abstractmethod
def _get_subject_for_topic(
self, dataset_subtype: str, is_key_schema: bool
) -> Optional[str]:
pass
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ def __init__(
def get_assertion_workunits(
self, discovered_datasets: List[str]
) -> Iterable[MetadataWorkUnit]:

self.connection = self.create_connection()
if self.connection is None:
return
Expand All @@ -80,7 +79,6 @@ def get_assertion_workunits(
yield self._gen_platform_instance_wu(mcp.entityUrn)

def _gen_platform_instance_wu(self, urn: str) -> MetadataWorkUnit:

# Construct a MetadataChangeProposalWrapper object for assertion platform
return MetadataChangeProposalWrapper(
entityUrn=urn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,6 @@ def table_upstreams_only(

@staticmethod
def dmf_assertion_results(start_time_millis: int, end_time_millis: int) -> str:

pattern = r"datahub\\_\\_%"
escape_pattern = r"\\"
return f"""
Expand Down
Loading
Loading