diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 8ba83768512a5..5bc0e66fa2ff1 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -31,7 +31,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe urn:li:dataset:(urn:li:dataPlatform:powerbi,[.]...,) ``` - The config `include_workspace_name_in_dataset_urn` is default to `false` for backward compatiblity, However, we recommend enabling this flag after performing the necessary cleanup. + The config `include_workspace_name_in_dataset_urn` is default to `false` for backward compatibility, However, we recommend enabling this flag after performing the necessary cleanup. If stateful ingestion is enabled, running ingestion with the latest CLI version will handle the cleanup automatically. Otherwise, we recommend soft deleting all powerbi data via the DataHub CLI: `datahub delete --platform powerbi --soft` and then re-ingest with the latest CLI version, ensuring the `include_workspace_name_in_dataset_urn` configuration is set to true. @@ -39,6 +39,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe - #11742: For PowerBi ingestion, `use_powerbi_email` is now enabled by default when extracting ownership information. - #12056: The DataHub Airflow plugin no longer supports Airflow 2.1 and Airflow 2.2. - #12056: The DataHub Airflow plugin now defaults to the v2 plugin implementation. +- #12077: `Kafka` source no longer ingests schemas from schema registry as separate entities by default, set `ingest_schemas_as_entities` to `true` to ingest them - OpenAPI Update: PIT Keep Alive parameter added to scroll. NOTE: This parameter requires the `pointInTimeCreationEnabled` feature flag to be enabled and the `elasticSearch.implementation` configuration to be `elasticsearch`. This feature is not supported for OpenSearch at this time and the parameter will not be respected without both of these set. - OpenAPI Update 2: Previously there was an incorrectly marked parameter named `sort` on the generic list entities endpoint for v3. This parameter is deprecated and only supports a single string value while the documentation indicates it supports a list of strings. This documentation error has been fixed and the correct field, `sortCriteria`, is now documented which supports a list of strings. diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py index 709ba431f0f87..fa842a15ba732 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py @@ -141,6 +141,10 @@ class KafkaSourceConfig( default=False, description="Disables the utilization of the TopicRecordNameStrategy for Schema Registry subjects. For more information, visit: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#handling-differences-between-preregistered-and-client-derived-schemas:~:text=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy", ) + ingest_schemas_as_entities: bool = pydantic.Field( + default=False, + description="Enables ingesting schemas from schema registry as separate entities, in addition to the topics", + ) def get_kafka_consumer( @@ -343,17 +347,20 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: else: self.report.report_dropped(topic) - # Get all subjects from schema registry and ingest them as SCHEMA DatasetSubTypes - for subject in self.schema_registry_client.get_subjects(): - try: - yield from self._extract_record( - subject, True, topic_detail=None, extra_topic_config=None - ) - except Exception as e: - logger.warning(f"Failed to extract subject {subject}", exc_info=True) - self.report.report_warning( - "subject", f"Exception while extracting topic {subject}: {e}" - ) + if self.source_config.ingest_schemas_as_entities: + # Get all subjects from schema registry and ingest them as SCHEMA DatasetSubTypes + for subject in self.schema_registry_client.get_subjects(): + try: + yield from self._extract_record( + subject, True, topic_detail=None, extra_topic_config=None + ) + except Exception as e: + logger.warning( + f"Failed to extract subject {subject}", exc_info=True + ) + self.report.report_warning( + "subject", f"Exception while extracting topic {subject}: {e}" + ) def _extract_record( self, diff --git a/metadata-ingestion/tests/integration/kafka/kafka_to_file.yml b/metadata-ingestion/tests/integration/kafka/kafka_to_file.yml index 380df845e737c..cde21d85ed2d9 100644 --- a/metadata-ingestion/tests/integration/kafka/kafka_to_file.yml +++ b/metadata-ingestion/tests/integration/kafka/kafka_to_file.yml @@ -3,6 +3,7 @@ run_id: kafka-test source: type: kafka config: + ingest_schemas_as_entities: true connection: bootstrap: "localhost:29092" schema_registry_url: "http://localhost:28081" diff --git a/metadata-ingestion/tests/integration/kafka/kafka_without_schemas_mces_golden.json b/metadata-ingestion/tests/integration/kafka/kafka_without_schemas_mces_golden.json new file mode 100644 index 0000000000000..7810c8077b31d --- /dev/null +++ b/metadata-ingestion/tests/integration/kafka/kafka_without_schemas_mces_golden.json @@ -0,0 +1,575 @@ +[ +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_topic,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "key_topic", + "platform": "urn:li:dataPlatform:kafka", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "44fd7a7b325d6fdd4275b1f02a79c1a8", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.KafkaSchema": { + "documentSchema": "", + "keySchema": "{\"type\":\"record\",\"name\":\"UserKey\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Key schema for kafka topic\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"namespace\",\"type\":\"string\"}]}", + "keySchemaType": "AVRO" + } + }, + "fields": [ + { + "fieldPath": "[version=2.0].[key=True].[type=UserKey].[type=long].id", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "id", + "recursive": false, + "isPartOfKey": true + }, + { + "fieldPath": "[version=2.0].[key=True].[type=UserKey].[type=string].namespace", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "namespace", + "recursive": false, + "isPartOfKey": true + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/prod/kafka" + ] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "Partitions": "1", + "Replication Factor": "1", + "min.insync.replicas": "1", + "retention.bytes": "-1", + "retention.ms": "604800000", + "cleanup.policy": "delete", + "max.message.bytes": "1048588", + "unclean.leader.election.enable": "false" + }, + "name": "key_topic", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_topic,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Topic" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_topic,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "key_value_topic", + "platform": "urn:li:dataPlatform:kafka", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "a79a2fe3adab60b21d272a9cc3e93595", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.KafkaSchema": { + "documentSchema": "{\"type\":\"record\",\"name\":\"CreateUserRequest\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Value schema for kafka topic\",\"fields\":[{\"name\":\"email\",\"type\":\"string\",\"tags\":[\"Email\"]},{\"name\":\"firstName\",\"type\":\"string\",\"tags\":[\"Name\"]},{\"name\":\"lastName\",\"type\":\"string\",\"tags\":[\"Name\"]}],\"tags\":[\"PII\"]}", + "documentSchemaType": "AVRO", + "keySchema": "{\"type\":\"record\",\"name\":\"UserKey\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Key schema for kafka topic\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"namespace\",\"type\":\"string\"}]}", + "keySchemaType": "AVRO" + } + }, + "fields": [ + { + "fieldPath": "[version=2.0].[key=True].[type=UserKey].[type=long].id", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "id", + "recursive": false, + "isPartOfKey": true + }, + { + "fieldPath": "[version=2.0].[key=True].[type=UserKey].[type=string].namespace", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "namespace", + "recursive": false, + "isPartOfKey": true + }, + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].email", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "email", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Email" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Email\"]}" + }, + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].firstName", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "firstName", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Name\"]}" + }, + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].lastName", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "lastName", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Name\"]}" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/prod/kafka" + ] + } + }, + { + "com.linkedin.pegasus2avro.common.GlobalTags": { + "tags": [ + { + "tag": "urn:li:tag:PII" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "Partitions": "1", + "Replication Factor": "1", + "min.insync.replicas": "1", + "retention.bytes": "-1", + "retention.ms": "604800000", + "cleanup.policy": "delete", + "max.message.bytes": "1048588", + "unclean.leader.election.enable": "false", + "Schema Name": "key_value_topic-value" + }, + "name": "key_value_topic", + "description": "Value schema for kafka topic", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Topic" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic,PROD)", + "changeType": "UPSERT", + "aspectName": "domains", + "aspect": { + "json": { + "domains": [ + "urn:li:domain:sales" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,value_topic,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "value_topic", + "platform": "urn:li:dataPlatform:kafka", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "62c7c400ec5760797a59c45e59c2f2dc", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.KafkaSchema": { + "documentSchema": "{\"type\":\"record\",\"name\":\"CreateUserRequest\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Value schema for kafka topic\",\"fields\":[{\"name\":\"email\",\"type\":\"string\",\"tags\":[\"Email\"]},{\"name\":\"firstName\",\"type\":\"string\",\"tags\":[\"Name\"]},{\"name\":\"lastName\",\"type\":\"string\",\"tags\":[\"Name\"]}],\"tags\":[\"PII\"]}", + "documentSchemaType": "AVRO", + "keySchema": "\"string\"", + "keySchemaType": "AVRO" + } + }, + "fields": [ + { + "fieldPath": "[version=2.0].[key=True].[type=string]", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": true + }, + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].email", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "email", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Email" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Email\"]}" + }, + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].firstName", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "firstName", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Name\"]}" + }, + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].lastName", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "lastName", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Name\"]}" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/prod/kafka" + ] + } + }, + { + "com.linkedin.pegasus2avro.common.GlobalTags": { + "tags": [ + { + "tag": "urn:li:tag:PII" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "Partitions": "1", + "Replication Factor": "1", + "min.insync.replicas": "1", + "retention.bytes": "-1", + "retention.ms": "604800000", + "cleanup.policy": "delete", + "max.message.bytes": "1048588", + "unclean.leader.election.enable": "false", + "Schema Name": "value_topic-value" + }, + "name": "value_topic", + "description": "Value schema for kafka topic", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,value_topic,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Topic" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,value_topic,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:Email", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "Email" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:Name", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "Name" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:PII", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "PII" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/kafka/kafka_without_schemas_to_file.yml b/metadata-ingestion/tests/integration/kafka/kafka_without_schemas_to_file.yml new file mode 100644 index 0000000000000..7f44e43c3c490 --- /dev/null +++ b/metadata-ingestion/tests/integration/kafka/kafka_without_schemas_to_file.yml @@ -0,0 +1,16 @@ +run_id: kafka-test + +source: + type: kafka + config: + connection: + bootstrap: "localhost:29092" + schema_registry_url: "http://localhost:28081" + domain: + "urn:li:domain:sales": + allow: + - "key_value_topic" +sink: + type: file + config: + filename: "./kafka_without_schemas_mces.json" diff --git a/metadata-ingestion/tests/integration/kafka/test_kafka.py b/metadata-ingestion/tests/integration/kafka/test_kafka.py index bf0ec1845a66c..0d9a714625e96 100644 --- a/metadata-ingestion/tests/integration/kafka/test_kafka.py +++ b/metadata-ingestion/tests/integration/kafka/test_kafka.py @@ -43,20 +43,21 @@ def mock_kafka_service(docker_compose_runner, test_resources_dir): yield docker_compose_runner +@pytest.mark.parametrize("approach", ["kafka_without_schemas", "kafka"]) @freeze_time(FROZEN_TIME) @pytest.mark.integration def test_kafka_ingest( - mock_kafka_service, test_resources_dir, pytestconfig, tmp_path, mock_time + mock_kafka_service, test_resources_dir, pytestconfig, tmp_path, mock_time, approach ): # Run the metadata ingestion pipeline. - config_file = (test_resources_dir / "kafka_to_file.yml").resolve() + config_file = (test_resources_dir / f"{approach}_to_file.yml").resolve() run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path) # Verify the output. mce_helpers.check_golden_file( pytestconfig, - output_path=tmp_path / "kafka_mces.json", - golden_path=test_resources_dir / "kafka_mces_golden.json", + output_path=tmp_path / f"{approach}_mces.json", + golden_path=test_resources_dir / f"{approach}_mces_golden.json", ignore_paths=[], ) diff --git a/metadata-ingestion/tests/unit/test_kafka_source.py b/metadata-ingestion/tests/unit/test_kafka_source.py index cab0a2bce7ba8..1a8afe1b956fa 100644 --- a/metadata-ingestion/tests/unit/test_kafka_source.py +++ b/metadata-ingestion/tests/unit/test_kafka_source.py @@ -330,6 +330,7 @@ def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]: "topic2-key": "test.acryl.Topic2Key", "topic2-value": "test.acryl.Topic2Value", }, + "ingest_schemas_as_entities": True, } ctx = PipelineContext(run_id="test") kafka_source = KafkaSource.create(source_config, ctx) @@ -478,8 +479,7 @@ def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]: kafka_source = KafkaSource.create(source_config, ctx) workunits = list(kafka_source.get_workunits()) - - assert len(workunits) == 6 + assert len(workunits) == 2 if ignore_warnings_on_schema_type: assert not kafka_source.report.warnings else: @@ -622,6 +622,7 @@ def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]: kafka_source = KafkaSource.create( { "connection": {"bootstrap": "localhost:9092"}, + "ingest_schemas_as_entities": True, "meta_mapping": { "owner": { "match": "^@(.*)",