From 860d475c5edb68baf868c2a78db5c2ae7aedbc89 Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Mon, 11 Jul 2022 09:37:38 -0700 Subject: [PATCH 01/11] feat(ingest): improve domain ingestion usability (#5366) --- docs/domains.md | 44 +++++++- .../src/datahub/configuration/common.py | 4 +- .../src/datahub/emitter/rest_emitter.py | 10 ++ .../src/datahub/ingestion/api/sink.py | 4 + .../src/datahub/ingestion/graph/client.py | 83 +++++++++++++- .../src/datahub/ingestion/run/pipeline.py | 101 +++++++++++++----- .../datahub/ingestion/sink/datahub_rest.py | 6 ++ .../src/datahub/ingestion/source/kafka.py | 10 +- .../ingestion/source/sql/sql_common.py | 31 ++++-- .../datahub/utilities/registries/__init__.py | 0 .../utilities/registries/domain_registry.py | 43 ++++++++ .../tests/integration/hana/hana_to_file.yml | 2 +- .../tests/integration/kafka/kafka_to_file.yml | 2 +- .../tests/integration/mysql/mysql_to_file.yml | 2 +- .../mysql/mysql_to_file_dbalias.yml | 2 +- 15 files changed, 303 insertions(+), 41 deletions(-) create mode 100644 metadata-ingestion/src/datahub/utilities/registries/__init__.py create mode 100644 metadata-ingestion/src/datahub/utilities/registries/domain_registry.py diff --git a/docs/domains.md b/docs/domains.md index a2a6f9b262440c..3f409ead1495b9 100644 --- a/docs/domains.md +++ b/docs/domains.md @@ -47,8 +47,11 @@ By default, you don't need to worry about this. DataHub will auto-generate an un Once you've chosen a name and a description, click 'Create' to create the new Domain. -## Assigning an Asset to a Domain +## Assigning an Asset to a Domain +You can assign assets to Domain using the UI or programmatically using the API or during ingestion. + +### UI-Based Assignment To assign an asset to a Domain, simply navigate to the asset's profile page. At the bottom left-side menu bar, you'll see a 'Domain' section. Click 'Set Domain', and then search for the Domain you'd like to add to. When you're done, click 'Add'. @@ -59,6 +62,45 @@ To remove an asset from a Domain, click the 'x' icon on the Domain tag. > Notice: Adding or removing an asset from a Domain requires the `Edit Domain` Metadata Privilege, which can be granted > by a [Policy](authorization/policies.md). +### Ingestion-time Assignment +All SQL-based ingestion sources support assigning domains during ingestion using the `domain` configuration. Consult your source's configuration details page (e.g. [Snowflake](./generated/ingestion/sources/snowflake.md)), to verify that it supports the Domain capability. + +:::note + +Assignment of domains during ingestion will overwrite domains that you have assigned in the UI. A single table can only belong to one domain. + +::: + + +Here is a quick example of a snowflake ingestion recipe that has been enhanced to attach the **Analytics** domain to all tables in the **long_tail_companions** database in the **analytics** schema, and the **Finance** domain to all tables in the **long_tail_companions** database in the **ecommerce** schema. + +```yaml +source: + type: snowflake + config: + username: ${SNOW_USER} + password: ${SNOW_PASS} + account_id: + warehouse: COMPUTE_WH + role: accountadmin + database_pattern: + allow: + - "long_tail_companions" + schema_pattern: + deny: + - information_schema + profiling: + enabled: False + domain: + Analytics: + allow: + - "long_tail_companions.analytics.*" + Finance: + allow: + - "long_tail_companions.ecommerce.*" +``` + + ## Searching by Domain diff --git a/metadata-ingestion/src/datahub/configuration/common.py b/metadata-ingestion/src/datahub/configuration/common.py index be3c6a13d3599b..c342a7cd33424b 100644 --- a/metadata-ingestion/src/datahub/configuration/common.py +++ b/metadata-ingestion/src/datahub/configuration/common.py @@ -105,11 +105,11 @@ class AllowDenyPattern(ConfigModel): allow: List[str] = Field( default=[".*"], - description="List of regex patterns for process groups to include in ingestion", + description="List of regex patterns to include in ingestion", ) deny: List[str] = Field( default=[], - description="List of regex patterns for process groups to exclude from ingestion.", + description="List of regex patterns to exclude from ingestion.", ) ignoreCase: Optional[bool] = Field( default=True, diff --git a/metadata-ingestion/src/datahub/emitter/rest_emitter.py b/metadata-ingestion/src/datahub/emitter/rest_emitter.py index ab96fea8c2c983..a20489163c0de3 100644 --- a/metadata-ingestion/src/datahub/emitter/rest_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/rest_emitter.py @@ -249,6 +249,16 @@ def _emit_generic(self, url: str, payload: str) -> None: "Unable to emit metadata to DataHub GMS", {"message": str(e)} ) from e + def __repr__(self) -> str: + token_str = ( + f" with token: {self._token[:4]}**********{self._token[-4:]}" + if self._token + else "" + ) + return ( + f"DataHubRestEmitter: configured to talk to {self._gms_server}{token_str}" + ) + class DatahubRestEmitter(DataHubRestEmitter): """This class exists as a pass-through for backwards compatibility""" diff --git a/metadata-ingestion/src/datahub/ingestion/api/sink.py b/metadata-ingestion/src/datahub/ingestion/api/sink.py index 56030987bce5da..1fa961dd42836a 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/sink.py +++ b/metadata-ingestion/src/datahub/ingestion/api/sink.py @@ -109,3 +109,7 @@ def get_report(self) -> SinkReport: @abstractmethod def close(self) -> None: pass + + def configured(self) -> str: + """Override this method to output a human-readable and scrubbed version of the configured sink""" + return "" diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index 04f792f7244714..2e876bfa8d15bc 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -2,7 +2,7 @@ import logging import os from json.decoder import JSONDecodeError -from typing import Any, Dict, List, Optional, Type +from typing import Any, Dict, Iterable, List, Optional, Type from avro.schema import RecordSchema from deprecated import deprecated @@ -287,3 +287,84 @@ def get_latest_timeseries_value( f"Failed to find {aspect_type} in response {aspect_json}" ) return None + + def _get_search_endpoint(self): + return f"{self.config.server}/entities?action=search" + + def get_domain_urn_by_name(self, domain_name: str) -> Optional[str]: + """Retrieve a domain urn based on its name. Returns None if there is no match found""" + + filters = [] + filter_criteria = [ + { + "field": "name", + "value": domain_name, + "condition": "EQUAL", + } + ] + + filters.append({"and": filter_criteria}) + search_body = { + "input": "*", + "entity": "domain", + "start": 0, + "count": 10, + "filter": {"or": filters}, + } + results: Dict = self._post_generic(self._get_search_endpoint(), search_body) + num_entities = results.get("value", {}).get("numEntities", 0) + if num_entities > 1: + logger.warning( + f"Got {num_entities} results for domain name {domain_name}. Will return the first match." + ) + entities_yielded: int = 0 + entities = [] + for x in results["value"]["entities"]: + entities_yielded += 1 + logger.debug(f"yielding {x['entity']}") + entities.append(x["entity"]) + return entities[0] if entities_yielded else None + + def get_container_urns_by_filter( + self, + env: Optional[str] = None, + search_query: str = "*", + ) -> Iterable[str]: + """Return container urns that match based on query""" + url = self._get_search_endpoint() + + container_filters = [] + for container_subtype in ["Database", "Schema", "Project", "Dataset"]: + filter_criteria = [] + + filter_criteria.append( + { + "field": "customProperties", + "value": f"instance={env}", + "condition": "EQUAL", + } + ) + + filter_criteria.append( + { + "field": "typeNames", + "value": container_subtype, + "condition": "EQUAL", + } + ) + container_filters.append({"and": filter_criteria}) + search_body = { + "input": search_query, + "entity": "container", + "start": 0, + "count": 10000, + "filter": {"or": container_filters}, + } + results: Dict = self._post_generic(url, search_body) + num_entities = results["value"]["numEntities"] + logger.debug(f"Matched {num_entities} containers") + entities_yielded: int = 0 + for x in results["value"]["entities"]: + entities_yielded += 1 + logger.debug(f"yielding {x['entity']}") + yield x["entity"] diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index d0b1e300b9184b..0c2b4082f68251 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -120,6 +120,11 @@ class Pipeline: sink: Sink transformers: List[Transformer] + def _record_initialization_failure(self, e: Exception, msg: str) -> None: + self.pipeline_init_exception: Optional[Exception] = e + self.pipeline_init_failures: Optional[str] = f"{msg} due to {e}" + logger.error(e) + def __init__( self, config: PipelineConfig, @@ -138,23 +143,59 @@ def __init__( dry_run=dry_run, preview_mode=preview_mode, ) + self.pipeline_init_failures = None + self.pipeline_init_exception = None sink_type = self.config.sink.type - sink_class = sink_registry.get(sink_type) - sink_config = self.config.sink.dict().get("config") or {} - self.sink: Sink = sink_class.create(sink_config, self.ctx) - logger.debug(f"Sink type:{self.config.sink.type},{sink_class} configured") - - source_type = self.config.source.type - source_class = source_registry.get(source_type) - self.source: Source = source_class.create( - self.config.source.dict().get("config", {}), self.ctx - ) - logger.debug(f"Source type:{source_type},{source_class} configured") + try: + sink_class = sink_registry.get(sink_type) + except Exception as e: + self._record_initialization_failure(e, "Failed to create a sink") + return + + try: + sink_config = self.config.sink.dict().get("config") or {} + self.sink: Sink = sink_class.create(sink_config, self.ctx) + logger.debug(f"Sink type:{self.config.sink.type},{sink_class} configured") + logger.info(f"Sink configured successfully. {self.sink.configured()}") + except Exception as e: + self._record_initialization_failure( + e, f"Failed to configure sink ({sink_type})" + ) + return + + try: + source_type = self.config.source.type + source_class = source_registry.get(source_type) + except Exception as e: + self._record_initialization_failure(e, "Failed to create source") + return + + try: + self.source: Source = source_class.create( + self.config.source.dict().get("config", {}), self.ctx + ) + logger.debug(f"Source type:{source_type},{source_class} configured") + except Exception as e: + self._record_initialization_failure( + e, f"Failed to configure source ({source_type})" + ) + return - self.extractor_class = extractor_registry.get(self.config.source.extractor) + try: + self.extractor_class = extractor_registry.get(self.config.source.extractor) + except Exception as e: + self._record_initialization_failure( + e, f"Failed to configure extractor ({self.config.source.extractor})" + ) + return + + try: + self._configure_transforms() + except ValueError as e: + self._record_initialization_failure(e, "Failed to configure transformers") + return - self._configure_transforms() self._configure_reporting() def _configure_transforms(self) -> None: @@ -209,6 +250,10 @@ def create( def run(self) -> None: callback = LoggingCallback() + if self.pipeline_init_failures: + # no point continuing, return early + return + extractor: Extractor = self.extractor_class() for wu in itertools.islice( self.source.get_workunits(), @@ -296,6 +341,9 @@ def process_commits(self) -> None: logger.info(f"Successfully committed changes for {name}.") def raise_from_status(self, raise_warnings: bool = False) -> None: + if self.pipeline_init_exception: + raise self.pipeline_init_exception + if self.source.get_report().failures: raise PipelineExecutionError( "Source reported errors", self.source.get_report() @@ -310,18 +358,18 @@ def raise_from_status(self, raise_warnings: bool = False) -> None: ) def log_ingestion_stats(self) -> None: - - telemetry.telemetry_instance.ping( - "ingest_stats", - { - "source_type": self.config.source.type, - "sink_type": self.config.sink.type, - "records_written": stats.discretize( - self.sink.get_report().records_written - ), - }, - self.ctx.graph, - ) + if not self.pipeline_init_failures: + telemetry.telemetry_instance.ping( + "ingest_stats", + { + "source_type": self.config.source.type, + "sink_type": self.config.sink.type, + "records_written": stats.discretize( + self.sink.get_report().records_written + ), + }, + self.ctx.graph, + ) def _count_all_vals(self, d: Dict[str, List]) -> int: result = 0 @@ -331,6 +379,9 @@ def _count_all_vals(self, d: Dict[str, List]) -> int: def pretty_print_summary(self, warnings_as_failure: bool = False) -> int: click.echo() + if self.pipeline_init_failures: + click.secho(f"{self.pipeline_init_failures}", fg="red") + return 1 click.secho(f"Source ({self.config.source.type}) report:", bold=True) click.echo(self.source.get_report().as_string()) click.secho(f"Sink ({self.config.sink.type}) report:", bold=True) diff --git a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py index d95eb245ccfb50..45e9a28c763a9f 100644 --- a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py +++ b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py @@ -157,3 +157,9 @@ def get_report(self) -> SinkReport: def close(self): self.executor.shutdown(wait=True) + + def __repr__(self) -> str: + return self.emitter.__repr__() + + def configured(self) -> str: + return self.__repr__() diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka.py index 98551bd66e73cb..97db75c525f754 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka.py @@ -47,6 +47,7 @@ JobStatusClass, SubTypesClass, ) +from datahub.utilities.registries.domain_registry import DomainRegistry logger = logging.getLogger(__name__) @@ -150,6 +151,11 @@ def __init__(self, config: KafkaSourceConfig, ctx: PipelineContext): self.schema_registry_client: KafkaSchemaRegistryBase = ( KafkaSource.create_schema_registry(config, self.report) ) + if self.source_config.domain: + self.domain_registry = DomainRegistry( + cached_domains=[k for k in self.source_config.domain], + graph=self.ctx.graph, + ) def is_checkpointing_enabled(self, job_id: JobId) -> bool: if ( @@ -333,7 +339,9 @@ def _extract_record(self, topic: str) -> Iterable[MetadataWorkUnit]: # 6. Emit domains aspect MCPW for domain, pattern in self.source_config.domain.items(): if pattern.allowed(dataset_name): - domain_urn = make_domain_urn(domain) + domain_urn = make_domain_urn( + self.domain_registry.get_domain_urn(domain) + ) if domain_urn: wus = add_domain_to_entity_wu( diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 034445bd853c55..56b82adbbab156 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -92,6 +92,7 @@ ViewPropertiesClass, ) from datahub.telemetry import telemetry +from datahub.utilities.registries.domain_registry import DomainRegistry from datahub.utilities.sqlalchemy_query_combiner import SQLAlchemyQueryCombinerReport if TYPE_CHECKING: @@ -241,23 +242,23 @@ class SQLAlchemyConfig(StatefulIngestionConfigBase): # them out afterwards via the table_pattern. schema_pattern: AllowDenyPattern = Field( default=AllowDenyPattern.allow_all(), - description="regex patterns for schemas to filter in ingestion.", + description="Regex patterns for schemas to filter in ingestion. Specify regex to only match the schema name. e.g. to match all tables in schema analytics, use the regex 'analytics'", ) table_pattern: AllowDenyPattern = Field( default=AllowDenyPattern.allow_all(), - description="regex patterns for tables to filter in ingestion.", + description="Regex patterns for tables to filter in ingestion. Specify regex to match the entire table name in database.schema.table format. e.g. to match all tables starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'", ) view_pattern: AllowDenyPattern = Field( default=AllowDenyPattern.allow_all(), - description="regex patterns for views to filter in ingestion.", + description="Regex patterns for views to filter in ingestion. Note: Defaults to table_pattern if not specified. Specify regex to match the entire view name in database.schema.view format. e.g. to match all views starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'", ) profile_pattern: AllowDenyPattern = Field( default=AllowDenyPattern.allow_all(), - description="regex patterns for profiles to filter in ingestion, allowed by the `table_pattern`.", + description="Regex patterns to filter tables for profiling during ingestion. Allowed by the `table_pattern`.", ) domain: Dict[str, AllowDenyPattern] = Field( default=dict(), - description=' regex patterns for tables/schemas to descide domain_key domain key (domain_key can be any string like "sales".) There can be multiple domain key specified.', + description='Attach domains to databases, schemas or tables during ingestion using regex patterns. Domain key can be a guid like *urn:li:domain:ec428203-ce86-4db3-985d-5a8ee6df32ba* or a string like "Marketing".) If you provide strings, then datahub will attempt to resolve this name to a guid, and will error out if this fails. There can be multiple domain keys specified.', ) include_views: Optional[bool] = Field( @@ -273,6 +274,17 @@ class SQLAlchemyConfig(StatefulIngestionConfigBase): # Custom Stateful Ingestion settings stateful_ingestion: Optional[SQLAlchemyStatefulIngestionConfig] = None + @pydantic.root_validator(pre=True) + def view_pattern_is_table_pattern_unless_specified( + cls, values: Dict[str, Any] + ) -> Dict[str, Any]: + view_pattern = values.get("view_pattern") + table_pattern = values.get("table_pattern") + if table_pattern and not view_pattern: + logger.info(f"Applying table_pattern {table_pattern} to view_pattern.") + values["view_pattern"] = table_pattern + return values + @pydantic.root_validator() def ensure_profiling_pattern_is_passed_to_profiling( cls, values: Dict[str, Any] @@ -497,6 +509,10 @@ def __init__(self, config: SQLAlchemyConfig, ctx: PipelineContext, platform: str for config_flag in profiling_flags_to_report }, ) + if self.config.domain: + self.domain_registry = DomainRegistry( + cached_domains=[k for k in self.config.domain], graph=self.ctx.graph + ) def warn(self, log: logging.Logger, key: str, reason: str) -> None: self.report.report_warning(key, reason) @@ -809,7 +825,9 @@ def _gen_domain_urn(self, dataset_name: str) -> Optional[str]: for domain, pattern in self.config.domain.items(): if pattern.allowed(dataset_name): - domain_urn = make_domain_urn(domain) + domain_urn = make_domain_urn( + self.domain_registry.get_domain_urn(domain) + ) return domain_urn @@ -857,7 +875,6 @@ def loop_tables( # noqa: C901 continue self.report.report_entity_scanned(dataset_name, ent_type="table") - if not sql_config.table_pattern.allowed(dataset_name): self.report.report_dropped(dataset_name) continue diff --git a/metadata-ingestion/src/datahub/utilities/registries/__init__.py b/metadata-ingestion/src/datahub/utilities/registries/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/metadata-ingestion/src/datahub/utilities/registries/domain_registry.py b/metadata-ingestion/src/datahub/utilities/registries/domain_registry.py new file mode 100644 index 00000000000000..6ae55dee509677 --- /dev/null +++ b/metadata-ingestion/src/datahub/utilities/registries/domain_registry.py @@ -0,0 +1,43 @@ +import logging +from typing import List, Optional + +from datahub.ingestion.graph.client import DataHubGraph + +logger = logging.getLogger(__name__) + + +class DomainRegistry: + """A class that makes it easy to resolve domains using DataHub""" + + def __init__( + self, + cached_domains: Optional[List[str]] = [], + graph: Optional[DataHubGraph] = None, + ): + self.domain_registry = {} + if cached_domains: + # isolate the domains that don't seem fully specified + domains_needing_resolution = [ + d + for d in cached_domains + if (not d.startswith("urn:li:domain") and d.count("-") != 4) + ] + if domains_needing_resolution and not graph: + raise ValueError( + f"Following domains need server-side resolution {domains_needing_resolution} but a DataHub server wasn't provided. Either use fully qualified domain ids (e.g. urn:li:domain:ec428203-ce86-4db3-985d-5a8ee6df32ba) or provide a datahub_api config in your recipe." + ) + for domain_identifier in domains_needing_resolution: + assert graph + domain_urn = graph.get_domain_urn_by_name(domain_identifier) + if domain_urn: + self.domain_registry[domain_identifier] = domain_urn + else: + logger.error( + f"Failed to retrieve domain id for domain {domain_identifier}" + ) + raise ValueError( + f"domain {domain_identifier} doesn't seem to be provisioned on DataHub. Either provision it first and re-run ingestion, or provide a fully qualified domain id (e.g. urn:li:domain:ec428203-ce86-4db3-985d-5a8ee6df32ba) to skip this check." + ) + + def get_domain_urn(self, domain_identifier: str) -> str: + return self.domain_registry.get(domain_identifier) or domain_identifier diff --git a/metadata-ingestion/tests/integration/hana/hana_to_file.yml b/metadata-ingestion/tests/integration/hana/hana_to_file.yml index c37f37b884bc6b..8900a5a3bfbadf 100644 --- a/metadata-ingestion/tests/integration/hana/hana_to_file.yml +++ b/metadata-ingestion/tests/integration/hana/hana_to_file.yml @@ -32,7 +32,7 @@ source: include_field_histogram: true include_field_sample_values: true domain: - sales: + "urn:li:domain:sales": allow: - "HOTEL" sink: diff --git a/metadata-ingestion/tests/integration/kafka/kafka_to_file.yml b/metadata-ingestion/tests/integration/kafka/kafka_to_file.yml index 69c43c57aa44bd..b345ba86799371 100644 --- a/metadata-ingestion/tests/integration/kafka/kafka_to_file.yml +++ b/metadata-ingestion/tests/integration/kafka/kafka_to_file.yml @@ -7,7 +7,7 @@ source: bootstrap: "localhost:59092" schema_registry_url: "http://localhost:58081" domain: - sales: + "urn:li:domain:sales": allow: - "key_value_topic" sink: diff --git a/metadata-ingestion/tests/integration/mysql/mysql_to_file.yml b/metadata-ingestion/tests/integration/mysql/mysql_to_file.yml index 988dbca18ff366..8d4a5b84b91d20 100644 --- a/metadata-ingestion/tests/integration/mysql/mysql_to_file.yml +++ b/metadata-ingestion/tests/integration/mysql/mysql_to_file.yml @@ -31,7 +31,7 @@ source: include_field_histogram: true include_field_sample_values: true domain: - sales: + "urn:li:domain:sales": allow: - "^metagalaxy" sink: diff --git a/metadata-ingestion/tests/integration/mysql/mysql_to_file_dbalias.yml b/metadata-ingestion/tests/integration/mysql/mysql_to_file_dbalias.yml index 86e5915a0d1d25..1c324641fe1583 100644 --- a/metadata-ingestion/tests/integration/mysql/mysql_to_file_dbalias.yml +++ b/metadata-ingestion/tests/integration/mysql/mysql_to_file_dbalias.yml @@ -32,7 +32,7 @@ source: include_field_histogram: true include_field_sample_values: true domain: - sales: + "urn:li:domain:sales": allow: - "^metagalaxy" sink: From 070dfa0eaf1afa6d23350514d498497104d4277f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20G=C3=B3mez=20Villamor?= Date: Mon, 11 Jul 2022 18:57:20 +0200 Subject: [PATCH 02/11] fix(config): fixes config key in DataHubAuthorizerFactory (#5371) --- .../com/linkedin/gms/factory/auth/DataHubAuthorizerFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/auth/DataHubAuthorizerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/auth/DataHubAuthorizerFactory.java index 997709b2c90f52..e5e377b5777c1e 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/auth/DataHubAuthorizerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/auth/DataHubAuthorizerFactory.java @@ -32,7 +32,7 @@ public class DataHubAuthorizerFactory { @Value("${authorization.defaultAuthorizer.cacheRefreshIntervalSecs}") private Integer policyCacheRefreshIntervalSeconds; - @Value("${authorization.defaultAuthorizer..enabled:true}") + @Value("${authorization.defaultAuthorizer.enabled:true}") private Boolean policiesEnabled; @Bean(name = "dataHubAuthorizer") From 489b5bb5b450570de92a1fa44c1a76befb286ee2 Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Mon, 11 Jul 2022 15:08:26 -0700 Subject: [PATCH 03/11] fix(ingest): domains - check whether urn based domain exists during resolution (#5373) --- docs/domains.md | 37 ++++++++++++++++++- .../utilities/registries/domain_registry.py | 29 ++++++++++----- 2 files changed, 56 insertions(+), 10 deletions(-) diff --git a/docs/domains.md b/docs/domains.md index 3f409ead1495b9..436f7ba1f332a5 100644 --- a/docs/domains.md +++ b/docs/domains.md @@ -100,6 +100,41 @@ source: - "long_tail_companions.ecommerce.*" ``` +:::note + +When bare domain names like `Analytics` is used, the ingestion system will first check if a domain like `urn:li:domain:Analytics` is provisioned, failing that; it will check for a provisioned domain that has the same name. If we are unable to resolve bare domain names to provisioned domains, then ingestion will refuse to proceeed until the domain is provisioned on DataHub. + +::: + +You can also provide fully-qualified domain names to ensure that no ingestion-time domain resolution is needed. For example, the following recipe shows an example using fully qualified domain names: + +```yaml +source: + type: snowflake + config: + username: ${SNOW_USER} + password: ${SNOW_PASS} + account_id: + warehouse: COMPUTE_WH + role: accountadmin + database_pattern: + allow: + - "long_tail_companions" + schema_pattern: + deny: + - information_schema + profiling: + enabled: False + domain: + "urn:li:domain:6289fccc-4af2-4cbb-96ed-051e7d1de93c": + allow: + - "long_tail_companions.analytics.*" + "urn:li:domain:07155b15-cee6-4fda-b1c1-5a19a6b74c3a": + allow: + - "long_tail_companions.ecommerce.*" +``` + + ## Searching by Domain @@ -179,4 +214,4 @@ Click [here](https://www.loom.com/share/72b3bcc2729b4df0982fa63ae3a8cb21) to see ## Feedback / Questions / Concerns -We want to hear from you! For any inquiries, including Feedback, Questions, or Concerns, reach out on Slack! \ No newline at end of file +We want to hear from you! For any inquiries, including Feedback, Questions, or Concerns, reach out on Slack! diff --git a/metadata-ingestion/src/datahub/utilities/registries/domain_registry.py b/metadata-ingestion/src/datahub/utilities/registries/domain_registry.py index 6ae55dee509677..4e719c939b6f2b 100644 --- a/metadata-ingestion/src/datahub/utilities/registries/domain_registry.py +++ b/metadata-ingestion/src/datahub/utilities/registries/domain_registry.py @@ -28,16 +28,27 @@ def __init__( ) for domain_identifier in domains_needing_resolution: assert graph - domain_urn = graph.get_domain_urn_by_name(domain_identifier) - if domain_urn: - self.domain_registry[domain_identifier] = domain_urn + # first try to check if this domain exists by urn + maybe_domain_urn = f"urn:li:domain:{domain_identifier}" + from datahub.metadata.schema_classes import DomainPropertiesClass + + maybe_domain_properties = graph.get_aspect_v2( + maybe_domain_urn, DomainPropertiesClass, "domainProperties" + ) + if maybe_domain_properties: + self.domain_registry[domain_identifier] = maybe_domain_urn else: - logger.error( - f"Failed to retrieve domain id for domain {domain_identifier}" - ) - raise ValueError( - f"domain {domain_identifier} doesn't seem to be provisioned on DataHub. Either provision it first and re-run ingestion, or provide a fully qualified domain id (e.g. urn:li:domain:ec428203-ce86-4db3-985d-5a8ee6df32ba) to skip this check." - ) + # try to get this domain by name + domain_urn = graph.get_domain_urn_by_name(domain_identifier) + if domain_urn: + self.domain_registry[domain_identifier] = domain_urn + else: + logger.error( + f"Failed to retrieve domain id for domain {domain_identifier}" + ) + raise ValueError( + f"domain {domain_identifier} doesn't seem to be provisioned on DataHub. Either provision it first and re-run ingestion, or provide a fully qualified domain id (e.g. urn:li:domain:ec428203-ce86-4db3-985d-5a8ee6df32ba) to skip this check." + ) def get_domain_urn(self, domain_identifier: str) -> str: return self.domain_registry.get(domain_identifier) or domain_identifier From f3e5afdba957e3943c28d9a01ebea0967175b010 Mon Sep 17 00:00:00 2001 From: Navin Sharma <103643430+NavinSharma13@users.noreply.github.com> Date: Tue, 12 Jul 2022 11:03:24 +0530 Subject: [PATCH 04/11] feat(quickstart): Adding env variables and cli options for customizing mapped ports in quickstart (#5353) Co-authored-by: Shirshanka Das --- docker/docker-compose.override.yml | 2 +- docker/docker-compose.yml | 17 +- ...er-compose-without-neo4j-m1.quickstart.yml | 15 +- .../quickstart/docker-compose.quickstart.yml | 19 ++- docs/quickstart.md | 110 +++++++++---- metadata-ingestion/setup.py | 2 +- metadata-ingestion/src/datahub/cli/docker.py | 149 +++++++++++++++++- metadata-ingestion/src/datahub/entrypoints.py | 2 +- 8 files changed, 248 insertions(+), 68 deletions(-) diff --git a/docker/docker-compose.override.yml b/docker/docker-compose.override.yml index b6a095d8836222..e8336d8349b2ab 100644 --- a/docker/docker-compose.override.yml +++ b/docker/docker-compose.override.yml @@ -9,7 +9,7 @@ services: env_file: mysql/env/docker.env command: --character-set-server=utf8mb4 --collation-server=utf8mb4_bin ports: - - "3306:3306" + - ${DATAHUB_MAPPED_MYSQL_PORT:-3306}:3306 volumes: - ./mysql/init.sql:/docker-entrypoint-initdb.d/init.sql - mysqldata:/var/lib/mysql diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 6063626ab04b29..402916fd717ccd 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -12,7 +12,7 @@ services: hostname: zookeeper container_name: zookeeper ports: - - "2181:2181" + - ${DATAHUB_MAPPED_ZK_PORT:-2181}:2181 volumes: - zkdata:/var/opt/zookeeper @@ -24,8 +24,7 @@ services: depends_on: - zookeeper ports: - - "29092:29092" - - "9092:9092" + - ${DATAHUB_MAPPED_KAFKA_BROKER_PORT:-9092}:9092 volumes: - broker:/var/lib/kafka/data/ @@ -50,7 +49,7 @@ services: - zookeeper - broker ports: - - "8081:8081" + - ${DATAHUB_MAPPED_SCHEMA_REGISTRY_PORT:-8081}:8081 elasticsearch: image: elasticsearch:7.9.3 @@ -58,7 +57,7 @@ services: container_name: elasticsearch hostname: elasticsearch ports: - - "9200:9200" + - ${DATAHUB_MAPPED_ELASTIC_PORT:-9200}:9200 environment: - discovery.type=single-node - xpack.security.enabled=false @@ -75,8 +74,8 @@ services: hostname: neo4j container_name: neo4j ports: - - "7474:7474" - - "7687:7687" + - ${DATAHUB_MAPPED_NEO4J_HTTP_PORT:-7474}:7474 + - ${DATAHUB_MAPPED_NEO4J_BOLT_PORT:-7687}:7687 volumes: - neo4jdata:/data @@ -100,7 +99,7 @@ services: hostname: datahub-gms container_name: datahub-gms ports: - - "8080:8080" + - ${DATAHUB_MAPPED_GMS_PORT:-8080}:8080 depends_on: - elasticsearch-setup - kafka-setup @@ -116,7 +115,7 @@ services: hostname: datahub-frontend-react container_name: datahub-frontend-react ports: - - "9002:9002" + - ${DATAHUB_MAPPED_FRONTEND_PORT:-9002}:9002 depends_on: - datahub-gms volumes: diff --git a/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml b/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml index 76c0fc823c3503..49dbbc5b9f3086 100644 --- a/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml +++ b/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml @@ -17,8 +17,7 @@ services: hostname: broker image: kymeric/cp-kafka:latest ports: - - 29092:29092 - - 9092:9092 + - ${DATAHUB_MAPPED_KAFKA_BROKER_PORT:-9092}:9092 datahub-actions: depends_on: - datahub-gms @@ -57,7 +56,7 @@ services: hostname: datahub-frontend-react image: linkedin/datahub-frontend-react:${DATAHUB_VERSION:-head} ports: - - 9002:9002 + - ${DATAHUB_MAPPED_FRONTEND_PORT:-9002}:9002 volumes: - ${HOME}/.datahub/plugins:/etc/datahub/plugins datahub-gms: @@ -86,7 +85,7 @@ services: hostname: datahub-gms image: linkedin/datahub-gms:${DATAHUB_VERSION:-head} ports: - - 8080:8080 + - ${DATAHUB_MAPPED_GMS_PORT:-8080}:8080 volumes: - ${HOME}/.datahub/plugins:/etc/datahub/plugins elasticsearch: @@ -106,7 +105,7 @@ services: image: elasticsearch:7.9.3 mem_limit: 1g ports: - - 9200:9200 + - ${DATAHUB_MAPPED_ELASTIC_PORT:-9200}:9200 volumes: - esdata:/usr/share/elasticsearch/data elasticsearch-setup: @@ -140,7 +139,7 @@ services: hostname: mysql image: mariadb:10.5.8 ports: - - 3306:3306 + - ${DATAHUB_MAPPED_MYSQL_PORT:-3306}:3306 volumes: - ../mysql/init.sql:/docker-entrypoint-initdb.d/init.sql - mysqldata:/var/lib/mysql @@ -167,7 +166,7 @@ services: hostname: schema-registry image: eugenetea/schema-registry-arm64:latest ports: - - 8081:8081 + - ${DATAHUB_MAPPED_SCHEMA_REGISTRY_PORT:-8081}:8081 zookeeper: container_name: zookeeper environment: @@ -176,7 +175,7 @@ services: hostname: zookeeper image: kymeric/cp-zookeeper:latest ports: - - 2181:2181 + - ${DATAHUB_MAPPED_ZK_PORT:-2181}:2181 volumes: - zkdata:/var/opt/zookeeper version: '2.3' diff --git a/docker/quickstart/docker-compose.quickstart.yml b/docker/quickstart/docker-compose.quickstart.yml index 7cdee0e7084a35..6e46f3f16ab208 100644 --- a/docker/quickstart/docker-compose.quickstart.yml +++ b/docker/quickstart/docker-compose.quickstart.yml @@ -17,8 +17,7 @@ services: hostname: broker image: confluentinc/cp-kafka:5.4.0 ports: - - 29092:29092 - - 9092:9092 + - ${DATAHUB_MAPPED_KAFKA_BROKER_PORT:-9092}:9092 volumes: - broker:/var/lib/kafka/data/ datahub-actions: @@ -59,7 +58,7 @@ services: hostname: datahub-frontend-react image: linkedin/datahub-frontend-react:${DATAHUB_VERSION:-head} ports: - - 9002:9002 + - ${DATAHUB_MAPPED_FRONTEND_PORT:-9002}:9002 volumes: - ${HOME}/.datahub/plugins:/etc/datahub/plugins datahub-gms: @@ -94,7 +93,7 @@ services: hostname: datahub-gms image: linkedin/datahub-gms:${DATAHUB_VERSION:-head} ports: - - 8080:8080 + - ${DATAHUB_MAPPED_GMS_PORT:-8080}:8080 volumes: - ${HOME}/.datahub/plugins/:/etc/datahub/plugins elasticsearch: @@ -114,7 +113,7 @@ services: image: elasticsearch:7.9.3 mem_limit: 1g ports: - - 9200:9200 + - ${DATAHUB_MAPPED_ELASTIC_PORT:-9200}:9200 volumes: - esdata:/usr/share/elasticsearch/data elasticsearch-setup: @@ -148,7 +147,7 @@ services: hostname: mysql image: mysql:5.7 ports: - - 3306:3306 + - ${DATAHUB_MAPPED_MYSQL_PORT:-3306}:3306 volumes: - ../mysql/init.sql:/docker-entrypoint-initdb.d/init.sql - mysqldata:/var/lib/mysql @@ -173,8 +172,8 @@ services: hostname: neo4j image: neo4j:4.0.6 ports: - - 7474:7474 - - 7687:7687 + - ${DATAHUB_MAPPED_NEO4J_HTTP_PORT:-7474}:7474 + - ${DATAHUB_MAPPED_NEO4J_BOLT_PORT:-7687}:7687 volumes: - neo4jdata:/data schema-registry: @@ -188,7 +187,7 @@ services: hostname: schema-registry image: confluentinc/cp-schema-registry:5.4.0 ports: - - 8081:8081 + - ${DATAHUB_MAPPED_SCHEMA_REGISTRY_PORT:-8081}:8081 zookeeper: container_name: zookeeper environment: @@ -197,7 +196,7 @@ services: hostname: zookeeper image: confluentinc/cp-zookeeper:5.4.0 ports: - - 2181:2181 + - ${DATAHUB_MAPPED_ZK_PORT:-2181}:2181 volumes: - zkdata:/var/opt/zookeeper version: '2.3' diff --git a/docs/quickstart.md b/docs/quickstart.md index 5e83efe3e2aab8..4118314a099240 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -42,10 +42,6 @@ To deploy a new instance of DataHub, perform the following steps. at [http://localhost:9002](http://localhost:9002) in your browser. You can sign in using `datahub` as both the username and password. - If you would like to modify/configure the DataHub installation in some way, please download the [docker-compose.yaml](https://raw.githubusercontent.com/datahub-project/datahub/master/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml) used by the cli tool, modify it as necessary and deploy DataHub by passing the downloaded docker-compose file: - ``` - datahub docker quickstart --quickstart-compose-file - ``` 5. To ingest the sample metadata, run the following CLI command from your terminal @@ -62,11 +58,73 @@ using the `--token ` parameter in the command. That's it! Now feel free to play around with DataHub! +## Troubleshooting Issues + +
+Command not found: datahub + + +If running the datahub cli produces "command not found" errors inside your terminal, your system may be defaulting to an +older version of Python. Try prefixing your `datahub` commands with `python3 -m`: + +``` +python3 -m datahub docker quickstart +``` + +Another possibility is that your system PATH does not include pip's `$HOME/.local/bin` directory. On linux, you can add this to your `~/.bashrc`: + +``` +if [ -d "$HOME/.local/bin" ] ; then + PATH="$HOME/.local/bin:$PATH" +fi +``` +
+ +
+ +Port Conflicts + + +By default the quickstart deploy will require the following ports to be free on your local machine: + - 3306 for MySQL + - 9200 for Elasticsearch + - 9092 for the Kafka broker + - 8081 for Schema Registry + - 2181 for ZooKeeper + - 9002 for the DataHub Web Application (datahub-frontend) + - 8080 for the DataHub Metadata Service (datahub-gms) + + In case the default ports conflict with software you are already running on your machine, you can override these ports by passing additional flags to the `datahub docker quickstart` command. + e.g. To override the MySQL port with 53306 (instead of the default 3306), you can say: `datahub docker quickstart --mysql-port 53306`. Use `datahub docker quickstart --help` to see all the supported options. + +
+ +
+ +Miscellaneous Docker issues + +There can be misc issues with Docker, like conflicting containers and dangling volumes, that can often be resolved by +pruning your Docker state with the following command. Note that this command removes all unused containers, networks, +images (both dangling and unreferenced), and optionally, volumes. + +``` +docker system prune +``` + +
+ +
+ +Still stuck? + +Hop over to our [Slack community](https://slack.datahubproject.io) and ask for help in the [#troubleshoot](https://datahubspace.slack.com/archives/C029A3M079U) channel! +
+ ## Next Steps ### Ingest Metadata -To start pushing your company's metadata into DataHub, take a look at the [Metadata Ingestion Framework](../metadata-ingestion/README.md). +To start pushing your company's metadata into DataHub, take a look at [UI-based Ingestion Guide](./ui-ingestion.md), or to run ingestion using the cli, look at the [Metadata Ingestion Guide](../metadata-ingestion/README.md). ### Invite Users @@ -82,50 +140,36 @@ To enable backend Authentication, check out [authentication in DataHub's backend We recommend deploying DataHub to production using Kubernetes. We provide helpful [Helm Charts](https://artifacthub.io/packages/helm/datahub/datahub) to help you quickly get up and running. Check out [Deploying DataHub to Kubernetes](./deploy/kubernetes.md) for a step-by-step walkthrough. -## Resetting DataHub +## Other Common Operations + +### Stopping DataHub -To cleanse DataHub of all of it's state (e.g. before ingesting your own), you can use the CLI `nuke` command. +To stop DataHub's quickstart, you can issue the following command. ``` -datahub docker nuke +datahub docker quickstart --stop ``` -## Updating DataHub locally +### Resetting DataHub -If you have been testing DataHub locally, a new version of DataHub got released and you want to try the new version then you can use below commands. +To cleanse DataHub of all of its state (e.g. before ingesting your own), you can use the CLI `nuke` command. ``` -datahub docker nuke --keep-data -datahub docker quickstart +datahub docker nuke ``` -This will keep the data that you have ingested so far in DataHub and start a new quickstart with the latest version of DataHub. +### Upgrading your local DataHub -## Troubleshooting - -### Command not found: datahub - -If running the datahub cli produces "command not found" errors inside your terminal, your system may be defaulting to an -older version of Python. Try prefixing your `datahub` commands with `python3 -m`: +If you have been testing DataHub locally, a new version of DataHub got released and you want to try the new version then you can just issue the quickstart command again. It will pull down newer images and restart your instance without losing any data. ``` -python3 -m datahub docker quickstart +datahub docker quickstart ``` -Another possibility is that your system PATH does not include pip's `$HOME/.local/bin` directory. On linux, you can add this to your `~/.bashrc`: +### Customization +If you would like to customize the DataHub installation further, please download the [docker-compose.yaml](https://raw.githubusercontent.com/datahub-project/datahub/master/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml) used by the cli tool, modify it as necessary and deploy DataHub by passing the downloaded docker-compose file: ``` -if [ -d "$HOME/.local/bin" ] ; then - PATH="$HOME/.local/bin:$PATH" -fi +datahub docker quickstart --quickstart-compose-file ``` -### Miscellaneous Docker issues - -There can be misc issues with Docker, like conflicting containers and dangling volumes, that can often be resolved by -pruning your Docker state with the following command. Note that this command removes all unused containers, networks, -images (both dangling and unreferenced), and optionally, volumes. - -``` -docker system prune -``` diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index b07650b1ba5055..929208eaa8891e 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -68,7 +68,7 @@ def get_long_description(): # At the same time, we use Kafka's AvroSerializer, which internally relies on # fastavro for serialization. We do not use confluent_kafka[avro], since it # is incompatible with its own dep on avro-python3. - "confluent_kafka>=1.5.0,<1.9.0", + "confluent_kafka>=1.5.0", "fastavro>=1.2.0", } diff --git a/metadata-ingestion/src/datahub/cli/docker.py b/metadata-ingestion/src/datahub/cli/docker.py index 6e42b4e5adb020..f2e580a2122bcc 100644 --- a/metadata-ingestion/src/datahub/cli/docker.py +++ b/metadata-ingestion/src/datahub/cli/docker.py @@ -11,6 +11,7 @@ from typing import List, NoReturn, Optional import click +import pydantic import requests from datahub.cli.docker_check import ( @@ -131,6 +132,80 @@ def should_use_neo4j_for_graph_service(graph_service_override: Optional[str]) -> return False +def _set_environment_variables( + version: Optional[str], + mysql_port: Optional[pydantic.PositiveInt], + zk_port: Optional[pydantic.PositiveInt], + kafka_broker_port: Optional[pydantic.PositiveInt], + schema_registry_port: Optional[pydantic.PositiveInt], + elastic_port: Optional[pydantic.PositiveInt], +) -> None: + if version is not None: + os.environ["DATAHUB_VERSION"] = version + if mysql_port is not None: + os.environ["DATAHUB_MAPPED_MYSQL_PORT"] = str(mysql_port) + + if zk_port is not None: + os.environ["DATAHUB_MAPPED_ZK_PORT"] = str(zk_port) + + if kafka_broker_port is not None: + os.environ["DATAHUB_MAPPED_KAFKA_BROKER_PORT"] = str(kafka_broker_port) + + if schema_registry_port is not None: + os.environ["DATAHUB_MAPPED_SCHEMA_REGISTRY_PORT"] = str(schema_registry_port) + + if elastic_port is not None: + os.environ["DATAHUB_MAPPED_ELASTIC_PORT"] = str(elastic_port) + + +def _get_default_quickstart_compose_file() -> Optional[str]: + home = os.environ["HOME"] + if home: + try: + os.makedirs(f"{home}/.datahub/quickstart", exist_ok=True) + return f"{home}/.datahub/quickstart/docker-compose.yml" + except Exception as e: + logger.debug( + f"Failed to identify a default quickstart compose file due to {e}" + ) + + return None + + +def _attempt_stop(quickstart_compose_file: List[pathlib.Path]) -> None: + default_quickstart_compose_file = _get_default_quickstart_compose_file() + compose_files_for_stopping = ( + quickstart_compose_file + if quickstart_compose_file + else [pathlib.Path(default_quickstart_compose_file)] + if default_quickstart_compose_file + else None + ) + if compose_files_for_stopping: + # docker-compose stop + base_command: List[str] = [ + "docker-compose", + *itertools.chain.from_iterable( + ("-f", f"{path}") for path in compose_files_for_stopping + ), + "-p", + "datahub", + ] + try: + logger.debug(f"Executing {base_command} stop") + subprocess.run( + [*base_command, "stop"], + check=True, + ) + click.secho("Stopped datahub successfully.", fg="green") + except subprocess.CalledProcessError: + click.secho( + "Error while stopping.", + fg="red", + ) + return + + @docker.command() @click.option( "--version", @@ -166,6 +241,48 @@ def should_use_neo4j_for_graph_service(graph_service_override: Optional[str]) -> default=None, help="If set, forces docker-compose to use that graph service implementation", ) +@click.option( + "--mysql-port", + type=pydantic.PositiveInt, + is_flag=False, + default=None, + help="If there is an existing mysql instance running on port 3306, set this to a free port to avoid port conflicts on startup", +) +@click.option( + "--zk-port", + type=pydantic.PositiveInt, + is_flag=False, + default=None, + help="If there is an existing zookeeper instance running on port 2181, set this to a free port to avoid port conflicts on startup", +) +@click.option( + "--kafka-broker-port", + type=pydantic.PositiveInt, + is_flag=False, + default=None, + help="If there is an existing Kafka broker running on port 9092, set this to a free port to avoid port conflicts on startup", +) +@click.option( + "--schema-registry-port", + type=pydantic.PositiveInt, + is_flag=False, + default=None, + help="If there is an existing process running on port 8081, set this to a free port to avoid port conflicts with Kafka schema registry on startup", +) +@click.option( + "--elastic-port", + type=pydantic.PositiveInt, + is_flag=False, + default=None, + help="If there is an existing Elasticsearch instance running on port 9092, set this to a free port to avoid port conflicts on startup", +) +@click.option( + "--stop", + type=bool, + is_flag=True, + default=False, + help="Use this flag to stop the running containers", +) @upgrade.check_upgrade @telemetry.with_telemetry def quickstart( @@ -174,6 +291,12 @@ def quickstart( quickstart_compose_file: List[pathlib.Path], dump_logs_on_failure: bool, graph_service_impl: Optional[str], + mysql_port: Optional[pydantic.PositiveInt], + zk_port: Optional[pydantic.PositiveInt], + kafka_broker_port: Optional[pydantic.PositiveInt], + schema_registry_port: Optional[pydantic.PositiveInt], + elastic_port: Optional[pydantic.PositiveInt], + stop: bool, ) -> None: """Start an instance of DataHub locally using docker-compose. @@ -185,7 +308,7 @@ def quickstart( running_on_m1 = is_m1() if running_on_m1: - click.echo("Detected M1 machine") + click.secho("Detected M1 machine", fg="yellow") # Run pre-flight checks. issues = check_local_docker_containers(preflight_only=True) @@ -195,7 +318,13 @@ def quickstart( quickstart_compose_file = list( quickstart_compose_file ) # convert to list from tuple - if not quickstart_compose_file: + + default_quickstart_compose_file = _get_default_quickstart_compose_file() + if stop: + _attempt_stop(quickstart_compose_file) + return + elif not quickstart_compose_file: + # download appropriate quickstart file should_use_neo4j = should_use_neo4j_for_graph_service(graph_service_impl) if should_use_neo4j and running_on_m1: click.secho( @@ -210,7 +339,11 @@ def quickstart( else GITHUB_M1_QUICKSTART_COMPOSE_URL ) - with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp_file: + with open( + default_quickstart_compose_file, "wb" + ) if default_quickstart_compose_file else tempfile.NamedTemporaryFile( + suffix=".yml", delete=False + ) as tmp_file: path = pathlib.Path(tmp_file.name) quickstart_compose_file.append(path) click.echo(f"Fetching docker-compose file {github_file} from GitHub") @@ -221,8 +354,14 @@ def quickstart( logger.debug(f"Copied to {path}") # set version - if version is not None: - os.environ["DATAHUB_VERSION"] = version + _set_environment_variables( + version=version, + mysql_port=mysql_port, + zk_port=zk_port, + kafka_broker_port=kafka_broker_port, + schema_registry_port=schema_registry_port, + elastic_port=elastic_port, + ) base_command: List[str] = [ "docker-compose", diff --git a/metadata-ingestion/src/datahub/entrypoints.py b/metadata-ingestion/src/datahub/entrypoints.py index 81cc375c15bc07..41404122979a11 100644 --- a/metadata-ingestion/src/datahub/entrypoints.py +++ b/metadata-ingestion/src/datahub/entrypoints.py @@ -26,7 +26,7 @@ logger = logging.getLogger(__name__) # Configure some loggers. -logging.getLogger("urllib3").setLevel(logging.WARNING) +logging.getLogger("urllib3").setLevel(logging.ERROR) logging.getLogger("snowflake").setLevel(level=logging.WARNING) # logging.getLogger("botocore").setLevel(logging.INFO) # logging.getLogger("google").setLevel(logging.INFO) From 9fca5277dc6d603e619c1a7c46f0d7990318a3d6 Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Tue, 12 Jul 2022 12:37:47 +0530 Subject: [PATCH 05/11] fix(build): tweak ingestion build (#5374) --- .github/workflows/docker-ingestion-base.yml | 2 + .github/workflows/docker-ingestion-smoke.yml | 42 +++++++++++++++++++ docker/datahub-ingestion/Dockerfile | 3 +- .../datahub-ingestion/base-requirements.txt | 1 + docker/datahub-ingestion/base.Dockerfile | 3 +- docker/datahub-ingestion/smoke.Dockerfile | 18 ++++++++ 6 files changed, 67 insertions(+), 2 deletions(-) create mode 100644 .github/workflows/docker-ingestion-smoke.yml create mode 100644 docker/datahub-ingestion/smoke.Dockerfile diff --git a/.github/workflows/docker-ingestion-base.yml b/.github/workflows/docker-ingestion-base.yml index 9616d211cd1610..adcf8b14ab40bc 100644 --- a/.github/workflows/docker-ingestion-base.yml +++ b/.github/workflows/docker-ingestion-base.yml @@ -1,5 +1,7 @@ name: ingestion base on: + release: + types: [published, edited] push: branches: - master diff --git a/.github/workflows/docker-ingestion-smoke.yml b/.github/workflows/docker-ingestion-smoke.yml new file mode 100644 index 00000000000000..66762b78ccd902 --- /dev/null +++ b/.github/workflows/docker-ingestion-smoke.yml @@ -0,0 +1,42 @@ +name: ingestion smoke +on: + release: + types: [published, edited] + push: + branches: + - master + paths: + - "docker/datahub-ingestion/**" + workflow_dispatch: + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +jobs: + + build-smoke: + name: Build and Push Docker Image to Docker Hub + runs-on: ubuntu-latest + steps: + - name: Check out the repo + uses: actions/checkout@v2 + with: + fetch-depth: 0 + - name: Set up QEMU + uses: docker/setup-qemu-action@v1 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v1 + - name: Login to DockerHub + uses: docker/login-action@v1 + with: + username: ${{ secrets.ACRYL_DOCKER_USERNAME }} + password: ${{ secrets.ACRYL_DOCKER_PASSWORD }} + - name: Build and Push image + uses: docker/build-push-action@v2 + with: + context: ./docker/datahub-ingestion + file: ./docker/datahub-ingestion/smoke.Dockerfile + platforms: linux/amd64,linux/arm64 + tags: acryldata/datahub-ingestion-base:smoke + push: true diff --git a/docker/datahub-ingestion/Dockerfile b/docker/datahub-ingestion/Dockerfile index 899919b2978c41..aaea8b04f38362 100644 --- a/docker/datahub-ingestion/Dockerfile +++ b/docker/datahub-ingestion/Dockerfile @@ -25,7 +25,8 @@ ARG RELEASE_VERSION RUN cd /datahub-ingestion && \ sed -i.bak "s/__version__ = \"0.0.0.dev0\"/__version__ = \"$RELEASE_VERSION\"/" src/datahub/__init__.py && \ cat src/datahub/__init__.py && \ - pip install ".[all]" + pip install ".[all]" && \ + pip freeze FROM base as dev-install # Dummy stage for development. Assumes code is built on your machine and mounted to this image. diff --git a/docker/datahub-ingestion/base-requirements.txt b/docker/datahub-ingestion/base-requirements.txt index ea0fc24498e58c..788baab9de5338 100644 --- a/docker/datahub-ingestion/base-requirements.txt +++ b/docker/datahub-ingestion/base-requirements.txt @@ -1,3 +1,4 @@ +acryl-datahub absl-py==1.1.0 acryl-iceberg-legacy==0.0.4 acryl-PyHive==0.6.13 diff --git a/docker/datahub-ingestion/base.Dockerfile b/docker/datahub-ingestion/base.Dockerfile index e104b9b34f8adb..dfffa64ea3a090 100644 --- a/docker/datahub-ingestion/base.Dockerfile +++ b/docker/datahub-ingestion/base.Dockerfile @@ -12,4 +12,5 @@ RUN apt-get update && apt-get install -y \ COPY ./base-requirements.txt requirements.txt -RUN pip install -r requirements.txt \ No newline at end of file +RUN pip install -r requirements.txt && \ + pip uninstall -y acryl-datahub \ No newline at end of file diff --git a/docker/datahub-ingestion/smoke.Dockerfile b/docker/datahub-ingestion/smoke.Dockerfile new file mode 100644 index 00000000000000..3bfdc9ccd0d770 --- /dev/null +++ b/docker/datahub-ingestion/smoke.Dockerfile @@ -0,0 +1,18 @@ +FROM acryldata/datahub-ingestion-base as base + +RUN apt-get update && apt-get install -y \ + sudo \ + python3-dev \ + libgtk2.0-0 \ + libgtk-3-0 \ + libgbm-dev \ + libnotify-dev \ + libgconf-2-4 \ + libnss3 \ + libxss1 \ + libasound2 \ + libxtst6 \ + xauth \ + xvfb + +RUN DEBIAN_FRONTEND=noninteractive apt-get install -y openjdk-11-jdk \ No newline at end of file From ff0aa3f24b4496c6e4c26f95c332882ee296a55a Mon Sep 17 00:00:00 2001 From: Aezo <45879156+aezomz@users.noreply.github.com> Date: Wed, 13 Jul 2022 01:10:07 +0800 Subject: [PATCH 06/11] feat(sdk): python - add get_aspects_for_entity (#5255) Co-authored-by: Shirshanka Das --- .../library/dataset_query_entity_v2.py | 33 ++++++++++++ .../src/datahub/ingestion/graph/client.py | 53 +++++++++++++++++++ 2 files changed, 86 insertions(+) create mode 100644 metadata-ingestion/examples/library/dataset_query_entity_v2.py diff --git a/metadata-ingestion/examples/library/dataset_query_entity_v2.py b/metadata-ingestion/examples/library/dataset_query_entity_v2.py new file mode 100644 index 00000000000000..a8b8439079f22c --- /dev/null +++ b/metadata-ingestion/examples/library/dataset_query_entity_v2.py @@ -0,0 +1,33 @@ +import logging + +from datahub.emitter.mce_builder import make_dataset_urn + +# read-modify-write requires access to the DataHubGraph (RestEmitter is not enough) +from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph + +# Imports for metadata model classes +from datahub.metadata.schema_classes import ( + DataPlatformInstanceClass, + DatasetKeyClass, + StatusClass, +) + +log = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + +dataset_urn = make_dataset_urn(platform="hive", name="realestate_db.sales", env="PROD") + +gms_endpoint = "http://localhost:8080" +graph = DataHubGraph(DatahubClientConfig(server=gms_endpoint)) + +# Query multiple aspects from entity +result = graph.get_aspects_for_entity( + entity_urn=dataset_urn, + aspects=["status", "dataPlatformInstance", "datasetKey"], + aspect_types=[StatusClass, DataPlatformInstanceClass, DatasetKeyClass], +) + +# result are typed according to their class if exist +if result is not None: + if result["datasetKey"]: + log.info(result["datasetKey"].name) diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index 2e876bfa8d15bc..9b069f148f7f94 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -288,6 +288,59 @@ def get_latest_timeseries_value( ) return None + def get_aspects_for_entity( + self, + entity_urn: str, + aspects: List[str], + aspect_types: List[Type[Aspect]], + ) -> Optional[Dict[str, Optional[Aspect]]]: + """ + Get multiple aspects for an entity. To get a single aspect for an entity, use the `get_aspect_v2` method. + Warning: Do not use this method to determine if an entity exists! + This method will always return an entity, even if it doesn't exist. This is an issue with how DataHub server + responds to these calls, and will be fixed automatically when the server-side issue is fixed. + + :param str entity_urn: The urn of the entity + :param List[Type[Aspect]] aspect_type_list: List of aspect type classes being requested (e.g. [datahub.metadata.schema_classes.DatasetProperties]) + :param List[str] aspects_list: List of aspect names being requested (e.g. [schemaMetadata, datasetProperties]) + :return: Optionally, a map of aspect_name to aspect_value as a dictionary if present, aspect_value will be set to None if that aspect was not found. Returns None on HTTP status 404. + :rtype: Optional[Dict[str, Optional[Aspect]]] + :raises HttpError: if the HTTP response is not a 200 or a 404 + """ + assert len(aspects) == len( + aspect_types + ), f"number of aspects requested ({len(aspects)}) should be the same as number of aspect types provided ({len(aspect_types)})" + aspects_list = ",".join(aspects) + url: str = f"{self._gms_server}/entitiesV2/{Urn.url_encode(entity_urn)}?aspects=List({aspects_list})" + + response = self._session.get(url) + if response.status_code == 404: + # not found + return None + response.raise_for_status() + response_json = response.json() + + result: Dict[str, Optional[Aspect]] = {} + for aspect_type in aspect_types: + record_schema: RecordSchema = aspect_type.__getattribute__( + aspect_type, "RECORD_SCHEMA" + ) + if not record_schema: + logger.warning( + f"Failed to infer type name of the aspect from the aspect type class {aspect_type}. Continuing, but this will fail." + ) + else: + aspect_type_name = record_schema.props["Aspect"]["name"] + aspect_json = response_json.get("aspects", {}).get(aspect_type_name) + if aspect_json: + # need to apply a transform to the response to match rest.li and avro serialization + post_json_obj = post_json_transform(aspect_json) + result[aspect_type_name] = aspect_type.from_obj(post_json_obj["value"]) + else: + result[aspect_type_name] = None + + return result + def _get_search_endpoint(self): return f"{self.config.server}/entities?action=search" From f0281f32abe8ce3163fe6ab6427a03af192c0fd5 Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Tue, 12 Jul 2022 19:20:27 +0200 Subject: [PATCH 07/11] fix(airflow): fix for failing serialisation when Param was specified + support for external task sensor (#5368) fixes #4546 --- .../client/airflow_generator.py | 58 +++++++++++-------- 1 file changed, 33 insertions(+), 25 deletions(-) diff --git a/metadata-ingestion/src/datahub_provider/client/airflow_generator.py b/metadata-ingestion/src/datahub_provider/client/airflow_generator.py index b5c389d298969e..b7864ddb71ea60 100644 --- a/metadata-ingestion/src/datahub_provider/client/airflow_generator.py +++ b/metadata-ingestion/src/datahub_provider/client/airflow_generator.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING, Dict, List, Optional, Union +from typing import TYPE_CHECKING, Dict, List, Optional, Union, cast from airflow.configuration import conf @@ -87,6 +87,27 @@ def _get_dependencies( if subdag_task_id in upstream_task._downstream_task_ids: upstream_subdag_triggers.append(upstream_task_urn) + # If the operator is an ExternalTaskSensor then we set the remote task as upstream. + # It is possible to tie an external sensor to DAG if external_task_id is omitted but currently we can't tie + # jobflow to anothet jobflow. + external_task_upstreams = [] + if task.task_type == "ExternalTaskSensor": + from airflow.sensors.external_task_sensor import ExternalTaskSensor + + task = cast(ExternalTaskSensor, task) + if hasattr(task, "external_task_id") and task.external_task_id is not None: + external_task_upstreams = [ + DataJobUrn.create_from_ids( + job_id=task.external_task_id, + data_flow_urn=str( + DataFlowUrn.create_from_ids( + orchestrator=flow_urn.get_orchestrator_name(), + flow_id=task.external_dag_id, + env=flow_urn.get_env(), + ) + ), + ) + ] # exclude subdag operator tasks since these are not emitted, resulting in empty metadata upstream_tasks = ( [ @@ -96,6 +117,7 @@ def _get_dependencies( ] + upstream_subdag_task_urns + upstream_subdag_triggers + + external_task_upstreams ) return upstream_tasks @@ -114,8 +136,6 @@ def generate_dataflow( :param capture_owner: :return: DataFlow - Data generated dataflow """ - from airflow.serialization.serialized_objects import SerializedDAG - id = dag.dag_id orchestrator = "airflow" description = f"{dag.description}\n\n{dag.doc_md or ''}" @@ -123,13 +143,7 @@ def generate_dataflow( cluster=cluster, id=id, orchestrator=orchestrator, description=description ) - flow_property_bag: Dict[str, str] = { - key: repr(value) - for (key, value) in SerializedDAG.serialize_dag(dag).items() - } - for key in dag.get_serialized_fields(): - if key not in flow_property_bag: - flow_property_bag[key] = repr(getattr(dag, key)) + flow_property_bag: Dict[str, str] = {} allowed_flow_keys = [ "_access_control", @@ -142,9 +156,10 @@ def generate_dataflow( "tags", "timezone", ] - flow_property_bag = { - k: v for (k, v) in flow_property_bag.items() if k in allowed_flow_keys - } + + for key in allowed_flow_keys: + if hasattr(dag, key): + flow_property_bag[key] = repr(getattr(dag, key)) data_flow.properties = flow_property_bag base_url = conf.get("webserver", "base_url") @@ -191,21 +206,13 @@ def generate_datajob( :param capture_tags: bool - whether to set tags automatically from airflow task :return: DataJob - returns the generated DataJob object """ - from airflow.serialization.serialized_objects import SerializedBaseOperator - dataflow_urn = DataFlowUrn.create_from_ids( orchestrator="airflow", env=cluster, flow_id=dag.dag_id ) datajob = DataJob(id=task.task_id, flow_urn=dataflow_urn) datajob.description = AirflowGenerator._get_description(task) - job_property_bag: Dict[str, str] = { - key: repr(value) - for (key, value) in SerializedBaseOperator.serialize_operator(task).items() - } - for key in task.get_serialized_fields(): - if key not in job_property_bag: - job_property_bag[key] = repr(getattr(task, key)) + job_property_bag: Dict[str, str] = {} allowed_task_keys = [ "_downstream_task_ids", @@ -223,9 +230,10 @@ def generate_datajob( "trigger_rule", "wait_for_downstream", ] - job_property_bag = { - k: v for (k, v) in job_property_bag.items() if k in allowed_task_keys - } + + for key in allowed_task_keys: + if hasattr(task, key): + job_property_bag[key] = repr(getattr(task, key)) datajob.properties = job_property_bag base_url = conf.get("webserver", "base_url") From e2a0fddcaa8c30babd6be07a1941793a775f5e19 Mon Sep 17 00:00:00 2001 From: Aditya Radhakrishnan Date: Tue, 12 Jul 2022 10:20:35 -0700 Subject: [PATCH 08/11] fix(users): fix to not get invite token unless the invite token modal is visible (#5380) --- .../src/app/identity/user/ViewInviteTokenModal.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datahub-web-react/src/app/identity/user/ViewInviteTokenModal.tsx b/datahub-web-react/src/app/identity/user/ViewInviteTokenModal.tsx index a353af4677f026..d8d2d9ccc38ecb 100644 --- a/datahub-web-react/src/app/identity/user/ViewInviteTokenModal.tsx +++ b/datahub-web-react/src/app/identity/user/ViewInviteTokenModal.tsx @@ -42,7 +42,7 @@ type Props = { export default function ViewInviteTokenModal({ visible, onClose }: Props) { const baseUrl = window.location.origin; - const { data: getNativeUserInviteTokenData } = useGetNativeUserInviteTokenQuery({}); + const { data: getNativeUserInviteTokenData } = useGetNativeUserInviteTokenQuery({ skip: !visible }); const [createNativeUserInviteToken, { data: createNativeUserInviteTokenData }] = useCreateNativeUserInviteTokenMutation({}); From 5011c2f6db565c95eee9f1bb0c90528f104aebc3 Mon Sep 17 00:00:00 2001 From: Pedro Silva Date: Wed, 13 Jul 2022 00:02:28 +0100 Subject: [PATCH 09/11] fix(gms) Propagate cache exception upstream (#5381) --- .../datahub/authentication/token/StatefulTokenService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metadata-service/auth-impl/src/main/java/com/datahub/authentication/token/StatefulTokenService.java b/metadata-service/auth-impl/src/main/java/com/datahub/authentication/token/StatefulTokenService.java index b35ad32ee88abf..19efd4f6139f5d 100644 --- a/metadata-service/auth-impl/src/main/java/com/datahub/authentication/token/StatefulTokenService.java +++ b/metadata-service/auth-impl/src/main/java/com/datahub/authentication/token/StatefulTokenService.java @@ -161,7 +161,7 @@ public TokenClaims validateAccessToken(@Nonnull String accessToken) throws Token this.revokeAccessToken(hash(accessToken)); throw e; } catch (final ExecutionException e) { - throw new TokenException("Failed to validate DataHub token: Unable to load token information from store"); + throw new TokenException("Failed to validate DataHub token: Unable to load token information from store", e); } } @@ -174,7 +174,7 @@ public void revokeAccessToken(@Nonnull String hashedToken) throws TokenException return; } } catch (ExecutionException e) { - throw new TokenException("Failed to validate DataHub token from cache"); + throw new TokenException("Failed to validate DataHub token from cache", e); } throw new TokenException("Access token no longer exists"); } From 3a2fec341aa2dac2d59eb51290a4f3e5756ca092 Mon Sep 17 00:00:00 2001 From: Aditya Radhakrishnan Date: Tue, 12 Jul 2022 17:23:41 -0700 Subject: [PATCH 10/11] fix(bootstrap): skip ingesting data platforms that already exist (#5382) --- .../boot/steps/IngestDataPlatformsStep.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformsStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformsStep.java index 56910cf24baea0..7d460419adf7b2 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformsStep.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformsStep.java @@ -1,5 +1,6 @@ package com.linkedin.metadata.boot.steps; +import com.datahub.util.RecordUtils; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.linkedin.common.AuditStamp; @@ -7,12 +8,9 @@ import com.linkedin.dataplatform.DataPlatformInfo; import com.linkedin.metadata.Constants; import com.linkedin.metadata.boot.BootstrapStep; -import com.datahub.util.RecordUtils; import com.linkedin.metadata.entity.EntityService; - import java.io.IOException; import java.net.URISyntaxException; - import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.core.io.ClassPathResource; @@ -46,13 +44,24 @@ public void execute() throws IOException, URISyntaxException { // 2. For each JSON object, cast into a DataPlatformSnapshot object. for (final JsonNode dataPlatform : dataPlatforms) { + final String urnString; final Urn urn; try { - urn = Urn.createFromString(dataPlatform.get("urn").asText()); + urnString = dataPlatform.get("urn").asText(); + urn = Urn.createFromString(urnString); } catch (URISyntaxException e) { log.error("Malformed urn: {}", dataPlatform.get("urn").asText()); throw new RuntimeException("Malformed urn", e); } + + final DataPlatformInfo existingInfo = + (DataPlatformInfo) _entityService.getLatestAspect(urn, PLATFORM_ASPECT_NAME); + // Skip ingesting for this JSON object if info already exists. + if (existingInfo != null) { + log.debug(String.format("%s already exists for %s. Skipping...", PLATFORM_ASPECT_NAME, urnString)); + continue; + } + final DataPlatformInfo info = RecordUtils.toRecordTemplate(DataPlatformInfo.class, dataPlatform.get("aspect").toString()); From 4c6d42780005bf9da313d57c05b3a8e140f3799f Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Wed, 13 Jul 2022 11:24:18 +0200 Subject: [PATCH 11/11] fix(cli): respect server telemetry settings correctly (#5384) Co-authored-by: Shirshanka Das --- metadata-ingestion/src/datahub/telemetry/telemetry.py | 10 +++++----- .../src/datahub/utilities/server_config_util.py | 7 ++++--- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/metadata-ingestion/src/datahub/telemetry/telemetry.py b/metadata-ingestion/src/datahub/telemetry/telemetry.py index 0a346d09373850..c7c0e8e51526f5 100644 --- a/metadata-ingestion/src/datahub/telemetry/telemetry.py +++ b/metadata-ingestion/src/datahub/telemetry/telemetry.py @@ -262,11 +262,11 @@ def _server_props(self, server: Optional[DataHubGraph]) -> Dict[str, str]: T = TypeVar("T") -def set_telemetry_enable(enable: bool) -> Any: - telemetry_instance.enabled = enable - if not enable: - logger.info("Disabling Telemetry locally due to server config") - telemetry_instance.update_config() +def suppress_telemetry() -> Any: + """disables telemetry for this invocation, doesn't affect persistent client settings""" + if telemetry_instance.enabled: + logger.debug("Disabling telemetry locally due to server config") + telemetry_instance.enabled = False def get_full_class_name(obj): diff --git a/metadata-ingestion/src/datahub/utilities/server_config_util.py b/metadata-ingestion/src/datahub/utilities/server_config_util.py index 40841321ad2778..1b8c05b6091347 100644 --- a/metadata-ingestion/src/datahub/utilities/server_config_util.py +++ b/metadata-ingestion/src/datahub/utilities/server_config_util.py @@ -1,6 +1,6 @@ from typing import Any, Dict, Optional -from datahub.telemetry.telemetry import set_telemetry_enable +from datahub.telemetry.telemetry import suppress_telemetry # Only to be written to for logging server related information global_debug: Dict[str, Any] = {} @@ -10,8 +10,9 @@ def set_gms_config(config: Dict) -> Any: global_debug["gms_config"] = config cli_telemtry_enabled = is_cli_telemetry_enabled() - if cli_telemtry_enabled is not None: - set_telemetry_enable(cli_telemtry_enabled) + if cli_telemtry_enabled is not None and not cli_telemtry_enabled: + # server requires telemetry to be disabled on client + suppress_telemetry() def get_gms_config() -> Dict: