From 21d3349564c04a2021be2cd1182d63b4228c7113 Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Mon, 14 Mar 2022 21:20:29 +0530 Subject: [PATCH] feat(ingestion): improve logging, docs for bigquery, snowflake, redshift (#4344) --- metadata-ingestion/setup.py | 4 +- metadata-ingestion/source_docs/bigquery.md | 35 ++- metadata-ingestion/source_docs/redshift.md | 37 ++- metadata-ingestion/source_docs/snowflake.md | 91 +++--- .../src/datahub/emitter/rest_emitter.py | 8 +- metadata-ingestion/src/datahub/entrypoints.py | 9 + .../src/datahub/ingestion/api/source.py | 9 +- .../src/datahub/ingestion/run/pipeline.py | 14 +- .../datahub/ingestion/sink/datahub_rest.py | 10 +- .../src/datahub/ingestion/source/kafka.py | 4 +- .../datahub/ingestion/source/sql/bigquery.py | 15 +- .../datahub/ingestion/source/sql/redshift.py | 100 +++--- .../datahub/ingestion/source/sql/snowflake.py | 264 +++++++--------- .../ingestion/source/sql/sql_common.py | 8 +- .../source/state/stateful_ingestion_base.py | 17 +- .../ingestion/source/usage/bigquery_usage.py | 7 +- .../ingestion/source/usage/redshift_usage.py | 14 +- .../ingestion/source/usage/snowflake_usage.py | 297 +++++++++--------- .../ingestion/source_config/__init__.py | 0 .../ingestion/source_config/sql/__init__.py | 0 .../ingestion/source_config/sql/snowflake.py | 210 +++++++++++++ .../ingestion/source_config/usage/__init__.py | 0 .../source_config/usage/snowflake_usage.py | 58 ++++ .../ingestion/source_report/__init__.py | 0 .../ingestion/source_report/sql/__init__.py | 0 .../ingestion/source_report/sql/snowflake.py | 29 ++ .../ingestion/source_report/time_window.py | 9 + .../ingestion/source_report/usage/__init__.py | 0 .../source_report/usage/snowflake_usage.py | 23 ++ .../src/datahub/telemetry/telemetry.py | 34 +- .../src/datahub/utilities/config_clean.py | 12 +- .../datahub/utilities/server_config_util.py | 22 ++ .../tests/unit/test_config_clean.py | 14 + .../tests/unit/test_snowflake_source.py | 6 - .../config/IngestionConfiguration.java | 4 +- .../factory/config/ConfigurationProvider.java | 5 + .../config/TelemetryConfiguration.java | 17 + .../src/main/resources/application.yml | 4 + metadata-service/servlet/build.gradle | 1 + .../java/com/datahub/gms/servlet/Config.java | 19 ++ 40 files changed, 953 insertions(+), 457 deletions(-) create mode 100644 metadata-ingestion/src/datahub/ingestion/source_config/__init__.py create mode 100644 metadata-ingestion/src/datahub/ingestion/source_config/sql/__init__.py create mode 100644 metadata-ingestion/src/datahub/ingestion/source_config/sql/snowflake.py create mode 100644 metadata-ingestion/src/datahub/ingestion/source_config/usage/__init__.py create mode 100644 metadata-ingestion/src/datahub/ingestion/source_config/usage/snowflake_usage.py create mode 100644 metadata-ingestion/src/datahub/ingestion/source_report/__init__.py create mode 100644 metadata-ingestion/src/datahub/ingestion/source_report/sql/__init__.py create mode 100644 metadata-ingestion/src/datahub/ingestion/source_report/sql/snowflake.py create mode 100644 metadata-ingestion/src/datahub/ingestion/source_report/time_window.py create mode 100644 metadata-ingestion/src/datahub/ingestion/source_report/usage/__init__.py create mode 100644 metadata-ingestion/src/datahub/ingestion/source_report/usage/snowflake_usage.py create mode 100644 metadata-ingestion/src/datahub/utilities/server_config_util.py create mode 100644 metadata-service/factories/src/main/java/com/linkedin/gms/factory/config/TelemetryConfiguration.java diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index be0d0aa7f7a0b3..df726c74f72079 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -211,6 +211,8 @@ def get_long_description(): "types-click==0.1.12", "boto3-stubs[s3,glue,sagemaker]", "types-tabulate", + # avrogen package requires this + "types-pytz", } base_dev_requirements = { @@ -223,7 +225,7 @@ def get_long_description(): "flake8>=3.8.3", "flake8-tidy-imports>=4.3.0", "isort>=5.7.0", - "mypy>=0.920", + "mypy>=0.920,<0.940", # pydantic 1.8.2 is incompatible with mypy 0.910. # See https://github.com/samuelcolvin/pydantic/pull/3175#issuecomment-995382910. "pydantic>=1.9.0", diff --git a/metadata-ingestion/source_docs/bigquery.md b/metadata-ingestion/source_docs/bigquery.md index b4a93a321f57b4..75a92b862fc77b 100644 --- a/metadata-ingestion/source_docs/bigquery.md +++ b/metadata-ingestion/source_docs/bigquery.md @@ -1,14 +1,17 @@ # BigQuery +To get all metadata from BigQuery you need to use two plugins `bigquery` and `bigquery-usage`. Both of them are described in this page. These will require 2 separate recipes. We understand this is not ideal and we plan to make this easier in the future. + For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md). -## Setup +## `bigquery` +### Setup To install this plugin, run `pip install 'acryl-datahub[bigquery]'`. -## Prerequisites -### Create a datahub profile in GCP: -1. Create a custom role for datahub (https://cloud.google.com/iam/docs/creating-custom-roles#creating_a_custom_role) +### Prerequisites +#### Create a datahub profile in GCP +1. Create a custom role for datahub as per [BigQuery docs](https://cloud.google.com/iam/docs/creating-custom-roles#creating_a_custom_role) 2. Grant the following permissions to this role: ``` bigquery.datasets.get @@ -27,9 +30,9 @@ To install this plugin, run `pip install 'acryl-datahub[bigquery]'`. logging.logEntries.list # Needs for lineage generation resourcemanager.projects.get ``` -### Create a service account: +#### Create a service account -1. Setup a ServiceAccount (https://cloud.google.com/iam/docs/creating-managing-service-accounts#iam-service-accounts-create-console) +1. Setup a ServiceAccount as per [BigQuery docs](https://cloud.google.com/iam/docs/creating-managing-service-accounts#iam-service-accounts-create-console) and assign the previously created role to this service account. 2. Download a service account JSON keyfile. Example credential file: @@ -64,7 +67,7 @@ and assign the previously created role to this service account. client_id: "123456678890" ``` -## Capabilities +### Capabilities This plugin extracts the following: @@ -81,11 +84,11 @@ This plugin extracts the following: :::tip -You can also get fine-grained usage statistics for BigQuery using the `bigquery-usage` source described below. +You can also get fine-grained usage statistics for BigQuery using the `bigquery-usage` source described [below](#bigquery-usage-plugin). ::: -## Quickstart recipe +### Quickstart recipe Check out the following recipe to get started with ingestion! See [below](#config-details) for full configuration options. @@ -102,7 +105,7 @@ sink: # sink configs ``` -## Config details +### Config details Note that a `.` is used to denote nested fields in the YAML recipe. @@ -155,7 +158,7 @@ Note: the bigquery_audit_metadata_datasets parameter receives a list of datasets Note: Since bigquery source also supports dataset level lineage, the auth client will require additional permissions to be able to access the google audit logs. Refer the permissions section in bigquery-usage section below which also accesses the audit logs. -## Profiling +### Profiling Profiling can profile normal/partitioned and sharded tables as well but due to performance reasons, we only profile the latest partition for Partitioned tables and the latest shard for sharded tables. If limit/offset parameter is set or partitioning partitioned or sharded table Great Expectation (the profiling framework we use) needs to create temporary @@ -175,11 +178,11 @@ Due to performance reasons, we only profile the latest partition for Partitioned You can set partition explicitly with `partition.partition_datetime` property if you want. (partition will be applied to all partitioned tables) ::: -# BigQuery Usage Stats +## `bigquery-usage` For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md). -## Setup +### Setup To install this plugin, run `pip install 'acryl-datahub[bigquery-usage]'`. @@ -194,7 +197,7 @@ The Google Identity must have one of the following OAuth scopes granted to it: And should be authorized on all projects you'd like to ingest usage stats from. -## Capabilities +### Capabilities This plugin extracts the following: @@ -208,7 +211,7 @@ This plugin extracts the following: ::: -## Quickstart recipe +### Quickstart recipe Check out the following recipe to get started with ingestion! See [below](#config-details) for full configuration options. @@ -230,7 +233,7 @@ sink: # sink configs ``` -## Config details +### Config details Note that a `.` is used to denote nested fields in the YAML recipe. diff --git a/metadata-ingestion/source_docs/redshift.md b/metadata-ingestion/source_docs/redshift.md index bce07dc5026ea6..6cd9cd1fc51453 100644 --- a/metadata-ingestion/source_docs/redshift.md +++ b/metadata-ingestion/source_docs/redshift.md @@ -1,8 +1,12 @@ # Redshift +To get all metadata from BigQuery you need to use two plugins `redshift` and `redshift-usage`. Both of them are described in this page. These will require 2 separate recipes. We understand this is not ideal and we plan to make this easier in the future. + For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md). -## Setup +## `redshift` + +### Setup To install this plugin, run `pip install 'acryl-datahub[redshift]'`. @@ -19,7 +23,7 @@ Giving a user unrestricted access to system tables gives the user visibility to ::: -## Capabilities +### Capabilities This plugin extracts the following: @@ -41,7 +45,7 @@ You can also get fine-grained usage statistics for Redshift using the `redshift- | Data Containers | ✔️ | | | Data Domains | ✔️ | [link](../../docs/domains.md) | -## Quickstart recipe +### Quickstart recipe Check out the following recipe to get started with ingestion! See [below](#config-details) for full configuration options. @@ -93,7 +97,7 @@ sink: -## Config details +### Config details Like all SQL-based sources, the Redshift integration supports: - Stale Metadata Deletion: See [here](./stateful_ingestion.md) for more details on configuration. @@ -130,11 +134,11 @@ Note that a `.` is used to denote nested fields in the YAML recipe. | `domain.domain_key.deny` | | | List of regex patterns for tables/schemas to not assign domain_key. There can be multiple domain key specified. | | `domain.domain_key.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching.There can be multiple domain key specified. | -## Lineage +### Lineage There are multiple lineage collector implementations as Redshift does not support table lineage out of the box. -### stl_scan_based +#### stl_scan_based The stl_scan based collector uses Redshift's [stl_insert](https://docs.aws.amazon.com/redshift/latest/dg/r_STL_INSERT.html) and [stl_scan](https://docs.aws.amazon.com/redshift/latest/dg/r_STL_SCAN.html) system tables to discover lineage between tables. Pros: @@ -145,7 +149,7 @@ Cons: - Does not work with Spectrum/external tables because those scans do not show up in stl_scan table. - If a table is depending on a view then the view won't be listed as dependency. Instead the table will be connected with the view's dependencies. -### sql_based +#### sql_based The sql_based based collector uses Redshift's [stl_insert](https://docs.aws.amazon.com/redshift/latest/dg/r_STL_INSERT.html) to discover all the insert queries and uses sql parsing to discover the dependecies. @@ -157,7 +161,7 @@ Cons: - Slow. - Less reliable as the query parser can fail on certain queries -### mixed +#### mixed Using both collector above and first applying the sql based and then the stl_scan based one. Pros: @@ -169,10 +173,13 @@ Cons: - Slow - May be incorrect at times as the query parser can fail on certain queries -# Note -- The redshift stl redshift tables which are used for getting data lineage only retain approximately two to five days of log history. This means you cannot extract lineage from queries issued outside that window. +:::note + +The redshift stl redshift tables which are used for getting data lineage only retain approximately two to five days of log history. This means you cannot extract lineage from queries issued outside that window. + +::: -# Redshift Usage Stats +## `redshift-usage` This plugin extracts usage statistics for datasets in Amazon Redshift. For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md). @@ -187,10 +194,10 @@ To grant access this plugin for all system tables, please alter your datahub Red ALTER USER datahub_user WITH SYSLOG ACCESS UNRESTRICTED; ``` -## Setup +### Setup To install this plugin, run `pip install 'acryl-datahub[redshift-usage]'`. -## Capabilities +### Capabilities | Capability | Status | Details | | -----------| ------ | ---- | @@ -210,7 +217,7 @@ This source only does usage statistics. To get the tables, views, and schemas in ::: -## Quickstart recipe +### Quickstart recipe Check out the following recipe to get started with ingestion! See [below](#config-details) for full configuration options. @@ -233,7 +240,7 @@ sink: # sink configs ``` -## Config details +### Config details Note that a `.` is used to denote nested fields in the YAML recipe. By default, we extract usage stats for the last day, with the recommendation that this source is executed every day. diff --git a/metadata-ingestion/source_docs/snowflake.md b/metadata-ingestion/source_docs/snowflake.md index 45c2eb2106131d..6505d01f1704f1 100644 --- a/metadata-ingestion/source_docs/snowflake.md +++ b/metadata-ingestion/source_docs/snowflake.md @@ -1,16 +1,22 @@ # Snowflake +To get all metadata from Snowflake you need to use two plugins `snowflake` and `snowflake-usage`. Both of them are described in this page. These will require 2 separate recipes. We understand this is not ideal and we plan to make this easier in the future. + For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md). -## Setup +## `snowflake` +### Setup To install this plugin, run `pip install 'acryl-datahub[snowflake]'`. ### Prerequisites In order to execute this source, your Snowflake user will need to have specific privileges granted to it for reading metadata -from your warehouse. You can create a DataHub-specific role, assign it the required privileges, and assign it to a new DataHub user -by executing the following Snowflake commands from a user with the `ACCOUNTADMIN` role: +from your warehouse. + +You can use the `provision_role` block in the recipe to grant the requires roles. + +If your system admins prefer running the commands themselves then they can follow this guide to create a DataHub-specific role, assign it the required privileges, and assign it to a new DataHub user by executing the following Snowflake commands from a user with the `ACCOUNTADMIN` role: ```sql create or replace role datahub_role; @@ -36,25 +42,21 @@ grant role datahub_role to user datahub_user; This represents the bare minimum privileges required to extract databases, schemas, views, tables from Snowflake. -If you plan to enable extraction of table lineage, via the `include_table_lineage` config flag, you'll also need to grant privileges -to access the Snowflake Account Usage views. You can execute the following using the `ACCOUNTADMIN` role to do so: +If you plan to enable extraction of table lineage, via the `include_table_lineage` config flag, you'll need to grant additional privileges. See [snowflake usage prerequisites](#prerequisites-1) as the same privilege is required for this purpose too. -```sql -grant imported privileges on database snowflake to role datahub_role; -``` -## Capabilities +### Capabilities This plugin extracts the following: - Metadata for databases, schemas, views and tables - Column types associated with each table - Table, row, and column statistics via optional [SQL profiling](./sql_profiles.md) -- Table lineage. +- Table lineage :::tip -You can also get fine-grained usage statistics for Snowflake using the `snowflake-usage` source described below. +You can also get fine-grained usage statistics for Snowflake using the `snowflake-usage` source described [below](#snowflake-usage-plugin). ::: @@ -64,7 +66,7 @@ You can also get fine-grained usage statistics for Snowflake using the `snowflak | Data Containers | ✔️ | | | Data Domains | ✔️ | [link](../../docs/domains.md) | -## Quickstart recipe +### Quickstart recipe Check out the following recipe to get started with ingestion! See [below](#config-details) for full configuration options. @@ -74,20 +76,28 @@ For general pointers on writing and running a recipe, see our [main recipe guide source: type: snowflake config: + + provision_role: # Optional + enabled: false + dry_run: true + run_ingestion: false + admin_username: "${SNOWFLAKE_ADMIN_USER}" + admin_password: "${SNOWFLAKE_ADMIN_PASS}" + # Coordinates host_port: account_name warehouse: "COMPUTE_WH" # Credentials - username: user - password: pass - role: "accountadmin" + username: "${SNOWFLAKE_USER}" + password: "${SNOWFLAKE_PASS}" + role: "datahub_role" sink: # sink configs ``` -## Config details +### Config details Like all SQL-based sources, the Snowflake integration supports: - Stale Metadata Deletion: See [here](./stateful_ingestion.md) for more details on configuration. @@ -122,8 +132,8 @@ Note that a `.` is used to denote nested fields in the YAML recipe. | `view_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. | | `include_tables` | | `True` | Whether tables should be ingested. | | `include_views` | | `True` | Whether views should be ingested. | -| `include_table_lineage` | | `True` | If enabled, populates the snowflake table-to-table and s3-to-snowflake table lineage. Requires role to be `accountadmin` | -| `include_view_lineage` | | `True` | If enabled, populates the snowflake view->table and table->view lineages (no view->view lineage yet). Requires role to be `accountadmin`, and `include_table_lineage` to be `True`. | +| `include_table_lineage` | | `True` | If enabled, populates the snowflake table-to-table and s3-to-snowflake table lineage. Requires appropriate grants given to the role. | +| `include_view_lineage` | | `True` | If enabled, populates the snowflake view->table and table->view lineages (no view->view lineage yet). Requires appropriate grants given to the role, and `include_table_lineage` to be `True`. | | `bucket_duration` | | `"DAY"` | Duration to bucket lineage data extraction by. Can be `"DAY"` or `"HOUR"`. | | `start_time` | | Start of last full day in UTC (or hour, depending on `bucket_duration`) | Earliest time of lineage data to consider. For the bootstrap run, set it as far back in time as possible. | | `end_time` | | End of last full day in UTC (or hour, depending on `bucket_duration`) | Latest time of lineage data to consider. | @@ -131,41 +141,46 @@ Note that a `.` is used to denote nested fields in the YAML recipe. | `domain.domain_key.allow` | | | List of regex patterns for tables/schemas to set domain_key domain key (domain_key can be any string like `sales`. There can be multiple domain key specified. | | `domain.domain_key.deny` | | | List of regex patterns for tables/schemas to not assign domain_key. There can be multiple domain key specified. | | `domain.domain_key.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching.There can be multiple domain key specified. | +| `provision_role.enabled` | | `False` | Whether provisioning of Snowflake role (used for ingestion) is enabled or not | +| `provision_role.dry_run` | | `False` | If `provision_role` is enabled, whether to dry run the sql commands for system admins to see what sql grant commands would be run without actually running the grant commands | +| `provision_role.drop_role_if_exists` | | `False` | Useful during testing to ensure you have a clean slate role. Not recommended for production use cases | +| `provision_role.run_ingestion` | | `False` | If system admins wish to skip actual ingestion of metadata during testing of the provisioning of `role` | +| `provision_role.admin_role` | | `accountadmin` | The Snowflake role of admin user used for provisioning of the role specified by `role` config. System admins can audit the open source code and decide to use a different role | +| `provision_role.admin_username` | ✅ | | The username to be used for provisioning of role | +| `provision_role.admin_password` | ✅ | | The password to be used for provisioning of role | -## Compatibility - -Table lineage requires Snowflake's [Access History](https://docs.snowflake.com/en/user-guide/access-history.html) feature. - -# Snowflake Usage Stats +## `snowflake-usage` For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md). -## Setup +### Setup To install this plugin, run `pip install 'acryl-datahub[snowflake-usage]'`. ### Prerequisites -In order to execute the snowflake-usage source, your Snowflake user will need to have specific privileges granted to it. Specifically, -you'll need to grant access to the [Account Usage](https://docs.snowflake.com/en/sql-reference/account-usage.html) system tables, using which the DataHub source extracts information. Assuming -you've followed the steps outlined above to create a DataHub-specific User & Role, you'll simply need to execute the following commands -in Snowflake from a user with the `ACCOUNTADMIN` role: +:::note + +Table lineage requires Snowflake's [Access History](https://docs.snowflake.com/en/user-guide/access-history.html) feature. The "accountadmin" role has this by default. + +The underlying access history views that we use are only available in Snowflake's enterprise edition or higher. + +::: + +In order to execute the snowflake-usage source, your Snowflake user will need to have specific privileges granted to it. Specifically, you'll need to grant access to the [Account Usage](https://docs.snowflake.com/en/sql-reference/account-usage.html) system tables, using which the DataHub source extracts information. Assuming you've followed the steps outlined in `snowflake` plugin to create a DataHub-specific User & Role, you'll simply need to execute the following commands in Snowflake. This will require a user with the `ACCOUNTADMIN` role (or a role granted the IMPORT SHARES global privilege). Please see [Snowflake docs for more details](https://docs.snowflake.com/en/user-guide/data-share-consumers.html). ```sql grant imported privileges on database snowflake to role datahub_role; ``` -## Capabilities +### Capabilities This plugin extracts the following: - Statistics on queries issued and tables and columns accessed (excludes views) - Aggregation of these statistics into buckets, by day or hour granularity -Note: the user/role must have access to the account usage table. The "accountadmin" role has this by default, and other roles can be [granted this permission](https://docs.snowflake.com/en/sql-reference/account-usage.html#enabling-account-usage-for-other-roles). - -Note: the underlying access history views that we use are only available in Snowflake's enterprise edition or higher. :::note @@ -173,7 +188,7 @@ This source only does usage statistics. To get the tables, views, and schemas in ::: -## Quickstart recipe +### Quickstart recipe Check out the following recipe to get started with ingestion! See [below](#config-details) for full configuration options. @@ -188,9 +203,9 @@ source: warehouse: "COMPUTE_WH" # Credentials - username: user - password: pass - role: "sysadmin" + username: "${SNOWFLAKE_USER}" + password: "${SNOWFLAKE_PASS}" + role: "datahub_role" # Options top_n_queries: 10 @@ -199,7 +214,7 @@ sink: # sink configs ``` -## Config details +### Config details Snowflake integration also supports prevention of redundant reruns for the same data. See [here](./stateful_ingestion.md) for more details on configuration. @@ -235,7 +250,7 @@ User's without email address will be ignored from usage if you don't set `email_ -# Compatibility +## Compatibility Coming soon! diff --git a/metadata-ingestion/src/datahub/emitter/rest_emitter.py b/metadata-ingestion/src/datahub/emitter/rest_emitter.py index e9beb06b9a9a2e..5aadaf37652583 100644 --- a/metadata-ingestion/src/datahub/emitter/rest_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/rest_emitter.py @@ -136,16 +136,12 @@ def __init__( self._session.mount("http://", adapter) self._session.mount("https://", adapter) - def test_connection(self) -> str: + def test_connection(self) -> dict: response = self._session.get(f"{self._gms_server}/config") if response.status_code == 200: config: dict = response.json() if config.get("noCode") == "true": - return ( - config.get("versions", {}) - .get("linkedin/datahub", {}) - .get("version", "") - ) + return config else: # Looks like we either connected to an old GMS or to some other service. Let's see if we can determine which before raising an error diff --git a/metadata-ingestion/src/datahub/entrypoints.py b/metadata-ingestion/src/datahub/entrypoints.py index f7b60622d3bdc1..c897eb6bd79bbd 100644 --- a/metadata-ingestion/src/datahub/entrypoints.py +++ b/metadata-ingestion/src/datahub/entrypoints.py @@ -1,5 +1,6 @@ import logging import os +import platform import sys import click @@ -18,6 +19,7 @@ from datahub.cli.timeline_cli import timeline from datahub.configuration import SensitiveError from datahub.telemetry import telemetry +from datahub.utilities.server_config_util import get_gms_config logger = logging.getLogger(__name__) @@ -156,4 +158,11 @@ def main(**kwargs): **kwargs, ) ) + logger.info( + f"DataHub CLI version: {datahub_package.__version__} at {datahub_package.__file__}" + ) + logger.info( + f"Python version: {sys.version} at {sys.executable} on {platform.platform()}" + ) + logger.info(f"GMS config {get_gms_config()}") sys.exit(1) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index dbb1d9981ebfb7..4b5c5068757beb 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -1,7 +1,10 @@ +import platform +import sys from abc import ABCMeta, abstractmethod from dataclasses import dataclass, field from typing import Dict, Generic, Iterable, List, TypeVar +import datahub from datahub.ingestion.api.closeable import Closeable from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit from datahub.ingestion.api.report import Report @@ -14,7 +17,11 @@ class SourceReport(Report): warnings: Dict[str, List[str]] = field(default_factory=dict) failures: Dict[str, List[str]] = field(default_factory=dict) - cli_version: str = "" + cli_version: str = datahub.nice_version_name() + cli_entry_location: str = datahub.__file__ + py_version: str = sys.version + py_exec_path: str = sys.executable + os_details: str = platform.platform() def report_workunit(self, wu: WorkUnit) -> None: self.workunits_produced += 1 diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index 02b81e0e968e8c..b2ebc3d984facc 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -8,7 +8,6 @@ import click from pydantic import validator -import datahub from datahub.configuration.common import ( ConfigModel, DynamicTypedConfig, @@ -117,6 +116,12 @@ def __init__( preview_mode=preview_mode, ) + sink_type = self.config.sink.type + sink_class = sink_registry.get(sink_type) + sink_config = self.config.sink.dict().get("config", {}) + self.sink: Sink = sink_class.create(sink_config, self.ctx) + logger.debug(f"Sink type:{self.config.sink.type},{sink_class} configured") + source_type = self.config.source.type source_class = source_registry.get(source_type) self.source: Source = source_class.create( @@ -124,12 +129,6 @@ def __init__( ) logger.debug(f"Source type:{source_type},{source_class} configured") - sink_type = self.config.sink.type - sink_class = sink_registry.get(sink_type) - sink_config = self.config.sink.dict().get("config", {}) - self.sink: Sink = sink_class.create(sink_config, self.ctx) - logger.debug(f"Sink type:{self.config.sink.type},{sink_class} configured") - self.extractor_class = extractor_registry.get(self.config.source.extractor) self._configure_transforms() @@ -179,7 +178,6 @@ def run(self) -> None: callback = LoggingCallback() extractor: Extractor = self.extractor_class() - self.source.get_report().cli_version = datahub.nice_version_name() for wu in itertools.islice( self.source.get_workunits(), 10 if self.preview_mode else None ): diff --git a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py index 7e028b7d5eabcc..2755751856c683 100644 --- a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py +++ b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py @@ -16,6 +16,7 @@ MetadataChangeProposal, ) from datahub.metadata.com.linkedin.pegasus2avro.usage import UsageAggregation +from datahub.utilities.server_config_util import set_gms_config logger = logging.getLogger(__name__) @@ -50,7 +51,14 @@ def __init__(self, ctx: PipelineContext, config: DatahubRestSinkConfig): extra_headers=self.config.extra_headers, ca_certificate_path=self.config.ca_certificate_path, ) - self.report.gms_version = self.emitter.test_connection() + gms_config = self.emitter.test_connection() + self.report.gms_version = ( + gms_config.get("versions", {}) + .get("linkedin/datahub", {}) + .get("version", "") + ) + logger.info("Setting gms config") + set_gms_config(gms_config) self.executor = concurrent.futures.ThreadPoolExecutor( max_workers=self.config.max_threads ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka.py index d9d6b96dadce04..41158c86595248 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka.py @@ -20,7 +20,6 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import add_domain_to_entity_wu from datahub.ingestion.api.common import PipelineContext -from datahub.ingestion.api.source import SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.extractor import schema_util from datahub.ingestion.source.state.checkpoint import Checkpoint @@ -29,6 +28,7 @@ JobId, StatefulIngestionConfig, StatefulIngestionConfigBase, + StatefulIngestionReport, StatefulIngestionSourceBase, ) from datahub.metadata.com.linkedin.pegasus2avro.common import Status @@ -71,7 +71,7 @@ class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigBase): @dataclass -class KafkaSourceReport(SourceReport): +class KafkaSourceReport(StatefulIngestionReport): topics_scanned: int = 0 filtered: List[str] = field(default_factory=list) soft_deleted_stale_entities: List[str] = field(default_factory=list) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py index 7cf432451b42d0..cfea3ceaeebc61 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py @@ -30,6 +30,7 @@ from datahub.ingestion.source.sql.sql_common import ( SQLAlchemyConfig, SQLAlchemySource, + SQLSourceReport, SqlWorkUnit, make_sqlalchemy_type, register_custom_type, @@ -284,13 +285,18 @@ class BigQueryDatasetKey(ProjectIdKey): dataset_id: str -class BigQuerySource(SQLAlchemySource): - config: BigQueryConfig - maximum_shard_ids: Dict[str, str] = dict() - lineage_metadata: Optional[Dict[str, Set[str]]] = None +@dataclass +class BigQueryReport(SQLSourceReport): + pass + +class BigQuerySource(SQLAlchemySource): def __init__(self, config, ctx): super().__init__(config, ctx, "bigquery") + self.config: BigQueryConfig = config + self.report: BigQueryReport = BigQueryReport() + self.lineage_metadata: Optional[Dict[str, Set[str]]] = None + self.maximum_shard_ids: Dict[str, str] = dict() def get_db_name(self, inspector: Inspector = None) -> str: if self.config.project_id: @@ -772,7 +778,6 @@ def prepare_profiler_args( partition: Optional[str], custom_sql: Optional[str] = None, ) -> dict: - self.config: BigQueryConfig return dict( schema=self.config.project_id, table=f"{schema}.{table}", diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/sql/redshift.py index 1417b121cdd792..c1845ece508ef2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/redshift.py @@ -1,12 +1,14 @@ +import logging from collections import defaultdict from dataclasses import dataclass, field from enum import Enum -from typing import Dict, Iterable, List, Optional, Set, Tuple, Union +from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union from urllib.parse import urlparse # These imports verify that the dependencies are available. import psycopg2 # noqa: F401 import pydantic # noqa: F401 +import sqlalchemy import sqlalchemy_redshift # noqa: F401 from sqlalchemy import create_engine, inspect from sqlalchemy.engine import Connection, reflection @@ -24,8 +26,8 @@ from datahub.ingestion.source.sql.postgres import PostgresConfig from datahub.ingestion.source.sql.sql_common import ( SQLAlchemySource, + SQLSourceReport, SqlWorkUnit, - logger, ) # TRICKY: it's necessary to import the Postgres source because @@ -41,6 +43,8 @@ UpstreamClass, ) +logger: logging.Logger = logging.getLogger(__name__) + class LineageMode(Enum): SQL_BASED = "sql_based" @@ -336,18 +340,11 @@ def _get_schema_column_info(self, connection, schema=None, **kw): def _get_external_db_mapping(connection): # SQL query to get mapping of external schemas in redshift to its external database. - try: - result = connection.execute( - """ - select * from svv_external_schemas - """ - ) - return result - except Exception as e: - logger.error( - "Error querying svv_external_schemas to get external database mapping.", e - ) - return None + return connection.execute( + """ + select * from svv_external_schemas + """ + ) # This monkey-patching enables us to batch fetch the table descriptions, rather than @@ -360,15 +357,23 @@ def _get_external_db_mapping(connection): redshift_datetime_format = "%Y-%m-%d %H:%M:%S" +@dataclass +class RedshiftReport(SQLSourceReport): + # https://forums.aws.amazon.com/ann.jspa?annID=9105 + saas_version: str = "" + upstream_lineage: Dict[str, List[str]] = field(default_factory=dict) + + class RedshiftSource(SQLAlchemySource): - config: RedshiftConfig - catalog_metadata: Dict = {} eskind_to_platform = {1: "glue", 2: "hive", 3: "postgres", 4: "redshift"} def __init__(self, config: RedshiftConfig, ctx: PipelineContext): super().__init__(config, ctx, "redshift") + self.catalog_metadata: Dict = {} + self.config: RedshiftConfig = config self._lineage_map: Optional[Dict[str, LineageItem]] = None self._all_tables_set: Optional[Set[str]] = None + self.report: RedshiftReport = RedshiftReport() @classmethod def create(cls, config_dict, ctx): @@ -376,9 +381,12 @@ def create(cls, config_dict, ctx): return cls(config, ctx) def get_catalog_metadata(self, conn: Connection) -> None: - catalog_metadata = _get_external_db_mapping(conn) - if catalog_metadata is None: + try: + catalog_metadata = _get_external_db_mapping(conn) + except Exception as e: + self.error(logger, "external-svv_external_schemas", f"Error was {e}") return + db_name = self.get_db_name() external_schema_mapping = {} @@ -401,15 +409,30 @@ def get_catalog_metadata(self, conn: Connection) -> None: def get_inspectors(self) -> Iterable[Inspector]: # This method can be overridden in the case that you want to dynamically # run on multiple databases. - url = self.config.get_sql_alchemy_url() - logger.debug(f"sql_alchemy_url={url}") - engine = create_engine(url, **self.config.options) + engine = self.get_metadata_engine() with engine.connect() as conn: self.get_catalog_metadata(conn) inspector = inspect(conn) yield inspector + def get_metadata_engine(self) -> sqlalchemy.engine.Engine: + url = self.config.get_sql_alchemy_url() + logger.debug(f"sql_alchemy_url={url}") + return create_engine(url, **self.config.options) + + def inspect_version(self) -> Any: + db_engine = self.get_metadata_engine() + logger.info("Checking current version") + for db_row in db_engine.execute("select version()"): + self.report.saas_version = db_row[0] + def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]: + try: + self.inspect_version() + except Exception as e: + self.report.report_failure("version", f"Error: {e}") + return + for wu in super().get_workunits(): yield wu if ( @@ -498,9 +521,7 @@ def _get_all_tables(self) -> Set[str]: db_name = self.get_db_name() all_tables_set = set() - url = self.config.get_sql_alchemy_url() - logger.debug(f"sql_alchemy_url={url}") - engine = create_engine(url, **self.config.options) + engine = self.get_metadata_engine() for db_row in engine.execute(all_tables_query): all_tables_set.add( f'{db_name}.{db_row["schemaname"]}.{db_row["tablename"]}' @@ -554,9 +575,7 @@ def _populate_lineage_map( if not self._all_tables_set: self._all_tables_set = self._get_all_tables() - url = self.config.get_sql_alchemy_url() - logger.debug(f"sql_alchemy_url={url}") - engine = create_engine(url, **self.config.options) + engine = self.get_metadata_engine() db_name = self.get_db_name() @@ -593,17 +612,21 @@ def _populate_lineage_map( ) except Exception as e: target.query_parser_failed_sqls.append(db_row["ddl"]) - logger.warning( + self.warn( + logger, + "parsing-query", f'Error parsing query {db_row["ddl"]} for getting lineage .' - f"\nError was {e}." + f"\nError was {e}.", ) else: if lineage_type == lineage_type.COPY: platform = LineageDatasetPlatform.S3 path = db_row["filename"].strip() if urlparse(path).scheme != "s3": - logger.warning( - f"Only s3 source supported with copy. The source was: {path}. ." + self.warn( + logger, + "non-s3-lineage", + f"Only s3 source supported with copy. The source was: {path}.", ) continue else: @@ -624,7 +647,9 @@ def _populate_lineage_map( source.platform == LineageDatasetPlatform.REDSHIFT and source.path not in self._all_tables_set ): - logger.warning(f"{source.path} missing table") + self.warn( + logger, "missing-table", f"{source.path} missing table" + ) continue target.upstreams.add(source) @@ -648,10 +673,7 @@ def _populate_lineage_map( ) except Exception as e: - logger.warning( - f"Extracting {lineage_type.name} lineage from Redshift failed." - f"Continuing...\nError was {e}." - ) + self.warn(logger, f"extract-{lineage_type.name}", f"Error was {e}") def _populate_lineage(self) -> None: @@ -957,7 +979,11 @@ def get_lineage_mcp( if custom_properties: properties = DatasetPropertiesClass(customProperties=custom_properties) - if not upstream_lineage: + if upstream_lineage: + self.report.upstream_lineage[dataset_urn] = [ + u.dataset for u in upstream_lineage + ] + else: return None, properties mcp = MetadataChangeProposalWrapper( diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py b/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py index 3e5299f06fd576..ab454ee20cd3eb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py @@ -1,7 +1,6 @@ import json import logging from collections import defaultdict -from dataclasses import dataclass, field from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union import pydantic @@ -9,35 +8,25 @@ # This import verifies that the dependencies are available. import snowflake.sqlalchemy # noqa: F401 import sqlalchemy.engine -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import serialization -from snowflake.connector.network import ( - DEFAULT_AUTHENTICATOR, - EXTERNAL_BROWSER_AUTHENTICATOR, - KEY_PAIR_AUTHENTICATOR, -) from snowflake.sqlalchemy import custom_types, snowdialect from sqlalchemy import create_engine, inspect from sqlalchemy.engine.reflection import Inspector from sqlalchemy.sql import sqltypes, text import datahub.emitter.mce_builder as builder -from datahub.configuration.common import AllowDenyPattern -from datahub.configuration.time_window_config import BaseTimeWindowConfig from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.aws.s3_util import make_s3_urn from datahub.ingestion.source.sql.sql_common import ( RecordTypeClass, - SQLAlchemyConfig, SQLAlchemySource, - SQLSourceReport, SqlWorkUnit, TimeTypeClass, - make_sqlalchemy_uri, register_custom_type, ) +from datahub.ingestion.source_config.sql.snowflake import SnowflakeConfig +from datahub.ingestion.source_report.sql.snowflake import SnowflakeReport from datahub.metadata.com.linkedin.pegasus2avro.dataset import ( DatasetLineageTypeClass, UpstreamClass, @@ -54,146 +43,9 @@ logger: logging.Logger = logging.getLogger(__name__) -APPLICATION_NAME = "acryl_datahub" - snowdialect.ischema_names["GEOGRAPHY"] = sqltypes.NullType -@dataclass -class SnowflakeReport(SQLSourceReport): - num_table_to_table_edges_scanned: int = 0 - num_table_to_view_edges_scanned: int = 0 - num_view_to_table_edges_scanned: int = 0 - num_external_table_edges_scanned: int = 0 - upstream_lineage: Dict[str, List[str]] = field(default_factory=dict) - # https://community.snowflake.com/s/topic/0TO0Z000000Unu5WAC/releases - saas_version: str = "" - role: str = "" - role_grants: List[str] = field(default_factory=list) - - -class BaseSnowflakeConfig(BaseTimeWindowConfig): - # Note: this config model is also used by the snowflake-usage source. - - scheme = "snowflake" - - username: Optional[str] = None - password: Optional[pydantic.SecretStr] = pydantic.Field(default=None, exclude=True) - private_key_path: Optional[str] - private_key_password: Optional[pydantic.SecretStr] = pydantic.Field( - default=None, exclude=True - ) - authentication_type: Optional[str] = "DEFAULT_AUTHENTICATOR" - host_port: str - warehouse: Optional[str] - role: Optional[str] - include_table_lineage: Optional[bool] = True - include_view_lineage: Optional[bool] = True - - connect_args: Optional[dict] - - @pydantic.validator("authentication_type", always=True) - def authenticator_type_is_valid(cls, v, values, **kwargs): - valid_auth_types = { - "DEFAULT_AUTHENTICATOR": DEFAULT_AUTHENTICATOR, - "EXTERNAL_BROWSER_AUTHENTICATOR": EXTERNAL_BROWSER_AUTHENTICATOR, - "KEY_PAIR_AUTHENTICATOR": KEY_PAIR_AUTHENTICATOR, - } - if v not in valid_auth_types.keys(): - raise ValueError( - f"unsupported authenticator type '{v}' was provided," - f" use one of {list(valid_auth_types.keys())}" - ) - else: - if v == "KEY_PAIR_AUTHENTICATOR": - # If we are using key pair auth, we need the private key path and password to be set - if values.get("private_key_path") is None: - raise ValueError( - f"'private_key_path' was none " - f"but should be set when using {v} authentication" - ) - if values.get("private_key_password") is None: - raise ValueError( - f"'private_key_password' was none " - f"but should be set when using {v} authentication" - ) - logger.info(f"using authenticator type '{v}'") - return valid_auth_types.get(v) - - @pydantic.validator("include_view_lineage") - def validate_include_view_lineage(cls, v, values): - if not values.get("include_table_lineage") and v: - raise ValueError( - "include_table_lineage must be True for include_view_lineage to be set." - ) - return v - - def get_sql_alchemy_url(self, database=None): - return make_sqlalchemy_uri( - self.scheme, - self.username, - self.password.get_secret_value() if self.password else None, - self.host_port, - f'"{database}"' if database is not None else database, - uri_opts={ - # Drop the options if value is None. - key: value - for (key, value) in { - "authenticator": self.authentication_type, - "warehouse": self.warehouse, - "role": self.role, - "application": APPLICATION_NAME, - }.items() - if value - }, - ) - - def get_sql_alchemy_connect_args(self) -> dict: - if self.authentication_type != KEY_PAIR_AUTHENTICATOR: - return {} - if self.connect_args is None: - if self.private_key_path is None: - raise ValueError("missing required private key path to read key from") - if self.private_key_password is None: - raise ValueError("missing required private key password") - with open(self.private_key_path, "rb") as key: - p_key = serialization.load_pem_private_key( - key.read(), - password=self.private_key_password.get_secret_value().encode(), - backend=default_backend(), - ) - - pkb = p_key.private_bytes( - encoding=serialization.Encoding.DER, - format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption(), - ) - self.connect_args = {"private_key": pkb} - return self.connect_args - - -class SnowflakeConfig(BaseSnowflakeConfig, SQLAlchemyConfig): - database_pattern: AllowDenyPattern = AllowDenyPattern( - deny=[r"^UTIL_DB$", r"^SNOWFLAKE$", r"^SNOWFLAKE_SAMPLE_DATA$"] - ) - - database: Optional[str] # deprecated - - @pydantic.validator("database") - def note_database_opt_deprecation(cls, v, values, **kwargs): - logger.warning( - "snowflake's `database` option has been deprecated; use database_pattern instead" - ) - values["database_pattern"].allow = f"^{v}$" - return None - - def get_sql_alchemy_url(self, database: str = None) -> str: - return super().get_sql_alchemy_url(database=database) - - def get_sql_alchemy_connect_args(self) -> dict: - return super().get_sql_alchemy_connect_args() - - class SnowflakeSource(SQLAlchemySource): def __init__(self, config: SnowflakeConfig, ctx: PipelineContext): super().__init__(config, ctx, "snowflake") @@ -201,6 +53,7 @@ def __init__(self, config: SnowflakeConfig, ctx: PipelineContext): self._external_lineage_map: Optional[Dict[str, Set[str]]] = None self.report: SnowflakeReport = SnowflakeReport() self.config: SnowflakeConfig = config + self.provision_role_in_progress: bool = False @classmethod def create(cls, config_dict, ctx): @@ -210,7 +63,20 @@ def create(cls, config_dict, ctx): def get_metadata_engine( self, database: Optional[str] = None ) -> sqlalchemy.engine.Engine: - url = self.config.get_sql_alchemy_url(database=database) + if self.provision_role_in_progress and self.config.provision_role is not None: + username: Optional[str] = self.config.provision_role.admin_username + password: Optional[ + pydantic.SecretStr + ] = self.config.provision_role.admin_password + role: Optional[str] = self.config.provision_role.admin_role + else: + username = self.config.username + password = self.config.password + role = self.config.role + + url = self.config.get_sql_alchemy_url( + database=database, username=username, password=password, role=role + ) logger.debug(f"sql_alchemy_url={url}") return create_engine( url, @@ -643,8 +509,104 @@ def _get_upstream_lineage_info( return UpstreamLineage(upstreams=upstream_tables), column_lineage return None + def add_config_to_report(self): + self.report.cleaned_host_port = self.config.host_port + + if self.config.provision_role is not None: + self.report.run_ingestion = self.config.provision_role.run_ingestion + + def do_provision_role_internal(self): + provision_role_block = self.config.provision_role + if provision_role_block is None: + return + self.report.provision_role_done = not provision_role_block.dry_run + + role = self.config.role + if role is None: + role = "datahub_role" + self.warn( + logger, + "role-grant", + f"role not specified during provision role using {role} as default", + ) + self.report.role = role + + warehouse = self.config.warehouse + + logger.info("Creating connection for provision_role") + engine = self.get_metadata_engine(database=None) + + sqls: List[str] = [] + if provision_role_block.drop_role_if_exists: + sqls.append(f"DROP ROLE IF EXISTS {role}") + + sqls.append(f"CREATE ROLE IF NOT EXISTS {role}") + + if warehouse is None: + self.warn( + logger, "role-grant", "warehouse not specified during provision role" + ) + else: + sqls.append(f"grant operate, usage on warehouse {warehouse} to role {role}") + + for inspector in self.get_inspectors(): + db_name = self.get_db_name(inspector) + sqls.extend( + [ + f"grant usage on DATABASE {db_name} to role {role}", + f"grant usage on all schemas in database {db_name} to role {role}", + f"grant select on all tables in database {db_name} to role {role}", + f"grant select on all external tables in database {db_name} to role {role}", + f"grant select on all views in database {db_name} to role {role}", + f"grant usage on future schemas in database {db_name} to role {role}", + f"grant select on future tables in database {db_name} to role {role}", + ] + ) + if self.config.username is not None: + sqls.append(f"grant role {role} to user {self.config.username}") + + sqls.append(f"grant imported privileges on database snowflake to role {role}") + + dry_run_str = "[DRY RUN] " if provision_role_block.dry_run else "" + for sql in sqls: + logger.info(f"{dry_run_str} Attempting to run sql {sql}") + if provision_role_block.dry_run: + continue + try: + engine.execute(sql) + except Exception as e: + self.error(logger, "role-grant", f"Exception: {e}") + + self.report.provision_role_success = not provision_role_block.dry_run + + def do_provision_role(self): + if ( + self.config.provision_role is None + or self.config.provision_role.enabled is False + ): + return + + try: + self.provision_role_in_progress = True + self.do_provision_role_internal() + finally: + self.provision_role_in_progress = False + + def should_run_ingestion(self) -> bool: + return ( + self.config.provision_role is None + or self.config.provision_role.enabled is False + or self.config.provision_role.run_ingestion + ) + # Override the base class method. def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]: + self.add_config_to_report() + + self.do_provision_role() + if not self.should_run_ingestion(): + return + try: self.inspect_version() except Exception as e: diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 2ada238ae17bc6..fbbdf93b7e0b10 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -42,7 +42,6 @@ gen_containers, ) from datahub.ingestion.api.common import PipelineContext -from datahub.ingestion.api.source import SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.state.checkpoint import Checkpoint from datahub.ingestion.source.state.sql_common_state import ( @@ -52,6 +51,7 @@ JobId, StatefulIngestionConfig, StatefulIngestionConfigBase, + StatefulIngestionReport, StatefulIngestionSourceBase, ) from datahub.metadata.com.linkedin.pegasus2avro.common import StatusClass @@ -161,7 +161,7 @@ class SqlContainerSubTypes(str, Enum): @dataclass -class SQLSourceReport(SourceReport): +class SQLSourceReport(StatefulIngestionReport): tables_scanned: int = 0 views_scanned: int = 0 entities_profiled: int = 0 @@ -413,7 +413,7 @@ def __init__(self, config: SQLAlchemyConfig, ctx: PipelineContext, platform: str super(SQLAlchemySource, self).__init__(config, ctx) self.config = config self.platform = platform - self.report = SQLSourceReport() + self.report: SQLSourceReport = SQLSourceReport() config_report = { config_option: config.dict().get(config_option) @@ -447,7 +447,7 @@ def warn(self, log: logging.Logger, key: str, reason: str) -> Any: def error(self, log: logging.Logger, key: str, reason: str) -> Any: self.report.report_failure(key, reason) - log.error(reason) + log.error(f"{key} => {reason}") def get_inspectors(self) -> Iterable[Inspector]: # This method can be overridden in the case that you want to dynamically diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py b/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py index a6968146313cfa..22b6290f1f9eb2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py @@ -1,5 +1,6 @@ import logging import platform +from dataclasses import dataclass from datetime import datetime from typing import Any, Dict, Optional, Type, cast @@ -20,7 +21,7 @@ from datahub.ingestion.api.ingestion_job_reporting_provider_base import ( IngestionReportingProviderBase, ) -from datahub.ingestion.api.source import Source +from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.source.state.checkpoint import Checkpoint, CheckpointStateBase from datahub.ingestion.source.state_provider.state_provider_registry import ( ingestion_checkpoint_provider_registry, @@ -65,6 +66,11 @@ class StatefulIngestionConfigBase(DatasetSourceConfigBase): stateful_ingestion: Optional[StatefulIngestionConfig] = None +@dataclass +class StatefulIngestionReport(SourceReport): + pass + + class StatefulIngestionSourceBase(Source): """ Defines the base class for all stateful sources. @@ -80,6 +86,15 @@ def __init__( self.cur_checkpoints: Dict[JobId, Optional[Checkpoint]] = {} self.run_summaries_to_report: Dict[JobId, DatahubIngestionRunSummaryClass] = {} self._initialize_checkpointing_state_provider() + self.report: StatefulIngestionReport = StatefulIngestionReport() + + def warn(self, log: logging.Logger, key: str, reason: str) -> Any: + self.report.report_warning(key, reason) + log.warning(reason) + + def error(self, log: logging.Logger, key: str, reason: str) -> Any: + self.report.report_failure(key, reason) + log.error(f"{key} => {reason}") # # Checkpointing specific support. diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py b/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py index 2cec7afd7c1c7b..1bda876415c9db 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py @@ -572,13 +572,10 @@ def report_dropped(self, key: str) -> None: class BigQueryUsageSource(Source): - config: BigQueryUsageConfig - report: BigQueryUsageSourceReport - def __init__(self, config: BigQueryUsageConfig, ctx: PipelineContext): super().__init__(ctx) - self.config = config - self.report = BigQueryUsageSourceReport() + self.config: BigQueryUsageConfig = config + self.report: BigQueryUsageSourceReport = BigQueryUsageSourceReport() @classmethod def create(cls, config_dict: dict, ctx: PipelineContext) -> "BigQueryUsageSource": diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/redshift_usage.py b/metadata-ingestion/src/datahub/ingestion/source/usage/redshift_usage.py index 3c75665964be09..c75e00ff0fbd80 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/usage/redshift_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/usage/redshift_usage.py @@ -1,6 +1,7 @@ import collections import dataclasses import logging +from dataclasses import dataclass from datetime import datetime from typing import Any, Dict, Iterable, List, Optional, Union @@ -13,6 +14,7 @@ import datahub.emitter.mce_builder as builder from datahub.configuration.time_window_config import get_time_bucket from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.sql.redshift import RedshiftConfig @@ -87,15 +89,21 @@ def get_sql_alchemy_url(self): return super().get_sql_alchemy_url() +@dataclass +class RedshiftUsageReport(SourceReport): + pass + + @dataclasses.dataclass class RedshiftUsageSource(Source): - config: RedshiftUsageConfig - report: SourceReport = dataclasses.field(default_factory=SourceReport) + def __init__(self, config: RedshiftUsageConfig, ctx: PipelineContext): + self.config: RedshiftUsageConfig = config + self.report: RedshiftUsageReport = RedshiftUsageReport() @classmethod def create(cls, config_dict, ctx): config = RedshiftUsageConfig.parse_obj(config_dict) - return cls(ctx, config) + return cls(config, ctx) def get_workunits(self) -> Iterable[MetadataWorkUnit]: """Gets Redshift usage stats as work units""" diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/snowflake_usage.py b/metadata-ingestion/src/datahub/ingestion/source/usage/snowflake_usage.py index 8b20e928768323..a9c31517e76829 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/usage/snowflake_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/usage/snowflake_usage.py @@ -15,21 +15,16 @@ from datahub.configuration.time_window_config import get_time_bucket from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext -from datahub.ingestion.api.source import SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit -from datahub.ingestion.source.sql.snowflake import BaseSnowflakeConfig from datahub.ingestion.source.state.checkpoint import Checkpoint from datahub.ingestion.source.state.stateful_ingestion_base import ( JobId, - StatefulIngestionConfig, - StatefulIngestionConfigBase, StatefulIngestionSourceBase, ) from datahub.ingestion.source.state.usage_common_state import BaseUsageCheckpointState -from datahub.ingestion.source.usage.usage_common import ( - BaseUsageConfig, - GenericAggregatedDataset, -) +from datahub.ingestion.source.usage.usage_common import GenericAggregatedDataset +from datahub.ingestion.source_config.usage.snowflake_usage import SnowflakeUsageConfig +from datahub.ingestion.source_report.usage.snowflake_usage import SnowflakeUsageReport from datahub.metadata.schema_classes import ( ChangeTypeClass, JobStatusClass, @@ -37,6 +32,7 @@ OperationTypeClass, TimeWindowSizeClass, ) +from datahub.utilities.perf_timer import PerfTimer logger = logging.getLogger(__name__) @@ -126,50 +122,11 @@ class SnowflakeJoinedAccessEvent(PermissiveModel): role_name: str -class SnowflakeStatefulIngestionConfig(StatefulIngestionConfig): - """ - Specialization of basic StatefulIngestionConfig to adding custom config. - This will be used to override the stateful_ingestion config param of StatefulIngestionConfigBase - in the SnowflakeUsageConfig. - """ - - ignore_old_state = pydantic.Field(False, alias="force_rerun") - - -class SnowflakeUsageConfig( - BaseSnowflakeConfig, BaseUsageConfig, StatefulIngestionConfigBase -): - options: dict = {} - database_pattern: AllowDenyPattern = AllowDenyPattern( - deny=[r"^UTIL_DB$", r"^SNOWFLAKE$", r"^SNOWFLAKE_SAMPLE_DATA$"] - ) - email_domain: Optional[str] - schema_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() - table_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() - view_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() - apply_view_usage_to_tables: bool = False - stateful_ingestion: Optional[SnowflakeStatefulIngestionConfig] = None - - @pydantic.validator("role", always=True) - def role_accountadmin(cls, v): - if not v or v.lower() != "accountadmin": - # This isn't an error, since the privileges can be delegated to other - # roles as well: https://docs.snowflake.com/en/sql-reference/account-usage.html#enabling-account-usage-for-other-roles - logger.info( - 'snowflake usage tables are only accessible by role "accountadmin" by default; you set %s', - v, - ) - return v - - def get_sql_alchemy_url(self): - return super().get_sql_alchemy_url(database="snowflake") - - class SnowflakeUsageSource(StatefulIngestionSourceBase): def __init__(self, config: SnowflakeUsageConfig, ctx: PipelineContext): super(SnowflakeUsageSource, self).__init__(config, ctx) self.config: SnowflakeUsageConfig = config - self.report: SourceReport = SourceReport() + self.report: SnowflakeUsageReport = SnowflakeUsageReport() self.should_skip_this_run = self._should_skip_this_run() @classmethod @@ -275,7 +232,23 @@ def update_default_job_summary(self) -> None: summary.config = self.config.json() summary.custom_summary = self.report.as_string() + def check_email_domain_missing(self) -> Any: + if self.config.email_domain is not None and self.config.email_domain != "": + return + + self.warn( + logger, + "missing-email-domain", + "User's without email address will be ignored from usage if you don't set email_domain property", + ) + + def add_config_to_report(self): + self.report.start_time = self.config.start_time + self.report.end_time = self.config.end_time + def get_workunits(self) -> Iterable[MetadataWorkUnit]: + self.add_config_to_report() + self.check_email_domain_missing() if not self.should_skip_this_run: # Initialize the checkpoints self._init_checkpoints() @@ -316,107 +289,143 @@ def _make_sql_engine(self) -> Engine: ) return engine + def _check_usage_date_ranges(self, engine: Engine) -> Any: + + query = """ + select + min(query_start_time) as min_time, + max(query_start_time) as max_time + from snowflake.account_usage.access_history + where ARRAY_SIZE(base_objects_accessed) > 0 + """ + with PerfTimer() as timer: + for db_row in engine.execute(query): + self.report.min_access_history_time = db_row[0].astimezone( + tz=timezone.utc + ) + self.report.max_access_history_time = db_row[1].astimezone( + tz=timezone.utc + ) + self.report.access_history_range_query_secs = round( + timer.elapsed_seconds(), 2 + ) + break + + def _is_unsupported_object_accessed(self, obj: Dict[str, Any]) -> bool: + unsupported_keys = ["locations"] + return any([obj.get(key) is not None for key in unsupported_keys]) + + def _is_object_valid(self, obj: Dict[str, Any]) -> bool: + if self._is_unsupported_object_accessed( + obj + ) or not self._is_dataset_pattern_allowed( + obj.get("objectName"), obj.get("objectDomain") + ): + return False + return True + + def _is_dataset_pattern_allowed( + self, dataset_name: Optional[Any], dataset_type: Optional[Any] + ) -> bool: + # TODO: support table/view patterns for usage logs by pulling that information as well from the usage query + if not dataset_type or not dataset_name: + return True + + table_or_view_pattern: Optional[AllowDenyPattern] = AllowDenyPattern.allow_all() + # Test domain type = external_table and then add it + table_or_view_pattern = ( + self.config.table_pattern + if dataset_type.lower() in {"table"} + else ( + self.config.view_pattern + if dataset_type.lower() in {"view", "materialized_view"} + else None + ) + ) + if table_or_view_pattern is None: + return True + + dataset_params = dataset_name.split(".") + assert len(dataset_params) == 3 + if ( + not self.config.database_pattern.allowed(dataset_params[0]) + or not self.config.schema_pattern.allowed(dataset_params[1]) + or not table_or_view_pattern.allowed(dataset_params[2]) + ): + return False + return True + + def _process_snowflake_history_row( + self, row: Any + ) -> Iterable[SnowflakeJoinedAccessEvent]: + self.report.rows_processed += 1 + # Make some minor type conversions. + if hasattr(row, "_asdict"): + # Compat with SQLAlchemy 1.3 and 1.4 + # See https://docs.sqlalchemy.org/en/14/changelog/migration_14.html#rowproxy-is-no-longer-a-proxy-is-now-called-row-and-behaves-like-an-enhanced-named-tuple. + event_dict = row._asdict() + else: + event_dict = dict(row) + + # no use processing events that don't have a query text + if not event_dict["query_text"]: + self.report.rows_missing_query_text += 1 + return + + event_dict["base_objects_accessed"] = [ + obj + for obj in json.loads(event_dict["base_objects_accessed"]) + if self._is_object_valid(obj) + ] + if len(event_dict["base_objects_accessed"]) == 0: + self.report.rows_zero_base_objects_accessed += 1 + + event_dict["direct_objects_accessed"] = [ + obj + for obj in json.loads(event_dict["direct_objects_accessed"]) + if self._is_object_valid(obj) + ] + if len(event_dict["direct_objects_accessed"]) == 0: + self.report.rows_zero_direct_objects_accessed += 1 + + event_dict["query_start_time"] = (event_dict["query_start_time"]).astimezone( + tz=timezone.utc + ) + + if not event_dict["email"] and self.config.email_domain: + if not event_dict["user_name"]: + self.report.report_warning("user-name-miss", f"Missing in {event_dict}") + logger.warning( + f"The user_name is missing from {event_dict}. Skipping ...." + ) + self.report.rows_missing_email += 1 + return + + event_dict[ + "email" + ] = f'{event_dict["user_name"]}@{self.config.email_domain}'.lower() + + try: # big hammer try block to ensure we don't fail on parsing events + event = SnowflakeJoinedAccessEvent(**event_dict) + yield event + except Exception as e: + self.report.rows_parsing_error += 1 + self.warn(logger, "usage", f"Failed to parse usage line {event_dict}, {e}") + def _get_snowflake_history(self) -> Iterable[SnowflakeJoinedAccessEvent]: - query = self._make_usage_query() engine = self._make_sql_engine() - results = engine.execute(query) + logger.info("Checking usage date ranges") + self._check_usage_date_ranges(engine) + + logger.info("Getting usage history") + with PerfTimer() as timer: + query = self._make_usage_query() + results = engine.execute(query) + self.report.access_history_query_secs = round(timer.elapsed_seconds(), 2) for row in results: - # Make some minor type conversions. - if hasattr(row, "_asdict"): - # Compat with SQLAlchemy 1.3 and 1.4 - # See https://docs.sqlalchemy.org/en/14/changelog/migration_14.html#rowproxy-is-no-longer-a-proxy-is-now-called-row-and-behaves-like-an-enhanced-named-tuple. - event_dict = row._asdict() - else: - event_dict = dict(row) - - # no use processing events that don't have a query text - if not event_dict["query_text"]: - continue - - def is_unsupported_object_accessed(obj: Dict[str, Any]) -> bool: - unsupported_keys = ["locations"] - return any([obj.get(key) is not None for key in unsupported_keys]) - - def is_dataset_pattern_allowed( - dataset_name: Optional[Any], dataset_type: Optional[Any] - ) -> bool: - # TODO: support table/view patterns for usage logs by pulling that information as well from the usage query - if not dataset_type or not dataset_name: - return True - - table_or_view_pattern: Optional[ - AllowDenyPattern - ] = AllowDenyPattern.allow_all() - # Test domain type = external_table and then add it - table_or_view_pattern = ( - self.config.table_pattern - if dataset_type.lower() in {"table"} - else ( - self.config.view_pattern - if dataset_type.lower() in {"view", "materialized_view"} - else None - ) - ) - if table_or_view_pattern is None: - return True - - dataset_params = dataset_name.split(".") - assert len(dataset_params) == 3 - if ( - not self.config.database_pattern.allowed(dataset_params[0]) - or not self.config.schema_pattern.allowed(dataset_params[1]) - or not table_or_view_pattern.allowed(dataset_params[2]) - ): - return False - return True - - def is_object_valid(obj: Dict[str, Any]) -> bool: - if is_unsupported_object_accessed( - obj - ) or not is_dataset_pattern_allowed( - obj.get("objectName"), obj.get("objectDomain") - ): - return False - return True - - event_dict["base_objects_accessed"] = [ - obj - for obj in json.loads(event_dict["base_objects_accessed"]) - if is_object_valid(obj) - ] - event_dict["direct_objects_accessed"] = [ - obj - for obj in json.loads(event_dict["direct_objects_accessed"]) - if is_object_valid(obj) - ] - event_dict["query_start_time"] = ( - event_dict["query_start_time"] - ).astimezone(tz=timezone.utc) - - if not event_dict["email"] and self.config.email_domain: - if not event_dict["user_name"]: - self.report.report_warning( - "user-name-miss", f"Missing in {event_dict}" - ) - logger.warning( - f"The user_name is missing from {event_dict}. Skipping ...." - ) - continue - - event_dict[ - "email" - ] = f'{event_dict["user_name"]}@{self.config.email_domain}'.lower() - - try: # big hammer try block to ensure we don't fail on parsing events - event = SnowflakeJoinedAccessEvent(**event_dict) - yield event - except Exception as e: - logger.warning(f"Failed to parse usage line {event_dict}", e) - self.report.report_warning( - "usage", f"Failed to parse usage line {event_dict}" - ) + yield from self._process_snowflake_history_row(row) def _get_operation_aspect_work_unit( self, event: SnowflakeJoinedAccessEvent diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/__init__.py b/metadata-ingestion/src/datahub/ingestion/source_config/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/sql/__init__.py b/metadata-ingestion/src/datahub/ingestion/source_config/sql/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/sql/snowflake.py b/metadata-ingestion/src/datahub/ingestion/source_config/sql/snowflake.py new file mode 100644 index 00000000000000..87477cbbe86907 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source_config/sql/snowflake.py @@ -0,0 +1,210 @@ +import logging +from typing import Optional + +import pydantic +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization +from snowflake.connector.network import ( + DEFAULT_AUTHENTICATOR, + EXTERNAL_BROWSER_AUTHENTICATOR, + KEY_PAIR_AUTHENTICATOR, +) + +from datahub.configuration.common import AllowDenyPattern, ConfigModel +from datahub.configuration.time_window_config import BaseTimeWindowConfig +from datahub.ingestion.source.sql.sql_common import ( + SQLAlchemyConfig, + make_sqlalchemy_uri, +) +from datahub.utilities.config_clean import ( + remove_protocol, + remove_suffix, + remove_trailing_slashes, +) + +APPLICATION_NAME = "acryl_datahub" + +logger: logging.Logger = logging.getLogger(__name__) + + +class SnowflakeProvisionRoleConfig(ConfigModel): + enabled: bool = False + + # Can be used by account admin to test what sql statements will be run + dry_run: bool = False + + # Setting this to True is helpful in case you want a clean role without any extra privileges + # Not set to True by default because multiple parallel + # snowflake ingestions can be dependent on single role + drop_role_if_exists: bool = False + + # When Account admin is testing they might not want to actually do the ingestion + # Set this to False in case the account admin would want to + # create role + # grant role to user in main config + # run ingestion as the user in main config + run_ingestion: bool = False + + admin_role: Optional[str] = "accountadmin" + + admin_username: str + admin_password: pydantic.SecretStr = pydantic.Field(default=None, exclude=True) + + @pydantic.validator("admin_username", always=True) + def username_not_empty(cls, v, values, **kwargs): + v_str: str = str(v) + if v_str.strip() == "": + raise ValueError("username is empty") + return v + + +class BaseSnowflakeConfig(BaseTimeWindowConfig): + # Note: this config model is also used by the snowflake-usage source. + + scheme = "snowflake" + + username: Optional[str] = None + password: Optional[pydantic.SecretStr] = pydantic.Field(default=None, exclude=True) + private_key_path: Optional[str] + private_key_password: Optional[pydantic.SecretStr] = pydantic.Field( + default=None, exclude=True + ) + authentication_type: Optional[str] = "DEFAULT_AUTHENTICATOR" + host_port: str + warehouse: Optional[str] + role: Optional[str] + include_table_lineage: Optional[bool] = True + include_view_lineage: Optional[bool] = True + + connect_args: Optional[dict] + + @pydantic.validator("host_port", always=True) + def host_port_is_valid(cls, v, values, **kwargs): + v = remove_protocol(v) + v = remove_trailing_slashes(v) + v = remove_suffix(v, ".snowflakecomputing.com") + logger.info(f"Cleaned Host port is {v}") + return v + + @pydantic.validator("authentication_type", always=True) + def authenticator_type_is_valid(cls, v, values, **kwargs): + valid_auth_types = { + "DEFAULT_AUTHENTICATOR": DEFAULT_AUTHENTICATOR, + "EXTERNAL_BROWSER_AUTHENTICATOR": EXTERNAL_BROWSER_AUTHENTICATOR, + "KEY_PAIR_AUTHENTICATOR": KEY_PAIR_AUTHENTICATOR, + } + if v not in valid_auth_types.keys(): + raise ValueError( + f"unsupported authenticator type '{v}' was provided," + f" use one of {list(valid_auth_types.keys())}" + ) + else: + if v == "KEY_PAIR_AUTHENTICATOR": + # If we are using key pair auth, we need the private key path and password to be set + if values.get("private_key_path") is None: + raise ValueError( + f"'private_key_path' was none " + f"but should be set when using {v} authentication" + ) + if values.get("private_key_password") is None: + raise ValueError( + f"'private_key_password' was none " + f"but should be set when using {v} authentication" + ) + logger.info(f"using authenticator type '{v}'") + return valid_auth_types.get(v) + + @pydantic.validator("include_view_lineage") + def validate_include_view_lineage(cls, v, values): + if not values.get("include_table_lineage") and v: + raise ValueError( + "include_table_lineage must be True for include_view_lineage to be set." + ) + return v + + def get_sql_alchemy_url( + self, + database: Optional[str] = None, + username: Optional[str] = None, + password: Optional[pydantic.SecretStr] = None, + role: Optional[str] = None, + ) -> str: + if username is None: + username = self.username + if password is None: + password = self.password + if role is None: + role = self.role + return make_sqlalchemy_uri( + self.scheme, + username, + password.get_secret_value() if password else None, + self.host_port, + f'"{database}"' if database is not None else database, + uri_opts={ + # Drop the options if value is None. + key: value + for (key, value) in { + "authenticator": self.authentication_type, + "warehouse": self.warehouse, + "role": role, + "application": APPLICATION_NAME, + }.items() + if value + }, + ) + + def get_sql_alchemy_connect_args(self) -> dict: + if self.authentication_type != KEY_PAIR_AUTHENTICATOR: + return {} + if self.connect_args is None: + if self.private_key_path is None: + raise ValueError("missing required private key path to read key from") + if self.private_key_password is None: + raise ValueError("missing required private key password") + with open(self.private_key_path, "rb") as key: + p_key = serialization.load_pem_private_key( + key.read(), + password=self.private_key_password.get_secret_value().encode(), + backend=default_backend(), + ) + + pkb = p_key.private_bytes( + encoding=serialization.Encoding.DER, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + self.connect_args = {"private_key": pkb} + return self.connect_args + + +class SnowflakeConfig(BaseSnowflakeConfig, SQLAlchemyConfig): + database_pattern: AllowDenyPattern = AllowDenyPattern( + deny=[r"^UTIL_DB$", r"^SNOWFLAKE$", r"^SNOWFLAKE_SAMPLE_DATA$"] + ) + + database: Optional[str] # deprecated + + provision_role: Optional[SnowflakeProvisionRoleConfig] = None + + @pydantic.validator("database") + def note_database_opt_deprecation(cls, v, values, **kwargs): + logger.warning( + "snowflake's `database` option has been deprecated; use database_pattern instead" + ) + values["database_pattern"].allow = f"^{v}$" + return None + + def get_sql_alchemy_url( + self, + database: str = None, + username: Optional[str] = None, + password: Optional[pydantic.SecretStr] = None, + role: Optional[str] = None, + ) -> str: + return super().get_sql_alchemy_url( + database=database, username=username, password=password, role=role + ) + + def get_sql_alchemy_connect_args(self) -> dict: + return super().get_sql_alchemy_connect_args() diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/usage/__init__.py b/metadata-ingestion/src/datahub/ingestion/source_config/usage/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/usage/snowflake_usage.py b/metadata-ingestion/src/datahub/ingestion/source_config/usage/snowflake_usage.py new file mode 100644 index 00000000000000..415bfb163ab1ec --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source_config/usage/snowflake_usage.py @@ -0,0 +1,58 @@ +import logging +from typing import Optional + +import pydantic + +from datahub.configuration.common import AllowDenyPattern +from datahub.ingestion.source.state.stateful_ingestion_base import ( + StatefulIngestionConfig, + StatefulIngestionConfigBase, +) +from datahub.ingestion.source.usage.usage_common import BaseUsageConfig +from datahub.ingestion.source_config.sql.snowflake import BaseSnowflakeConfig + +logger = logging.getLogger(__name__) + + +class SnowflakeStatefulIngestionConfig(StatefulIngestionConfig): + """ + Specialization of basic StatefulIngestionConfig to adding custom config. + This will be used to override the stateful_ingestion config param of StatefulIngestionConfigBase + in the SnowflakeUsageConfig. + """ + + ignore_old_state = pydantic.Field(False, alias="force_rerun") + + +class SnowflakeUsageConfig( + BaseSnowflakeConfig, BaseUsageConfig, StatefulIngestionConfigBase +): + options: dict = {} + database_pattern: AllowDenyPattern = AllowDenyPattern( + deny=[r"^UTIL_DB$", r"^SNOWFLAKE$", r"^SNOWFLAKE_SAMPLE_DATA$"] + ) + email_domain: Optional[str] + schema_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() + table_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() + view_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() + apply_view_usage_to_tables: bool = False + stateful_ingestion: Optional[SnowflakeStatefulIngestionConfig] = None + + @pydantic.validator("role", always=True) + def role_accountadmin(cls, v): + if not v or v.lower() != "accountadmin": + # This isn't an error, since the privileges can be delegated to other + # roles as well: https://docs.snowflake.com/en/sql-reference/account-usage.html#enabling-account-usage-for-other-roles + logger.info( + 'snowflake usage tables are only accessible by role "accountadmin" by default; you set %s', + v, + ) + return v + + def get_sql_alchemy_url(self): + return super().get_sql_alchemy_url( + database="snowflake", + username=self.username, + password=self.password, + role=self.role, + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source_report/__init__.py b/metadata-ingestion/src/datahub/ingestion/source_report/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/metadata-ingestion/src/datahub/ingestion/source_report/sql/__init__.py b/metadata-ingestion/src/datahub/ingestion/source_report/sql/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/metadata-ingestion/src/datahub/ingestion/source_report/sql/snowflake.py b/metadata-ingestion/src/datahub/ingestion/source_report/sql/snowflake.py new file mode 100644 index 00000000000000..81b1a95d682708 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source_report/sql/snowflake.py @@ -0,0 +1,29 @@ +from dataclasses import dataclass, field +from typing import Dict, List + +from datahub.ingestion.source.sql.sql_common import SQLSourceReport +from datahub.ingestion.source_report.time_window import BaseTimeWindowReport + + +@dataclass +class BaseSnowflakeReport(BaseTimeWindowReport): + pass + + +@dataclass +class SnowflakeReport(BaseSnowflakeReport, SQLSourceReport): + num_table_to_table_edges_scanned: int = 0 + num_table_to_view_edges_scanned: int = 0 + num_view_to_table_edges_scanned: int = 0 + num_external_table_edges_scanned: int = 0 + upstream_lineage: Dict[str, List[str]] = field(default_factory=dict) + + cleaned_host_port: str = "" + run_ingestion: bool = False + provision_role_done: bool = False + provision_role_success: bool = False + + # https://community.snowflake.com/s/topic/0TO0Z000000Unu5WAC/releases + saas_version: str = "" + role: str = "" + role_grants: List[str] = field(default_factory=list) diff --git a/metadata-ingestion/src/datahub/ingestion/source_report/time_window.py b/metadata-ingestion/src/datahub/ingestion/source_report/time_window.py new file mode 100644 index 00000000000000..db8cbbca5ad0b5 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source_report/time_window.py @@ -0,0 +1,9 @@ +from dataclasses import dataclass +from datetime import datetime +from typing import Optional + + +@dataclass +class BaseTimeWindowReport: + end_time: Optional[datetime] = None + start_time: Optional[datetime] = None diff --git a/metadata-ingestion/src/datahub/ingestion/source_report/usage/__init__.py b/metadata-ingestion/src/datahub/ingestion/source_report/usage/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/metadata-ingestion/src/datahub/ingestion/source_report/usage/snowflake_usage.py b/metadata-ingestion/src/datahub/ingestion/source_report/usage/snowflake_usage.py new file mode 100644 index 00000000000000..5f7962fc367104 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source_report/usage/snowflake_usage.py @@ -0,0 +1,23 @@ +from dataclasses import dataclass +from datetime import datetime +from typing import Optional + +from datahub.ingestion.source.state.stateful_ingestion_base import ( + StatefulIngestionReport, +) +from datahub.ingestion.source_report.sql.snowflake import BaseSnowflakeReport + + +@dataclass +class SnowflakeUsageReport(BaseSnowflakeReport, StatefulIngestionReport): + min_access_history_time: Optional[datetime] = None + max_access_history_time: Optional[datetime] = None + access_history_range_query_secs: float = -1 + access_history_query_secs: float = -1 + + rows_processed: int = 0 + rows_missing_query_text: int = 0 + rows_zero_base_objects_accessed: int = 0 + rows_zero_direct_objects_accessed: int = 0 + rows_missing_email: int = 0 + rows_parsing_error: int = 0 diff --git a/metadata-ingestion/src/datahub/telemetry/telemetry.py b/metadata-ingestion/src/datahub/telemetry/telemetry.py index 746cf9c20638d6..5e276d61c08b2c 100644 --- a/metadata-ingestion/src/datahub/telemetry/telemetry.py +++ b/metadata-ingestion/src/datahub/telemetry/telemetry.py @@ -29,6 +29,7 @@ class Telemetry: client_id: str enabled: bool = True + tracking_init: bool = False def __init__(self): @@ -47,14 +48,6 @@ def __init__(self): self.mp = Mixpanel( MIXPANEL_TOKEN, consumer=Consumer(request_timeout=int(TIMEOUT)) ) - self.mp.people_set( - self.client_id, - { - "datahub_version": datahub_package.nice_version_name(), - "os": platform.system(), - "python_version": platform.python_version(), - }, - ) except Exception as e: logger.debug(f"Error connecting to mixpanel: {e}") @@ -62,6 +55,7 @@ def update_config(self) -> None: """ Update the config file with the current client ID and enabled status. """ + logger.info("Updating telemetry config") if not DATAHUB_FOLDER.exists(): os.makedirs(DATAHUB_FOLDER) @@ -124,6 +118,21 @@ def load_config(self): f"{CONFIG_FILE} had an IOError, please inspect this file for issues." ) + def init_tracking(self) -> None: + if not self.enabled or self.mp is None or self.tracking_init is True: + return + + logger.info("Sending init Telemetry") + self.mp.people_set( + self.client_id, + { + "datahub_version": datahub_package.nice_version_name(), + "os": platform.system(), + "python_version": platform.python_version(), + }, + ) + self.init_track = True + def ping( self, action: str, @@ -144,6 +153,7 @@ def ping( # send event try: + logger.info("Sending Telemetry") self.mp.track(self.client_id, action, properties) except Exception as e: @@ -155,6 +165,13 @@ def ping( T = TypeVar("T") +def set_telemetry_enable(enable: bool) -> Any: + telemetry_instance.enabled = enable + if not enable: + logger.info("Disabling Telemetry locally due to server config") + telemetry_instance.update_config() + + def get_full_class_name(obj): module = obj.__class__.__module__ if module is None or module == str.__class__.__module__: @@ -168,6 +185,7 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: action = f"function:{func.__module__}.{func.__name__}" + telemetry_instance.init_tracking() telemetry_instance.ping(action) try: res = func(*args, **kwargs) diff --git a/metadata-ingestion/src/datahub/utilities/config_clean.py b/metadata-ingestion/src/datahub/utilities/config_clean.py index 8208772b6021f5..64df846ae5bde7 100644 --- a/metadata-ingestion/src/datahub/utilities/config_clean.py +++ b/metadata-ingestion/src/datahub/utilities/config_clean.py @@ -1,10 +1,16 @@ import re +def remove_suffix(inp: str, suffix: str, remove_all: bool = False) -> str: + while suffix and inp.endswith(suffix): + inp = inp[: -len(suffix)] + if not remove_all: + break + return inp + + def remove_trailing_slashes(url: str) -> str: - while url.endswith("/"): - url = url[:-1] - return url + return remove_suffix(url, "/", remove_all=True) def remove_protocol(url: str) -> str: diff --git a/metadata-ingestion/src/datahub/utilities/server_config_util.py b/metadata-ingestion/src/datahub/utilities/server_config_util.py new file mode 100644 index 00000000000000..c919a1356f2642 --- /dev/null +++ b/metadata-ingestion/src/datahub/utilities/server_config_util.py @@ -0,0 +1,22 @@ +from typing import Any, Dict, Optional + +from datahub.telemetry.telemetry import set_telemetry_enable + +# Only to be written to for logging server related information +global_debug: Dict[str, Any] = dict() + + +def set_gms_config(config: Dict) -> Any: + global_debug["gms_config"] = config + + cli_telemtry_enabled = is_cli_telemetry_enabled() + if cli_telemtry_enabled is not None: + set_telemetry_enable(cli_telemtry_enabled) + + +def get_gms_config() -> Dict: + return global_debug.get("gms_config", {}) + + +def is_cli_telemetry_enabled() -> Optional[bool]: + return get_gms_config().get("telemetry", {}).get("enabledCli", None) diff --git a/metadata-ingestion/tests/unit/test_config_clean.py b/metadata-ingestion/tests/unit/test_config_clean.py index 218ddfc90b3d3e..178030c773e7f3 100644 --- a/metadata-ingestion/tests/unit/test_config_clean.py +++ b/metadata-ingestion/tests/unit/test_config_clean.py @@ -1,6 +1,20 @@ from datahub.utilities import config_clean +def test_remove_suffix(): + assert ( + config_clean.remove_suffix( + "xaaabcdef.snowflakecomputing.com", ".snowflakecomputing.com" + ) + == "xaaabcdef" + ) + + assert ( + config_clean.remove_suffix("xaaabcdef", ".snowflakecomputing.com") + == "xaaabcdef" + ) + + def test_url_without_slash_suffix(): assert ( config_clean.remove_trailing_slashes("http://example.com") diff --git a/metadata-ingestion/tests/unit/test_snowflake_source.py b/metadata-ingestion/tests/unit/test_snowflake_source.py index 49b0a379b62ed9..071eb267263df6 100644 --- a/metadata-ingestion/tests/unit/test_snowflake_source.py +++ b/metadata-ingestion/tests/unit/test_snowflake_source.py @@ -1,7 +1,3 @@ -import pytest - - -@pytest.mark.integration def test_snowflake_uri_default_authentication(): from datahub.ingestion.source.sql.snowflake import SnowflakeConfig @@ -23,7 +19,6 @@ def test_snowflake_uri_default_authentication(): ) -@pytest.mark.integration def test_snowflake_uri_external_browser_authentication(): from datahub.ingestion.source.sql.snowflake import SnowflakeConfig @@ -45,7 +40,6 @@ def test_snowflake_uri_external_browser_authentication(): ) -@pytest.mark.integration def test_snowflake_uri_key_pair_authentication(): from datahub.ingestion.source.sql.snowflake import SnowflakeConfig diff --git a/metadata-io/src/main/java/com/linkedin/metadata/config/IngestionConfiguration.java b/metadata-io/src/main/java/com/linkedin/metadata/config/IngestionConfiguration.java index 9c85c4398ad0bd..5b10b59ff0c205 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/config/IngestionConfiguration.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/config/IngestionConfiguration.java @@ -10,10 +10,10 @@ public class IngestionConfiguration { /** * Whether managed ingestion is enabled */ - private boolean enabled; + public boolean enabled; /** * The default CLI version to use in managed ingestion */ - private String defaultCliVersion; + public String defaultCliVersion; } \ No newline at end of file diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/config/ConfigurationProvider.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/config/ConfigurationProvider.java index 039a62bee05343..2b075ec7ae233b 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/config/ConfigurationProvider.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/config/ConfigurationProvider.java @@ -22,4 +22,9 @@ public class ConfigurationProvider { * Ingestion related configs */ private IngestionConfiguration ingestion; + + /** + * Telemetry related configs + */ + private TelemetryConfiguration telemetry; } \ No newline at end of file diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/config/TelemetryConfiguration.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/config/TelemetryConfiguration.java new file mode 100644 index 00000000000000..b78a1cbec5c85b --- /dev/null +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/config/TelemetryConfiguration.java @@ -0,0 +1,17 @@ +package com.linkedin.gms.factory.config; + +import lombok.Data; +/** + * POJO representing the "telemtry" configuration block in application.yml. + */ +@Data +public class TelemetryConfiguration { + /** + * Whether cli telemtry is enabled + */ + public boolean enabledCli; + /** + * Whether reporting telemetry is enabled + */ + public boolean enabledIngestion; +} \ No newline at end of file diff --git a/metadata-service/factories/src/main/resources/application.yml b/metadata-service/factories/src/main/resources/application.yml index 52ab6d5d72021b..211c64d2ab6200 100644 --- a/metadata-service/factories/src/main/resources/application.yml +++ b/metadata-service/factories/src/main/resources/application.yml @@ -28,6 +28,10 @@ ingestion: enabled: ${UI_INGESTION_ENABLED:true} defaultCliVersion: '${UI_INGESTION_DEFAULT_CLI_VERSION:0.8.26.6}' +telemetry: + enabledCli: ${CLI_TELEMETRY_ENABLED:true} + enabledIngestion: ${INGESTION_REPORTING_ENABLED:false} + secretService: encryptionKey: ${SECRET_SERVICE_ENCRYPTION_KEY:ENCRYPTION_KEY} diff --git a/metadata-service/servlet/build.gradle b/metadata-service/servlet/build.gradle index 3cf2e62882f251..9242d212018860 100644 --- a/metadata-service/servlet/build.gradle +++ b/metadata-service/servlet/build.gradle @@ -9,4 +9,5 @@ dependencies { compile externalDependency.springWebMVC annotationProcessor externalDependency.lombok compile project(':entity-registry') + compile project(':metadata-service:factories') } diff --git a/metadata-service/servlet/src/main/java/com/datahub/gms/servlet/Config.java b/metadata-service/servlet/src/main/java/com/datahub/gms/servlet/Config.java index bef66f40b3159e..c857b2b051b7d6 100644 --- a/metadata-service/servlet/src/main/java/com/datahub/gms/servlet/Config.java +++ b/metadata-service/servlet/src/main/java/com/datahub/gms/servlet/Config.java @@ -20,6 +20,7 @@ import org.apache.maven.artifact.versioning.ComparableVersion; import org.springframework.web.context.WebApplicationContext; import org.springframework.web.context.support.WebApplicationContextUtils; +import com.linkedin.gms.factory.config.ConfigurationProvider; // Return a 200 for health checks @@ -50,6 +51,10 @@ private Map> getPluginM return patchDiagnostics; } + private ConfigurationProvider getConfigProvider(WebApplicationContext ctx) { + return (ConfigurationProvider) ctx.getBean("configurationProvider"); + } + private GitVersion getGitVersion(WebApplicationContext ctx) { return (GitVersion) ctx.getBean("gitVersion"); } @@ -71,6 +76,20 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IO versionConfig.put("linkedin/datahub", version.toConfig()); config.put("versions", versionConfig); + ConfigurationProvider configProvider = getConfigProvider(ctx); + + Map telemetryConfig = new HashMap() {{ + put("enabledCli", configProvider.getTelemetry().enabledCli); + put("enabledIngestion", configProvider.getTelemetry().enabledIngestion); + }}; + config.put("telemetry", telemetryConfig); + + Map ingestionConfig = new HashMap() {{ + put("enabled", configProvider.getIngestion().enabled); + put("defaultCliVersion", configProvider.getIngestion().defaultCliVersion); + }}; + config.put("managedIngestion", ingestionConfig); + resp.setContentType("application/json"); PrintWriter out = resp.getWriter();