Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/bigquery): Add ability to filter GCP project ingestion based on project labels #11169

Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
0c050be
feat: First pass at enabling getting projects based on folder ids
Apr 7, 2024
69cf8e0
chore: Add google-cloud-resource-manager dependency
Apr 8, 2024
df9698c
chore: Update some methods to account for new projects_client
Apr 8, 2024
6c77b1e
chore: Add a few tests, more to come
Apr 8, 2024
7b1efab
Merge branch 'master' into add-support-for-ingestion-based-on-folder-ids
Apr 8, 2024
7e3f829
chore: Add another unit test
Apr 8, 2024
463a18a
feat: Switch to using labels instead of folders, more flexible
Apr 8, 2024
bd1560d
chore: Remove line deletions
Apr 8, 2024
0aea92b
chore: Update failure report message to be accurate
Apr 8, 2024
82147f3
Update metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bi…
anaghshineh Apr 17, 2024
d00d4db
docs: Update inline docs for new BQ config var
Apr 17, 2024
110bb7e
Merge branch 'bq-filter-ingestion-based-on-proj-labels' of github.com…
Apr 17, 2024
b5a932b
resolve conflict
sid-acryl Aug 13, 2024
2f51f8b
Merge branch 'master' into ing-690-bq-filter-ingestion-based-on-proj-…
sid-acryl Aug 13, 2024
f6af306
test case
sid-acryl Aug 14, 2024
6379304
Merge branch 'ing-690-bq-filter-ingestion-based-on-proj-labels' of gi…
sid-acryl Aug 14, 2024
24204f0
revert the base-requirements
sid-acryl Aug 14, 2024
68915a7
fix mock
sid-acryl Aug 14, 2024
8525355
test case
sid-acryl Aug 14, 2024
46a2447
Merge branch 'master' into ing-690-bq-filter-ingestion-based-on-proj-…
sid-acryl Aug 14, 2024
3149ae7
Merge branch 'master' into ing-690-bq-filter-ingestion-based-on-proj-…
david-leifker Aug 14, 2024
a4d235a
Merge branch 'master' into ing-690-bq-filter-ingestion-based-on-proj-…
sid-acryl Aug 16, 2024
f049769
Merge branch 'master' into ing-690-bq-filter-ingestion-based-on-proj-…
sid-acryl Aug 19, 2024
3584298
address review comments
sid-acryl Aug 19, 2024
f71f40f
Merge branch 'master' into ing-690-bq-filter-ingestion-based-on-proj-…
sid-acryl Aug 19, 2024
56cafba
working with portal
sid-acryl Aug 20, 2024
624288c
Merge branch 'ing-690-bq-filter-ingestion-based-on-proj-labels' of gi…
sid-acryl Aug 20, 2024
38d3dfd
Merge branch 'master' into ing-690-bq-filter-ingestion-based-on-proj-…
sid-acryl Aug 20, 2024
d38dd9e
update quick start
sid-acryl Aug 20, 2024
ffa2dd4
Merge branch 'ing-690-bq-filter-ingestion-based-on-proj-labels' of gi…
sid-acryl Aug 20, 2024
38c3496
lint fix
sid-acryl Aug 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@
"google-cloud-logging<=3.5.0",
"google-cloud-bigquery",
"google-cloud-datacatalog>=1.5.0",
"google-cloud-resource-manager",
"more-itertools>=8.12.0",
"sqlalchemy-bigquery>=1.4.1",
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
BigqueryTableIdentifier._BQ_SHARDED_TABLE_SUFFIX = ""

self.bigquery_data_dictionary = BigQuerySchemaApi(
self.report.schema_api_perf,
self.config.get_bigquery_client(),
report=BigQueryV2Report().schema_api_perf,
projects_client=config.get_projects_client(),
client=config.get_bigquery_client(),
)
if self.config.extract_policy_tags_from_catalog:
self.bigquery_data_dictionary.datacatalog_client = (
Expand Down Expand Up @@ -257,14 +258,37 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

def _get_projects(self) -> List[BigqueryProject]:
logger.info("Getting projects")

if self.config.project_ids or self.config.project_id:
project_ids = self.config.project_ids or [self.config.project_id] # type: ignore
return [
BigqueryProject(id=project_id, name=project_id)
for project_id in project_ids
]
else:
return list(self._query_project_list())

if self.config.project_labels:
return list(self._query_project_list_from_labels())

return list(self._query_project_list())

def _query_project_list_from_labels(self) -> Iterable[BigqueryProject]:
projects = self.bigquery_data_dictionary.get_projects_with_labels(
self.config.project_labels
)

if not projects: # Report failure on exception and if empty list is returned
self.report.report_failure(
"metadata-extraction",
"Get projects didn't return any project with any of the specified label(s). "
"Maybe resourcemanager.projects.list permission is missing for the service account. "
"You can assign predefined roles/bigquery.metadataViewer role to your service account.",
)

for project in projects:
if self.config.project_id_pattern.allowed(project.id):
yield project
else:
self.report.report_dropped(project.id)

def _query_project_list(self) -> Iterable[BigqueryProject]:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from datetime import timedelta
from typing import Any, Dict, List, Optional, Union

from google.cloud import bigquery, datacatalog_v1
from google.cloud import bigquery, datacatalog_v1, resourcemanager_v3
from google.cloud.logging_v2.client import Client as GCPLoggingClient
from pydantic import Field, PositiveInt, PrivateAttr, root_validator, validator

Expand Down Expand Up @@ -74,6 +74,9 @@ def get_bigquery_client(self) -> bigquery.Client:
client_options = self.extra_client_options
return bigquery.Client(self.project_on_behalf, **client_options)

def get_projects_client(self) -> resourcemanager_v3.ProjectsClient:
return resourcemanager_v3.ProjectsClient()

def get_policy_tag_manager_client(self) -> datacatalog_v1.PolicyTagManagerClient:
return datacatalog_v1.PolicyTagManagerClient()

Expand Down Expand Up @@ -214,6 +217,13 @@ def have_table_data_read_permission(self) -> bool:
"Overrides `project_id_pattern`."
),
)
project_labels: List[str] = Field(
default_factory=list,
description=(
"Ingests projects with the specified labels. Use this property if you want to specify what "
"projects to ingest based on project-level labels."
sid-acryl marked this conversation as resolved.
Show resolved Hide resolved
),
)

storage_project_id: None = Field(default=None, hidden_from_docs=True)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class BigQuerySchemaApiPerfReport(Report):
num_get_snapshots_for_dataset_api_requests: int = 0

list_projects: PerfTimer = field(default_factory=PerfTimer)
list_projects_with_labels: PerfTimer = field(default_factory=PerfTimer)
list_datasets: PerfTimer = field(default_factory=PerfTimer)

get_columns_for_dataset_sec: float = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Any, Dict, Iterable, Iterator, List, Optional

from google.api_core import retry
from google.cloud import bigquery, datacatalog_v1
from google.cloud import bigquery, datacatalog_v1, resourcemanager_v3
from google.cloud.bigquery.table import (
RowIterator,
TableListItem,
Expand Down Expand Up @@ -144,9 +144,11 @@ def __init__(
self,
report: BigQuerySchemaApiPerfReport,
client: bigquery.Client,
projects_client: resourcemanager_v3.ProjectsClient,
datacatalog_client: Optional[datacatalog_v1.PolicyTagManagerClient] = None,
) -> None:
self.bq_client = client
self.projects_client = projects_client
self.report = report
self.datacatalog_client = datacatalog_client

Expand Down Expand Up @@ -175,7 +177,7 @@ def _should_retry(exc: BaseException) -> bool:
# 'Quota exceeded: Your user exceeded quota for concurrent project.lists requests.'
# Hence, added the api request retry of 15 min.
# We already tried adding rate_limit externally, proving max_result and page_size
# to restrict the request calls inside list_project but issue still occured.
# to restrict the request calls inside list_project but issue still occurred.
projects_iterator = self.bq_client.list_projects(
max_results=max_results_per_page,
page_token=page_token,
Expand All @@ -202,6 +204,26 @@ def _should_retry(exc: BaseException) -> bool:
return []
return projects

def get_projects_with_labels(self, labels: List[str]) -> List[BigqueryProject]:
with self.report.list_projects_with_labels:
try:
projects = []
labels_query = " OR ".join([f"labels.{label}" for label in labels])
for project in self.projects_client.search_projects(query=labels_query):
projects.append(
BigqueryProject(
id=project.project_id, name=project.display_name
)
)

return projects

except Exception as e:
logger.error(
f"Error getting projects with labels: {labels}. {e}", exc_info=True
)
return []

def get_datasets_for_project_id(
self, project_id: str, maxResults: Optional[int] = None
) -> List[BigqueryDataset]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ def metadata_read_capability_test(
client: bigquery.Client = config.get_bigquery_client()
assert client
bigquery_data_dictionary = BigQuerySchemaApi(
BigQueryV2Report().schema_api_perf, client
report=BigQueryV2Report().schema_api_perf,
projects_client=config.get_projects_client(),
client=client,
)
result = bigquery_data_dictionary.get_datasets_for_project_id(
project_id, 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,9 @@ def lineage_via_catalog_lineage_api(
lineage_client: lineage_v1.LineageClient = lineage_v1.LineageClient()

data_dictionary = BigQuerySchemaApi(
self.report.schema_api_perf, self.config.get_bigquery_client()
self.report.schema_api_perf,
self.config.get_bigquery_client(),
self.config.get_projects_client(),
)

# Filtering datasets
Expand Down
Loading
Loading