Skip to content

Commit

Permalink
Fix testQuick issues
Browse files Browse the repository at this point in the history
  • Loading branch information
aabharti-visa committed May 31, 2024
1 parent f73b8ab commit 2e9948a
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 8 deletions.
20 changes: 15 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,21 +338,31 @@ def _extract_record(
extra_topic_config: Optional[Dict[str, ConfigEntry]],
) -> Iterable[MetadataWorkUnit]:
AVRO = "AVRO"

kafka_entity = topic if len(topic) != 0 else subject
is_subject = False if len(topic) != 0 else True

logger.debug(f"kafka entity name = {kafka_entity}")

platform_urn = make_data_platform_urn(self.platform)

# 1. Create schemaMetadata aspect (pass control to SchemaRegistry)
schema_metadata = self.schema_registry_client.get_schema_metadata(
kafka_entity, platform_urn, is_subject=False
kafka_entity, platform_urn, is_subject
)
if schema_metadata is None:
return

logger.debug(f"schema name = {schema_metadata.schemaName}")
# topic can have no associated subject, but still it can be ingested without schema
if is_subject:
if schema_metadata is None:
return
dataset_name = schema_metadata.schemaName
else:
dataset_name = topic

# dataset_name = schema_metadata.schemaName if len(topic) == 0 else topic
# 2. Create the default dataset snapshot for the topic.
dataset_name = schema_metadata.schemaName if len(topic) == 0 else topic
# if schema_metadata is not None:
# dataset_name = schema_metadata.schemaName if len(topic) == 0 else topic
dataset_urn = make_dataset_urn_with_platform_instance(
platform=self.platform,
name=dataset_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ def __init__(
def get_assertion_workunits(
self, discovered_datasets: List[str]
) -> Iterable[MetadataWorkUnit]:

self.connection = self.create_connection()
if self.connection is None:
return
Expand All @@ -80,7 +79,6 @@ def get_assertion_workunits(
yield self._gen_platform_instance_wu(mcp.entityUrn)

def _gen_platform_instance_wu(self, urn: str) -> MetadataWorkUnit:

# Construct a MetadataChangeProposalWrapper object for assertion platform
return MetadataChangeProposalWrapper(
entityUrn=urn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,6 @@ def table_upstreams_only(

@staticmethod
def dmf_assertion_results(start_time_millis: int, end_time_millis: int) -> str:

pattern = r"datahub\\_\\_%"
escape_pattern = r"\\"
return f"""
Expand Down

0 comments on commit 2e9948a

Please sign in to comment.