From 0c050bef1e4bbb4f09ae05621e000d1895dcb3b2 Mon Sep 17 00:00:00 2001 From: Alice Naghshineh Date: Sun, 7 Apr 2024 14:39:10 -0400 Subject: [PATCH 01/18] feat: First pass at enabling getting projects based on folder ids --- .../ingestion/source/bigquery_v2/bigquery.py | 24 +++++++++++++++++-- .../source/bigquery_v2/bigquery_config.py | 7 ++++++ .../source/bigquery_v2/bigquery_schema.py | 22 ++++++++++++++++- 3 files changed, 50 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index ffdb604c52731d..68e18862989c56 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -615,8 +615,28 @@ def _get_projects(self) -> List[BigqueryProject]: BigqueryProject(id=project_id, name=project_id) for project_id in project_ids ] - else: - return list(self._query_project_list()) + + if self.config.folder_ids: + return list(self._query_project_list_from_folders()) + + return list(self._query_project_list()) + + def _query_project_list_from_folders(self) -> Iterable[BigqueryProject]: + projects = self.bigquery_data_dictionary.get_projects_in_folders(self.config.folder_ids) + if not projects: # Report failure on exception and if empty list is returned + self.report.report_failure( + "metadata-extraction", + "Get projects didn't return any project in the specified folder(s). " + "Maybe resourcemanager.projects.list permission is missing for the service account. " + "You can assign predefined roles/bigquery.metadataViewer role to your service account.", + ) + return [] + + for project in projects: + if self.config.project_id_pattern.allowed(project.id): + yield project + else: + self.report.report_dropped(project.id) def _query_project_list(self) -> Iterable[BigqueryProject]: projects = self.bigquery_data_dictionary.get_projects() diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index 3fbac069a18206..2472d68d456e2b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -193,6 +193,13 @@ class BigQueryV2Config( "Overrides `project_id_pattern`." ), ) + folder_ids: List[str] = Field( + default_factory=list, + description=( + "Ingests projects that are direct children of the specified folders. Use this property if you want to specify what " + "projects to ingest based on their direct parent folders. Your service account will require resourcemanager.projects.list." + ), + ) storage_project_id: None = Field(default=None, hidden_from_docs=True) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index d918782691c778..eeded4ce7909b6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -11,6 +11,7 @@ TimePartitioning, TimePartitioningType, ) +from google.cloud import resourcemanager_v3 from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier from datahub.ingestion.source.bigquery_v2.bigquery_report import ( @@ -137,9 +138,13 @@ class BigqueryProject: class BigQuerySchemaApi: def __init__( - self, report: BigQuerySchemaApiPerfReport, client: bigquery.Client + self, + report: BigQuerySchemaApiPerfReport, + client: bigquery.Client, + projects_client: resourcemanager_v3.ProjectsClient() ) -> None: self.bq_client = client + self.projects_client = projects_client self.report = report def get_query_result(self, query: str) -> RowIterator: @@ -159,6 +164,21 @@ def get_projects(self) -> List[BigqueryProject]: except Exception as e: logger.error(f"Error getting projects. {e}", exc_info=True) return [] + + def get_projects_in_folders(self, folder_ids: List[str]) -> List[BigqueryProject]: + try: + projects = [] + for folder_id in folder_ids: + for project in self.projects_client.list_projects(parent=f"folders/{folder_id}"): + projects.append( + BigqueryProject(id=project.project_id, name=project.display_name) + ) + + return projects + + except Exception as e: + logger.error(f"Error getting projects in Folder: {folder_id}. {e}", exc_info=True) + return [] def get_datasets_for_project_id( self, project_id: str, maxResults: Optional[int] = None From 69cf8e09cd6d3f31f14cb775e7e5bf9cf6535ecf Mon Sep 17 00:00:00 2001 From: Alice Naghshineh Date: Mon, 8 Apr 2024 07:34:03 -0400 Subject: [PATCH 02/18] chore: Add google-cloud-resource-manager dependency --- .../base-requirements.txt | 74 ++++++++++--------- metadata-ingestion/setup.py | 1 + 2 files changed, 40 insertions(+), 35 deletions(-) diff --git a/docker/datahub-ingestion-base/base-requirements.txt b/docker/datahub-ingestion-base/base-requirements.txt index f2d7675d32ae35..46ef46325765d0 100644 --- a/docker/datahub-ingestion-base/base-requirements.txt +++ b/docker/datahub-ingestion-base/base-requirements.txt @@ -1,16 +1,17 @@ # Generated requirements file. Run ./regenerate-base-requirements.sh to regenerate. -acryl-datahub-classify==0.0.9 +acryl-datahub-classify==0.0.10 acryl-PyHive==0.6.16 -acryl-sqlglot==22.4.1.dev4 +acryl-sqlglot==23.2.1.dev5 aenum==3.1.15 aiohttp==3.9.3 aiosignal==1.3.1 alembic==1.13.1 altair==4.2.0 anyio==4.3.0 -apache-airflow==2.8.4 +apache-airflow==2.9.0 apache-airflow-providers-common-io==1.3.0 apache-airflow-providers-common-sql==1.11.1 +apache-airflow-providers-fab==1.0.2 apache-airflow-providers-ftp==3.7.0 apache-airflow-providers-http==4.10.0 apache-airflow-providers-imap==3.5.0 @@ -35,8 +36,8 @@ beautifulsoup4==4.12.3 bleach==6.1.0 blinker==1.7.0 blis==0.7.11 -boto3==1.34.71 -botocore==1.34.71 +boto3==1.34.79 +botocore==1.34.79 bracex==2.4 cached-property==1.5.2 cachelib==0.9.0 @@ -68,14 +69,14 @@ cryptography==42.0.5 cx_Oracle==8.3.0 cymem==2.0.8 databricks-dbapi==0.6.0 -databricks-sdk==0.23.0 +databricks-sdk==0.24.0 databricks-sql-connector==2.9.5 dataflows-tabulator==1.54.3 db-dtypes==1.2.0 debugpy==1.8.1 decorator==5.1.1 defusedxml==0.7.1 -deltalake==0.16.3 +deltalake==0.16.4 Deprecated==1.2.14 dill==0.3.8 dnspython==2.6.1 @@ -83,7 +84,7 @@ docker==7.0.0 docutils==0.20.1 ecdsa==0.18.0 elasticsearch==7.13.4 -email-validator==1.3.1 +email_validator==2.1.1 entrypoints==0.4 et-xmlfile==1.1.0 exceptiongroup==1.2.0 @@ -97,17 +98,18 @@ flatdict==4.0.1 frozenlist==1.4.1 fsspec==2023.12.2 future==1.0.0 -GeoAlchemy2==0.14.6 +GeoAlchemy2==0.14.7 gitdb==4.0.11 -GitPython==3.1.42 +GitPython==3.1.43 google-api-core==2.18.0 google-auth==2.29.0 google-cloud-appengine-logging==1.4.3 google-cloud-audit-log==0.2.5 -google-cloud-bigquery==3.19.0 +google-cloud-bigquery==3.20.1 google-cloud-core==2.4.1 google-cloud-datacatalog-lineage==0.2.2 google-cloud-logging==3.5.0 +google-cloud-resource-manager==1.12.3 google-crc32c==1.5.0 google-re2==1.1 google-resumable-media==2.7.0 @@ -123,7 +125,8 @@ grpcio-tools==1.62.1 gssapi==1.8.3 gunicorn==21.2.0 h11==0.14.0 -httpcore==1.0.4 +hdbcli==2.20.15 +httpcore==1.0.5 httpx==0.27.0 humanfriendly==10.0 idna==3.6 @@ -151,7 +154,7 @@ jsonschema==4.21.1 jsonschema-specifications==2023.12.1 jupyter-server==1.16.0 jupyter_client==7.4.9 -jupyter_core==5.0.0 +jupyter_core==4.12.0 jupyterlab_pygments==0.3.0 jupyterlab_widgets==3.0.10 langcodes==3.3.0 @@ -164,7 +167,7 @@ linkify-it-py==2.0.3 lkml==1.3.4 lockfile==0.12.2 looker-sdk==23.0.0 -lxml==5.1.0 +lxml==5.2.1 lz4==4.3.3 makefun==1.15.2 Mako==1.3.2 @@ -172,7 +175,7 @@ markdown-it-py==3.0.0 MarkupSafe==2.1.5 marshmallow==3.21.1 marshmallow-oneofschema==3.1.1 -marshmallow-sqlalchemy==0.26.1 +marshmallow-sqlalchemy==0.28.2 matplotlib-inline==0.1.6 mdit-py-plugins==0.4.0 mdurl==0.1.2 @@ -189,18 +192,18 @@ mypy-extensions==1.0.0 nbclassic==1.0.0 nbclient==0.6.3 nbconvert==7.16.3 -nbformat==5.10.3 +nbformat==5.10.4 nest-asyncio==1.6.0 -networkx==3.2.1 +networkx==3.3 notebook==6.5.6 notebook_shim==0.2.4 numpy==1.26.4 oauthlib==3.2.2 okta==1.7.0 -openlineage-airflow==1.2.0 -openlineage-integration-common==1.2.0 -openlineage-python==1.2.0 -openlineage_sql==1.2.0 +openlineage-airflow==1.7.0 +openlineage-integration-common==1.7.0 +openlineage-python==1.7.0 +openlineage_sql==1.7.0 openpyxl==3.1.2 opentelemetry-api==1.16.0 opentelemetry-exporter-otlp==1.16.0 @@ -211,10 +214,10 @@ opentelemetry-sdk==1.16.0 opentelemetry-semantic-conventions==0.37b0 ordered-set==4.1.0 packaging==23.2 -pandas==2.2.1 +pandas==2.1.4 pandocfilters==1.5.1 parse==1.20.1 -parso==0.8.3 +parso==0.8.4 pathlib_abc==0.1.1 pathspec==0.12.1 pathy==0.11.0 @@ -242,14 +245,14 @@ pyasn1==0.6.0 pyasn1_modules==0.4.0 pyathena==2.25.2 pycountry==23.12.11 -pycparser==2.21 +pycparser==2.22 pycryptodome==3.20.0 -pydantic==1.10.14 -pydash==7.0.7 +pydantic==1.10.15 +pydash==8.0.0 pydruid==0.6.6 Pygments==2.17.2 pyiceberg==0.4.0 -pymongo==4.6.2 +pymongo==4.6.3 PyMySQL==1.1.0 pyOpenSSL==24.1.0 pyparsing==3.0.9 @@ -285,10 +288,10 @@ rsa==4.9 ruamel.yaml==0.17.17 s3transfer==0.10.1 schwifty==2024.1.1.post0 -scipy==1.12.0 +scipy==1.13.0 scramp==1.4.4 -Send2Trash==1.8.2 -sentry-sdk==1.43.0 +Send2Trash==1.8.3 +sentry-sdk==1.44.1 setproctitle==1.3.3 simple-salesforce==1.12.5 six==1.16.0 @@ -306,6 +309,7 @@ spacy-loggers==1.0.5 sql-metadata==2.2.2 SQLAlchemy==1.4.44 sqlalchemy-bigquery==1.10.0 +sqlalchemy-hana==2.0.0 SQLAlchemy-JSONField==1.0.2 sqlalchemy-pytds==0.3.5 sqlalchemy-redshift==0.8.14 @@ -316,11 +320,11 @@ srsly==2.4.8 stack-data==0.6.3 strictyaml==1.7.3 tableauserverclient==0.25 -tableschema==1.20.10 +tableschema==1.20.11 tabulate==0.9.0 tenacity==8.2.3 -teradatasql==20.0.0.8 -teradatasqlalchemy==17.20.0.0 +teradatasql==20.0.0.9 +teradatasqlalchemy==20.0.0.0 termcolor==2.4.0 terminado==0.18.1 text-unidecode==1.3 @@ -338,13 +342,13 @@ traitlets==5.2.1.post0 trino==0.328.0 typer==0.7.0 typing-inspect==0.9.0 -typing_extensions==4.10.0 +typing_extensions==4.11.0 tzdata==2024.1 tzlocal==5.2 uc-micro-py==1.0.3 ujson==5.9.0 unicodecsv==0.14.1 -universal-pathlib==0.1.4 +universal_pathlib==0.2.2 urllib3==1.26.18 vertica-python==1.3.8 vertica-sqlalchemy-dialect==0.0.8.1 diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index bc70c1d8cee208..84fd2260df7377 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -165,6 +165,7 @@ # Google cloud logging library "google-cloud-logging<=3.5.0", "google-cloud-bigquery", + "google-cloud-resource-manager", "more-itertools>=8.12.0", "sqlalchemy-bigquery>=1.4.1", } From df9698c51ac94e81502d3139546ff4ef7c556073 Mon Sep 17 00:00:00 2001 From: Alice Naghshineh Date: Mon, 8 Apr 2024 08:24:45 -0400 Subject: [PATCH 03/18] chore: Update some methods to account for new projects_client --- .../ingestion/source/bigquery_v2/bigquery.py | 12 ++++++---- .../source/bigquery_v2/bigquery_config.py | 3 +++ .../source/bigquery_v2/bigquery_report.py | 1 + .../source/bigquery_v2/bigquery_schema.py | 23 ++++++++++--------- .../ingestion/source/bigquery_v2/lineage.py | 4 +++- 5 files changed, 27 insertions(+), 16 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 68e18862989c56..3e4deb7299b47e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -232,7 +232,9 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config): BigqueryTableIdentifier._BQ_SHARDED_TABLE_SUFFIX = "" self.bigquery_data_dictionary = BigQuerySchemaApi( - self.report.schema_api_perf, self.config.get_bigquery_client() + self.report.schema_api_perf, + self.config.get_bigquery_client(), + self.config.get_projects_client() ) self.sql_parser_schema_resolver = self._init_schema_resolver() @@ -337,10 +339,12 @@ def metadata_read_capability_test( for project_id in project_ids: try: logger.info((f"Metadata read capability test for project {project_id}")) - client: bigquery.Client = config.get_bigquery_client() - assert client + bq_client: bigquery.Client = config.get_bigquery_client() + assert bq_client + proj_client: resourcemanager_v3.Client = config.get_projects_client() + assert proj_client bigquery_data_dictionary = BigQuerySchemaApi( - BigQueryV2Report().schema_api_perf, client + BigQueryV2Report().schema_api_perf, bq_client, proj_client ) result = bigquery_data_dictionary.get_datasets_for_project_id( project_id, 10 diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index 2472d68d456e2b..b3399fbbcaf71c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -69,6 +69,9 @@ def __init__(self, **data: Any): def get_bigquery_client(self) -> bigquery.Client: client_options = self.extra_client_options return bigquery.Client(self.project_on_behalf, **client_options) + + def get_projects_client(self) -> resourcemanager_v3.ProjectsClient: + return resourcemanager_v3.ProjectsClient() def make_gcp_logging_client( self, project_id: Optional[str] = None diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py index 54eca61dfe1c9a..60d0266492ec97 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py @@ -21,6 +21,7 @@ @dataclass class BigQuerySchemaApiPerfReport(Report): list_projects: PerfTimer = field(default_factory=PerfTimer) + list_projects_in_folders: PerfTimer = field(default_factory=PerfTimer) list_datasets: PerfTimer = field(default_factory=PerfTimer) get_columns_for_dataset: PerfTimer = field(default_factory=PerfTimer) get_tables_for_dataset: PerfTimer = field(default_factory=PerfTimer) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index eeded4ce7909b6..f802a74a1a156f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -166,19 +166,20 @@ def get_projects(self) -> List[BigqueryProject]: return [] def get_projects_in_folders(self, folder_ids: List[str]) -> List[BigqueryProject]: - try: - projects = [] - for folder_id in folder_ids: - for project in self.projects_client.list_projects(parent=f"folders/{folder_id}"): - projects.append( - BigqueryProject(id=project.project_id, name=project.display_name) - ) + with self.report.list_projects_in_folders: + try: + projects = [] + for folder_id in folder_ids: + for project in self.projects_client.list_projects(parent=f"folders/{folder_id}"): + projects.append( + BigqueryProject(id=project.project_id, name=project.display_name) + ) - return projects + return projects - except Exception as e: - logger.error(f"Error getting projects in Folder: {folder_id}. {e}", exc_info=True) - return [] + except Exception as e: + logger.error(f"Error getting projects in Folder: {folder_id}. {e}", exc_info=True) + return [] def get_datasets_for_project_id( self, project_id: str, maxResults: Optional[int] = None diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index 5586f942015d47..303dccc6bb7dc8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -484,7 +484,9 @@ def lineage_via_catalog_lineage_api( lineage_client: lineage_v1.LineageClient = lineage_v1.LineageClient() data_dictionary = BigQuerySchemaApi( - self.report.schema_api_perf, self.config.get_bigquery_client() + self.report.schema_api_perf, + self.config.get_bigquery_client(), + self.config.get_projects_client(), ) # Filtering datasets From 6c77b1e6ef28a9e74ad724ab0638136e5e207b46 Mon Sep 17 00:00:00 2001 From: Alice Naghshineh Date: Mon, 8 Apr 2024 08:42:20 -0400 Subject: [PATCH 04/18] chore: Add a few tests, more to come --- .../tests/unit/test_bigquery_source.py | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index 426d4dc12f2086..9863e145c8f14f 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -192,6 +192,36 @@ def test_get_projects_with_project_ids(get_bq_client_mock): ] assert client_mock.list_projects.call_count == 0 +@patch.object(BigQueryV2Config, "get_bigquery_client") +def test_get_projects_with_project_ids_and_folder_ids(get_bq_client_mock): + bq_client_mock = MagicMock() + get_bq_client_mock.return_value = bq_client_mock + config = BigQueryV2Config.parse_obj( + { + "project_ids": ["test-1", "test-2"], + "folder_ids": ["123", "456"], + } + ) + source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test1")) + assert source._get_projects() == [ + BigqueryProject("test-1", "test-1"), + BigqueryProject("test-2", "test-2"), + ] + assert bq_client_mock.list_projects.call_count == 0 + + config = BigQueryV2Config.parse_obj( + { + "project_ids": ["test-1", "test-2"], + "project_id": "test-3", + "folder_ids": ["123", "456"], + } + ) + source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test2")) + assert source._get_projects() == [ + BigqueryProject("test-1", "test-1"), + BigqueryProject("test-2", "test-2"), + ] + assert bq_client_mock.list_projects.call_count == 0 @patch.object(BigQueryV2Config, "get_bigquery_client") def test_get_projects_with_project_ids_overrides_project_id_pattern( @@ -310,6 +340,25 @@ def test_get_projects_filter_by_pattern(get_bq_client_mock, get_projects_mock): BigqueryProject(id="test-project-2", name="Test Project 2"), ] +@patch.object(BigQuerySchemaApi, "get_projects_in_folders") +def test_get_projects_with_folder_ids_filter_by_pattern(get_projects_in_folders_mock): + get_projects_in_folders_mock.return_value = [ + BigqueryProject("test-project", "Test Project"), + BigqueryProject("test-project-2", "Test Project 2"), + ] + + config = BigQueryV2Config.parse_obj( + { + "folder_ids": ["123"], + "project_id_pattern": {"deny": ["^test-project$"]}, + } + ) + source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) + projects = source._get_projects() + assert projects == [ + BigqueryProject(id="test-project-2", name="Test Project 2"), + ] + @patch.object(BigQuerySchemaApi, "get_projects") @patch.object(BigQueryV2Config, "get_bigquery_client") From 7e3f8292c90e937d9f70e45210d91117b5d8721c Mon Sep 17 00:00:00 2001 From: Alice Naghshineh Date: Mon, 8 Apr 2024 08:54:23 -0400 Subject: [PATCH 05/18] chore: Add another unit test --- .../tests/unit/test_bigquery_source.py | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index 9863e145c8f14f..277390d47df31d 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -240,6 +240,36 @@ def test_get_projects_with_project_ids_overrides_project_id_pattern( BigqueryProject(id="test-project-2", name="test-project-2"), ] +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_projects_with_project_ids_overrides_folder_ids( + get_proj_client_mock, +): + client_mock = MagicMock() + get_proj_client_mock.return_value = client_mock + client_mock.list_projects.return_value = [ + SimpleNamespace( + project_id="test-1", + friendly_name="one", + ), + SimpleNamespace( + project_id="test-2", + friendly_name="two", + ), + ] + config = BigQueryV2Config.parse_obj( + { + "project_ids": ["test-project", "test-project-2"], + "folder_ids": ["123", "456"], + } + ) + source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) + projects = source._get_projects() + assert projects == [ + BigqueryProject(id="test-project", name="test-project"), + BigqueryProject(id="test-project-2", name="test-project-2"), + ] + + def test_platform_instance_config_always_none(): config = BigQueryV2Config.parse_obj( From 463a18a0d2111c4e2747ceb467a2ae5a0d2cf461 Mon Sep 17 00:00:00 2001 From: Alice Naghshineh Date: Mon, 8 Apr 2024 15:03:19 -0400 Subject: [PATCH 06/18] feat: Switch to using labels instead of folders, more flexible --- .../ingestion/source/bigquery_v2/bigquery.py | 8 +- .../source/bigquery_v2/bigquery_config.py | 6 +- .../source/bigquery_v2/bigquery_report.py | 2 +- .../source/bigquery_v2/bigquery_schema.py | 18 ++-- .../tests/unit/test_bigquery_source.py | 82 ------------------- 5 files changed, 17 insertions(+), 99 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 3e4deb7299b47e..f54f530a4ba968 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -620,13 +620,13 @@ def _get_projects(self) -> List[BigqueryProject]: for project_id in project_ids ] - if self.config.folder_ids: - return list(self._query_project_list_from_folders()) + if self.config.project_labels: + return list(self._query_project_list_from_labels()) return list(self._query_project_list()) - def _query_project_list_from_folders(self) -> Iterable[BigqueryProject]: - projects = self.bigquery_data_dictionary.get_projects_in_folders(self.config.folder_ids) + def _query_project_list_from_labels(self) -> Iterable[BigqueryProject]: + projects = self.bigquery_data_dictionary.get_projects_with_labels(self.config.project_labels) if not projects: # Report failure on exception and if empty list is returned self.report.report_failure( "metadata-extraction", diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index b3399fbbcaf71c..650cf6abf75c44 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -196,11 +196,11 @@ class BigQueryV2Config( "Overrides `project_id_pattern`." ), ) - folder_ids: List[str] = Field( + project_labels: List[str] = Field( default_factory=list, description=( - "Ingests projects that are direct children of the specified folders. Use this property if you want to specify what " - "projects to ingest based on their direct parent folders. Your service account will require resourcemanager.projects.list." + "Ingests projects with the specified labels. Use this properly if you want to specify what " + "projects to ignest based on project-level labels. Your service account will require resourcemanager.projects.list." ), ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py index 60d0266492ec97..e084f11a42259d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py @@ -21,7 +21,7 @@ @dataclass class BigQuerySchemaApiPerfReport(Report): list_projects: PerfTimer = field(default_factory=PerfTimer) - list_projects_in_folders: PerfTimer = field(default_factory=PerfTimer) + list_projects_with_labels: PerfTimer = field(default_factory=PerfTimer) list_datasets: PerfTimer = field(default_factory=PerfTimer) get_columns_for_dataset: PerfTimer = field(default_factory=PerfTimer) get_tables_for_dataset: PerfTimer = field(default_factory=PerfTimer) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index f802a74a1a156f..c9cc276a946005 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -165,21 +165,21 @@ def get_projects(self) -> List[BigqueryProject]: logger.error(f"Error getting projects. {e}", exc_info=True) return [] - def get_projects_in_folders(self, folder_ids: List[str]) -> List[BigqueryProject]: - with self.report.list_projects_in_folders: + def get_projects_with_labels(self, labels: List[str]) -> List[BigqueryProject]: + with self.report.list_projects_with_labels: try: projects = [] - for folder_id in folder_ids: - for project in self.projects_client.list_projects(parent=f"folders/{folder_id}"): - projects.append( - BigqueryProject(id=project.project_id, name=project.display_name) - ) + labels_query = " OR ".join([f"labels.{label}" for label in labels]) + for project in self.projects_client.search_projects(query=labels_query): + projects.append( + BigqueryProject(id=project.project_id, name=project.display_name) + ) return projects except Exception as e: - logger.error(f"Error getting projects in Folder: {folder_id}. {e}", exc_info=True) - return [] + logger.error(f"Error getting projects with labels: {labels}. {e}", exc_info=True) + return [] def get_datasets_for_project_id( self, project_id: str, maxResults: Optional[int] = None diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index 277390d47df31d..b6621c7ac7386f 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -192,37 +192,6 @@ def test_get_projects_with_project_ids(get_bq_client_mock): ] assert client_mock.list_projects.call_count == 0 -@patch.object(BigQueryV2Config, "get_bigquery_client") -def test_get_projects_with_project_ids_and_folder_ids(get_bq_client_mock): - bq_client_mock = MagicMock() - get_bq_client_mock.return_value = bq_client_mock - config = BigQueryV2Config.parse_obj( - { - "project_ids": ["test-1", "test-2"], - "folder_ids": ["123", "456"], - } - ) - source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test1")) - assert source._get_projects() == [ - BigqueryProject("test-1", "test-1"), - BigqueryProject("test-2", "test-2"), - ] - assert bq_client_mock.list_projects.call_count == 0 - - config = BigQueryV2Config.parse_obj( - { - "project_ids": ["test-1", "test-2"], - "project_id": "test-3", - "folder_ids": ["123", "456"], - } - ) - source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test2")) - assert source._get_projects() == [ - BigqueryProject("test-1", "test-1"), - BigqueryProject("test-2", "test-2"), - ] - assert bq_client_mock.list_projects.call_count == 0 - @patch.object(BigQueryV2Config, "get_bigquery_client") def test_get_projects_with_project_ids_overrides_project_id_pattern( get_bq_client_mock, @@ -240,37 +209,6 @@ def test_get_projects_with_project_ids_overrides_project_id_pattern( BigqueryProject(id="test-project-2", name="test-project-2"), ] -@patch.object(BigQueryV2Config, "get_projects_client") -def test_get_projects_with_project_ids_overrides_folder_ids( - get_proj_client_mock, -): - client_mock = MagicMock() - get_proj_client_mock.return_value = client_mock - client_mock.list_projects.return_value = [ - SimpleNamespace( - project_id="test-1", - friendly_name="one", - ), - SimpleNamespace( - project_id="test-2", - friendly_name="two", - ), - ] - config = BigQueryV2Config.parse_obj( - { - "project_ids": ["test-project", "test-project-2"], - "folder_ids": ["123", "456"], - } - ) - source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) - projects = source._get_projects() - assert projects == [ - BigqueryProject(id="test-project", name="test-project"), - BigqueryProject(id="test-project-2", name="test-project-2"), - ] - - - def test_platform_instance_config_always_none(): config = BigQueryV2Config.parse_obj( {"include_data_platform_instance": True, "platform_instance": "something"} @@ -370,26 +308,6 @@ def test_get_projects_filter_by_pattern(get_bq_client_mock, get_projects_mock): BigqueryProject(id="test-project-2", name="Test Project 2"), ] -@patch.object(BigQuerySchemaApi, "get_projects_in_folders") -def test_get_projects_with_folder_ids_filter_by_pattern(get_projects_in_folders_mock): - get_projects_in_folders_mock.return_value = [ - BigqueryProject("test-project", "Test Project"), - BigqueryProject("test-project-2", "Test Project 2"), - ] - - config = BigQueryV2Config.parse_obj( - { - "folder_ids": ["123"], - "project_id_pattern": {"deny": ["^test-project$"]}, - } - ) - source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) - projects = source._get_projects() - assert projects == [ - BigqueryProject(id="test-project-2", name="Test Project 2"), - ] - - @patch.object(BigQuerySchemaApi, "get_projects") @patch.object(BigQueryV2Config, "get_bigquery_client") def test_get_projects_list_empty(get_bq_client_mock, get_projects_mock): From bd1560deb2d56705d336817644b716ae7c5d3bd3 Mon Sep 17 00:00:00 2001 From: Alice Naghshineh Date: Mon, 8 Apr 2024 15:05:13 -0400 Subject: [PATCH 07/18] chore: Remove line deletions --- metadata-ingestion/tests/unit/test_bigquery_source.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index b6621c7ac7386f..426d4dc12f2086 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -192,6 +192,7 @@ def test_get_projects_with_project_ids(get_bq_client_mock): ] assert client_mock.list_projects.call_count == 0 + @patch.object(BigQueryV2Config, "get_bigquery_client") def test_get_projects_with_project_ids_overrides_project_id_pattern( get_bq_client_mock, @@ -209,6 +210,7 @@ def test_get_projects_with_project_ids_overrides_project_id_pattern( BigqueryProject(id="test-project-2", name="test-project-2"), ] + def test_platform_instance_config_always_none(): config = BigQueryV2Config.parse_obj( {"include_data_platform_instance": True, "platform_instance": "something"} @@ -308,6 +310,7 @@ def test_get_projects_filter_by_pattern(get_bq_client_mock, get_projects_mock): BigqueryProject(id="test-project-2", name="Test Project 2"), ] + @patch.object(BigQuerySchemaApi, "get_projects") @patch.object(BigQueryV2Config, "get_bigquery_client") def test_get_projects_list_empty(get_bq_client_mock, get_projects_mock): From 0aea92b5639a1702539cf5a8fd4a08655c17a140 Mon Sep 17 00:00:00 2001 From: Alice Naghshineh Date: Mon, 8 Apr 2024 15:14:18 -0400 Subject: [PATCH 08/18] chore: Update failure report message to be accurate --- .../src/datahub/ingestion/source/bigquery_v2/bigquery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index f54f530a4ba968..976d7bf2b138b1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -630,7 +630,7 @@ def _query_project_list_from_labels(self) -> Iterable[BigqueryProject]: if not projects: # Report failure on exception and if empty list is returned self.report.report_failure( "metadata-extraction", - "Get projects didn't return any project in the specified folder(s). " + "Get projects didn't return any project with any of the specified label(s). " "Maybe resourcemanager.projects.list permission is missing for the service account. " "You can assign predefined roles/bigquery.metadataViewer role to your service account.", ) From 82147f3fdc83afb5229cb8d10617bf8bdb26a08f Mon Sep 17 00:00:00 2001 From: Alice Naghshineh <45885699+anaghshineh@users.noreply.github.com> Date: Wed, 17 Apr 2024 09:50:08 -0400 Subject: [PATCH 09/18] Update metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py Co-authored-by: Tamas Nemeth --- .../src/datahub/ingestion/source/bigquery_v2/bigquery_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index 650cf6abf75c44..b41c868bf10334 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -199,7 +199,7 @@ class BigQueryV2Config( project_labels: List[str] = Field( default_factory=list, description=( - "Ingests projects with the specified labels. Use this properly if you want to specify what " + "Ingests projects with the specified labels. Use this property if you want to specify what " "projects to ignest based on project-level labels. Your service account will require resourcemanager.projects.list." ), ) From d00d4dbfed90d6885a15eeb31201a745ab92ef8f Mon Sep 17 00:00:00 2001 From: Alice Naghshineh Date: Wed, 17 Apr 2024 09:58:08 -0400 Subject: [PATCH 10/18] docs: Update inline docs for new BQ config var --- .../datahub/ingestion/source/bigquery_v2/bigquery_config.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index 650cf6abf75c44..4ec89791dc1e47 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -199,8 +199,8 @@ class BigQueryV2Config( project_labels: List[str] = Field( default_factory=list, description=( - "Ingests projects with the specified labels. Use this properly if you want to specify what " - "projects to ignest based on project-level labels. Your service account will require resourcemanager.projects.list." + "Ingests projects with the specified labels. Use this property if you want to specify what " + "projects to ingest based on project-level labels." ), ) From f6af306a1365fe162bae1af021fc52a11ad8b88a Mon Sep 17 00:00:00 2001 From: Siddique Bagwan Date: Wed, 14 Aug 2024 19:43:08 +0530 Subject: [PATCH 11/18] test case --- .../bigquery_project_label_mcp_golden.json | 452 ++++++++++++++++++ .../integration/bigquery_v2/test_bigquery.py | 155 +++++- 2 files changed, 581 insertions(+), 26 deletions(-) create mode 100644 metadata-ingestion/tests/integration/bigquery_v2/bigquery_project_label_mcp_golden.json diff --git a/metadata-ingestion/tests/integration/bigquery_v2/bigquery_project_label_mcp_golden.json b/metadata-ingestion/tests/integration/bigquery_v2/bigquery_project_label_mcp_golden.json new file mode 100644 index 00000000000000..a529ddc6221a7a --- /dev/null +++ b/metadata-ingestion/tests/integration/bigquery_v2/bigquery_project_label_mcp_golden.json @@ -0,0 +1,452 @@ +[ +{ + "entityType": "container", + "entityUrn": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "bigquery", + "env": "PROD", + "project_id": "dev" + }, + "name": "dev" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Project" + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:ce17940c2d64e7e315e68f8d7d071b1e", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "bigquery", + "env": "PROD", + "project_id": "dev", + "dataset_id": "bigquery-dataset-1" + }, + "externalUrl": "https://console.cloud.google.com/bigquery?project=dev&ws=!1m4!1m3!3m2!1sdev!2sbigquery-dataset-1", + "name": "bigquery-dataset-1" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:ce17940c2d64e7e315e68f8d7d071b1e", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:ce17940c2d64e7e315e68f8d7d071b1e", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:ce17940c2d64e7e315e68f8d7d071b1e", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Dataset" + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:ce17940c2d64e7e315e68f8d7d071b1e", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:ce17940c2d64e7e315e68f8d7d071b1e", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e", + "urn": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,dev.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,dev.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "dev.bigquery-dataset-1.table-1", + "platform": "urn:li:dataPlatform:bigquery", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [ + { + "fieldPath": "age", + "nullable": false, + "description": "comment", + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "INT", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Test Policy Tag" + } + ] + }, + "glossaryTerms": { + "terms": [ + { + "urn": "urn:li:glossaryTerm:Age" + } + ], + "auditStamp": { + "time": 1643871600000, + "actor": "urn:li:corpuser:datahub" + } + }, + "isPartOfKey": false + }, + { + "fieldPath": "email", + "nullable": false, + "description": "comment", + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "STRING", + "recursive": false, + "globalTags": { + "tags": [] + }, + "glossaryTerms": { + "terms": [ + { + "urn": "urn:li:glossaryTerm:Email_Address" + } + ], + "auditStamp": { + "time": 1643871600000, + "actor": "urn:li:corpuser:datahub" + } + }, + "isPartOfKey": false + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,dev.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": {}, + "externalUrl": "https://console.cloud.google.com/bigquery?project=dev&ws=!1m5!1m4!4m3!1sdev!2sbigquery-dataset-1!3stable-1", + "name": "table-1", + "qualifiedName": "dev.bigquery-dataset-1.table-1", + "description": "", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,dev.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:ce17940c2d64e7e315e68f8d7d071b1e" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,dev.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:bigquery,dev)" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,dev.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Table" + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,dev.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e", + "urn": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e" + }, + { + "id": "urn:li:container:ce17940c2d64e7e315e68f8d7d071b1e", + "urn": "urn:li:container:ce17940c2d64e7e315e68f8d7d071b1e" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:Age", + "changeType": "UPSERT", + "aspectName": "glossaryTermKey", + "aspect": { + "json": { + "name": "Age" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:Email_Address", + "changeType": "UPSERT", + "aspectName": "glossaryTermKey", + "aspect": { + "json": { + "name": "Email_Address" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:Test Policy Tag", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "Test Policy Tag" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py index 762c73d2a55c60..4b8a40ca74fd6d 100644 --- a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py +++ b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py @@ -16,7 +16,7 @@ BigqueryColumn, BigqueryDataset, BigQuerySchemaApi, - BigqueryTable, + BigqueryTable, BigqueryProject, ) from datahub.ingestion.source.bigquery_v2.bigquery_schema_gen import ( BigQuerySchemaGenerator, @@ -39,6 +39,33 @@ def random_email(): ) +def recipe(mcp_output_path: str, override: dict = {}) -> dict: + return { + "source": { + "type": "bigquery", + "config": { + "project_ids": ["project-id-1"], + "include_usage_statistics": False, + "include_table_lineage": False, + "include_data_platform_instance": True, + "classification": ClassificationConfig( + enabled=True, + classifiers=[ + DynamicTypedClassifierConfig( + type="datahub", + config=DataHubClassifierConfig( + minimum_values_threshold=1, + ), + ) + ], + max_workers=1, + ).dict(), + }, + }, + "sink": {"type": "file", "config": {"filename": mcp_output_path}}, + } + + @freeze_time(FROZEN_TIME) @patch.object(BigQuerySchemaApi, "get_tables_for_dataset") @patch.object(BigQuerySchemaGenerator, "get_core_table_details") @@ -47,9 +74,11 @@ def random_email(): @patch.object(BigQueryDataReader, "get_sample_data_for_table") @patch("google.cloud.bigquery.Client") @patch("google.cloud.datacatalog_v1.PolicyTagManagerClient") +@patch("google.cloud.resourcemanager_v3.ProjectsClient") def test_bigquery_v2_ingest( client, policy_tag_manager_client, + projects_client, get_sample_data_for_table, get_columns_for_dataset, get_datasets_for_project_id, @@ -111,37 +140,111 @@ def test_bigquery_v2_ingest( ) get_tables_for_dataset.return_value = iter([bigquery_table]) - source_config_dict: Dict[str, Any] = { - "project_ids": ["project-id-1"], - "include_usage_statistics": False, - "include_table_lineage": False, - "include_data_platform_instance": True, - "classification": ClassificationConfig( - enabled=True, - classifiers=[ - DynamicTypedClassifierConfig( - type="datahub", - config=DataHubClassifierConfig( - minimum_values_threshold=1, - ), - ) - ], - max_workers=1, - ).dict(), - } + pipeline_config_dict: Dict[str, Any] = recipe(mcp_output_path=mcp_output_path) - pipeline_config_dict: Dict[str, Any] = { - "source": { - "type": "bigquery", - "config": source_config_dict, - }, - "sink": {"type": "file", "config": {"filename": mcp_output_path}}, + run_and_get_pipeline(pipeline_config_dict) + + mce_helpers.check_golden_file( + pytestconfig, + output_path=mcp_output_path, + golden_path=mcp_golden_path, + ) + + +@freeze_time(FROZEN_TIME) +@patch.object(BigQuerySchemaApi, attribute="get_projects_with_labels") +@patch.object(BigQuerySchemaApi, "get_tables_for_dataset") +@patch.object(BigQuerySchemaGenerator, "get_core_table_details") +@patch.object(BigQuerySchemaApi, "get_datasets_for_project_id") +@patch.object(BigQuerySchemaApi, "get_columns_for_dataset") +@patch.object(BigQueryDataReader, "get_sample_data_for_table") +@patch("google.cloud.bigquery.Client") +@patch("google.cloud.datacatalog_v1.PolicyTagManagerClient") +@patch("google.cloud.resourcemanager_v3.ProjectsClient") +def test_bigquery_v2_project_labels_ingest( + client, + policy_tag_manager_client, + projects_client, + get_sample_data_for_table, + get_columns_for_dataset, + get_datasets_for_project_id, + get_core_table_details, + get_tables_for_dataset, + get_projects_with_labels, + pytestconfig, + tmp_path, +): + test_resources_dir = pytestconfig.rootpath / "tests/integration/bigquery_v2" + mcp_golden_path = f"{test_resources_dir}/bigquery_project_label_mcp_golden.json" + mcp_output_path = "{}/{}".format(tmp_path, "bigquery_project_label_mcp_output.json") + + get_datasets_for_project_id.return_value = [ + BigqueryDataset(name="bigquery-dataset-1") + ] + + get_projects_with_labels.return_value = [ + BigqueryProject( + id="dev", name="development" + ) + ] + + table_list_item = TableListItem( + {"tableReference": {"projectId": "", "datasetId": "", "tableId": ""}} + ) + table_name = "table-1" + get_core_table_details.return_value = {table_name: table_list_item} + get_columns_for_dataset.return_value = { + table_name: [ + BigqueryColumn( + name="age", + ordinal_position=1, + is_nullable=False, + field_path="col_1", + data_type="INT", + comment="comment", + is_partition_column=False, + cluster_column_position=None, + policy_tags=["Test Policy Tag"], + ), + BigqueryColumn( + name="email", + ordinal_position=1, + is_nullable=False, + field_path="col_2", + data_type="STRING", + comment="comment", + is_partition_column=False, + cluster_column_position=None, + ), + ] + } + get_sample_data_for_table.return_value = { + "age": [random.randint(1, 80) for i in range(20)], + "email": [random_email() for i in range(20)], } + bigquery_table = BigqueryTable( + name=table_name, + comment=None, + created=None, + last_altered=None, + size_in_bytes=None, + rows_count=None, + ) + get_tables_for_dataset.return_value = iter([bigquery_table]) + + pipeline_config_dict: Dict[str, Any] = recipe(mcp_output_path=mcp_output_path) + + del pipeline_config_dict["source"]["config"]["project_ids"] + + pipeline_config_dict["source"]["config"]["project_labels"] = [ + "development" + ] + run_and_get_pipeline(pipeline_config_dict) mce_helpers.check_golden_file( pytestconfig, output_path=mcp_output_path, golden_path=mcp_golden_path, - ) + ) \ No newline at end of file From 24204f02a35a386f8bba1227629fad5c03d96c42 Mon Sep 17 00:00:00 2001 From: Siddique Bagwan Date: Wed, 14 Aug 2024 19:46:08 +0530 Subject: [PATCH 12/18] revert the base-requirements --- .../base-requirements.txt | 134 +++++++++--------- 1 file changed, 66 insertions(+), 68 deletions(-) diff --git a/docker/datahub-ingestion-base/base-requirements.txt b/docker/datahub-ingestion-base/base-requirements.txt index fd34a1ff80647b..fa07b4184a6bc0 100644 --- a/docker/datahub-ingestion-base/base-requirements.txt +++ b/docker/datahub-ingestion-base/base-requirements.txt @@ -1,24 +1,24 @@ # Generated requirements file. Run ./regenerate-base-requirements.sh to regenerate. -acryl-datahub-classify==0.0.10 +acryl-datahub-classify==0.0.11 acryl-PyHive==0.6.16 -acryl-sqlglot==23.2.1.dev5 +acryl-sqlglot==25.3.1.dev3 aenum==3.1.15 aiohappyeyeballs==2.3.2 aiohttp==3.10.0 aiosignal==1.3.1 alembic==1.13.2 altair==4.2.0 -anyio==4.3.0 -apache-airflow==2.9.0 -apache-airflow-providers-common-io==1.3.0 -apache-airflow-providers-common-sql==1.11.1 -apache-airflow-providers-fab==1.0.2 -apache-airflow-providers-ftp==3.7.0 -apache-airflow-providers-http==4.10.0 -apache-airflow-providers-imap==3.5.0 -apache-airflow-providers-smtp==1.6.1 -apache-airflow-providers-sqlite==3.7.1 -apispec==6.6.0 +anyio==4.4.0 +apache-airflow==2.9.3 +apache-airflow-providers-common-io==1.3.2 +apache-airflow-providers-common-sql==1.14.2 +apache-airflow-providers-fab==1.2.2 +apache-airflow-providers-ftp==3.10.0 +apache-airflow-providers-http==4.12.0 +apache-airflow-providers-imap==3.6.1 +apache-airflow-providers-smtp==1.7.1 +apache-airflow-providers-sqlite==3.8.1 +apispec==6.6.1 appnope==0.1.4 argcomplete==3.4.0 argon2-cffi==23.1.0 @@ -42,8 +42,8 @@ beautifulsoup4==4.12.3 bleach==6.1.0 blinker==1.8.2 blis==0.7.11 -boto3==1.34.79 -botocore==1.34.79 +boto3==1.34.151 +botocore==1.34.151 bracex==2.4 cached-property==1.5.2 cachelib==0.9.0 @@ -76,14 +76,14 @@ cryptography==42.0.8 cx_Oracle==8.3.0 cymem==2.0.8 databricks-dbapi==0.6.0 -databricks-sdk==0.24.0 -databricks-sql-connector==2.9.5 +databricks-sdk==0.29.0 +databricks-sql-connector==2.9.6 dataflows-tabulator==1.54.3 db-dtypes==1.2.0 debugpy==1.8.2 decorator==5.1.1 defusedxml==0.7.1 -deltalake==0.16.4 +deltalake==0.17.4 Deprecated==1.2.14 dill==0.3.8 dnspython==2.6.1 @@ -91,7 +91,7 @@ docker==7.1.0 docutils==0.21.2 ecdsa==0.19.0 elasticsearch==7.13.4 -email_validator==2.1.1 +email_validator==2.2.0 entrypoints==0.4 et-xmlfile==1.1.0 exceptiongroup==1.2.2 @@ -105,19 +105,18 @@ flatdict==4.0.1 frozenlist==1.4.1 fsspec==2023.12.2 future==1.0.0 -GeoAlchemy2==0.14.7 +GeoAlchemy2==0.15.2 gitdb==4.0.11 GitPython==3.1.43 -google-api-core==2.18.0 -google-auth==2.29.0 -google-cloud-appengine-logging==1.4.3 +google-api-core==2.19.1 +google-auth==2.32.0 +google-cloud-appengine-logging==1.4.5 google-cloud-audit-log==0.2.5 -google-cloud-bigquery==3.20.1 +google-cloud-bigquery==3.25.0 google-cloud-core==2.4.1 google-cloud-datacatalog==3.20.0 google-cloud-datacatalog-lineage==0.2.2 google-cloud-logging==3.5.0 -google-cloud-resource-manager==1.12.3 google-crc32c==1.5.0 google-re2==1.1.20240702 google-resumable-media==2.7.1 @@ -133,7 +132,6 @@ grpcio-tools==1.62.2 gssapi==1.8.3 gunicorn==22.0.0 h11==0.14.0 -hdbcli==2.20.15 httpcore==1.0.5 httpx==0.27.0 humanfriendly==10.0 @@ -175,7 +173,7 @@ linkify-it-py==2.0.3 lkml==1.3.5 lockfile==0.12.2 looker-sdk==23.0.0 -lxml==5.2.1 +lxml==5.2.2 lz4==4.3.3 makefun==1.15.4 Mako==1.3.5 @@ -185,8 +183,8 @@ MarkupSafe==2.1.5 marshmallow==3.21.3 marshmallow-oneofschema==3.1.1 marshmallow-sqlalchemy==0.28.2 -matplotlib-inline==0.1.6 -mdit-py-plugins==0.4.0 +matplotlib-inline==0.1.7 +mdit-py-plugins==0.4.1 mdurl==0.1.2 methodtools==0.4.7 mistune==3.0.2 @@ -202,34 +200,34 @@ murmurhash==1.0.10 mypy-extensions==1.0.0 nbclassic==1.1.0 nbclient==0.6.3 -nbconvert==7.16.3 +nbconvert==7.16.4 nbformat==5.10.4 nest-asyncio==1.6.0 networkx==3.3 -notebook==6.5.6 +notebook==6.5.7 notebook_shim==0.2.4 numpy==1.26.4 oauthlib==3.2.2 okta==1.7.0 -openlineage-airflow==1.7.0 -openlineage-integration-common==1.7.0 -openlineage-python==1.7.0 -openlineage_sql==1.7.0 -openpyxl==3.1.2 -opentelemetry-api==1.16.0 -opentelemetry-exporter-otlp==1.16.0 -opentelemetry-exporter-otlp-proto-grpc==1.16.0 -opentelemetry-exporter-otlp-proto-http==1.16.0 -opentelemetry-proto==1.16.0 -opentelemetry-sdk==1.16.0 -opentelemetry-semantic-conventions==0.37b0 +openlineage-airflow==1.18.0 +openlineage-integration-common==1.18.0 +openlineage-python==1.18.0 +openlineage_sql==1.18.0 +openpyxl==3.1.5 +opentelemetry-api==1.26.0 +opentelemetry-exporter-otlp==1.26.0 +opentelemetry-exporter-otlp-proto-common==1.26.0 +opentelemetry-exporter-otlp-proto-grpc==1.26.0 +opentelemetry-exporter-otlp-proto-http==1.26.0 +opentelemetry-proto==1.26.0 +opentelemetry-sdk==1.26.0 +opentelemetry-semantic-conventions==0.47b0 ordered-set==4.1.0 -packaging==23.2 +packaging==24.1 pandas==2.1.4 pandocfilters==1.5.1 -parse==1.20.1 +parse==1.20.2 parso==0.8.4 -pathlib_abc==0.1.1 pathspec==0.12.1 pendulum==3.0.0 pexpect==4.9.0 @@ -255,17 +253,17 @@ pyarrow-hotfix==0.6 pyasn1==0.6.0 pyasn1_modules==0.4.0 pyathena==2.25.2 -pycountry==23.12.11 +pycountry==24.6.1 pycparser==2.22 pycryptodome==3.20.0 -pydantic==1.10.15 -pydash==8.0.0 -pydruid==0.6.6 -Pygments==2.17.2 +pydantic==1.10.17 +pydash==8.0.3 +pydruid==0.6.9 +Pygments==2.18.0 pyiceberg==0.4.0 -pymongo==4.6.3 -PyMySQL==1.1.0 -pyOpenSSL==24.1.0 +pymongo==4.8.0 +PyMySQL==1.1.1 +pyOpenSSL==24.2.1 pyparsing==3.0.9 pyspnego==0.11.1 python-daemon==3.0.1 @@ -299,12 +297,12 @@ rpds-py==0.19.1 rsa==4.9 rstr==3.2.2 ruamel.yaml==0.17.17 -s3transfer==0.10.1 -schwifty==2024.1.1.post0 -scipy==1.13.0 -scramp==1.4.4 +s3transfer==0.10.2 +schwifty==2024.6.1 +scipy==1.14.0 +scramp==1.4.5 Send2Trash==1.8.3 -sentry-sdk==1.44.1 +sentry-sdk==2.12.0 setproctitle==1.3.3 shellingham==1.5.4 simple-salesforce==1.12.6 @@ -322,8 +320,8 @@ spacy-legacy==3.0.12 spacy-loggers==1.0.5 sql_metadata==2.12.0 SQLAlchemy==1.4.44 -sqlalchemy-bigquery==1.10.0 -sqlalchemy-hana==2.0.0 +sqlalchemy-bigquery==1.11.0 +sqlalchemy-cockroachdb==1.4.4 SQLAlchemy-JSONField==1.0.2 sqlalchemy-pytds==0.3.5 sqlalchemy-redshift==0.8.14 @@ -337,9 +335,9 @@ strictyaml==1.7.3 tableauserverclient==0.25 tableschema==1.20.11 tabulate==0.9.0 -tenacity==8.2.3 -teradatasql==20.0.0.9 -teradatasqlalchemy==20.0.0.0 +tenacity==9.0.0 +teradatasql==20.0.0.14 +teradatasqlalchemy==20.0.0.1 termcolor==2.4.0 terminado==0.18.1 text-unidecode==1.3 @@ -357,16 +355,16 @@ traitlets==5.2.1.post0 trino==0.329.0 typer==0.12.3 typing-inspect==0.9.0 -typing_extensions==4.11.0 +typing_extensions==4.12.2 tzdata==2024.1 tzlocal==5.2 uc-micro-py==1.0.3 ujson==5.10.0 unicodecsv==0.14.1 universal_pathlib==0.2.2 -urllib3==1.26.18 -vertica-python==1.3.8 -vertica-sqlalchemy-dialect==0.0.8.1 +urllib3==1.26.19 +vertica-python==1.4.0 +vertica-sqlalchemy-dialect==0.0.8.2 vininfo==1.8.0 wasabi==1.1.3 wcmatch==8.5.2 From 68915a7c0da7ccf3d18cf033c68a8aa2eef7413d Mon Sep 17 00:00:00 2001 From: Siddique Bagwan Date: Wed, 14 Aug 2024 23:09:28 +0530 Subject: [PATCH 13/18] fix mock --- .../ingestion/source/bigquery_v2/bigquery.py | 1 + .../source/bigquery_v2/bigquery_schema.py | 1 + .../tests/unit/test_bigquery_source.py | 79 +++++++++++++++---- 3 files changed, 64 insertions(+), 17 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 6a968cee1b5e26..0d73c9ad028972 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -258,6 +258,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: def _get_projects(self) -> List[BigqueryProject]: logger.info("Getting projects") + if self.config.project_ids or self.config.project_id: project_ids = self.config.project_ids or [self.config.project_id] # type: ignore return [ diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index 4326ff7a35527f..06a6a22d704803 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -165,6 +165,7 @@ def _should_retry(exc: BaseException) -> bool: self.report.num_list_projects_retry_request += 1 return True + page_token = None projects: List[BigqueryProject] = [] with self.report.list_projects: diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index 6b7a6ab5708e50..017b2487361bbe 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -170,7 +170,11 @@ def test_bigquery_uri_with_credential(): @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_get_projects_with_project_ids(get_bq_client_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_projects_with_project_ids( + get_projects_client, + get_bq_client_mock, +): client_mock = MagicMock() get_bq_client_mock.return_value = client_mock config = BigQueryV2Config.parse_obj( @@ -197,8 +201,10 @@ def test_get_projects_with_project_ids(get_bq_client_mock): @patch.object(BigQueryV2Config, "get_bigquery_client") +@patch.object(BigQueryV2Config, "get_projects_client") def test_get_projects_with_project_ids_overrides_project_id_pattern( - get_bq_client_mock, + get_projects_client, + get_bigquery_client, ): config = BigQueryV2Config.parse_obj( { @@ -226,7 +232,11 @@ def test_platform_instance_config_always_none(): @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_get_dataplatform_instance_aspect_returns_project_id(get_bq_client_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_dataplatform_instance_aspect_returns_project_id( + get_projects_client, + get_bq_client_mock, +): project_id = "project_id" expected_instance = ( f"urn:li:dataPlatformInstance:(urn:li:dataPlatform:bigquery,{project_id})" @@ -247,7 +257,11 @@ def test_get_dataplatform_instance_aspect_returns_project_id(get_bq_client_mock) @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_get_dataplatform_instance_default_no_instance(get_bq_client_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_dataplatform_instance_default_no_instance( + get_projects_client, + get_bq_client_mock, +): config = BigQueryV2Config.parse_obj({}) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) schema_gen = source.bq_schema_extractor @@ -263,7 +277,11 @@ def test_get_dataplatform_instance_default_no_instance(get_bq_client_mock): @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_get_projects_with_single_project_id(get_bq_client_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_projects_with_single_project_id( + get_projects_client, + get_bq_client_mock, +): client_mock = MagicMock() get_bq_client_mock.return_value = client_mock config = BigQueryV2Config.parse_obj({"project_id": "test-3"}) @@ -275,9 +293,13 @@ def test_get_projects_with_single_project_id(get_bq_client_mock): @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_get_projects_by_list(get_bq_client_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_projects_by_list( + get_projects_client, + get_bigquery_client +): client_mock = MagicMock() - get_bq_client_mock.return_value = client_mock + get_bigquery_client.return_value = client_mock first_page = MagicMock() first_page.__iter__.return_value = iter( @@ -296,6 +318,7 @@ def test_get_projects_by_list(get_bq_client_mock): ] ) second_page.next_page_token = None + client_mock.list_projects.side_effect = [first_page, second_page] config = BigQueryV2Config.parse_obj({}) @@ -311,7 +334,8 @@ def test_get_projects_by_list(get_bq_client_mock): @patch.object(BigQuerySchemaApi, "get_projects") @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_get_projects_filter_by_pattern(get_bq_client_mock, get_projects_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_projects_filter_by_pattern(get_projects_client, get_bq_client_mock, get_projects_mock): get_projects_mock.return_value = [ BigqueryProject("test-project", "Test Project"), BigqueryProject("test-project-2", "Test Project 2"), @@ -329,7 +353,8 @@ def test_get_projects_filter_by_pattern(get_bq_client_mock, get_projects_mock): @patch.object(BigQuerySchemaApi, "get_projects") @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_get_projects_list_empty(get_bq_client_mock, get_projects_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_projects_list_empty(get_projects_client, get_bq_client_mock, get_projects_mock): get_projects_mock.return_value = [] config = BigQueryV2Config.parse_obj( @@ -342,7 +367,9 @@ def test_get_projects_list_empty(get_bq_client_mock, get_projects_mock): @patch.object(BigQueryV2Config, "get_bigquery_client") +@patch.object(BigQueryV2Config, "get_projects_client") def test_get_projects_list_failure( + get_projects_client: MagicMock, get_bq_client_mock: MagicMock, caplog: pytest.LogCaptureFixture, ) -> None: @@ -366,7 +393,8 @@ def test_get_projects_list_failure( @patch.object(BigQuerySchemaApi, "get_projects") @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_get_projects_list_fully_filtered(get_projects_mock, get_bq_client_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_projects_list_fully_filtered(get_projects_mock, get_bq_client_mock, get_projects_client): get_projects_mock.return_value = [BigqueryProject("test-project", "Test Project")] config = BigQueryV2Config.parse_obj( @@ -399,7 +427,8 @@ def bigquery_table() -> BigqueryTable: @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_gen_table_dataset_workunits(get_bq_client_mock, bigquery_table): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_gen_table_dataset_workunits(get_projects_client, get_bq_client_mock, bigquery_table): project_id = "test-project" dataset_name = "test-dataset" config = BigQueryV2Config.parse_obj( @@ -471,7 +500,8 @@ def test_gen_table_dataset_workunits(get_bq_client_mock, bigquery_table): @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_simple_upstream_table_generation(get_bq_client_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_simple_upstream_table_generation(get_bq_client_mock, get_projects_client): a: BigQueryTableRef = BigQueryTableRef( BigqueryTableIdentifier( project_id="test-project", dataset="test-dataset", table="a" @@ -503,8 +533,10 @@ def test_simple_upstream_table_generation(get_bq_client_mock): @patch.object(BigQueryV2Config, "get_bigquery_client") +@patch.object(BigQueryV2Config, "get_projects_client") def test_upstream_table_generation_with_temporary_table_without_temp_upstream( get_bq_client_mock, + get_projects_client, ): a: BigQueryTableRef = BigQueryTableRef( BigqueryTableIdentifier( @@ -536,7 +568,8 @@ def test_upstream_table_generation_with_temporary_table_without_temp_upstream( @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_upstream_table_column_lineage_with_temp_table(get_bq_client_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_upstream_table_column_lineage_with_temp_table(get_bq_client_mock, get_projects_client): from datahub.ingestion.api.common import PipelineContext a: BigQueryTableRef = BigQueryTableRef( @@ -611,8 +644,10 @@ def test_upstream_table_column_lineage_with_temp_table(get_bq_client_mock): @patch.object(BigQueryV2Config, "get_bigquery_client") +@patch.object(BigQueryV2Config, "get_projects_client") def test_upstream_table_generation_with_temporary_table_with_multiple_temp_upstream( get_bq_client_mock, + get_projects_client ): a: BigQueryTableRef = BigQueryTableRef( BigqueryTableIdentifier( @@ -675,7 +710,8 @@ def test_upstream_table_generation_with_temporary_table_with_multiple_temp_upstr @patch.object(BigQuerySchemaApi, "get_tables_for_dataset") @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_table_processing_logic(get_bq_client_mock, data_dictionary_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_table_processing_logic(get_projects_client, get_bq_client_mock, data_dictionary_mock): client_mock = MagicMock() get_bq_client_mock.return_value = client_mock config = BigQueryV2Config.parse_obj( @@ -747,8 +783,9 @@ def test_table_processing_logic(get_bq_client_mock, data_dictionary_mock): @patch.object(BigQuerySchemaApi, "get_tables_for_dataset") @patch.object(BigQueryV2Config, "get_bigquery_client") +@patch.object(BigQueryV2Config, "get_projects_client") def test_table_processing_logic_date_named_tables( - get_bq_client_mock, data_dictionary_mock + get_projects_client, get_bq_client_mock, data_dictionary_mock ): client_mock = MagicMock() get_bq_client_mock.return_value = client_mock @@ -859,8 +896,10 @@ def bigquery_view_2() -> BigqueryView: @patch.object(BigQuerySchemaApi, "get_query_result") @patch.object(BigQueryV2Config, "get_bigquery_client") +@patch.object(BigQueryV2Config, "get_projects_client") def test_get_views_for_dataset( get_bq_client_mock: Mock, + get_projects_client: MagicMock, query_mock: Mock, bigquery_view_1: BigqueryView, bigquery_view_2: BigqueryView, @@ -907,8 +946,9 @@ def test_get_views_for_dataset( BigQuerySchemaGenerator, "gen_dataset_workunits", lambda *args, **kwargs: [] ) @patch.object(BigQueryV2Config, "get_bigquery_client") +@patch.object(BigQueryV2Config, "get_projects_client") def test_gen_view_dataset_workunits( - get_bq_client_mock, bigquery_view_1, bigquery_view_2 + get_projects_client, get_bq_client_mock, bigquery_view_1, bigquery_view_2 ): project_id = "test-project" dataset_name = "test-dataset" @@ -965,7 +1005,9 @@ def bigquery_snapshot() -> BigqueryTableSnapshot: @patch.object(BigQuerySchemaApi, "get_query_result") @patch.object(BigQueryV2Config, "get_bigquery_client") +@patch.object(BigQueryV2Config, "get_projects_client") def test_get_snapshots_for_dataset( + get_projects_client: MagicMock, get_bq_client_mock: Mock, query_mock: Mock, bigquery_snapshot: BigqueryTableSnapshot, @@ -1005,7 +1047,8 @@ def test_get_snapshots_for_dataset( @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_gen_snapshot_dataset_workunits(get_bq_client_mock, bigquery_snapshot): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_gen_snapshot_dataset_workunits(get_bq_client_mock, get_projects_client, bigquery_snapshot): project_id = "test-project" dataset_name = "test-dataset" config = BigQueryV2Config.parse_obj( @@ -1144,7 +1187,9 @@ def test_default_config_for_excluding_projects_and_datasets(): @patch.object(BigQueryConnectionConfig, "get_bigquery_client", new=lambda self: None) @patch.object(BigQuerySchemaApi, "get_datasets_for_project_id") +@patch.object(BigQueryV2Config, "get_projects_client") def test_excluding_empty_projects_from_ingestion( + get_projects_client, get_datasets_for_project_id_mock, ): project_id_with_datasets = "project-id-with-datasets" From 8525355d11fcb50fa19c2d71936196e8bb8567ae Mon Sep 17 00:00:00 2001 From: Siddique Bagwan Date: Wed, 14 Aug 2024 23:42:23 +0530 Subject: [PATCH 14/18] test case --- .../source/bigquery_v2/bigquery_schema.py | 1 - .../integration/bigquery_v2/test_bigquery.py | 13 ++- .../tests/unit/test_bigquery_source.py | 85 ++++++++++++++----- 3 files changed, 67 insertions(+), 32 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index 06a6a22d704803..4326ff7a35527f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -165,7 +165,6 @@ def _should_retry(exc: BaseException) -> bool: self.report.num_list_projects_retry_request += 1 return True - page_token = None projects: List[BigqueryProject] = [] with self.report.list_projects: diff --git a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py index 4b8a40ca74fd6d..45cf6ca6851572 100644 --- a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py +++ b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py @@ -15,8 +15,9 @@ from datahub.ingestion.source.bigquery_v2.bigquery_schema import ( BigqueryColumn, BigqueryDataset, + BigqueryProject, BigQuerySchemaApi, - BigqueryTable, BigqueryProject, + BigqueryTable, ) from datahub.ingestion.source.bigquery_v2.bigquery_schema_gen import ( BigQuerySchemaGenerator, @@ -183,9 +184,7 @@ def test_bigquery_v2_project_labels_ingest( ] get_projects_with_labels.return_value = [ - BigqueryProject( - id="dev", name="development" - ) + BigqueryProject(id="dev", name="development") ] table_list_item = TableListItem( @@ -237,9 +236,7 @@ def test_bigquery_v2_project_labels_ingest( del pipeline_config_dict["source"]["config"]["project_ids"] - pipeline_config_dict["source"]["config"]["project_labels"] = [ - "development" - ] + pipeline_config_dict["source"]["config"]["project_labels"] = ["development"] run_and_get_pipeline(pipeline_config_dict) @@ -247,4 +244,4 @@ def test_bigquery_v2_project_labels_ingest( pytestconfig, output_path=mcp_output_path, golden_path=mcp_golden_path, - ) \ No newline at end of file + ) diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index 017b2487361bbe..2652db02e783fa 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -172,8 +172,8 @@ def test_bigquery_uri_with_credential(): @patch.object(BigQueryV2Config, "get_bigquery_client") @patch.object(BigQueryV2Config, "get_projects_client") def test_get_projects_with_project_ids( - get_projects_client, - get_bq_client_mock, + get_projects_client, + get_bq_client_mock, ): client_mock = MagicMock() get_bq_client_mock.return_value = client_mock @@ -203,8 +203,8 @@ def test_get_projects_with_project_ids( @patch.object(BigQueryV2Config, "get_bigquery_client") @patch.object(BigQueryV2Config, "get_projects_client") def test_get_projects_with_project_ids_overrides_project_id_pattern( - get_projects_client, - get_bigquery_client, + get_projects_client, + get_bigquery_client, ): config = BigQueryV2Config.parse_obj( { @@ -234,8 +234,8 @@ def test_platform_instance_config_always_none(): @patch.object(BigQueryV2Config, "get_bigquery_client") @patch.object(BigQueryV2Config, "get_projects_client") def test_get_dataplatform_instance_aspect_returns_project_id( - get_projects_client, - get_bq_client_mock, + get_projects_client, + get_bq_client_mock, ): project_id = "project_id" expected_instance = ( @@ -259,8 +259,8 @@ def test_get_dataplatform_instance_aspect_returns_project_id( @patch.object(BigQueryV2Config, "get_bigquery_client") @patch.object(BigQueryV2Config, "get_projects_client") def test_get_dataplatform_instance_default_no_instance( - get_projects_client, - get_bq_client_mock, + get_projects_client, + get_bq_client_mock, ): config = BigQueryV2Config.parse_obj({}) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) @@ -279,8 +279,8 @@ def test_get_dataplatform_instance_default_no_instance( @patch.object(BigQueryV2Config, "get_bigquery_client") @patch.object(BigQueryV2Config, "get_projects_client") def test_get_projects_with_single_project_id( - get_projects_client, - get_bq_client_mock, + get_projects_client, + get_bq_client_mock, ): client_mock = MagicMock() get_bq_client_mock.return_value = client_mock @@ -294,10 +294,7 @@ def test_get_projects_with_single_project_id( @patch.object(BigQueryV2Config, "get_bigquery_client") @patch.object(BigQueryV2Config, "get_projects_client") -def test_get_projects_by_list( - get_projects_client, - get_bigquery_client -): +def test_get_projects_by_list(get_projects_client, get_bigquery_client): client_mock = MagicMock() get_bigquery_client.return_value = client_mock @@ -335,7 +332,9 @@ def test_get_projects_by_list( @patch.object(BigQuerySchemaApi, "get_projects") @patch.object(BigQueryV2Config, "get_bigquery_client") @patch.object(BigQueryV2Config, "get_projects_client") -def test_get_projects_filter_by_pattern(get_projects_client, get_bq_client_mock, get_projects_mock): +def test_get_projects_filter_by_pattern( + get_projects_client, get_bq_client_mock, get_projects_mock +): get_projects_mock.return_value = [ BigqueryProject("test-project", "Test Project"), BigqueryProject("test-project-2", "Test Project 2"), @@ -354,7 +353,9 @@ def test_get_projects_filter_by_pattern(get_projects_client, get_bq_client_mock, @patch.object(BigQuerySchemaApi, "get_projects") @patch.object(BigQueryV2Config, "get_bigquery_client") @patch.object(BigQueryV2Config, "get_projects_client") -def test_get_projects_list_empty(get_projects_client, get_bq_client_mock, get_projects_mock): +def test_get_projects_list_empty( + get_projects_client, get_bq_client_mock, get_projects_mock +): get_projects_mock.return_value = [] config = BigQueryV2Config.parse_obj( @@ -394,7 +395,9 @@ def test_get_projects_list_failure( @patch.object(BigQuerySchemaApi, "get_projects") @patch.object(BigQueryV2Config, "get_bigquery_client") @patch.object(BigQueryV2Config, "get_projects_client") -def test_get_projects_list_fully_filtered(get_projects_mock, get_bq_client_mock, get_projects_client): +def test_get_projects_list_fully_filtered( + get_projects_mock, get_bq_client_mock, get_projects_client +): get_projects_mock.return_value = [BigqueryProject("test-project", "Test Project")] config = BigQueryV2Config.parse_obj( @@ -428,7 +431,9 @@ def bigquery_table() -> BigqueryTable: @patch.object(BigQueryV2Config, "get_bigquery_client") @patch.object(BigQueryV2Config, "get_projects_client") -def test_gen_table_dataset_workunits(get_projects_client, get_bq_client_mock, bigquery_table): +def test_gen_table_dataset_workunits( + get_projects_client, get_bq_client_mock, bigquery_table +): project_id = "test-project" dataset_name = "test-dataset" config = BigQueryV2Config.parse_obj( @@ -569,7 +574,9 @@ def test_upstream_table_generation_with_temporary_table_without_temp_upstream( @patch.object(BigQueryV2Config, "get_bigquery_client") @patch.object(BigQueryV2Config, "get_projects_client") -def test_upstream_table_column_lineage_with_temp_table(get_bq_client_mock, get_projects_client): +def test_upstream_table_column_lineage_with_temp_table( + get_bq_client_mock, get_projects_client +): from datahub.ingestion.api.common import PipelineContext a: BigQueryTableRef = BigQueryTableRef( @@ -646,8 +653,7 @@ def test_upstream_table_column_lineage_with_temp_table(get_bq_client_mock, get_p @patch.object(BigQueryV2Config, "get_bigquery_client") @patch.object(BigQueryV2Config, "get_projects_client") def test_upstream_table_generation_with_temporary_table_with_multiple_temp_upstream( - get_bq_client_mock, - get_projects_client + get_bq_client_mock, get_projects_client ): a: BigQueryTableRef = BigQueryTableRef( BigqueryTableIdentifier( @@ -711,7 +717,9 @@ def test_upstream_table_generation_with_temporary_table_with_multiple_temp_upstr @patch.object(BigQuerySchemaApi, "get_tables_for_dataset") @patch.object(BigQueryV2Config, "get_bigquery_client") @patch.object(BigQueryV2Config, "get_projects_client") -def test_table_processing_logic(get_projects_client, get_bq_client_mock, data_dictionary_mock): +def test_table_processing_logic( + get_projects_client, get_bq_client_mock, data_dictionary_mock +): client_mock = MagicMock() get_bq_client_mock.return_value = client_mock config = BigQueryV2Config.parse_obj( @@ -1048,7 +1056,9 @@ def test_get_snapshots_for_dataset( @patch.object(BigQueryV2Config, "get_bigquery_client") @patch.object(BigQueryV2Config, "get_projects_client") -def test_gen_snapshot_dataset_workunits(get_bq_client_mock, get_projects_client, bigquery_snapshot): +def test_gen_snapshot_dataset_workunits( + get_bq_client_mock, get_projects_client, bigquery_snapshot +): project_id = "test-project" dataset_name = "test-dataset" config = BigQueryV2Config.parse_obj( @@ -1222,3 +1232,32 @@ def get_datasets_for_project_id_side_effect( config = BigQueryV2Config.parse_obj({**base_config, "exclude_empty_projects": True}) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test-2")) assert len({wu.metadata.entityUrn for wu in source.get_workunits()}) == 1 # type: ignore + + +@patch.object(BigQueryV2Config, "get_bigquery_client") +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_projects_with_project_labels( + get_projects_client, + get_bq_client_mock, +): + client_mock = MagicMock() + + get_projects_client.return_value = client_mock + + client_mock.search_projects.return_value = [ + SimpleNamespace(project_id="dev", display_name="dev_project"), + SimpleNamespace(project_id="qa", display_name="qa_project"), + ] + + config = BigQueryV2Config.parse_obj( + { + "project_labels": ["dev", "qa"], + } + ) + + source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test1")) + + assert source._get_projects() == [ + BigqueryProject("dev", "dev_project"), + BigqueryProject("qa", "qa_project"), + ] From 35842981612d298e3a94c0851d25da5ee9693c00 Mon Sep 17 00:00:00 2001 From: Siddique Bagwan Date: Mon, 19 Aug 2024 23:55:37 +0530 Subject: [PATCH 15/18] address review comments --- .../source/bigquery_v2/bigquery_config.py | 38 +++++++++++++------ 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index 8bdf96d603f5a5..8e96194461b848 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -34,12 +34,16 @@ class BigQueryUsageConfig(BaseUsageConfig): max_query_duration: timedelta = Field( default=timedelta(minutes=15), - description="Correction to pad start_time and end_time with. For handling the case where the read happens within our time range but the query completion event is delayed and happens after the configured end time.", + description="Correction to pad start_time and end_time with. For handling the case where the read happens " + "within our time range but the query completion event is delayed and happens after the configured" + " end time.", ) apply_view_usage_to_tables: bool = Field( default=False, - description="Whether to apply view's usage to its base tables. If set to False, uses sql parser and applies usage to views / tables mentioned in the query. If set to True, usage is applied to base tables only.", + description="Whether to apply view's usage to its base tables. If set to False, uses sql parser and applies " + "usage to views / tables mentioned in the query. If set to True, usage is applied to base tables " + "only.", ) @@ -146,12 +150,14 @@ class BigQueryV2Config( dataset_pattern: AllowDenyPattern = Field( default=AllowDenyPattern.allow_all(), - description="Regex patterns for dataset to filter in ingestion. Specify regex to only match the schema name. e.g. to match all tables in schema analytics, use the regex 'analytics'", + description="Regex patterns for dataset to filter in ingestion. Specify regex to only match the schema name. " + "e.g. to match all tables in schema analytics, use the regex 'analytics'", ) match_fully_qualified_names: bool = Field( default=True, - description="[deprecated] Whether `dataset_pattern` is matched against fully qualified dataset name `.`.", + description="[deprecated] Whether `dataset_pattern` is matched against fully qualified dataset name " + "`.`.", ) include_external_url: bool = Field( @@ -172,7 +178,9 @@ class BigQueryV2Config( table_snapshot_pattern: AllowDenyPattern = Field( default=AllowDenyPattern.allow_all(), - description="Regex patterns for table snapshots to filter in ingestion. Specify regex to match the entire snapshot name in database.schema.snapshot format. e.g. to match all snapshots starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'", + description="Regex patterns for table snapshots to filter in ingestion. Specify regex to match the entire " + "snapshot name in database.schema.snapshot format. e.g. to match all snapshots starting with " + "customer in Customer database and public schema, use the regex 'Customer.public.customer.*'", ) debug_include_full_payloads: bool = Field( @@ -183,17 +191,22 @@ class BigQueryV2Config( number_of_datasets_process_in_batch: int = Field( hidden_from_docs=True, default=10000, - description="Number of table queried in batch when getting metadata. This is a low level config property which should be touched with care.", + description="Number of table queried in batch when getting metadata. This is a low level config property " + "which should be touched with care.", ) number_of_datasets_process_in_batch_if_profiling_enabled: int = Field( default=1000, - description="Number of partitioned table queried in batch when getting metadata. This is a low level config property which should be touched with care. This restriction is needed because we query partitions system view which throws error if we try to touch too many tables.", + description="Number of partitioned table queried in batch when getting metadata. This is a low level config " + "property which should be touched with care. This restriction is needed because we query " + "partitions system view which throws error if we try to touch too many tables.", ) use_tables_list_query_v2: bool = Field( default=False, - description="List tables using an improved query that extracts partitions and last modified timestamps more accurately. Requires the ability to read table data. Automatically enabled when profiling is enabled.", + description="List tables using an improved query that extracts partitions and last modified timestamps more " + "accurately. Requires the ability to read table data. Automatically enabled when profiling is " + "enabled.", ) @property @@ -202,7 +215,9 @@ def have_table_data_read_permission(self) -> bool: column_limit: int = Field( default=300, - description="Maximum number of columns to process in a table. This is a low level config property which should be touched with care. This restriction is needed because excessively wide tables can result in failure to ingest the schema.", + description="Maximum number of columns to process in a table. This is a low level config property which " + "should be touched with care. This restriction is needed because excessively wide tables can " + "result in failure to ingest the schema.", ) # The inheritance hierarchy is wonky here, but these options need modifications. project_id: Optional[str] = Field( @@ -220,8 +235,9 @@ def have_table_data_read_permission(self) -> bool: project_labels: List[str] = Field( default_factory=list, description=( - "Ingests projects with the specified labels. Use this property if you want to specify what " - "projects to ingest based on project-level labels." + "Ingests projects with the specified labels. Use this property to define which projects to ingest based " + "on project-level labels. If project_ids or project_id is set, this configuration has no effect. The " + "ingestion process filters projects by label first, and then applies the project_id_pattern." ), ) From 56cafba24e03d08dc6c90110c9a06a3ee280f969 Mon Sep 17 00:00:00 2001 From: Siddique Bagwan Date: Tue, 20 Aug 2024 11:15:07 +0530 Subject: [PATCH 16/18] working with portal --- .../datahub/ingestion/source/bigquery_v2/bigquery_config.py | 3 ++- .../tests/integration/bigquery_v2/test_bigquery.py | 2 +- metadata-ingestion/tests/unit/test_bigquery_source.py | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index 8e96194461b848..af9256d8877f50 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -235,7 +235,8 @@ def have_table_data_read_permission(self) -> bool: project_labels: List[str] = Field( default_factory=list, description=( - "Ingests projects with the specified labels. Use this property to define which projects to ingest based " + "Ingests projects with the specified labels. Set value in the format of `key:value`. Use this property to " + "define which projects to ingest based" "on project-level labels. If project_ids or project_id is set, this configuration has no effect. The " "ingestion process filters projects by label first, and then applies the project_id_pattern." ), diff --git a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py index 45cf6ca6851572..fbaca81f2db017 100644 --- a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py +++ b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py @@ -236,7 +236,7 @@ def test_bigquery_v2_project_labels_ingest( del pipeline_config_dict["source"]["config"]["project_ids"] - pipeline_config_dict["source"]["config"]["project_labels"] = ["development"] + pipeline_config_dict["source"]["config"]["project_labels"] = ["environment:development"] run_and_get_pipeline(pipeline_config_dict) diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index 2652db02e783fa..d12ffbcbbcf10b 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -1251,7 +1251,7 @@ def test_get_projects_with_project_labels( config = BigQueryV2Config.parse_obj( { - "project_labels": ["dev", "qa"], + "project_labels": ["environment:dev", "environment:qa"], } ) From d38dd9e3625ca98d05ee742b3d9585227168b7f6 Mon Sep 17 00:00:00 2001 From: Siddique Bagwan Date: Tue, 20 Aug 2024 11:20:14 +0530 Subject: [PATCH 17/18] update quick start --- docs/quick-ingestion-guides/bigquery/setup.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/quick-ingestion-guides/bigquery/setup.md b/docs/quick-ingestion-guides/bigquery/setup.md index 10351d6572c531..96850f2deb68ed 100644 --- a/docs/quick-ingestion-guides/bigquery/setup.md +++ b/docs/quick-ingestion-guides/bigquery/setup.md @@ -38,7 +38,9 @@ Please refer to the BigQuery [Permissions](https://cloud.google.com/iam/docs/per You can always add/remove roles to Service Accounts later on. Please refer to the BigQuery [Manage access to projects, folders, and organizations](https://cloud.google.com/iam/docs/granting-changing-revoking-access) guide for more details. ::: -3. Create and download a [Service Account Key](https://cloud.google.com/iam/docs/creating-managing-service-account-keys). We will use this to set up authentication within DataHub. +3. To filter projects based on the `project_labels` configuration, first visit [cloudresourcemanager.googleapis.com](https://console.developers.google.com/apis/api/cloudresourcemanager.googleapis.com/overview) and enable the `Cloud Resource Manager API` + +4. Create and download a [Service Account Key](https://cloud.google.com/iam/docs/creating-managing-service-account-keys). We will use this to set up authentication within DataHub. The key file looks like this: From 38c3496e82efd5469385445d1cbe80258f3e9e5e Mon Sep 17 00:00:00 2001 From: Siddique Bagwan Date: Tue, 20 Aug 2024 23:36:41 +0530 Subject: [PATCH 18/18] lint fix --- .../tests/integration/bigquery_v2/test_bigquery.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py index fbaca81f2db017..dff7f18db6135c 100644 --- a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py +++ b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py @@ -236,7 +236,9 @@ def test_bigquery_v2_project_labels_ingest( del pipeline_config_dict["source"]["config"]["project_ids"] - pipeline_config_dict["source"]["config"]["project_labels"] = ["environment:development"] + pipeline_config_dict["source"]["config"]["project_labels"] = [ + "environment:development" + ] run_and_get_pipeline(pipeline_config_dict)